async: Try to avoid race conditions when adding/removing fds
This commit is contained in:
+16
-6
@@ -137,20 +137,20 @@ static int async_add_to_queue(evt_listen_t *evt) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int async_exec(evt_listen_t *evt, async_evt_t r_events) {
|
||||
static int async_exec_cb(evt_listen_t *evt, async_evt_t r_events, void (**cb)(void *)) {
|
||||
int ret, e = errno;
|
||||
if (r_events & evt->events) {
|
||||
// specified event(s) occurred
|
||||
if (!(evt->flags & ASYNC_IGNORE_PENDING) && evt->socket && !sock_has_pending(evt->socket, 0)) {
|
||||
evt->err_cb(evt->arg);
|
||||
*cb = evt->err_cb;
|
||||
ret = 0;
|
||||
} else {
|
||||
evt->cb(evt->arg);
|
||||
*cb = evt->cb;
|
||||
ret = (evt->flags & ASYNC_KEEP) ? 1 : 0;
|
||||
}
|
||||
} else if (r_events & (POLLERR | POLLHUP | POLLNVAL)) {
|
||||
// error occurred
|
||||
evt->err_cb(evt->arg);
|
||||
*cb = evt->err_cb;
|
||||
ret = 0;
|
||||
} else {
|
||||
// no event occurred
|
||||
@@ -161,6 +161,13 @@ static int async_exec(evt_listen_t *evt, async_evt_t r_events) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int async_exec(evt_listen_t *evt, async_evt_t r_events) {
|
||||
void (*cb)(void *) = NULL;
|
||||
const int ret = async_exec_cb(evt, r_events, &cb);
|
||||
if (cb != NULL) cb(evt->arg);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int async_check(evt_listen_t *evt) {
|
||||
struct pollfd fds[1] = {{
|
||||
.fd = evt->fd,
|
||||
@@ -333,7 +340,6 @@ void async_thread(void) {
|
||||
if (errno == EINTR) {
|
||||
// interrupt
|
||||
errno = 0;
|
||||
continue;
|
||||
} else {
|
||||
// other error
|
||||
critical("Unable to poll for events");
|
||||
@@ -345,7 +351,8 @@ void async_thread(void) {
|
||||
evt_listen_t *evt = events[i].data.ptr;
|
||||
if (!list_contains(local, &evt)) continue;
|
||||
|
||||
if (async_exec(evt, async_e2a(events[i].events)) == 0) {
|
||||
void (*cb)(void *) = NULL;
|
||||
if (async_exec_cb(evt, async_e2a(events[i].events), &cb) == 0) {
|
||||
logger_set_prefix("");
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) == -1) {
|
||||
if (errno == EBADF || errno == ENOENT || errno == EPERM) {
|
||||
@@ -363,7 +370,10 @@ void async_thread(void) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (cb != NULL) cb(evt->arg);
|
||||
free(evt);
|
||||
} else {
|
||||
if (cb != NULL) cb(evt->arg);
|
||||
}
|
||||
logger_set_prefix("");
|
||||
}
|
||||
|
||||
@@ -127,13 +127,12 @@ static void proxy_chunk_next_cb(chunk_ctx_t *ctx) {
|
||||
}
|
||||
|
||||
static void proxy_chunk_err_cb(chunk_ctx_t *ctx) {
|
||||
ctx->client->c_keep_alive = 0;
|
||||
proxy_close(ctx->client->proxy);
|
||||
proxy_unlock_ctx(ctx->client->proxy);
|
||||
|
||||
ctx->client->proxy = NULL;
|
||||
request_complete(ctx->client);
|
||||
handle_request(ctx->client);
|
||||
tcp_close(ctx->client);
|
||||
}
|
||||
|
||||
static int proxy_handler_2(client_ctx_t *ctx) {
|
||||
|
||||
Reference in New Issue
Block a user