diff --git a/src/lib/mpmc.c b/src/lib/mpmc.c index ff9f340..91b9062 100644 --- a/src/lib/mpmc.c +++ b/src/lib/mpmc.c @@ -10,7 +10,7 @@ static void *mpmc_worker(void *arg); -int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) { +int mpmc_init(mpmc_t *ctx, const int n_workers, const int buf_size, void (*consumer)(void *), const char *name) { ctx->alive = 1; ctx->n_workers = n_workers; ctx->size = buf_size, ctx->max_size = buf_size; @@ -72,7 +72,7 @@ int mpmc_queue(mpmc_t *ctx, void *obj) { } } - int p = ctx->wr; + const int p = ctx->wr; ctx->wr = (ctx->wr + 1) % ctx->size; // unlock wr field @@ -121,14 +121,14 @@ static void *mpmc_worker(void *arg) { } } - int p = ctx->rd; + const int p = ctx->rd; ctx->rd = (ctx->rd + 1) % ctx->size; // unlock rd field sem_post(&ctx->lck_rd); // consume object - ctx->consumer(ctx->buffer[p]); + ctx->consumer((void *)ctx->buffer[p]); logger_set_prefix(""); // unlock slot in buffer @@ -143,9 +143,9 @@ void mpmc_stop(mpmc_t *ctx) { } void mpmc_destroy(mpmc_t *ctx) { - int e = errno; + const int e = errno; - // stop threads, if running + // stop threads if running mpmc_stop(ctx); for (int i = 0; i < ctx->n_workers; i++) { if (ctx->workers[i] == -1) break; diff --git a/src/lib/mpmc.h b/src/lib/mpmc.h index a551a9c..901c7c3 100644 --- a/src/lib/mpmc.h +++ b/src/lib/mpmc.h @@ -5,18 +5,18 @@ #include typedef struct { - unsigned char alive; + volatile unsigned int alive:1; int n_workers; - int rd, wr; + volatile int rd, wr; sem_t free, used, lck_rd, lck_wr; int size, max_size; - void **buffer; + volatile void **buffer; pthread_t *workers; void (*consumer)(void *obj); const char* name; } mpmc_t; -int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name); +int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *), const char *name); int mpmc_queue(mpmc_t *ctx, void *obj);