3 Commits

Author SHA1 Message Date
f92c26c350 async - check if fd is already ready 2022-12-29 11:31:52 +01:00
f9b3cc29ab Update logger to use format 2022-12-29 11:09:45 +01:00
5c72a0cb60 Request handler 2022-12-29 10:27:54 +01:00
11 changed files with 95 additions and 40 deletions

View File

@@ -37,11 +37,11 @@ static pthread_t thread = -1;
static int async_add_to_queue(evt_listen_t *evt) {
// TODO locking
memcpy(&listen->q[listen->n++], evt, sizeof(*evt));
if (thread != -1) pthread_kill(thread, SIGUSR1);
return 0;
}
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg) {
struct pollfd fds[1] = {{.fd = fd, .events = events}};
evt_listen_t evt = {
.fd = fd,
.events = events,
@@ -51,7 +51,28 @@ int async(int fd, short events, int flags, void cb(void *), void *arg, void err_
.err_cb = err_cb,
.err_arg = err_arg,
};
return async_add_to_queue(&evt);
// check, if fd is already ready
if (poll(fds, 1, 0) == 1) {
// fd already read
if (fds[0].revents & events) {
// specified event(s) occurred
cb(arg);
if (!(flags & ASYNC_KEEP))
return 0;
} else if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
// error occurred
err_cb(err_arg);
return 0;
}
}
int ret = async_add_to_queue(&evt);
if (ret == 0 && thread != -1)
pthread_kill(thread, SIGUSR1);
return ret;
}
void async_thread(void) {

View File

@@ -52,7 +52,8 @@ void client_terminate(int _) {
}
*/
int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long client_num, unsigned int req_num, const char *restrict log_client_prefix) {
int client_request_handler(client_ctx_t *cctx) {
sock *client = &cctx->socket;
struct timespec begin, end;
long ret;
@@ -140,8 +141,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
strcpy(host, host_ptr);
}
sprintf(log_req_prefix, "[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, log_client_prefix);
logger_set_prefix(log_req_prefix);
logger_set_prefix("[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, cctx->log_prefix);
info(BLD_STR "%s %s", req.method, req.uri);
conf = get_host_config(host);
@@ -378,7 +378,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
http_add_header_field(&res.hdr, "Last-Modified", last_modified);
res.status = http_get_status(200);
if (fastcgi_init(&fcgi_conn, mode, client_num, req_num, client, &req, &uri) != 0) {
if (fastcgi_init(&fcgi_conn, mode, 0 /* TODO */, cctx->req_num, client, &req, &uri) != 0) {
res.status = http_get_status(503);
sprintf(err_msg, "Unable to communicate with FastCGI socket.");
goto respond;
@@ -783,7 +783,7 @@ int client_connection_handler(client_ctx_t *ctx, unsigned long client_num, const
ctx->s_keep_alive = 1;
ctx->c_keep_alive = 1;
while (ctx->c_keep_alive && ctx->s_keep_alive && req_num < REQ_PER_CONNECTION) {
ret = client_request_handler(ctx, client, client_num, req_num++, log_client_prefix);
ret = client_request_handler(ctx);
logger_set_prefix(log_conn_prefix);
}

View File

@@ -27,6 +27,6 @@ typedef struct {
host_config_t *get_host_config(const char *host);
void *client_handler(client_ctx_t *client);
int client_request_handler(client_ctx_t *cctx);
#endif //SESIMOS_CLIENT_H

View File

@@ -8,15 +8,21 @@
#include <pthread.h>
#include <signal.h>
typedef struct {
mpmc_t *ctx;
int worker_id;
} mpmc_arg_t;
static void *mpmc_worker(void *arg);
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) {
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) {
ctx->alive = 1;
ctx->n_workers = n_workers;
ctx->size = buf_size, ctx->max_size = buf_size;
ctx->rd = 0, ctx->wr = 0;
ctx->buffer = NULL, ctx->workers = NULL;
ctx->consumer = consumer;
ctx->name = name;
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
sem_init(&ctx->used, 0, 0) != 0 ||
@@ -61,9 +67,9 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
}
}
// lock rd field
// lock wr field
try_again_2:
if (sem_wait(&ctx->lck_rd) != 0) {
if (sem_wait(&ctx->lck_wr) != 0) {
if (errno == EINTR) {
errno = 0;
goto try_again_2;
@@ -73,11 +79,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
}
}
int p = ctx->rd;
ctx->rd = (ctx->rd + 1) % ctx->size;
int p = ctx->wr;
ctx->wr = (ctx->wr + 1) % ctx->size;
// unlock rd field
sem_post(&ctx->lck_rd);
// unlock wr field
sem_post(&ctx->lck_wr);
// fill buffer with object
ctx->buffer[p] = obj;
@@ -90,6 +96,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
static void *mpmc_worker(void *arg) {
mpmc_t *ctx = arg;
int id;
for (id = 0; id < ctx->n_workers && ctx->workers[id] != pthread_self(); id++);
logger_set_name("%s/%i", ctx->name, id);
while (ctx->alive) {
// wait for buffer to be filled
if (sem_wait(&ctx->used) != 0) {
@@ -103,8 +114,8 @@ static void *mpmc_worker(void *arg) {
}
}
// lock wr field
if (sem_wait(&ctx->lck_wr) != 0) {
// lock rd field
if (sem_wait(&ctx->lck_rd) != 0) {
if (errno == EINTR) {
errno = 0;
sem_post(&ctx->used);
@@ -117,11 +128,11 @@ static void *mpmc_worker(void *arg) {
}
}
int p = ctx->wr;
ctx->wr = (ctx->wr + 1) % ctx->size;
int p = ctx->rd;
ctx->rd = (ctx->rd + 1) % ctx->size;
// unlock wr field
sem_post(&ctx->lck_wr);
// unlock rd field
sem_post(&ctx->lck_rd);
// consume object
ctx->consumer(ctx->buffer[p]);

View File

@@ -13,9 +13,10 @@ typedef struct {
void **buffer;
pthread_t *workers;
void (*consumer)(void *obj);
const char* name;
} mpmc_t;
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix);
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name);
int mpmc_queue(mpmc_t *ctx, void *obj);

View File

@@ -130,6 +130,7 @@ int sock_close(sock *s) {
close(s->socket);
s->socket = 0;
s->enc = 0;
errno = 0;
return 0;
}

View File

@@ -162,10 +162,13 @@ static int logger_remaining(void) {
return val;
}
void logger_set_name(const char *restrict name) {
void logger_set_name(const char *restrict format, ...) {
va_list args;
if (key_name == -1) {
// not initialized
strncpy(global_name, name, sizeof(global_name));
va_start(args, format);
vsnprintf(global_name, sizeof(global_name), format, args);
} else {
int ret;
void *ptr = pthread_getspecific(key_name);
@@ -177,14 +180,22 @@ void logger_set_name(const char *restrict name) {
return;
}
}
strncpy(ptr, name, LOG_NAME_LEN);
va_start(args, format);
vsnprintf(ptr, LOG_NAME_LEN, format, args);
}
// cleanup
va_end(args);
}
void logger_set_prefix(const char *restrict prefix) {
void logger_set_prefix(const char *restrict format, ...) {
va_list args;
if (key_prefix == -1) {
// not initialized
strncpy(global_prefix, prefix, sizeof(global_prefix));
va_start(args, format);
vsnprintf(global_prefix, sizeof(global_prefix), format, args);
} else {
int ret;
void *ptr = pthread_getspecific(key_prefix);
@@ -196,8 +207,12 @@ void logger_set_prefix(const char *restrict prefix) {
return;
}
}
strncpy(ptr, prefix, LOG_PREFIX_LEN);
va_start(args, format);
vsnprintf(ptr, LOG_PREFIX_LEN, format, args);
}
// cleanup
va_end(args);
}
static void *logger_thread(void *arg) {

View File

@@ -29,9 +29,9 @@ typedef unsigned char log_lvl_t;
void logmsgf(log_lvl_t level, const char *restrict format, ...);
void logger_set_name(const char *restrict name);
void logger_set_name(const char *restrict format, ...);
void logger_set_prefix(const char *restrict prefix);
void logger_set_prefix(const char *restrict format, ...);
int logger_init(void);

View File

@@ -4,6 +4,8 @@
#include "../lib/mpmc.h"
#include "../lib/utils.h"
#include "tcp_closer.h"
#include "../async.h"
#include "../server.h"
static mpmc_t mpmc_ctx;
@@ -26,6 +28,12 @@ void request_handler_destroy(void) {
}
static void request_handler_func(client_ctx_t *ctx) {
// TODO
client_request_handler(ctx);
if (ctx->c_keep_alive && ctx->s_keep_alive && ctx->req_num < REQ_PER_CONNECTION) {
async(ctx->socket.socket, POLLIN, 0, (void (*)(void *)) handle_request, ctx, (void (*)(void *)) tcp_close, ctx);
logger_set_prefix(ctx->log_prefix);
} else {
tcp_close(ctx);
}
}

View File

@@ -19,7 +19,7 @@ static mpmc_t mpmc_ctx;
static void tcp_acceptor_func(client_ctx_t *ctx);
int tcp_acceptor_init(int n_workers, int buf_size) {
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp/a");
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp_a");
}
int tcp_accept(client_ctx_t *ctx) {
@@ -36,7 +36,6 @@ void tcp_acceptor_destroy(void) {
static void tcp_acceptor_func(client_ctx_t *ctx) {
struct sockaddr_in6 server_addr;
char log_client_prefix[256];
inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr));
if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) {
@@ -54,12 +53,11 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
ctx->s_addr = ctx->_s_addr;
}
sprintf(log_client_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
sprintf(ctx->log_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr,
ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR);
sprintf(ctx->log_prefix, "[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, log_client_prefix);
logger_set_prefix(ctx->log_prefix);
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
int ret;
char buf[1024];
@@ -117,7 +115,7 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
client->_errno = errno;
client->_ssl_error = ERR_get_error();
if (ret <= 0) {
error("Unable to perform handshake: %s", sock_strerror(client));
info("Unable to perform handshake: %s", sock_strerror(client));
tcp_close(ctx);
return;
}

View File

@@ -11,7 +11,7 @@ static mpmc_t mpmc_ctx;
static void tcp_closer_func(client_ctx_t *ctx);
int tcp_closer_init(int n_workers, int buf_size) {
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp/c");
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp_c");
}
int tcp_close(client_ctx_t *ctx) {
@@ -27,7 +27,7 @@ void tcp_closer_destroy(void) {
}
static void tcp_closer_func(client_ctx_t *ctx) {
logger_set_prefix(ctx->log_prefix);
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
sock_close(&ctx->socket);