Handle timeouts in epoll

This commit is contained in:
2023-01-08 21:58:27 +01:00
parent 83ca2467de
commit 4ff22bd0c6
10 changed files with 131 additions and 36 deletions

View File

@ -9,6 +9,7 @@
#include "async.h"
#include "logger.h"
#include "lib/list.h"
#include "lib/utils.h"
#include <poll.h>
#include <sys/epoll.h>
@ -28,6 +29,7 @@ typedef struct {
int flags;
void *arg;
void (*cb)(void *);
void (*to_cb)(void *);
void (*err_cb)(void *);
} evt_listen_t;
@ -154,7 +156,7 @@ static int async_add(evt_listen_t *evt) {
return ret;
}
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)) {
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
evt_listen_t evt = {
.fd = fd,
.socket = NULL,
@ -162,12 +164,13 @@ int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *),
.flags = flags,
.arg = arg,
.cb = cb,
.to_cb = to_cb,
.err_cb = err_cb,
};
return async_add(&evt);
}
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *)) {
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
evt_listen_t evt = {
.fd = s->socket,
.socket = s,
@ -175,6 +178,7 @@ int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), vo
.flags = flags,
.arg = arg,
.cb = cb,
.to_cb = to_cb,
.err_cb = err_cb,
};
return async_add(&evt);
@ -206,6 +210,8 @@ void async_free(void) {
void async_thread(void) {
struct epoll_event ev, events[ASYNC_MAX_EVENTS];
int num_fds;
long ts, min_ts, cur_ts;
listen_queue_t *l;
evt_listen_t **local = list_create(sizeof(evt_listen_t *), 16);
if (local == NULL) {
critical("Unable to create async local list");
@ -217,7 +223,7 @@ void async_thread(void) {
// main event loop
while (alive) {
// swap listen queue
listen_queue_t *l = listen_q;
l = listen_q;
listen_q = (listen_q == &listen1) ? &listen2 : &listen1;
// fill local list and epoll instance with previously added queue entries
@ -240,7 +246,17 @@ void async_thread(void) {
// reset size of queue
l->n = 0;
if ((num_fds = epoll_wait(epoll_fd, events, ASYNC_MAX_EVENTS, -1)) == -1) {
// calculate wait timeout
min_ts = -1000, cur_ts = clock_micros();;
for (int i = 0; i < list_size(local); i++) {
evt_listen_t *evt = local[i];
if (!evt->socket) continue;
ts = evt->socket->ts_last + evt->socket->timeout_us - cur_ts;
if (min_ts == -1000 || ts < min_ts) min_ts = ts;
}
if ((num_fds = epoll_wait(epoll_fd, events, ASYNC_MAX_EVENTS, (int) (min_ts / 1000))) == -1) {
if (errno == EINTR) {
// interrupt
errno = 0;
@ -274,6 +290,31 @@ void async_thread(void) {
free(evt);
}
}
// check, if some socket ran into a timeout
cur_ts = clock_micros();
for (int i = 0; i < list_size(local); i++) {
evt_listen_t *evt = local[i];
if (!evt->socket) continue;
if ((cur_ts - evt->socket->ts_last) >= evt->socket->timeout_us) {
evt->to_cb(evt->arg);
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) == -1) {
if (errno == EBADF) {
// already closed fd, do not die
errno = 0;
} else {
critical("Unable to remove file descriptor from epoll instance");
return;
}
}
local = list_remove(local, i--);
}
}
logger_set_prefix("");
errno = 0;
}
// cleanup

View File

@ -24,9 +24,9 @@
typedef unsigned int async_evt_t;
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *));
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *));
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void err_cb(void *));
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *));
int async_init(void);

View File

@ -59,7 +59,7 @@ const char *sock_strerror(sock *s) {
}
}
int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros) {
int sock_set_socket_timeout_micros(sock *s, long recv_micros, long send_micros) {
struct timeval recv_to = {.tv_sec = recv_micros / 1000000, .tv_usec = recv_micros % 1000000},
send_to = {.tv_sec = send_micros / 1000000, .tv_usec = send_micros % 1000000};
@ -72,8 +72,20 @@ int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros) {
return 0;
}
int sock_set_timeout(sock *s, int sec) {
return sock_set_timeout_micros(s, sec * 1000000L, sec * 1000000L);
int sock_set_socket_timeout(sock *s, double sec) {
return sock_set_socket_timeout_micros(s, (long) (sec * 1000000L), (long) (sec * 1000000L));
}
int sock_set_timeout_micros(sock *s, long micros) {
if (micros < 0)
return -1;
s->timeout_us = micros;
return 0;
}
int sock_set_timeout(sock *s, double sec) {
return sock_set_timeout_micros(s, (long) (sec * 1000000));
}
long sock_send(sock *s, void *buf, unsigned long len, int flags) {
@ -86,7 +98,12 @@ long sock_send(sock *s, void *buf, unsigned long len, int flags) {
}
s->_last_ret = ret;
s->_errno = errno;
return ret >= 0 ? ret : -1;
if (ret >= 0) {
s->ts_last = clock_micros();
return ret;
} else {
return -1;
}
}
long sock_recv(sock *s, void *buf, unsigned long len, int flags) {
@ -100,7 +117,12 @@ long sock_recv(sock *s, void *buf, unsigned long len, int flags) {
}
s->_last_ret = ret;
s->_errno = errno;
return ret >= 0 ? ret : -1;
if (ret >= 0) {
s->ts_last = clock_micros();
return ret;
} else {
return -1;
}
}
long sock_splice(sock *dst, sock *src, void *buf, unsigned long buf_len, unsigned long len) {

View File

@ -23,6 +23,7 @@ typedef struct {
char *addr, *s_addr;
SSL_CTX *ctx;
SSL *ssl;
long ts_start, ts_last, timeout_us;
long _last_ret;
int _errno;
unsigned long _ssl_error;
@ -32,9 +33,13 @@ int sock_enc_error(sock *s);
const char *sock_strerror(sock *s);
int sock_set_timeout_micros(sock *s, long recv_micros, long send_micros);
int sock_set_socket_timeout_micros(sock *s, long recv_micros, long send_micros);
int sock_set_timeout(sock *s, int sec);
int sock_set_socket_timeout(sock *s, double sec);
int sock_set_timeout_micros(sock *s, long micros);
int sock_set_timeout(sock *s, double sec);
long sock_send(sock *s, void *buf, unsigned long len, int flags);

View File

@ -211,7 +211,6 @@ long clock_micros(void) {
return time.tv_sec * 1000000 + time.tv_nsec / 1000;
}
long clock_cpu(void) {
struct timespec time;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time);

View File

@ -18,6 +18,7 @@
#include "workers.h"
#include "worker/func.h"
#include "lib/list.h"
#include "lib/utils.h"
#include <stdio.h>
#include <getopt.h>
@ -98,6 +99,10 @@ static void accept_cb(void *arg) {
client->socket = client_fd;
client->enc = (i == 1);
client->ts_start = clock_micros();
client->ts_last = client->ts_start;
client_ctx->cnx_s = client->ts_start;
client_ctx->cnx_e = -1, client_ctx->req_s = -1, client_ctx->req_e = -1, client_ctx->res_ts = -1;
tcp_accept(client_ctx);
}
@ -301,12 +306,11 @@ int main(int argc, char *const argv[]) {
workers_init();
for (int i = 0; i < NUM_SOCKETS; i++) {
async_fd(sockets[i], ASYNC_WAIT_READ, ASYNC_KEEP, &sockets[i], accept_cb, accept_err_cb);
async_fd(sockets[i], ASYNC_WAIT_READ, ASYNC_KEEP, &sockets[i], accept_cb, accept_err_cb, accept_err_cb);
}
notice("Ready to accept connections");
// TODO handle timeouts in epoll
async_thread();
notice("Goodbye!");

View File

@ -58,6 +58,8 @@ int respond(client_ctx_t *ctx);
void request_complete(client_ctx_t *ctx);
void timeout_request(client_ctx_t *ctx);
void tcp_close(client_ctx_t *ctx);
void proxy_close(proxy_ctx_t *ctx);

View File

@ -41,15 +41,7 @@ void request_handler_func(client_ctx_t *ctx) {
}
}
static int request_handler(client_ctx_t *ctx) {
sock *client = &ctx->socket;
char *err_msg = ctx->err_msg;
long ret;
char buf0[1024], buf1[1024];
err_msg[0] = 0;
static void init_ctx(client_ctx_t *ctx) {
ctx->conf = NULL;
ctx->file = NULL;
ctx->proxy = NULL;
@ -62,24 +54,36 @@ static int request_handler(client_ctx_t *ctx) {
ctx->msg_buf_ptr = NULL;
ctx->req_host[0] = 0;
ctx->err_msg[0] = 0;
ctx->req_s = ctx->socket.ts_last;
memset(&ctx->uri, 0, sizeof(ctx->uri));
memset(&ctx->req, 0, sizeof(ctx->req));
memset(&ctx->res, 0, sizeof(ctx->res));
http_res *res = &ctx->res;
res->status = http_get_status(501);
http_init_hdr(&res->hdr);
res->hdr.last_field_num = -1;
sprintf(res->version, "1.1");
ctx->res.status = http_get_status(501);
http_init_hdr(&ctx->res.hdr);
ctx->res.hdr.last_field_num = -1;
sprintf(ctx->res.version, "1.1");
ctx->status.status = 0;
ctx->status.origin = NONE;
ctx->status.ws_key = NULL;
}
static int request_handler(client_ctx_t *ctx) {
sock *client = &ctx->socket;
char *err_msg = ctx->err_msg;
http_res *res = &ctx->res;
http_status_ctx *status = &ctx->status;
status->status = 0;
status->origin = NONE;
status->ws_key = NULL;
long ret;
char buf0[1024], buf1[1024];
ctx->req_s = clock_micros();
init_ctx(ctx);
http_add_header_field(&res->hdr, "Date", http_get_date(buf0, sizeof(buf0)));
http_add_header_field(&res->hdr, "Server", SERVER_STR);
/*if (ret <= 0) {
@ -387,3 +391,14 @@ void request_complete(client_ctx_t *ctx) {
http_free_req(&ctx->req);
http_free_res(&ctx->res);
}
void timeout_request(client_ctx_t *ctx) {
init_ctx(ctx);
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->socket.s_addr, ctx->log_prefix);
ctx->s_keep_alive = 0;
ctx->res.status = http_get_status(408);
respond(ctx);
tcp_close(ctx);
}

View File

@ -90,7 +90,7 @@ static int tcp_acceptor(client_ctx_t *ctx) {
ctx->host[0] != 0 ? ctx->host : "", ctx->host[0] != 0 ? ") " : "",
ctx->cc[0] != 0 ? ctx->cc : "N/A");
if (sock_set_timeout(client, CLIENT_TIMEOUT)) {
if (sock_set_socket_timeout(client, 1) != 0 || sock_set_timeout(client, CLIENT_TIMEOUT) != 0) {
error("Unable to set timeout for socket");
return -1;
}
@ -108,6 +108,7 @@ static int tcp_acceptor(client_ctx_t *ctx) {
info("Unable to perform handshake: %s", sock_strerror(client));
return - 1;
}
client->ts_last = clock_micros();
}
ctx->req_num = 0;

View File

@ -53,7 +53,10 @@ static int handle_request_cb(client_ctx_t *ctx) {
int handle_request(client_ctx_t *ctx) {
if (ctx->c_keep_alive && ctx->s_keep_alive) {
return async(&ctx->socket, ASYNC_WAIT_READ, 0, ctx, (void (*)(void *)) handle_request_cb, (void (*)(void *)) tcp_close);
return async(&ctx->socket, ASYNC_WAIT_READ, 0, ctx,
(void (*)(void *)) handle_request_cb,
(void (*)(void *)) timeout_request,
(void (*)(void *)) tcp_close);
} else {
tcp_close(ctx);
return 0;
@ -77,5 +80,8 @@ static int ws_handle_frame_cb(ws_ctx_t *ctx) {
}
int ws_handle_frame(ws_ctx_t *ctx) {
return async(ctx->socket, ASYNC_WAIT_READ, 0, ctx, (void (*)(void *)) ws_handle_frame_cb, (void (*)(void *)) ws_close);
return async(ctx->socket, ASYNC_WAIT_READ, 0, ctx,
(void (*)(void *)) ws_handle_frame_cb,
(void (*)(void *)) ws_close,
(void (*)(void *)) ws_close);
}