Compare commits
3 Commits
cf3cff0746
...
f92c26c350
Author | SHA1 | Date | |
---|---|---|---|
f92c26c350
|
|||
f9b3cc29ab
|
|||
5c72a0cb60
|
25
src/async.c
25
src/async.c
@@ -37,11 +37,11 @@ static pthread_t thread = -1;
|
||||
static int async_add_to_queue(evt_listen_t *evt) {
|
||||
// TODO locking
|
||||
memcpy(&listen->q[listen->n++], evt, sizeof(*evt));
|
||||
if (thread != -1) pthread_kill(thread, SIGUSR1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg) {
|
||||
struct pollfd fds[1] = {{.fd = fd, .events = events}};
|
||||
evt_listen_t evt = {
|
||||
.fd = fd,
|
||||
.events = events,
|
||||
@@ -51,7 +51,28 @@ int async(int fd, short events, int flags, void cb(void *), void *arg, void err_
|
||||
.err_cb = err_cb,
|
||||
.err_arg = err_arg,
|
||||
};
|
||||
return async_add_to_queue(&evt);
|
||||
|
||||
// check, if fd is already ready
|
||||
if (poll(fds, 1, 0) == 1) {
|
||||
// fd already read
|
||||
if (fds[0].revents & events) {
|
||||
// specified event(s) occurred
|
||||
cb(arg);
|
||||
|
||||
if (!(flags & ASYNC_KEEP))
|
||||
return 0;
|
||||
} else if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
|
||||
// error occurred
|
||||
err_cb(err_arg);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int ret = async_add_to_queue(&evt);
|
||||
if (ret == 0 && thread != -1)
|
||||
pthread_kill(thread, SIGUSR1);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void async_thread(void) {
|
||||
|
10
src/client.c
10
src/client.c
@@ -52,7 +52,8 @@ void client_terminate(int _) {
|
||||
}
|
||||
*/
|
||||
|
||||
int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long client_num, unsigned int req_num, const char *restrict log_client_prefix) {
|
||||
int client_request_handler(client_ctx_t *cctx) {
|
||||
sock *client = &cctx->socket;
|
||||
struct timespec begin, end;
|
||||
long ret;
|
||||
|
||||
@@ -140,8 +141,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
|
||||
strcpy(host, host_ptr);
|
||||
}
|
||||
|
||||
sprintf(log_req_prefix, "[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, log_client_prefix);
|
||||
logger_set_prefix(log_req_prefix);
|
||||
logger_set_prefix("[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, cctx->log_prefix);
|
||||
info(BLD_STR "%s %s", req.method, req.uri);
|
||||
|
||||
conf = get_host_config(host);
|
||||
@@ -378,7 +378,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
|
||||
http_add_header_field(&res.hdr, "Last-Modified", last_modified);
|
||||
|
||||
res.status = http_get_status(200);
|
||||
if (fastcgi_init(&fcgi_conn, mode, client_num, req_num, client, &req, &uri) != 0) {
|
||||
if (fastcgi_init(&fcgi_conn, mode, 0 /* TODO */, cctx->req_num, client, &req, &uri) != 0) {
|
||||
res.status = http_get_status(503);
|
||||
sprintf(err_msg, "Unable to communicate with FastCGI socket.");
|
||||
goto respond;
|
||||
@@ -783,7 +783,7 @@ int client_connection_handler(client_ctx_t *ctx, unsigned long client_num, const
|
||||
ctx->s_keep_alive = 1;
|
||||
ctx->c_keep_alive = 1;
|
||||
while (ctx->c_keep_alive && ctx->s_keep_alive && req_num < REQ_PER_CONNECTION) {
|
||||
ret = client_request_handler(ctx, client, client_num, req_num++, log_client_prefix);
|
||||
ret = client_request_handler(ctx);
|
||||
logger_set_prefix(log_conn_prefix);
|
||||
}
|
||||
|
||||
|
@@ -27,6 +27,6 @@ typedef struct {
|
||||
|
||||
host_config_t *get_host_config(const char *host);
|
||||
|
||||
void *client_handler(client_ctx_t *client);
|
||||
int client_request_handler(client_ctx_t *cctx);
|
||||
|
||||
#endif //SESIMOS_CLIENT_H
|
||||
|
@@ -8,15 +8,21 @@
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
|
||||
typedef struct {
|
||||
mpmc_t *ctx;
|
||||
int worker_id;
|
||||
} mpmc_arg_t;
|
||||
|
||||
static void *mpmc_worker(void *arg);
|
||||
|
||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) {
|
||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) {
|
||||
ctx->alive = 1;
|
||||
ctx->n_workers = n_workers;
|
||||
ctx->size = buf_size, ctx->max_size = buf_size;
|
||||
ctx->rd = 0, ctx->wr = 0;
|
||||
ctx->buffer = NULL, ctx->workers = NULL;
|
||||
ctx->consumer = consumer;
|
||||
ctx->name = name;
|
||||
|
||||
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
|
||||
sem_init(&ctx->used, 0, 0) != 0 ||
|
||||
@@ -61,9 +67,9 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
||||
}
|
||||
}
|
||||
|
||||
// lock rd field
|
||||
// lock wr field
|
||||
try_again_2:
|
||||
if (sem_wait(&ctx->lck_rd) != 0) {
|
||||
if (sem_wait(&ctx->lck_wr) != 0) {
|
||||
if (errno == EINTR) {
|
||||
errno = 0;
|
||||
goto try_again_2;
|
||||
@@ -73,11 +79,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
||||
}
|
||||
}
|
||||
|
||||
int p = ctx->rd;
|
||||
ctx->rd = (ctx->rd + 1) % ctx->size;
|
||||
int p = ctx->wr;
|
||||
ctx->wr = (ctx->wr + 1) % ctx->size;
|
||||
|
||||
// unlock rd field
|
||||
sem_post(&ctx->lck_rd);
|
||||
// unlock wr field
|
||||
sem_post(&ctx->lck_wr);
|
||||
|
||||
// fill buffer with object
|
||||
ctx->buffer[p] = obj;
|
||||
@@ -90,6 +96,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
||||
|
||||
static void *mpmc_worker(void *arg) {
|
||||
mpmc_t *ctx = arg;
|
||||
|
||||
int id;
|
||||
for (id = 0; id < ctx->n_workers && ctx->workers[id] != pthread_self(); id++);
|
||||
logger_set_name("%s/%i", ctx->name, id);
|
||||
|
||||
while (ctx->alive) {
|
||||
// wait for buffer to be filled
|
||||
if (sem_wait(&ctx->used) != 0) {
|
||||
@@ -103,8 +114,8 @@ static void *mpmc_worker(void *arg) {
|
||||
}
|
||||
}
|
||||
|
||||
// lock wr field
|
||||
if (sem_wait(&ctx->lck_wr) != 0) {
|
||||
// lock rd field
|
||||
if (sem_wait(&ctx->lck_rd) != 0) {
|
||||
if (errno == EINTR) {
|
||||
errno = 0;
|
||||
sem_post(&ctx->used);
|
||||
@@ -117,11 +128,11 @@ static void *mpmc_worker(void *arg) {
|
||||
}
|
||||
}
|
||||
|
||||
int p = ctx->wr;
|
||||
ctx->wr = (ctx->wr + 1) % ctx->size;
|
||||
int p = ctx->rd;
|
||||
ctx->rd = (ctx->rd + 1) % ctx->size;
|
||||
|
||||
// unlock wr field
|
||||
sem_post(&ctx->lck_wr);
|
||||
// unlock rd field
|
||||
sem_post(&ctx->lck_rd);
|
||||
|
||||
// consume object
|
||||
ctx->consumer(ctx->buffer[p]);
|
||||
|
@@ -13,9 +13,10 @@ typedef struct {
|
||||
void **buffer;
|
||||
pthread_t *workers;
|
||||
void (*consumer)(void *obj);
|
||||
const char* name;
|
||||
} mpmc_t;
|
||||
|
||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix);
|
||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name);
|
||||
|
||||
int mpmc_queue(mpmc_t *ctx, void *obj);
|
||||
|
||||
|
@@ -130,6 +130,7 @@ int sock_close(sock *s) {
|
||||
close(s->socket);
|
||||
s->socket = 0;
|
||||
s->enc = 0;
|
||||
errno = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
27
src/logger.c
27
src/logger.c
@@ -162,10 +162,13 @@ static int logger_remaining(void) {
|
||||
return val;
|
||||
}
|
||||
|
||||
void logger_set_name(const char *restrict name) {
|
||||
void logger_set_name(const char *restrict format, ...) {
|
||||
va_list args;
|
||||
|
||||
if (key_name == -1) {
|
||||
// not initialized
|
||||
strncpy(global_name, name, sizeof(global_name));
|
||||
va_start(args, format);
|
||||
vsnprintf(global_name, sizeof(global_name), format, args);
|
||||
} else {
|
||||
int ret;
|
||||
void *ptr = pthread_getspecific(key_name);
|
||||
@@ -177,14 +180,22 @@ void logger_set_name(const char *restrict name) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
strncpy(ptr, name, LOG_NAME_LEN);
|
||||
|
||||
va_start(args, format);
|
||||
vsnprintf(ptr, LOG_NAME_LEN, format, args);
|
||||
}
|
||||
|
||||
// cleanup
|
||||
va_end(args);
|
||||
}
|
||||
|
||||
void logger_set_prefix(const char *restrict prefix) {
|
||||
void logger_set_prefix(const char *restrict format, ...) {
|
||||
va_list args;
|
||||
|
||||
if (key_prefix == -1) {
|
||||
// not initialized
|
||||
strncpy(global_prefix, prefix, sizeof(global_prefix));
|
||||
va_start(args, format);
|
||||
vsnprintf(global_prefix, sizeof(global_prefix), format, args);
|
||||
} else {
|
||||
int ret;
|
||||
void *ptr = pthread_getspecific(key_prefix);
|
||||
@@ -196,8 +207,12 @@ void logger_set_prefix(const char *restrict prefix) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
strncpy(ptr, prefix, LOG_PREFIX_LEN);
|
||||
va_start(args, format);
|
||||
vsnprintf(ptr, LOG_PREFIX_LEN, format, args);
|
||||
}
|
||||
|
||||
// cleanup
|
||||
va_end(args);
|
||||
}
|
||||
|
||||
static void *logger_thread(void *arg) {
|
||||
|
@@ -29,9 +29,9 @@ typedef unsigned char log_lvl_t;
|
||||
|
||||
void logmsgf(log_lvl_t level, const char *restrict format, ...);
|
||||
|
||||
void logger_set_name(const char *restrict name);
|
||||
void logger_set_name(const char *restrict format, ...);
|
||||
|
||||
void logger_set_prefix(const char *restrict prefix);
|
||||
void logger_set_prefix(const char *restrict format, ...);
|
||||
|
||||
int logger_init(void);
|
||||
|
||||
|
@@ -4,6 +4,8 @@
|
||||
#include "../lib/mpmc.h"
|
||||
#include "../lib/utils.h"
|
||||
#include "tcp_closer.h"
|
||||
#include "../async.h"
|
||||
#include "../server.h"
|
||||
|
||||
static mpmc_t mpmc_ctx;
|
||||
|
||||
@@ -26,6 +28,12 @@ void request_handler_destroy(void) {
|
||||
}
|
||||
|
||||
static void request_handler_func(client_ctx_t *ctx) {
|
||||
// TODO
|
||||
tcp_close(ctx);
|
||||
client_request_handler(ctx);
|
||||
|
||||
if (ctx->c_keep_alive && ctx->s_keep_alive && ctx->req_num < REQ_PER_CONNECTION) {
|
||||
async(ctx->socket.socket, POLLIN, 0, (void (*)(void *)) handle_request, ctx, (void (*)(void *)) tcp_close, ctx);
|
||||
logger_set_prefix(ctx->log_prefix);
|
||||
} else {
|
||||
tcp_close(ctx);
|
||||
}
|
||||
}
|
||||
|
@@ -19,7 +19,7 @@ static mpmc_t mpmc_ctx;
|
||||
static void tcp_acceptor_func(client_ctx_t *ctx);
|
||||
|
||||
int tcp_acceptor_init(int n_workers, int buf_size) {
|
||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp/a");
|
||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp_a");
|
||||
}
|
||||
|
||||
int tcp_accept(client_ctx_t *ctx) {
|
||||
@@ -36,7 +36,6 @@ void tcp_acceptor_destroy(void) {
|
||||
|
||||
static void tcp_acceptor_func(client_ctx_t *ctx) {
|
||||
struct sockaddr_in6 server_addr;
|
||||
char log_client_prefix[256];
|
||||
|
||||
inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr));
|
||||
if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) {
|
||||
@@ -54,12 +53,11 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
|
||||
ctx->s_addr = ctx->_s_addr;
|
||||
}
|
||||
|
||||
sprintf(log_client_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
|
||||
sprintf(ctx->log_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
|
||||
ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr,
|
||||
ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR);
|
||||
|
||||
sprintf(ctx->log_prefix, "[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, log_client_prefix);
|
||||
logger_set_prefix(ctx->log_prefix);
|
||||
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
|
||||
|
||||
int ret;
|
||||
char buf[1024];
|
||||
@@ -117,7 +115,7 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
|
||||
client->_errno = errno;
|
||||
client->_ssl_error = ERR_get_error();
|
||||
if (ret <= 0) {
|
||||
error("Unable to perform handshake: %s", sock_strerror(client));
|
||||
info("Unable to perform handshake: %s", sock_strerror(client));
|
||||
tcp_close(ctx);
|
||||
return;
|
||||
}
|
||||
|
@@ -11,7 +11,7 @@ static mpmc_t mpmc_ctx;
|
||||
static void tcp_closer_func(client_ctx_t *ctx);
|
||||
|
||||
int tcp_closer_init(int n_workers, int buf_size) {
|
||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp/c");
|
||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp_c");
|
||||
}
|
||||
|
||||
int tcp_close(client_ctx_t *ctx) {
|
||||
@@ -27,7 +27,7 @@ void tcp_closer_destroy(void) {
|
||||
}
|
||||
|
||||
static void tcp_closer_func(client_ctx_t *ctx) {
|
||||
logger_set_prefix(ctx->log_prefix);
|
||||
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
|
||||
|
||||
sock_close(&ctx->socket);
|
||||
|
||||
|
Reference in New Issue
Block a user