Async working with connection handlers

This commit is contained in:
2022-12-29 01:49:00 +01:00
parent dfc659dec0
commit b90ed61e03
15 changed files with 505 additions and 50 deletions

158
src/lib/mpmc.c Normal file
View File

@ -0,0 +1,158 @@
#include "mpmc.h"
#include "../logger.h"
#include <errno.h>
#include <malloc.h>
#include <memory.h>
#include <pthread.h>
#include <signal.h>
static void *mpmc_worker(void *arg);
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) {
ctx->alive = 1;
ctx->n_workers = n_workers;
ctx->size = buf_size, ctx->max_size = buf_size;
ctx->rd = 0, ctx->wr = 0;
ctx->buffer = NULL, ctx->workers = NULL;
ctx->consumer = consumer;
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
sem_init(&ctx->used, 0, 0) != 0 ||
sem_init(&ctx->lck_rd, 0, 1) != 0 ||
sem_init(&ctx->lck_wr, 0, 1) != 0)
{
mpmc_destroy(ctx);
return -1;
}
if ((ctx->buffer = malloc(ctx->size * sizeof(void *))) == NULL ||
(ctx->workers = malloc(ctx->n_workers * sizeof(pthread_t))) == NULL)
{
mpmc_destroy(ctx);
return -1;
}
memset(ctx->buffer, 0, ctx->size * sizeof(void *));
memset(ctx->workers, 0, ctx->n_workers * sizeof(pthread_t));
for (int i = 0; i < ctx->n_workers; i++) {
int ret;
if ((ret = pthread_create(&ctx->workers[i], NULL, mpmc_worker, ctx)) != 0) {
mpmc_destroy(ctx);
errno = ret;
return -1;
}
}
return 0;
}
int mpmc_queue(mpmc_t *ctx, void *obj) {
// wait for buffer to be emptied
try_again_1:
if (sem_wait(&ctx->free) != 0) {
if (errno == EINTR) {
goto try_again_1;
} else {
return -1;
}
}
// lock rd field
try_again_2:
if (sem_wait(&ctx->lck_rd) != 0) {
if (errno == EINTR) {
goto try_again_2;
} else {
sem_post(&ctx->free);
return -1;
}
}
int p = ctx->rd;
ctx->rd = (ctx->rd + 1) % ctx->size;
// unlock rd field
sem_post(&ctx->lck_rd);
// fill buffer with object
ctx->buffer[p] = obj;
// inform worker
sem_post(&ctx->used);
return 0;
}
static void *mpmc_worker(void *arg) {
mpmc_t *ctx = arg;
while (ctx->alive) {
// wait for buffer to be filled
if (sem_wait(&ctx->used) != 0) {
if (errno == EINTR) {
continue;
} else {
critical("Unable to lock semaphore");
break;
}
}
// lock wr field
if (sem_wait(&ctx->lck_wr) != 0) {
if (errno == EINTR) {
sem_post(&ctx->used);
continue;
} else {
critical("Unable to lock semaphore");
sem_post(&ctx->used);
break;
}
}
int p = ctx->wr;
ctx->wr = (ctx->wr + 1) % ctx->size;
// unlock wr field
sem_post(&ctx->lck_wr);
// consume object
ctx->consumer(ctx->buffer[p]);
logger_set_prefix("");
// unlock slot in buffer
sem_post(&ctx->free);
}
return NULL;
}
void mpmc_stop(mpmc_t *ctx) {
ctx->alive = 0;
}
void mpmc_destroy(mpmc_t *ctx) {
int e = errno;
// stop threads, if running
mpmc_stop(ctx);
for (int i = 0; i < ctx->n_workers; i++) {
if (ctx->workers[i] == 0) break;
// FIXME
pthread_kill(ctx->workers[i], SIGUSR1);
//pthread_join(ctx->workers[i], NULL);
pthread_cancel(ctx->workers[i]);
}
sem_destroy(&ctx->free);
sem_destroy(&ctx->used);
sem_destroy(&ctx->lck_rd);
sem_destroy(&ctx->lck_wr);
free(ctx->buffer);
free(ctx->workers);
// reset errno
errno = e;
}