diff --git a/src/client.c b/src/client.c index fd9f191..a837ed8 100644 --- a/src/client.c +++ b/src/client.c @@ -141,8 +141,7 @@ int client_request_handler(client_ctx_t *cctx) { strcpy(host, host_ptr); } - sprintf(log_req_prefix, "[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, cctx->log_prefix); - logger_set_prefix(log_req_prefix); + logger_set_prefix("[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, cctx->log_prefix); info(BLD_STR "%s %s", req.method, req.uri); conf = get_host_config(host); diff --git a/src/lib/mpmc.c b/src/lib/mpmc.c index 0188def..66ff678 100644 --- a/src/lib/mpmc.c +++ b/src/lib/mpmc.c @@ -8,15 +8,21 @@ #include #include +typedef struct { + mpmc_t *ctx; + int worker_id; +} mpmc_arg_t; + static void *mpmc_worker(void *arg); -int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) { +int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) { ctx->alive = 1; ctx->n_workers = n_workers; ctx->size = buf_size, ctx->max_size = buf_size; ctx->rd = 0, ctx->wr = 0; ctx->buffer = NULL, ctx->workers = NULL; ctx->consumer = consumer; + ctx->name = name; if (sem_init(&ctx->free, 0, ctx->size) != 0 || sem_init(&ctx->used, 0, 0) != 0 || @@ -61,9 +67,9 @@ int mpmc_queue(mpmc_t *ctx, void *obj) { } } - // lock rd field + // lock wr field try_again_2: - if (sem_wait(&ctx->lck_rd) != 0) { + if (sem_wait(&ctx->lck_wr) != 0) { if (errno == EINTR) { errno = 0; goto try_again_2; @@ -73,11 +79,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) { } } - int p = ctx->rd; - ctx->rd = (ctx->rd + 1) % ctx->size; + int p = ctx->wr; + ctx->wr = (ctx->wr + 1) % ctx->size; - // unlock rd field - sem_post(&ctx->lck_rd); + // unlock wr field + sem_post(&ctx->lck_wr); // fill buffer with object ctx->buffer[p] = obj; @@ -90,6 +96,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) { static void *mpmc_worker(void *arg) { mpmc_t *ctx = arg; + + int id; + for (id = 0; id < ctx->n_workers && ctx->workers[id] != pthread_self(); id++); + logger_set_name("%s/%i", ctx->name, id); + while (ctx->alive) { // wait for buffer to be filled if (sem_wait(&ctx->used) != 0) { @@ -103,8 +114,8 @@ static void *mpmc_worker(void *arg) { } } - // lock wr field - if (sem_wait(&ctx->lck_wr) != 0) { + // lock rd field + if (sem_wait(&ctx->lck_rd) != 0) { if (errno == EINTR) { errno = 0; sem_post(&ctx->used); @@ -117,11 +128,11 @@ static void *mpmc_worker(void *arg) { } } - int p = ctx->wr; - ctx->wr = (ctx->wr + 1) % ctx->size; + int p = ctx->rd; + ctx->rd = (ctx->rd + 1) % ctx->size; - // unlock wr field - sem_post(&ctx->lck_wr); + // unlock rd field + sem_post(&ctx->lck_rd); // consume object ctx->consumer(ctx->buffer[p]); diff --git a/src/lib/mpmc.h b/src/lib/mpmc.h index 4d62cb8..a551a9c 100644 --- a/src/lib/mpmc.h +++ b/src/lib/mpmc.h @@ -13,9 +13,10 @@ typedef struct { void **buffer; pthread_t *workers; void (*consumer)(void *obj); + const char* name; } mpmc_t; -int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix); +int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name); int mpmc_queue(mpmc_t *ctx, void *obj); diff --git a/src/lib/sock.c b/src/lib/sock.c index 4cfff6c..975ad97 100644 --- a/src/lib/sock.c +++ b/src/lib/sock.c @@ -130,6 +130,7 @@ int sock_close(sock *s) { close(s->socket); s->socket = 0; s->enc = 0; + errno = 0; return 0; } diff --git a/src/logger.c b/src/logger.c index cfa9764..df3c6c6 100644 --- a/src/logger.c +++ b/src/logger.c @@ -162,10 +162,13 @@ static int logger_remaining(void) { return val; } -void logger_set_name(const char *restrict name) { +void logger_set_name(const char *restrict format, ...) { + va_list args; + if (key_name == -1) { // not initialized - strncpy(global_name, name, sizeof(global_name)); + va_start(args, format); + vsnprintf(global_name, sizeof(global_name), format, args); } else { int ret; void *ptr = pthread_getspecific(key_name); @@ -177,14 +180,22 @@ void logger_set_name(const char *restrict name) { return; } } - strncpy(ptr, name, LOG_NAME_LEN); + + va_start(args, format); + vsnprintf(ptr, LOG_NAME_LEN, format, args); } + + // cleanup + va_end(args); } -void logger_set_prefix(const char *restrict prefix) { +void logger_set_prefix(const char *restrict format, ...) { + va_list args; + if (key_prefix == -1) { // not initialized - strncpy(global_prefix, prefix, sizeof(global_prefix)); + va_start(args, format); + vsnprintf(global_prefix, sizeof(global_prefix), format, args); } else { int ret; void *ptr = pthread_getspecific(key_prefix); @@ -196,8 +207,12 @@ void logger_set_prefix(const char *restrict prefix) { return; } } - strncpy(ptr, prefix, LOG_PREFIX_LEN); + va_start(args, format); + vsnprintf(ptr, LOG_PREFIX_LEN, format, args); } + + // cleanup + va_end(args); } static void *logger_thread(void *arg) { diff --git a/src/logger.h b/src/logger.h index 778701d..efad784 100644 --- a/src/logger.h +++ b/src/logger.h @@ -29,9 +29,9 @@ typedef unsigned char log_lvl_t; void logmsgf(log_lvl_t level, const char *restrict format, ...); -void logger_set_name(const char *restrict name); +void logger_set_name(const char *restrict format, ...); -void logger_set_prefix(const char *restrict prefix); +void logger_set_prefix(const char *restrict format, ...); int logger_init(void); diff --git a/src/worker/tcp_acceptor.c b/src/worker/tcp_acceptor.c index 8909e8a..8014b45 100644 --- a/src/worker/tcp_acceptor.c +++ b/src/worker/tcp_acceptor.c @@ -19,7 +19,7 @@ static mpmc_t mpmc_ctx; static void tcp_acceptor_func(client_ctx_t *ctx); int tcp_acceptor_init(int n_workers, int buf_size) { - return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp/a"); + return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp_a"); } int tcp_accept(client_ctx_t *ctx) { @@ -36,7 +36,6 @@ void tcp_acceptor_destroy(void) { static void tcp_acceptor_func(client_ctx_t *ctx) { struct sockaddr_in6 server_addr; - char log_client_prefix[256]; inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr)); if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) { @@ -54,12 +53,11 @@ static void tcp_acceptor_func(client_ctx_t *ctx) { ctx->s_addr = ctx->_s_addr; } - sprintf(log_client_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR, + sprintf(ctx->log_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR, ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr, ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR); - sprintf(ctx->log_prefix, "[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, log_client_prefix); - logger_set_prefix(ctx->log_prefix); + logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix); int ret; char buf[1024]; @@ -117,7 +115,7 @@ static void tcp_acceptor_func(client_ctx_t *ctx) { client->_errno = errno; client->_ssl_error = ERR_get_error(); if (ret <= 0) { - error("Unable to perform handshake: %s", sock_strerror(client)); + info("Unable to perform handshake: %s", sock_strerror(client)); tcp_close(ctx); return; } diff --git a/src/worker/tcp_closer.c b/src/worker/tcp_closer.c index c775c22..3a9a824 100644 --- a/src/worker/tcp_closer.c +++ b/src/worker/tcp_closer.c @@ -11,7 +11,7 @@ static mpmc_t mpmc_ctx; static void tcp_closer_func(client_ctx_t *ctx); int tcp_closer_init(int n_workers, int buf_size) { - return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp/c"); + return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp_c"); } int tcp_close(client_ctx_t *ctx) { @@ -27,7 +27,7 @@ void tcp_closer_destroy(void) { } static void tcp_closer_func(client_ctx_t *ctx) { - logger_set_prefix(ctx->log_prefix); + logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix); sock_close(&ctx->socket);