diff --git a/src/async.c b/src/async.c index 615c9c5..5f7664d 100644 --- a/src/async.c +++ b/src/async.c @@ -25,6 +25,7 @@ typedef struct { int fd; + int list_idx; sock *socket; async_evt_t events; int flags; @@ -208,6 +209,7 @@ static int async_add(evt_listen_t *evt) { int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) { evt_listen_t evt = { .fd = fd, + .list_idx = -1, .socket = NULL, .events = events, .flags = flags, @@ -222,6 +224,7 @@ int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) { evt_listen_t evt = { .fd = s->socket, + .list_idx = -1, .socket = s, .events = events, .flags = flags, @@ -289,6 +292,7 @@ void async_thread(void) { // fill local list and epoll instance with previously added queue entries for (int i = 0; i < l->n; i++) { evt_listen_t *evt = l->q[i]; + evt->list_idx = list_size(local); local = list_append(local, &evt); if (local == NULL) { critical("Unable to resize async local list"); @@ -303,14 +307,21 @@ void async_thread(void) { // fd already exists, delete old one warning("Unable to add file descriptor to epoll instance"); errno = 0; - if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) != -1) + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) != -1) { + local = list_remove(local, evt->list_idx); + if (local == NULL) { + critical("Unable to resize async local list"); + return; + } + notice("Removed old file descriptor from epoll instance"); continue; + } } else if (errno == EBADF || errno == EPERM) { // fd probably already closed or does not support epoll somehow // FIXME should not happen warning("Unable to add file descriptor to epoll instance"); errno = 0; - local = list_delete(local, &evt); + local = list_remove(local, evt->list_idx); if (local == NULL) { critical("Unable to resize async local list"); return; @@ -328,7 +339,7 @@ void async_thread(void) { // calculate wait timeout min_ts = -1000, cur_ts = clock_micros(); for (int i = 0; i < list_size(local); i++) { - evt_listen_t *evt = local[i]; + const evt_listen_t *evt = local[i]; if (!evt->socket || evt->socket->timeout_us < 0) continue; ts = evt->socket->ts_last + evt->socket->timeout_us - cur_ts; @@ -349,7 +360,10 @@ void async_thread(void) { for (int i = 0; i < num_fds; i++) { evt_listen_t *evt = events[i].data.ptr; - if (!list_contains(local, &evt)) continue; + if (evt != local[evt->list_idx]) { + error("epoll instance reported event not contained in local list"); + continue; + } void (*cb)(void *) = NULL; if (async_exec_cb(evt, async_e2a(events[i].events), &cb) == 0) { @@ -364,7 +378,7 @@ void async_thread(void) { } } - local = list_delete(local, &evt); + local = list_remove(local, evt->list_idx); if (local == NULL) { critical("Unable to resize async local list"); return; @@ -381,7 +395,7 @@ void async_thread(void) { // check, if some socket ran into a timeout cur_ts = clock_micros(); for (int i = 0; i < list_size(local); i++) { - evt_listen_t *evt = local[i]; + const evt_listen_t *evt = local[i]; if (!evt->socket) continue; if (evt->socket->timeout_us >= 0 && (cur_ts - evt->socket->ts_last) >= evt->socket->timeout_us) {