Update logger to use format

This commit is contained in:
2022-12-29 11:09:45 +01:00
parent 5c72a0cb60
commit f9b3cc29ab
8 changed files with 57 additions and 32 deletions

View File

@ -8,15 +8,21 @@
#include <pthread.h>
#include <signal.h>
typedef struct {
mpmc_t *ctx;
int worker_id;
} mpmc_arg_t;
static void *mpmc_worker(void *arg);
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) {
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) {
ctx->alive = 1;
ctx->n_workers = n_workers;
ctx->size = buf_size, ctx->max_size = buf_size;
ctx->rd = 0, ctx->wr = 0;
ctx->buffer = NULL, ctx->workers = NULL;
ctx->consumer = consumer;
ctx->name = name;
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
sem_init(&ctx->used, 0, 0) != 0 ||
@ -61,9 +67,9 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
}
}
// lock rd field
// lock wr field
try_again_2:
if (sem_wait(&ctx->lck_rd) != 0) {
if (sem_wait(&ctx->lck_wr) != 0) {
if (errno == EINTR) {
errno = 0;
goto try_again_2;
@ -73,11 +79,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
}
}
int p = ctx->rd;
ctx->rd = (ctx->rd + 1) % ctx->size;
int p = ctx->wr;
ctx->wr = (ctx->wr + 1) % ctx->size;
// unlock rd field
sem_post(&ctx->lck_rd);
// unlock wr field
sem_post(&ctx->lck_wr);
// fill buffer with object
ctx->buffer[p] = obj;
@ -90,6 +96,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
static void *mpmc_worker(void *arg) {
mpmc_t *ctx = arg;
int id;
for (id = 0; id < ctx->n_workers && ctx->workers[id] != pthread_self(); id++);
logger_set_name("%s/%i", ctx->name, id);
while (ctx->alive) {
// wait for buffer to be filled
if (sem_wait(&ctx->used) != 0) {
@ -103,8 +114,8 @@ static void *mpmc_worker(void *arg) {
}
}
// lock wr field
if (sem_wait(&ctx->lck_wr) != 0) {
// lock rd field
if (sem_wait(&ctx->lck_rd) != 0) {
if (errno == EINTR) {
errno = 0;
sem_post(&ctx->used);
@ -117,11 +128,11 @@ static void *mpmc_worker(void *arg) {
}
}
int p = ctx->wr;
ctx->wr = (ctx->wr + 1) % ctx->size;
int p = ctx->rd;
ctx->rd = (ctx->rd + 1) % ctx->size;
// unlock wr field
sem_post(&ctx->lck_wr);
// unlock rd field
sem_post(&ctx->lck_rd);
// consume object
ctx->consumer(ctx->buffer[p]);