diff --git a/src/async.c b/src/async.c index 4a33cb0..7a5af32 100644 --- a/src/async.c +++ b/src/async.c @@ -9,6 +9,7 @@ #include "async.h" #include "logger.h" #include "lib/list.h" +#include "lib/utils.h" #include #include @@ -28,6 +29,7 @@ typedef struct { int flags; void *arg; void (*cb)(void *); + void (*to_cb)(void *); void (*err_cb)(void *); } evt_listen_t; @@ -154,7 +156,7 @@ static int async_add(evt_listen_t *evt) { return ret; } -int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)) { +int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) { evt_listen_t evt = { .fd = fd, .socket = NULL, @@ -162,12 +164,13 @@ int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), .flags = flags, .arg = arg, .cb = cb, + .to_cb = to_cb, .err_cb = err_cb, }; return async_add(&evt); } -int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)) { +int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) { evt_listen_t evt = { .fd = s->socket, .socket = s, @@ -175,6 +178,7 @@ int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), vo .flags = flags, .arg = arg, .cb = cb, + .to_cb = to_cb, .err_cb = err_cb, }; return async_add(&evt); @@ -206,6 +210,8 @@ void async_free(void) { void async_thread(void) { struct epoll_event ev, events[ASYNC_MAX_EVENTS]; int num_fds; + long ts, min_ts, cur_ts; + listen_queue_t *l; evt_listen_t **local = list_create(sizeof(evt_listen_t *), 16); if (local == NULL) { critical("Unable to create async local list"); @@ -217,7 +223,7 @@ void async_thread(void) { // main event loop while (alive) { // swap listen queue - listen_queue_t *l = listen_q; + l = listen_q; listen_q = (listen_q == &listen1) ? &listen2 : &listen1; // fill local list and epoll instance with previously added queue entries @@ -240,7 +246,17 @@ void async_thread(void) { // reset size of queue l->n = 0; - if ((num_fds = epoll_wait(epoll_fd, events, ASYNC_MAX_EVENTS, -1)) == -1) { + // calculate wait timeout + min_ts = -1000, cur_ts = clock_micros();; + for (int i = 0; i < list_size(local); i++) { + evt_listen_t *evt = local[i]; + if (!evt->socket) continue; + + ts = evt->socket->ts_last + evt->socket->timeout_us - cur_ts; + if (min_ts == -1000 || ts < min_ts) min_ts = ts; + } + + if ((num_fds = epoll_wait(epoll_fd, events, ASYNC_MAX_EVENTS, (int) (min_ts / 1000))) == -1) { if (errno == EINTR) { // interrupt errno = 0; @@ -274,6 +290,31 @@ void async_thread(void) { free(evt); } } + + // check, if some socket ran into a timeout + cur_ts = clock_micros(); + for (int i = 0; i < list_size(local); i++) { + evt_listen_t *evt = local[i]; + if (!evt->socket) continue; + + if ((cur_ts - evt->socket->ts_last) >= evt->socket->timeout_us) { + evt->to_cb(evt->arg); + + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) == -1) { + if (errno == EBADF) { + // already closed fd, do not die + errno = 0; + } else { + critical("Unable to remove file descriptor from epoll instance"); + return; + } + } + + local = list_remove(local, i--); + } + } + logger_set_prefix(""); + errno = 0; } // cleanup diff --git a/src/async.h b/src/async.h index 0ac5c4a..3f5ca97 100644 --- a/src/async.h +++ b/src/async.h @@ -24,9 +24,9 @@ typedef unsigned int async_evt_t; -int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)); +int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)); -int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)); +int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)); int async_init(void); diff --git a/src/lib/sock.c b/src/lib/sock.c index 3b864dd..4643a58 100644 --- a/src/lib/sock.c +++ b/src/lib/sock.c @@ -59,7 +59,7 @@ const char *sock_strerror(sock *s) { } } -int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros) { +int sock_set_socket_timeout_micros(sock *s, long recv_micros, long send_micros) { struct timeval recv_to = {.tv_sec = recv_micros / 1000000, .tv_usec = recv_micros % 1000000}, send_to = {.tv_sec = send_micros / 1000000, .tv_usec = send_micros % 1000000}; @@ -72,8 +72,20 @@ int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros) { return 0; } -int sock_set_timeout(sock *s, int sec) { - return sock_set_timeout_micros(s, sec * 1000000L, sec * 1000000L); +int sock_set_socket_timeout(sock *s, double sec) { + return sock_set_socket_timeout_micros(s, (long) (sec * 1000000L), (long) (sec * 1000000L)); +} + +int sock_set_timeout_micros(sock *s, long micros) { + if (micros < 0) + return -1; + + s->timeout_us = micros; + return 0; +} + +int sock_set_timeout(sock *s, double sec) { + return sock_set_timeout_micros(s, (long) (sec * 1000000)); } long sock_send(sock *s, void *buf, unsigned long len, int flags) { @@ -86,7 +98,12 @@ long sock_send(sock *s, void *buf, unsigned long len, int flags) { } s->_last_ret = ret; s->_errno = errno; - return ret >= 0 ? ret : -1; + if (ret >= 0) { + s->ts_last = clock_micros(); + return ret; + } else { + return -1; + } } long sock_recv(sock *s, void *buf, unsigned long len, int flags) { @@ -100,7 +117,12 @@ long sock_recv(sock *s, void *buf, unsigned long len, int flags) { } s->_last_ret = ret; s->_errno = errno; - return ret >= 0 ? ret : -1; + if (ret >= 0) { + s->ts_last = clock_micros(); + return ret; + } else { + return -1; + } } long sock_splice(sock *dst, sock *src, void *buf, unsigned long buf_len, unsigned long len) { diff --git a/src/lib/sock.h b/src/lib/sock.h index b379042..37e7cc2 100644 --- a/src/lib/sock.h +++ b/src/lib/sock.h @@ -23,6 +23,7 @@ typedef struct { char *addr, *s_addr; SSL_CTX *ctx; SSL *ssl; + long ts_start, ts_last, timeout_us; long _last_ret; int _errno; unsigned long _ssl_error; @@ -32,9 +33,13 @@ int sock_enc_error(sock *s); const char *sock_strerror(sock *s); -int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros); +int sock_set_socket_timeout_micros(sock *s, long recv_micros, long send_micros); -int sock_set_timeout(sock *s, int sec); +int sock_set_socket_timeout(sock *s, double sec); + +int sock_set_timeout_micros(sock *s, long micros); + +int sock_set_timeout(sock *s, double sec); long sock_send(sock *s, void *buf, unsigned long len, int flags); diff --git a/src/lib/utils.c b/src/lib/utils.c index 7a53c61..9faa07e 100644 --- a/src/lib/utils.c +++ b/src/lib/utils.c @@ -211,7 +211,6 @@ long clock_micros(void) { return time.tv_sec * 1000000 + time.tv_nsec / 1000; } - long clock_cpu(void) { struct timespec time; clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time); diff --git a/src/server.c b/src/server.c index 7c1a2d5..d48c06b 100644 --- a/src/server.c +++ b/src/server.c @@ -18,6 +18,7 @@ #include "workers.h" #include "worker/func.h" #include "lib/list.h" +#include "lib/utils.h" #include #include @@ -98,6 +99,10 @@ static void accept_cb(void *arg) { client->socket = client_fd; client->enc = (i == 1); + client->ts_start = clock_micros(); + client->ts_last = client->ts_start; + client_ctx->cnx_s = client->ts_start; + client_ctx->cnx_e = -1, client_ctx->req_s = -1, client_ctx->req_e = -1, client_ctx->res_ts = -1; tcp_accept(client_ctx); } @@ -301,12 +306,11 @@ int main(int argc, char *const argv[]) { workers_init(); for (int i = 0; i < NUM_SOCKETS; i++) { - async_fd(sockets[i], ASYNC_WAIT_READ, ASYNC_KEEP, &sockets[i], accept_cb, accept_err_cb); + async_fd(sockets[i], ASYNC_WAIT_READ, ASYNC_KEEP, &sockets[i], accept_cb, accept_err_cb, accept_err_cb); } notice("Ready to accept connections"); - // TODO handle timeouts in epoll async_thread(); notice("Goodbye!"); diff --git a/src/worker/func.h b/src/worker/func.h index 2af7f74..5c3c19b 100644 --- a/src/worker/func.h +++ b/src/worker/func.h @@ -58,6 +58,8 @@ int respond(client_ctx_t *ctx); void request_complete(client_ctx_t *ctx); +void timeout_request(client_ctx_t *ctx); + void tcp_close(client_ctx_t *ctx); void proxy_close(proxy_ctx_t *ctx); diff --git a/src/worker/request_handler.c b/src/worker/request_handler.c index ed2131c..c42f5ec 100644 --- a/src/worker/request_handler.c +++ b/src/worker/request_handler.c @@ -41,15 +41,7 @@ void request_handler_func(client_ctx_t *ctx) { } } -static int request_handler(client_ctx_t *ctx) { - sock *client = &ctx->socket; - char *err_msg = ctx->err_msg; - - long ret; - char buf0[1024], buf1[1024]; - - err_msg[0] = 0; - +static void init_ctx(client_ctx_t *ctx) { ctx->conf = NULL; ctx->file = NULL; ctx->proxy = NULL; @@ -62,24 +54,36 @@ static int request_handler(client_ctx_t *ctx) { ctx->msg_buf_ptr = NULL; ctx->req_host[0] = 0; ctx->err_msg[0] = 0; + ctx->req_s = ctx->socket.ts_last; memset(&ctx->uri, 0, sizeof(ctx->uri)); memset(&ctx->req, 0, sizeof(ctx->req)); memset(&ctx->res, 0, sizeof(ctx->res)); - http_res *res = &ctx->res; - res->status = http_get_status(501); - http_init_hdr(&res->hdr); - res->hdr.last_field_num = -1; - sprintf(res->version, "1.1"); + ctx->res.status = http_get_status(501); + http_init_hdr(&ctx->res.hdr); + ctx->res.hdr.last_field_num = -1; + sprintf(ctx->res.version, "1.1"); + + ctx->status.status = 0; + ctx->status.origin = NONE; + ctx->status.ws_key = NULL; +} + +static int request_handler(client_ctx_t *ctx) { + sock *client = &ctx->socket; + char *err_msg = ctx->err_msg; + http_res *res = &ctx->res; http_status_ctx *status = &ctx->status; - status->status = 0; - status->origin = NONE; - status->ws_key = NULL; + + long ret; + char buf0[1024], buf1[1024]; ctx->req_s = clock_micros(); + init_ctx(ctx); + http_add_header_field(&res->hdr, "Date", http_get_date(buf0, sizeof(buf0))); http_add_header_field(&res->hdr, "Server", SERVER_STR); /*if (ret <= 0) { @@ -387,3 +391,14 @@ void request_complete(client_ctx_t *ctx) { http_free_req(&ctx->req); http_free_res(&ctx->res); } + +void timeout_request(client_ctx_t *ctx) { + init_ctx(ctx); + logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->socket.s_addr, ctx->log_prefix); + + ctx->s_keep_alive = 0; + ctx->res.status = http_get_status(408); + + respond(ctx); + tcp_close(ctx); +} diff --git a/src/worker/tcp_acceptor.c b/src/worker/tcp_acceptor.c index 1403584..12d4b37 100644 --- a/src/worker/tcp_acceptor.c +++ b/src/worker/tcp_acceptor.c @@ -90,7 +90,7 @@ static int tcp_acceptor(client_ctx_t *ctx) { ctx->host[0] != 0 ? ctx->host : "", ctx->host[0] != 0 ? ") " : "", ctx->cc[0] != 0 ? ctx->cc : "N/A"); - if (sock_set_timeout(client, CLIENT_TIMEOUT)) { + if (sock_set_socket_timeout(client, 1) != 0 || sock_set_timeout(client, CLIENT_TIMEOUT) != 0) { error("Unable to set timeout for socket"); return -1; } @@ -108,6 +108,7 @@ static int tcp_acceptor(client_ctx_t *ctx) { info("Unable to perform handshake: %s", sock_strerror(client)); return - 1; } + client->ts_last = clock_micros(); } ctx->req_num = 0; diff --git a/src/workers.c b/src/workers.c index 4cd6b2b..3485e23 100644 --- a/src/workers.c +++ b/src/workers.c @@ -53,7 +53,10 @@ static int handle_request_cb(client_ctx_t *ctx) { int handle_request(client_ctx_t *ctx) { if (ctx->c_keep_alive && ctx->s_keep_alive) { - return async(&ctx->socket, ASYNC_WAIT_READ, 0, ctx, (void (*)(void *)) handle_request_cb, (void (*)(void *)) tcp_close); + return async(&ctx->socket, ASYNC_WAIT_READ, 0, ctx, + (void (*)(void *)) handle_request_cb, + (void (*)(void *)) timeout_request, + (void (*)(void *)) tcp_close); } else { tcp_close(ctx); return 0; @@ -77,5 +80,8 @@ static int ws_handle_frame_cb(ws_ctx_t *ctx) { } int ws_handle_frame(ws_ctx_t *ctx) { - return async(ctx->socket, ASYNC_WAIT_READ, 0, ctx, (void (*)(void *)) ws_handle_frame_cb, (void (*)(void *)) ws_close); + return async(ctx->socket, ASYNC_WAIT_READ, 0, ctx, + (void (*)(void *)) ws_handle_frame_cb, + (void (*)(void *)) ws_close, + (void (*)(void *)) ws_close); }