4 Commits

Author SHA1 Message Date
lorenz.stechauner f4697ce0f3 Fix typo 2023-07-11 01:57:48 +02:00
lorenz.stechauner 72c2e24050 Small improvements in async 2023-07-11 01:51:47 +02:00
lorenz.stechauner 745509cab1 Add debug message when joining mpmc workers 2023-07-11 01:50:36 +02:00
lorenz.stechauner 35d3612d9b Cleanup on request timeout 2023-07-11 01:50:00 +02:00
4 changed files with 49 additions and 5 deletions
+37
View File
@@ -49,6 +49,13 @@ static short async_a2p(async_evt_t events) {
if (events & ASYNC_IN) ret |= POLLIN;
if (events & ASYNC_PRI) ret |= POLLPRI;
if (events & ASYNC_OUT) ret |= POLLOUT;
if (events & ASYNC_ERR) ret |= POLLERR;
if (events & ASYNC_HUP) ret |= POLLHUP;
if (events & ASYNC_RDNORM) ret |= POLLRDNORM;
if (events & ASYNC_RDBAND) ret |= POLLRDBAND;
if (events & ASYNC_WRNORM) ret |= POLLWRNORM;
if (events & ASYNC_WRBAND) ret |= POLLWRBAND;
if (events & ASYNC_MSG) ret |= POLLMSG;
return ret;
}
@@ -57,6 +64,13 @@ static unsigned int async_a2e(async_evt_t events) {
if (events & ASYNC_IN) ret |= EPOLLIN;
if (events & ASYNC_PRI) ret |= EPOLLPRI;
if (events & ASYNC_OUT) ret |= EPOLLOUT;
if (events & ASYNC_ERR) ret |= EPOLLERR;
if (events & ASYNC_HUP) ret |= EPOLLHUP;
if (events & ASYNC_RDNORM) ret |= EPOLLRDNORM;
if (events & ASYNC_RDBAND) ret |= EPOLLRDBAND;
if (events & ASYNC_WRNORM) ret |= EPOLLWRNORM;
if (events & ASYNC_WRBAND) ret |= EPOLLWRBAND;
if (events & ASYNC_MSG) ret |= EPOLLMSG;
return ret;
}
@@ -67,6 +81,11 @@ static async_evt_t async_p2a(short events) {
if (events & POLLOUT) ret |= ASYNC_OUT;
if (events & POLLERR) ret |= ASYNC_ERR;
if (events & POLLHUP) ret |= ASYNC_HUP;
if (events & POLLRDNORM) ret |= ASYNC_RDNORM;
if (events & POLLRDBAND) ret |= ASYNC_RDBAND;
if (events & POLLWRNORM) ret |= ASYNC_WRNORM;
if (events & POLLWRBAND) ret |= ASYNC_WRBAND;
if (events & POLLMSG) ret |= ASYNC_MSG;
return ret;
}
@@ -77,9 +96,22 @@ static async_evt_t async_e2a(unsigned int events) {
if (events & EPOLLOUT) ret |= ASYNC_OUT;
if (events & EPOLLERR) ret |= ASYNC_ERR;
if (events & EPOLLHUP) ret |= ASYNC_HUP;
if (events & EPOLLRDNORM) ret |= ASYNC_RDNORM;
if (events & EPOLLRDBAND) ret |= ASYNC_RDBAND;
if (events & EPOLLWRNORM) ret |= ASYNC_WRNORM;
if (events & EPOLLWRBAND) ret |= ASYNC_WRBAND;
if (events & EPOLLMSG) ret |= ASYNC_MSG;
return ret;
}
static short async_e2p(unsigned int events) {
return async_a2p(async_e2a(events));
}
static unsigned int async_p2e(short events) {
return async_a2e(async_p2a(events));
}
static int async_add_to_queue(evt_listen_t *evt) {
while (sem_wait(&lock) != 0) {
if (errno == EINTR) {
@@ -246,11 +278,13 @@ void async_thread(void) {
while (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, evt->fd, &ev) == -1) {
if (errno == EEXIST) {
// fd already exists, delete old one
warning("Unable to add file descriptor to epoll instance");
errno = 0;
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) != -1)
continue;
} else if (errno == EBADF) {
// fd probably already closed
warning("Unable to add file descriptor to epoll instance");
errno = 0;
local = list_delete(local, &evt);
if (local == NULL) {
@@ -277,6 +311,7 @@ void async_thread(void) {
if (min_ts == -1000 || ts < min_ts) min_ts = ts;
}
// epoll is used in level-triggered mode, so buffers are taken into account
if ((num_fds = epoll_wait(epoll_fd, events, ASYNC_MAX_EVENTS, (int) (min_ts / 1000))) == -1) {
if (errno == EINTR) {
// interrupt
@@ -298,6 +333,7 @@ void async_thread(void) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) == -1) {
if (errno == EBADF || errno == ENOENT) {
// already closed fd or not found, do not die
warning("Unable to remove file descriptor from epoll instance");
errno = 0;
} else {
critical("Unable to remove file descriptor from epoll instance");
@@ -328,6 +364,7 @@ void async_thread(void) {
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) == -1) {
if (errno == EBADF || errno == ENOENT) {
// already closed fd or not found, do not die
warning("Unable to remove file descriptor from epoll instance");
errno = 0;
} else {
critical("Unable to remove file descriptor from epoll instance");
+10 -5
View File
@@ -14,11 +14,16 @@
#define ASYNC_KEEP 1
#define ASYNC_IGNORE_PENDING 2
#define ASYNC_IN 0x01
#define ASYNC_PRI 0x02
#define ASYNC_OUT 0x04
#define ASYNC_ERR 0x08
#define ASYNC_HUP 0x10
#define ASYNC_IN 0x001
#define ASYNC_PRI 0x002
#define ASYNC_OUT 0x004
#define ASYNC_ERR 0x008
#define ASYNC_HUP 0x010
#define ASYNC_RDNORM 0x040
#define ASYNC_RDBAND 0x080
#define ASYNC_WRNORM 0x100
#define ASYNC_WRBAND 0x200
#define ASYNC_MSG 0x400
#define ASYNC_WAIT_READ ASYNC_IN
#define ASYNC_WAIT_WRITE ASYNC_OUT
+1
View File
@@ -149,6 +149,7 @@ void mpmc_destroy(mpmc_t *ctx) {
mpmc_stop(ctx);
for (int i = 0; i < ctx->n_workers; i++) {
if (ctx->workers[i] == -1) break;
debug("Waiting for worker %s/%i to finish...", ctx->name, i);
pthread_kill(ctx->workers[i], SIGUSR1);
pthread_join(ctx->workers[i], NULL);
}
+1
View File
@@ -392,5 +392,6 @@ void timeout_request(client_ctx_t *ctx) {
ctx->res.status = http_get_status(408);
respond(ctx);
request_complete(ctx);
tcp_close(ctx);
}