mpcm: Add volatile to some fields
This commit is contained in:
+6
-6
@@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
static void *mpmc_worker(void *arg);
|
static void *mpmc_worker(void *arg);
|
||||||
|
|
||||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) {
|
int mpmc_init(mpmc_t *ctx, const int n_workers, const int buf_size, void (*consumer)(void *), const char *name) {
|
||||||
ctx->alive = 1;
|
ctx->alive = 1;
|
||||||
ctx->n_workers = n_workers;
|
ctx->n_workers = n_workers;
|
||||||
ctx->size = buf_size, ctx->max_size = buf_size;
|
ctx->size = buf_size, ctx->max_size = buf_size;
|
||||||
@@ -72,7 +72,7 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int p = ctx->wr;
|
const int p = ctx->wr;
|
||||||
ctx->wr = (ctx->wr + 1) % ctx->size;
|
ctx->wr = (ctx->wr + 1) % ctx->size;
|
||||||
|
|
||||||
// unlock wr field
|
// unlock wr field
|
||||||
@@ -121,14 +121,14 @@ static void *mpmc_worker(void *arg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int p = ctx->rd;
|
const int p = ctx->rd;
|
||||||
ctx->rd = (ctx->rd + 1) % ctx->size;
|
ctx->rd = (ctx->rd + 1) % ctx->size;
|
||||||
|
|
||||||
// unlock rd field
|
// unlock rd field
|
||||||
sem_post(&ctx->lck_rd);
|
sem_post(&ctx->lck_rd);
|
||||||
|
|
||||||
// consume object
|
// consume object
|
||||||
ctx->consumer(ctx->buffer[p]);
|
ctx->consumer((void *)ctx->buffer[p]);
|
||||||
logger_set_prefix("");
|
logger_set_prefix("");
|
||||||
|
|
||||||
// unlock slot in buffer
|
// unlock slot in buffer
|
||||||
@@ -143,9 +143,9 @@ void mpmc_stop(mpmc_t *ctx) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void mpmc_destroy(mpmc_t *ctx) {
|
void mpmc_destroy(mpmc_t *ctx) {
|
||||||
int e = errno;
|
const int e = errno;
|
||||||
|
|
||||||
// stop threads, if running
|
// stop threads if running
|
||||||
mpmc_stop(ctx);
|
mpmc_stop(ctx);
|
||||||
for (int i = 0; i < ctx->n_workers; i++) {
|
for (int i = 0; i < ctx->n_workers; i++) {
|
||||||
if (ctx->workers[i] == -1) break;
|
if (ctx->workers[i] == -1) break;
|
||||||
|
|||||||
+4
-4
@@ -5,18 +5,18 @@
|
|||||||
#include <semaphore.h>
|
#include <semaphore.h>
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
unsigned char alive;
|
volatile unsigned int alive:1;
|
||||||
int n_workers;
|
int n_workers;
|
||||||
int rd, wr;
|
volatile int rd, wr;
|
||||||
sem_t free, used, lck_rd, lck_wr;
|
sem_t free, used, lck_rd, lck_wr;
|
||||||
int size, max_size;
|
int size, max_size;
|
||||||
void **buffer;
|
volatile void **buffer;
|
||||||
pthread_t *workers;
|
pthread_t *workers;
|
||||||
void (*consumer)(void *obj);
|
void (*consumer)(void *obj);
|
||||||
const char* name;
|
const char* name;
|
||||||
} mpmc_t;
|
} mpmc_t;
|
||||||
|
|
||||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name);
|
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *), const char *name);
|
||||||
|
|
||||||
int mpmc_queue(mpmc_t *ctx, void *obj);
|
int mpmc_queue(mpmc_t *ctx, void *obj);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user