Compare commits
3 Commits
cf3cff0746
...
f92c26c350
Author | SHA1 | Date | |
---|---|---|---|
f92c26c350
|
|||
f9b3cc29ab
|
|||
5c72a0cb60
|
25
src/async.c
25
src/async.c
@@ -37,11 +37,11 @@ static pthread_t thread = -1;
|
|||||||
static int async_add_to_queue(evt_listen_t *evt) {
|
static int async_add_to_queue(evt_listen_t *evt) {
|
||||||
// TODO locking
|
// TODO locking
|
||||||
memcpy(&listen->q[listen->n++], evt, sizeof(*evt));
|
memcpy(&listen->q[listen->n++], evt, sizeof(*evt));
|
||||||
if (thread != -1) pthread_kill(thread, SIGUSR1);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg) {
|
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg) {
|
||||||
|
struct pollfd fds[1] = {{.fd = fd, .events = events}};
|
||||||
evt_listen_t evt = {
|
evt_listen_t evt = {
|
||||||
.fd = fd,
|
.fd = fd,
|
||||||
.events = events,
|
.events = events,
|
||||||
@@ -51,7 +51,28 @@ int async(int fd, short events, int flags, void cb(void *), void *arg, void err_
|
|||||||
.err_cb = err_cb,
|
.err_cb = err_cb,
|
||||||
.err_arg = err_arg,
|
.err_arg = err_arg,
|
||||||
};
|
};
|
||||||
return async_add_to_queue(&evt);
|
|
||||||
|
// check, if fd is already ready
|
||||||
|
if (poll(fds, 1, 0) == 1) {
|
||||||
|
// fd already read
|
||||||
|
if (fds[0].revents & events) {
|
||||||
|
// specified event(s) occurred
|
||||||
|
cb(arg);
|
||||||
|
|
||||||
|
if (!(flags & ASYNC_KEEP))
|
||||||
|
return 0;
|
||||||
|
} else if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
|
||||||
|
// error occurred
|
||||||
|
err_cb(err_arg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int ret = async_add_to_queue(&evt);
|
||||||
|
if (ret == 0 && thread != -1)
|
||||||
|
pthread_kill(thread, SIGUSR1);
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void async_thread(void) {
|
void async_thread(void) {
|
||||||
|
10
src/client.c
10
src/client.c
@@ -52,7 +52,8 @@ void client_terminate(int _) {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long client_num, unsigned int req_num, const char *restrict log_client_prefix) {
|
int client_request_handler(client_ctx_t *cctx) {
|
||||||
|
sock *client = &cctx->socket;
|
||||||
struct timespec begin, end;
|
struct timespec begin, end;
|
||||||
long ret;
|
long ret;
|
||||||
|
|
||||||
@@ -140,8 +141,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
|
|||||||
strcpy(host, host_ptr);
|
strcpy(host, host_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
sprintf(log_req_prefix, "[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, log_client_prefix);
|
logger_set_prefix("[%s%*s%s]%s", BLD_STR, INET6_ADDRSTRLEN, host, CLR_STR, cctx->log_prefix);
|
||||||
logger_set_prefix(log_req_prefix);
|
|
||||||
info(BLD_STR "%s %s", req.method, req.uri);
|
info(BLD_STR "%s %s", req.method, req.uri);
|
||||||
|
|
||||||
conf = get_host_config(host);
|
conf = get_host_config(host);
|
||||||
@@ -378,7 +378,7 @@ int client_request_handler(client_ctx_t *cctx, sock *client, unsigned long clien
|
|||||||
http_add_header_field(&res.hdr, "Last-Modified", last_modified);
|
http_add_header_field(&res.hdr, "Last-Modified", last_modified);
|
||||||
|
|
||||||
res.status = http_get_status(200);
|
res.status = http_get_status(200);
|
||||||
if (fastcgi_init(&fcgi_conn, mode, client_num, req_num, client, &req, &uri) != 0) {
|
if (fastcgi_init(&fcgi_conn, mode, 0 /* TODO */, cctx->req_num, client, &req, &uri) != 0) {
|
||||||
res.status = http_get_status(503);
|
res.status = http_get_status(503);
|
||||||
sprintf(err_msg, "Unable to communicate with FastCGI socket.");
|
sprintf(err_msg, "Unable to communicate with FastCGI socket.");
|
||||||
goto respond;
|
goto respond;
|
||||||
@@ -783,7 +783,7 @@ int client_connection_handler(client_ctx_t *ctx, unsigned long client_num, const
|
|||||||
ctx->s_keep_alive = 1;
|
ctx->s_keep_alive = 1;
|
||||||
ctx->c_keep_alive = 1;
|
ctx->c_keep_alive = 1;
|
||||||
while (ctx->c_keep_alive && ctx->s_keep_alive && req_num < REQ_PER_CONNECTION) {
|
while (ctx->c_keep_alive && ctx->s_keep_alive && req_num < REQ_PER_CONNECTION) {
|
||||||
ret = client_request_handler(ctx, client, client_num, req_num++, log_client_prefix);
|
ret = client_request_handler(ctx);
|
||||||
logger_set_prefix(log_conn_prefix);
|
logger_set_prefix(log_conn_prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -27,6 +27,6 @@ typedef struct {
|
|||||||
|
|
||||||
host_config_t *get_host_config(const char *host);
|
host_config_t *get_host_config(const char *host);
|
||||||
|
|
||||||
void *client_handler(client_ctx_t *client);
|
int client_request_handler(client_ctx_t *cctx);
|
||||||
|
|
||||||
#endif //SESIMOS_CLIENT_H
|
#endif //SESIMOS_CLIENT_H
|
||||||
|
@@ -8,15 +8,21 @@
|
|||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
mpmc_t *ctx;
|
||||||
|
int worker_id;
|
||||||
|
} mpmc_arg_t;
|
||||||
|
|
||||||
static void *mpmc_worker(void *arg);
|
static void *mpmc_worker(void *arg);
|
||||||
|
|
||||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix) {
|
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name) {
|
||||||
ctx->alive = 1;
|
ctx->alive = 1;
|
||||||
ctx->n_workers = n_workers;
|
ctx->n_workers = n_workers;
|
||||||
ctx->size = buf_size, ctx->max_size = buf_size;
|
ctx->size = buf_size, ctx->max_size = buf_size;
|
||||||
ctx->rd = 0, ctx->wr = 0;
|
ctx->rd = 0, ctx->wr = 0;
|
||||||
ctx->buffer = NULL, ctx->workers = NULL;
|
ctx->buffer = NULL, ctx->workers = NULL;
|
||||||
ctx->consumer = consumer;
|
ctx->consumer = consumer;
|
||||||
|
ctx->name = name;
|
||||||
|
|
||||||
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
|
if (sem_init(&ctx->free, 0, ctx->size) != 0 ||
|
||||||
sem_init(&ctx->used, 0, 0) != 0 ||
|
sem_init(&ctx->used, 0, 0) != 0 ||
|
||||||
@@ -61,9 +67,9 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock rd field
|
// lock wr field
|
||||||
try_again_2:
|
try_again_2:
|
||||||
if (sem_wait(&ctx->lck_rd) != 0) {
|
if (sem_wait(&ctx->lck_wr) != 0) {
|
||||||
if (errno == EINTR) {
|
if (errno == EINTR) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
goto try_again_2;
|
goto try_again_2;
|
||||||
@@ -73,11 +79,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int p = ctx->rd;
|
int p = ctx->wr;
|
||||||
ctx->rd = (ctx->rd + 1) % ctx->size;
|
ctx->wr = (ctx->wr + 1) % ctx->size;
|
||||||
|
|
||||||
// unlock rd field
|
// unlock wr field
|
||||||
sem_post(&ctx->lck_rd);
|
sem_post(&ctx->lck_wr);
|
||||||
|
|
||||||
// fill buffer with object
|
// fill buffer with object
|
||||||
ctx->buffer[p] = obj;
|
ctx->buffer[p] = obj;
|
||||||
@@ -90,6 +96,11 @@ int mpmc_queue(mpmc_t *ctx, void *obj) {
|
|||||||
|
|
||||||
static void *mpmc_worker(void *arg) {
|
static void *mpmc_worker(void *arg) {
|
||||||
mpmc_t *ctx = arg;
|
mpmc_t *ctx = arg;
|
||||||
|
|
||||||
|
int id;
|
||||||
|
for (id = 0; id < ctx->n_workers && ctx->workers[id] != pthread_self(); id++);
|
||||||
|
logger_set_name("%s/%i", ctx->name, id);
|
||||||
|
|
||||||
while (ctx->alive) {
|
while (ctx->alive) {
|
||||||
// wait for buffer to be filled
|
// wait for buffer to be filled
|
||||||
if (sem_wait(&ctx->used) != 0) {
|
if (sem_wait(&ctx->used) != 0) {
|
||||||
@@ -103,8 +114,8 @@ static void *mpmc_worker(void *arg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock wr field
|
// lock rd field
|
||||||
if (sem_wait(&ctx->lck_wr) != 0) {
|
if (sem_wait(&ctx->lck_rd) != 0) {
|
||||||
if (errno == EINTR) {
|
if (errno == EINTR) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
sem_post(&ctx->used);
|
sem_post(&ctx->used);
|
||||||
@@ -117,11 +128,11 @@ static void *mpmc_worker(void *arg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int p = ctx->wr;
|
int p = ctx->rd;
|
||||||
ctx->wr = (ctx->wr + 1) % ctx->size;
|
ctx->rd = (ctx->rd + 1) % ctx->size;
|
||||||
|
|
||||||
// unlock wr field
|
// unlock rd field
|
||||||
sem_post(&ctx->lck_wr);
|
sem_post(&ctx->lck_rd);
|
||||||
|
|
||||||
// consume object
|
// consume object
|
||||||
ctx->consumer(ctx->buffer[p]);
|
ctx->consumer(ctx->buffer[p]);
|
||||||
|
@@ -13,9 +13,10 @@ typedef struct {
|
|||||||
void **buffer;
|
void **buffer;
|
||||||
pthread_t *workers;
|
pthread_t *workers;
|
||||||
void (*consumer)(void *obj);
|
void (*consumer)(void *obj);
|
||||||
|
const char* name;
|
||||||
} mpmc_t;
|
} mpmc_t;
|
||||||
|
|
||||||
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *prefix);
|
int mpmc_init(mpmc_t *ctx, int n_workers, int buf_size, void (*consumer)(void *obj), const char *name);
|
||||||
|
|
||||||
int mpmc_queue(mpmc_t *ctx, void *obj);
|
int mpmc_queue(mpmc_t *ctx, void *obj);
|
||||||
|
|
||||||
|
@@ -130,6 +130,7 @@ int sock_close(sock *s) {
|
|||||||
close(s->socket);
|
close(s->socket);
|
||||||
s->socket = 0;
|
s->socket = 0;
|
||||||
s->enc = 0;
|
s->enc = 0;
|
||||||
|
errno = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
27
src/logger.c
27
src/logger.c
@@ -162,10 +162,13 @@ static int logger_remaining(void) {
|
|||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
|
|
||||||
void logger_set_name(const char *restrict name) {
|
void logger_set_name(const char *restrict format, ...) {
|
||||||
|
va_list args;
|
||||||
|
|
||||||
if (key_name == -1) {
|
if (key_name == -1) {
|
||||||
// not initialized
|
// not initialized
|
||||||
strncpy(global_name, name, sizeof(global_name));
|
va_start(args, format);
|
||||||
|
vsnprintf(global_name, sizeof(global_name), format, args);
|
||||||
} else {
|
} else {
|
||||||
int ret;
|
int ret;
|
||||||
void *ptr = pthread_getspecific(key_name);
|
void *ptr = pthread_getspecific(key_name);
|
||||||
@@ -177,14 +180,22 @@ void logger_set_name(const char *restrict name) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
strncpy(ptr, name, LOG_NAME_LEN);
|
|
||||||
|
va_start(args, format);
|
||||||
|
vsnprintf(ptr, LOG_NAME_LEN, format, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
va_end(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
void logger_set_prefix(const char *restrict prefix) {
|
void logger_set_prefix(const char *restrict format, ...) {
|
||||||
|
va_list args;
|
||||||
|
|
||||||
if (key_prefix == -1) {
|
if (key_prefix == -1) {
|
||||||
// not initialized
|
// not initialized
|
||||||
strncpy(global_prefix, prefix, sizeof(global_prefix));
|
va_start(args, format);
|
||||||
|
vsnprintf(global_prefix, sizeof(global_prefix), format, args);
|
||||||
} else {
|
} else {
|
||||||
int ret;
|
int ret;
|
||||||
void *ptr = pthread_getspecific(key_prefix);
|
void *ptr = pthread_getspecific(key_prefix);
|
||||||
@@ -196,8 +207,12 @@ void logger_set_prefix(const char *restrict prefix) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
strncpy(ptr, prefix, LOG_PREFIX_LEN);
|
va_start(args, format);
|
||||||
|
vsnprintf(ptr, LOG_PREFIX_LEN, format, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
va_end(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *logger_thread(void *arg) {
|
static void *logger_thread(void *arg) {
|
||||||
|
@@ -29,9 +29,9 @@ typedef unsigned char log_lvl_t;
|
|||||||
|
|
||||||
void logmsgf(log_lvl_t level, const char *restrict format, ...);
|
void logmsgf(log_lvl_t level, const char *restrict format, ...);
|
||||||
|
|
||||||
void logger_set_name(const char *restrict name);
|
void logger_set_name(const char *restrict format, ...);
|
||||||
|
|
||||||
void logger_set_prefix(const char *restrict prefix);
|
void logger_set_prefix(const char *restrict format, ...);
|
||||||
|
|
||||||
int logger_init(void);
|
int logger_init(void);
|
||||||
|
|
||||||
|
@@ -4,6 +4,8 @@
|
|||||||
#include "../lib/mpmc.h"
|
#include "../lib/mpmc.h"
|
||||||
#include "../lib/utils.h"
|
#include "../lib/utils.h"
|
||||||
#include "tcp_closer.h"
|
#include "tcp_closer.h"
|
||||||
|
#include "../async.h"
|
||||||
|
#include "../server.h"
|
||||||
|
|
||||||
static mpmc_t mpmc_ctx;
|
static mpmc_t mpmc_ctx;
|
||||||
|
|
||||||
@@ -26,6 +28,12 @@ void request_handler_destroy(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void request_handler_func(client_ctx_t *ctx) {
|
static void request_handler_func(client_ctx_t *ctx) {
|
||||||
// TODO
|
client_request_handler(ctx);
|
||||||
tcp_close(ctx);
|
|
||||||
|
if (ctx->c_keep_alive && ctx->s_keep_alive && ctx->req_num < REQ_PER_CONNECTION) {
|
||||||
|
async(ctx->socket.socket, POLLIN, 0, (void (*)(void *)) handle_request, ctx, (void (*)(void *)) tcp_close, ctx);
|
||||||
|
logger_set_prefix(ctx->log_prefix);
|
||||||
|
} else {
|
||||||
|
tcp_close(ctx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -19,7 +19,7 @@ static mpmc_t mpmc_ctx;
|
|||||||
static void tcp_acceptor_func(client_ctx_t *ctx);
|
static void tcp_acceptor_func(client_ctx_t *ctx);
|
||||||
|
|
||||||
int tcp_acceptor_init(int n_workers, int buf_size) {
|
int tcp_acceptor_init(int n_workers, int buf_size) {
|
||||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp/a");
|
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_acceptor_func, "tcp_a");
|
||||||
}
|
}
|
||||||
|
|
||||||
int tcp_accept(client_ctx_t *ctx) {
|
int tcp_accept(client_ctx_t *ctx) {
|
||||||
@@ -36,7 +36,6 @@ void tcp_acceptor_destroy(void) {
|
|||||||
|
|
||||||
static void tcp_acceptor_func(client_ctx_t *ctx) {
|
static void tcp_acceptor_func(client_ctx_t *ctx) {
|
||||||
struct sockaddr_in6 server_addr;
|
struct sockaddr_in6 server_addr;
|
||||||
char log_client_prefix[256];
|
|
||||||
|
|
||||||
inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr));
|
inet_ntop(ctx->socket.addr.ipv6.sin6_family, &ctx->socket.addr.ipv6.sin6_addr, ctx->_c_addr, sizeof(ctx->_c_addr));
|
||||||
if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) {
|
if (strncmp(ctx->_c_addr, "::ffff:", 7) == 0) {
|
||||||
@@ -54,12 +53,11 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
|
|||||||
ctx->s_addr = ctx->_s_addr;
|
ctx->s_addr = ctx->_s_addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
sprintf(log_client_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
|
sprintf(ctx->log_prefix, "[%s%4i%s]%s[%*s][%5i]%s", (int) ctx->socket.enc ? HTTPS_STR : HTTP_STR,
|
||||||
ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr,
|
ntohs(server_addr.sin6_port), CLR_STR, /*color_table[0]*/ "", INET6_ADDRSTRLEN, ctx->addr,
|
||||||
ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR);
|
ntohs(ctx->socket.addr.ipv6.sin6_port), CLR_STR);
|
||||||
|
|
||||||
sprintf(ctx->log_prefix, "[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, log_client_prefix);
|
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
|
||||||
logger_set_prefix(ctx->log_prefix);
|
|
||||||
|
|
||||||
int ret;
|
int ret;
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
@@ -117,7 +115,7 @@ static void tcp_acceptor_func(client_ctx_t *ctx) {
|
|||||||
client->_errno = errno;
|
client->_errno = errno;
|
||||||
client->_ssl_error = ERR_get_error();
|
client->_ssl_error = ERR_get_error();
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
error("Unable to perform handshake: %s", sock_strerror(client));
|
info("Unable to perform handshake: %s", sock_strerror(client));
|
||||||
tcp_close(ctx);
|
tcp_close(ctx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@@ -11,7 +11,7 @@ static mpmc_t mpmc_ctx;
|
|||||||
static void tcp_closer_func(client_ctx_t *ctx);
|
static void tcp_closer_func(client_ctx_t *ctx);
|
||||||
|
|
||||||
int tcp_closer_init(int n_workers, int buf_size) {
|
int tcp_closer_init(int n_workers, int buf_size) {
|
||||||
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp/c");
|
return mpmc_init(&mpmc_ctx, n_workers, buf_size, (void (*)(void *)) tcp_closer_func, "tcp_c");
|
||||||
}
|
}
|
||||||
|
|
||||||
int tcp_close(client_ctx_t *ctx) {
|
int tcp_close(client_ctx_t *ctx) {
|
||||||
@@ -27,7 +27,7 @@ void tcp_closer_destroy(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void tcp_closer_func(client_ctx_t *ctx) {
|
static void tcp_closer_func(client_ctx_t *ctx) {
|
||||||
logger_set_prefix(ctx->log_prefix);
|
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->s_addr, ctx->log_prefix);
|
||||||
|
|
||||||
sock_close(&ctx->socket);
|
sock_close(&ctx->socket);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user