From b90ed61e0341d46ce048b481a3d08b8b1ac06df3 Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Thu, 29 Dec 2022 01:49:00 +0100 Subject: [PATCH] Async working with connection handlers --- Makefile | 18 +++- src/async.c | 10 +++ src/async.h | 4 + src/cache_handler.c | 8 +- src/client.h | 15 ++-- src/lib/mpmc.c | 158 +++++++++++++++++++++++++++++++++++ src/lib/mpmc.h | 26 ++++++ src/server.c | 66 +++++++-------- src/server.h | 3 +- src/worker/request_handler.c | 31 +++++++ src/worker/request_handler.h | 15 ++++ src/worker/tcp_acceptor.c | 131 +++++++++++++++++++++++++++++ src/worker/tcp_acceptor.h | 15 ++++ src/worker/tcp_closer.c | 40 +++++++++ src/worker/tcp_closer.h | 15 ++++ 15 files changed, 505 insertions(+), 50 deletions(-) create mode 100644 src/lib/mpmc.c create mode 100644 src/lib/mpmc.h create mode 100644 src/worker/request_handler.c create mode 100644 src/worker/request_handler.h create mode 100644 src/worker/tcp_acceptor.c create mode 100644 src/worker/tcp_acceptor.h create mode 100644 src/worker/tcp_closer.c create mode 100644 src/worker/tcp_closer.h diff --git a/Makefile b/Makefile index 23090e9..9dd4cb9 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ DEBIAN_OPTS=-D CACHE_MAGIC_FILE="\"/usr/share/file/magic.mgc\"" -D PHP_FPM_SOCKE .PHONY: all prod debug default debian permit clean test all: prod -default: bin bin/lib bin/sesimos +default: bin bin/lib bin/worker bin/sesimos prod: CFLAGS += -O3 prod: default @@ -29,6 +29,8 @@ bin: bin/lib: mkdir -p bin/lib +bin/worker: + mkdir -p bin/worker bin/test: test/mock_*.c test/test_*.c src/lib/utils.c src/lib/sock.c $(CC) -o $@ $(CFLAGS) $^ -lcriterion @@ -40,10 +42,14 @@ bin/%.o: src/%.c bin/lib/%.o: src/lib/%.c $(CC) -c -o $@ $(CFLAGS) $< +bin/worker/%.o: src/worker/%.c + $(CC) -c -o $@ $(CFLAGS) $< + bin/sesimos: bin/server.o bin/client.o bin/logger.o bin/cache_handler.o bin/async.o \ + bin/worker/request_handler.o bin/worker/tcp_acceptor.o bin/worker/tcp_closer.o \ bin/lib/compress.o bin/lib/config.o bin/lib/fastcgi.o bin/lib/geoip.o \ bin/lib/http.o bin/lib/http_static.o bin/lib/proxy.o bin/lib/sock.o bin/lib/uri.o \ - bin/lib/utils.o bin/lib/websocket.o + bin/lib/utils.o bin/lib/websocket.o bin/lib/mpmc.o $(CC) -o $@ $^ $(CFLAGS) $(LDFLAGS) @@ -60,6 +66,12 @@ bin/cache_handler.o: src/cache_handler.h src/lib/utils.h src/lib/uri.h src/lib/c bin/async.o: src/async.h src/logger.h +bin/worker/request_handler.o: src/worker/request_handler.h + +bin/worker/tcp_acceptor.o: src/worker/tcp_acceptor.h + +bin/worker/tcp_closer.o: src/worker/tcp_closer.h + bin/lib/compress.o: src/lib/compress.h bin/lib/config.o: src/lib/config.h src/lib/utils.h src/lib/uri.h src/logger.h @@ -71,6 +83,8 @@ bin/lib/geoip.o: src/lib/geoip.h bin/lib/http.o: src/lib/http.h src/lib/utils.h src/lib/compress.h src/lib/sock.h src/logger.h +bin/lib/mpmc.o: src/lib/mpmc.h src/logger.h + bin/lib/proxy.o: src/lib/proxy.h src/defs.h src/server.h src/lib/compress.h src/logger.h bin/lib/sock.o: src/lib/sock.h diff --git a/src/async.c b/src/async.c index 2440994..5704afd 100644 --- a/src/async.c +++ b/src/async.c @@ -13,6 +13,7 @@ #include #include #include +#include typedef struct { int fd; @@ -31,10 +32,12 @@ typedef struct { static listen_queue_t listen1, listen2, *listen = &listen1; static volatile sig_atomic_t alive = 1; +static pthread_t thread = -1; static int async_add_to_queue(evt_listen_t *evt) { // TODO locking memcpy(&listen->q[listen->n++], evt, sizeof(*evt)); + if (thread != -1) pthread_kill(thread, SIGUSR1); return 0; } @@ -55,6 +58,8 @@ void async_thread(void) { int num_fds; struct pollfd fds[256]; // TODO dynamic + thread = pthread_self(); + // main event loop while (alive) { // swap listen queue @@ -70,6 +75,7 @@ void async_thread(void) { if (poll(fds, num_fds, -1) < 0) { if (errno == EINTR) { // interrupt + errno = 0; } else { // other error critical("Unable to poll for events"); @@ -101,3 +107,7 @@ void async_thread(void) { l->n = 0; } } + +void async_stop(void) { + alive = 0; +} diff --git a/src/async.h b/src/async.h index 413c2ca..89f71d7 100644 --- a/src/async.h +++ b/src/async.h @@ -9,10 +9,14 @@ #ifndef SESIMOS_ASYNC_H #define SESIMOS_ASYNC_H +#include + #define ASYNC_KEEP 1 int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg); void async_thread(void); +void async_stop(void); + #endif //SESIMOS_ASYNC_H diff --git a/src/cache_handler.c b/src/cache_handler.c index 76bd7a4..47f5975 100644 --- a/src/cache_handler.c +++ b/src/cache_handler.c @@ -203,7 +203,7 @@ static void cache_process_entry(cache_entry_t *entry) { static void *cache_thread(void *arg) { logger_set_name("cache"); - while (alive) { + while (server_alive) { pthread_testcancel(); if (sem_wait(&sem_used) != 0) { if (errno == EINTR) { @@ -257,10 +257,14 @@ int cache_init(void) { } close(fd); + errno = 0; } // try to initialize all three semaphores - if (sem_init(&sem_lock, 0, 1) != 0|| sem_init(&sem_free, 0, 1) != 0 || sem_init(&sem_used, 0, 0) != 0) { + if (sem_init(&sem_lock, 0, 1) != 0 || + sem_init(&sem_free, 0, 1) != 0 || + sem_init(&sem_used, 0, 0) != 0) + { critical("Unable to initialize semaphore"); return -1; } diff --git a/src/client.h b/src/client.h index aade7bb..d2ce3cd 100644 --- a/src/client.h +++ b/src/client.h @@ -16,14 +16,13 @@ typedef struct { sock socket; - char *addr; - char *s_addr; - unsigned char s_keep_alive:1; - unsigned char c_keep_alive:1; - char cc[3]; - char host[256]; - char _c_addr[INET6_ADDRSTRLEN + 1]; - char _s_addr[INET6_ADDRSTRLEN + 1]; + int req_num; + char *addr, *s_addr; + unsigned char in_use: 1, s_keep_alive:1, c_keep_alive:1; + char cc[3], host[256]; + char log_prefix[512]; + char _c_addr[INET6_ADDRSTRLEN + 1], _s_addr[INET6_ADDRSTRLEN + 1]; + struct timespec begin, end; } client_ctx_t; host_config_t *get_host_config(const char *host); diff --git a/src/lib/mpmc.c b/src/lib/mpmc.c new file mode 100644 index 0000000..a59ecb6 --- /dev/null +++ b/src/lib/mpmc.c @@ -0,0 +1,158 @@ + +#include "mpmc.h" +#include "../logger.h" + +#include +#include +#include +#include +#include + +static void *mpmc_worker(void *arg); + +int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) { + ctx->alive = 1; + ctx->n_workers = n_workers; + ctx->size = buf_size, ctx->max_size = buf_size; + ctx->rd = 0, ctx->wr = 0; + ctx->buffer = NULL, ctx->workers = NULL; + ctx->consumer = consumer; + + if (sem_init(&ctx->free, 0, ctx->size) != 0 || + sem_init(&ctx->used, 0, 0) != 0 || + sem_init(&ctx->lck_rd, 0, 1) != 0 || + sem_init(&ctx->lck_wr, 0, 1) != 0) + { + mpmc_destroy(ctx); + return -1; + } + + if ((ctx->buffer = malloc(ctx->size * sizeof(void *))) == NULL || + (ctx->workers = malloc(ctx->n_workers * sizeof(pthread_t))) == NULL) + { + mpmc_destroy(ctx); + return -1; + } + + memset(ctx->buffer, 0, ctx->size * sizeof(void *)); + memset(ctx->workers, 0, ctx->n_workers * sizeof(pthread_t)); + + for (int i = 0; i < ctx->n_workers; i++) { + int ret; + if ((ret = pthread_create(&ctx->workers[i], NULL, mpmc_worker, ctx)) != 0) { + mpmc_destroy(ctx); + errno = ret; + return -1; + } + } + + return 0; +} + +int mpmc_queue(mpmc_t *ctx, void *obj) { + // wait for buffer to be emptied + try_again_1: + if (sem_wait(&ctx->free) != 0) { + if (errno == EINTR) { + goto try_again_1; + } else { + return -1; + } + } + + // lock rd field + try_again_2: + if (sem_wait(&ctx->lck_rd) != 0) { + if (errno == EINTR) { + goto try_again_2; + } else { + sem_post(&ctx->free); + return -1; + } + } + + int p = ctx->rd; + ctx->rd = (ctx->rd + 1) % ctx->size; + + // unlock rd field + sem_post(&ctx->lck_rd); + + // fill buffer with object + ctx->buffer[p] = obj; + + // inform worker + sem_post(&ctx->used); + + return 0; +} + +static void *mpmc_worker(void *arg) { + mpmc_t *ctx = arg; + while (ctx->alive) { + // wait for buffer to be filled + if (sem_wait(&ctx->used) != 0) { + if (errno == EINTR) { + continue; + } else { + critical("Unable to lock semaphore"); + break; + } + } + + // lock wr field + if (sem_wait(&ctx->lck_wr) != 0) { + if (errno == EINTR) { + sem_post(&ctx->used); + continue; + } else { + critical("Unable to lock semaphore"); + sem_post(&ctx->used); + break; + } + } + + int p = ctx->wr; + ctx->wr = (ctx->wr + 1) % ctx->size; + + // unlock wr field + sem_post(&ctx->lck_wr); + + // consume object + ctx->consumer(ctx->buffer[p]); + logger_set_prefix(""); + + // unlock slot in buffer + sem_post(&ctx->free); + } + + return NULL; +} + +void mpmc_stop(mpmc_t *ctx) { + ctx->alive = 0; +} + +void mpmc_destroy(mpmc_t *ctx) { + int e = errno; + + // stop threads, if running + mpmc_stop(ctx); + for (int i = 0; i < ctx->n_workers; i++) { + if (ctx->workers[i] == 0) break; + // FIXME + pthread_kill(ctx->workers[i], SIGUSR1); + //pthread_join(ctx->workers[i], NULL); + pthread_cancel(ctx->workers[i]); + } + + sem_destroy(&ctx->free); + sem_destroy(&ctx->used); + sem_destroy(&ctx->lck_rd); + sem_destroy(&ctx->lck_wr); + free(ctx->buffer); + free(ctx->workers); + + // reset errno + errno = e; +} + diff --git a/src/lib/mpmc.h b/src/lib/mpmc.h new file mode 100644 index 0000000..4d62cb8 --- /dev/null +++ b/src/lib/mpmc.h @@ -0,0 +1,26 @@ + +#ifndef SESIMOS_MPMC_H +#define SESIMOS_MPMC_H + +#include + +typedef struct { + unsigned char alive; + int n_workers; + int rd, wr; + sem_t free, used, lck_rd, lck_wr; + int size, max_size; + void **buffer; + pthread_t *workers; + void (*consumer)(void *obj); +} mpmc_t; + +int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix); + +int mpmc_queue(mpmc_t *ctx, void *obj); + +void mpmc_stop(mpmc_t *ctx); + +void mpmc_destroy(mpmc_t *ctx); + +#endif //SESIMOS_MPMC_H diff --git a/src/server.c b/src/server.c index 8af8457..79bbd94 100644 --- a/src/server.c +++ b/src/server.c @@ -11,11 +11,14 @@ #include "client.h" #include "logger.h" #include "async.h" +#include "worker/tcp_acceptor.h" #include "cache_handler.h" #include "lib/config.h" #include "lib/proxy.h" #include "lib/geoip.h" +#include "worker/tcp_closer.h" +#include "worker/request_handler.h" #include #include @@ -33,11 +36,10 @@ #include -volatile sig_atomic_t alive = 1; +volatile sig_atomic_t server_alive = 1; const char *config_file; static int sockets[NUM_SOCKETS]; -static pthread_t children[MAX_CHILDREN]; static SSL_CTX *contexts[CONFIG_MAX_CERT_CONFIG]; static client_ctx_t clients[MAX_CLIENTS]; // TODO dynamic @@ -62,10 +64,11 @@ static void accept_cb(void *arg) { int fd = sockets[i]; int j; - for (j = 0; j < MAX_CHILDREN; j++) { - if (children[j] == 0) break; + for (j = 0; j < MAX_CLIENTS; j++) { + if (clients[j].in_use == 0) break; } client_ctx_t *client_ctx = &clients[j]; + client_ctx->in_use = 1; sock *client = &client_ctx->socket; client->ctx = contexts[0]; @@ -78,11 +81,8 @@ static void accept_cb(void *arg) { client->socket = client_fd; client->enc = (i == 1); - pthread_t ret = pthread_create(&children[j], NULL, (void *(*)(void *)) &client_handler, client); - if (ret != 0) { - errno = (int) ret; - critical("Unable to create thread"); - } + + tcp_accept(client_ctx); } static void accept_err_cb(void *arg) { @@ -95,18 +95,9 @@ static void terminate_forcefully(int sig) { fprintf(stderr, "\n"); notice("Terminating forcefully!"); - int ret; - for (int i = 0; i < MAX_CHILDREN; i++) { - if (children[i] != 0) { - if ((ret = pthread_kill(children[i], SIGKILL)) < 0) { - errno = ret; - error("Unable to wait for child process (PID %i)", children[i]); - errno = 0; - } - } - } - geoip_free(); + + notice("Goodbye"); exit(2); } @@ -114,38 +105,37 @@ static void terminate_gracefully(int sig) { fprintf(stderr, "\n"); notice("Terminating gracefully..."); - alive = 0; + server_alive = 0; signal(SIGINT, terminate_forcefully); signal(SIGTERM, terminate_forcefully); + tcp_acceptor_stop(); + request_handler_stop(); + tcp_closer_stop(); + + tcp_acceptor_destroy(); + request_handler_destroy(); + tcp_closer_destroy(); + + for (int i = 0; i < NUM_SOCKETS; i++) { shutdown(sockets[i], SHUT_RDWR); close(sockets[i]); } - int ret; - for (int i = 0; i < MAX_CHILDREN; i++) { - if (children[i] != 0) { - ret = pthread_kill(children[i], SIGKILL); - if (ret < 0) { - critical("Unable to wait for child process (PID %i)", children[i]); - } else if (ret == children[i]) { - children[i] = 0; - } - } - } - - info("Goodbye"); + notice("Goodbye"); geoip_free(); exit(0); } +static void nothing(int sig) {} + int main(int argc, char *const argv[]) { const int YES = 1; int ret; memset(sockets, 0, sizeof(sockets)); - memset(children, 0, sizeof(children)); + memset(clients, 0, sizeof(clients)); const struct sockaddr_in6 addresses[2] = { {.sin6_family = AF_INET6, .sin6_addr = IN6ADDR_ANY_INIT, .sin6_port = htons(80)}, @@ -221,6 +211,7 @@ int main(int argc, char *const argv[]) { signal(SIGINT, terminate_gracefully); signal(SIGTERM, terminate_gracefully); + signal(SIGUSR1, nothing); if ((ret = geoip_init(config.geoip_dir)) != 0) { if (ret == -1) { @@ -271,11 +262,14 @@ int main(int argc, char *const argv[]) { } } + tcp_acceptor_init(CNX_HANDLER_WORKERS, 64); + tcp_closer_init(CNX_HANDLER_WORKERS, 64); + request_handler_init(REQ_HANDLER_WORKERS, 64); + for (int i = 0; i < NUM_SOCKETS; i++) { async(sockets[i], POLLIN, ASYNC_KEEP, accept_cb, &sockets[i], accept_err_cb, &sockets[i]); } - errno = 0; notice("Ready to accept connections"); async_thread(); diff --git a/src/server.h b/src/server.h index 79d491b..dc773c3 100644 --- a/src/server.h +++ b/src/server.h @@ -14,7 +14,6 @@ #include #define NUM_SOCKETS 2 -#define MAX_CHILDREN 64 #define LISTEN_BACKLOG 16 #define REQ_PER_CONNECTION 200 #define CLIENT_TIMEOUT 3600 @@ -25,6 +24,6 @@ #define CNX_HANDLER_WORKERS 8 #define REQ_HANDLER_WORKERS 16 -extern volatile sig_atomic_t alive; +extern volatile sig_atomic_t server_alive; #endif //SESIMOS_SERVER_H diff --git a/src/worker/request_handler.c b/src/worker/request_handler.c new file mode 100644 index 0000000..684324e --- /dev/null +++ b/src/worker/request_handler.c @@ -0,0 +1,31 @@ + +#include "request_handler.h" +#include "../logger.h" +#include "../lib/mpmc.h" +#include "../lib/utils.h" +#include "tcp_closer.h" + +static mpmc_t mpmc_ctx; + +static void request_handler_func(client_ctx_t *ctx); + +int request_handler_init(int n_workers, int buf_size) { + return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) request_handler_func, "req"); +} + +int handle_request(client_ctx_t *ctx) { + return mpmc_queue(&mpmc_ctx, ctx); +} + +void request_handler_stop(void) { + mpmc_stop(&mpmc_ctx); +} + +void request_handler_destroy(void) { + mpmc_destroy(&mpmc_ctx); +} + +static void request_handler_func(client_ctx_t *ctx) { + // TODO + tcp_close(ctx); +} diff --git a/src/worker/request_handler.h b/src/worker/request_handler.h new file mode 100644 index 0000000..5d096dd --- /dev/null +++ b/src/worker/request_handler.h @@ -0,0 +1,15 @@ + +#ifndef SESIMOS_REQUEST_HANDLER_H +#define SESIMOS_REQUEST_HANDLER_H + +#include "../client.h" + +int request_handler_init(int n_workers, int buf_size); + +int handle_request(client_ctx_t *ctx); + +void request_handler_stop(void); + +void request_handler_destroy(void); + +#endif //SESIMOS_REQUEST_HANDLER_H diff --git a/src/worker/tcp_acceptor.c b/src/worker/tcp_acceptor.c new file mode 100644 index 0000000..8909e8a --- /dev/null +++ b/src/worker/tcp_acceptor.c @@ -0,0 +1,131 @@ + +#include "tcp_acceptor.h" +#include "../logger.h" +#include "../lib/mpmc.h" +#include "../lib/utils.h" +#include "../server.h" +#include "../lib/geoip.h" +#include "../async.h" +#include "tcp_closer.h" +#include "request_handler.h" + +#include +#include +#include +#include + +static mpmc_t mpmc_ctx; + +static void tcp_acceptor_func(client_ctx_t *ctx); + +int tcp_acceptor_init(int n_workers, int buf_size) { + return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp/a"); +} + +int tcp_accept(client_ctx_t *ctx) { + return mpmc_queue(&mpmc_ctx, ctx); +} + +void tcp_acceptor_stop(void) { + mpmc_stop(&mpmc_ctx); +} + +void tcp_acceptor_destroy(void) { + mpmc_destroy(&mpmc_ctx); +} + +static void tcp_acceptor_func(client_ctx_t *ctx) { + struct sockaddr_in6 server_addr; + char log_client_prefix[256]; + + inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr)); + if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) { + ctx->addr = ctx->_c_addr + 7; + } else { + ctx->addr = ctx->_c_addr; + } + + socklen_t len = sizeof(server_addr); + getsockname(ctx->socket.socket, (struct sockaddr *) &server_addr, &len); + inet_ntop(server_addr.sin6_family, (void *) &server_addr.sin6_addr, ctx->_s_addr, sizeof(ctx->_s_addr)); + if (strncmp(ctx->_s_addr, "::ffff:", 7) == 0) { + ctx->s_addr = ctx->_s_addr + 7; + } else { + ctx->s_addr = ctx->_s_addr; + } + + sprintf(log_client_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR, + ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr, + ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR); + + sprintf(ctx->log_prefix, "[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, log_client_prefix); + logger_set_prefix(ctx->log_prefix); + + int ret; + char buf[1024]; + sock *client = &ctx->socket; + + clock_gettime(CLOCK_MONOTONIC, &ctx->begin); + + if (config.dns_server[0] != 0) { + sprintf(buf, "dig @%s +short +time=1 -x %s", config.dns_server, ctx->addr); + FILE *dig = popen(buf, "r"); + if (dig == NULL) { + error("Unable to start dig: %s", strerror(errno)); + goto dig_err; + } + unsigned long read = fread(buf, 1, sizeof(buf), dig); + ret = pclose(dig); + if (ret != 0) { + error("Dig terminated with exit code %i", ret); + goto dig_err; + } + char *ptr = memchr(buf, '\n', read); + if (ptr == buf || ptr == NULL) { + goto dig_err; + } + ptr[-1] = 0; + strncpy(ctx->host, buf, sizeof(ctx->host)); + } else { + dig_err: + ctx->host[0] = 0; + } + + ctx->cc[0] = 0; + geoip_lookup_country(&client->addr.sock, ctx->cc); + + info("Connection accepted from %s %s%s%s[%s]", ctx->addr, ctx->host[0] != 0 ? "(" : "", + ctx->host[0] != 0 ? ctx->host : "", ctx->host[0] != 0 ? ") " : "", + ctx->cc[0] != 0 ? ctx->cc : "N/A"); + + struct timeval client_timeout = {.tv_sec = CLIENT_TIMEOUT, .tv_usec = 0}; + if (setsockopt(client->socket, SOL_SOCKET, SO_RCVTIMEO, &client_timeout, sizeof(client_timeout)) == -1 || + setsockopt(client->socket, SOL_SOCKET, SO_SNDTIMEO, &client_timeout, sizeof(client_timeout)) == -1) + { + error("Unable to set timeout for socket"); + tcp_close(ctx); + return; + } + + if (client->enc) { + client->ssl = SSL_new(client->ctx); + SSL_set_fd(client->ssl, client->socket); + SSL_set_accept_state(client->ssl); + + ret = SSL_accept(client->ssl); + client->_last_ret = ret; + client->_errno = errno; + client->_ssl_error = ERR_get_error(); + if (ret <= 0) { + error("Unable to perform handshake: %s", sock_strerror(client)); + tcp_close(ctx); + return; + } + } + + ctx->req_num = 0; + ctx->s_keep_alive = 1; + ctx->c_keep_alive = 1; + + async(ctx->socket.socket, POLLIN, 0, (void (*)(void *)) handle_request, ctx, (void (*)(void *)) tcp_close, ctx); +} diff --git a/src/worker/tcp_acceptor.h b/src/worker/tcp_acceptor.h new file mode 100644 index 0000000..cdfd341 --- /dev/null +++ b/src/worker/tcp_acceptor.h @@ -0,0 +1,15 @@ + +#ifndef SESIMOS_TCP_ACCEPTOR_H +#define SESIMOS_TCP_ACCEPTOR_H + +#include "../client.h" + +int tcp_acceptor_init(int n_workers, int buf_size); + +int tcp_accept(client_ctx_t *ctx); + +void tcp_acceptor_stop(void); + +void tcp_acceptor_destroy(void); + +#endif //SESIMOS_TCP_ACCEPTOR_H diff --git a/src/worker/tcp_closer.c b/src/worker/tcp_closer.c new file mode 100644 index 0000000..c775c22 --- /dev/null +++ b/src/worker/tcp_closer.c @@ -0,0 +1,40 @@ + +#include "tcp_closer.h" +#include "../logger.h" +#include "../lib/mpmc.h" +#include "../lib/utils.h" + +#include + +static mpmc_t mpmc_ctx; + +static void tcp_closer_func(client_ctx_t *ctx); + +int tcp_closer_init(int n_workers, int buf_size) { + return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp/c"); +} + +int tcp_close(client_ctx_t *ctx) { + return mpmc_queue(&mpmc_ctx, ctx); +} + +void tcp_closer_stop(void) { + mpmc_stop(&mpmc_ctx); +} + +void tcp_closer_destroy(void) { + mpmc_destroy(&mpmc_ctx); +} + +static void tcp_closer_func(client_ctx_t *ctx) { + logger_set_prefix(ctx->log_prefix); + + sock_close(&ctx->socket); + + char buf[32]; + clock_gettime(CLOCK_MONOTONIC, &ctx->end); + unsigned long micros = (ctx->end.tv_nsec - ctx->begin.tv_nsec) / 1000 + (ctx->end.tv_sec - ctx->begin.tv_sec) * 1000000; + info("Connection closed (%s)", format_duration(micros, buf)); + + memset(ctx, 0, sizeof(*ctx)); +} diff --git a/src/worker/tcp_closer.h b/src/worker/tcp_closer.h new file mode 100644 index 0000000..380e7f5 --- /dev/null +++ b/src/worker/tcp_closer.h @@ -0,0 +1,15 @@ + +#ifndef SESIMOS_TCP_CLOSER_H +#define SESIMOS_TCP_CLOSER_H + +#include "../client.h" + +int tcp_closer_init(int n_workers, int buf_size); + +int tcp_close(client_ctx_t *ctx); + +void tcp_closer_stop(void); + +void tcp_closer_destroy(void); + +#endif //SESIMOS_TCP_CLOSER_H