async: Remove old fds in epoll instance
This commit is contained in:
+20
-6
@@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int fd;
|
int fd;
|
||||||
|
int list_idx;
|
||||||
sock *socket;
|
sock *socket;
|
||||||
async_evt_t events;
|
async_evt_t events;
|
||||||
int flags;
|
int flags;
|
||||||
@@ -208,6 +209,7 @@ static int async_add(evt_listen_t *evt) {
|
|||||||
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
|
int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
|
||||||
evt_listen_t evt = {
|
evt_listen_t evt = {
|
||||||
.fd = fd,
|
.fd = fd,
|
||||||
|
.list_idx = -1,
|
||||||
.socket = NULL,
|
.socket = NULL,
|
||||||
.events = events,
|
.events = events,
|
||||||
.flags = flags,
|
.flags = flags,
|
||||||
@@ -222,6 +224,7 @@ int async_fd(int fd, async_evt_t events, int flags, void *arg, void cb(void *),
|
|||||||
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
|
int async(sock *s, async_evt_t events, int flags, void *arg, void cb(void *), void to_cb(void *), void err_cb(void *)) {
|
||||||
evt_listen_t evt = {
|
evt_listen_t evt = {
|
||||||
.fd = s->socket,
|
.fd = s->socket,
|
||||||
|
.list_idx = -1,
|
||||||
.socket = s,
|
.socket = s,
|
||||||
.events = events,
|
.events = events,
|
||||||
.flags = flags,
|
.flags = flags,
|
||||||
@@ -289,6 +292,7 @@ void async_thread(void) {
|
|||||||
// fill local list and epoll instance with previously added queue entries
|
// fill local list and epoll instance with previously added queue entries
|
||||||
for (int i = 0; i < l->n; i++) {
|
for (int i = 0; i < l->n; i++) {
|
||||||
evt_listen_t *evt = l->q[i];
|
evt_listen_t *evt = l->q[i];
|
||||||
|
evt->list_idx = list_size(local);
|
||||||
local = list_append(local, &evt);
|
local = list_append(local, &evt);
|
||||||
if (local == NULL) {
|
if (local == NULL) {
|
||||||
critical("Unable to resize async local list");
|
critical("Unable to resize async local list");
|
||||||
@@ -303,14 +307,21 @@ void async_thread(void) {
|
|||||||
// fd already exists, delete old one
|
// fd already exists, delete old one
|
||||||
warning("Unable to add file descriptor to epoll instance");
|
warning("Unable to add file descriptor to epoll instance");
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) != -1)
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, evt->fd, NULL) != -1) {
|
||||||
|
local = list_remove(local, evt->list_idx);
|
||||||
|
if (local == NULL) {
|
||||||
|
critical("Unable to resize async local list");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
notice("Removed old file descriptor from epoll instance");
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
} else if (errno == EBADF || errno == EPERM) {
|
} else if (errno == EBADF || errno == EPERM) {
|
||||||
// fd probably already closed or does not support epoll somehow
|
// fd probably already closed or does not support epoll somehow
|
||||||
// FIXME should not happen
|
// FIXME should not happen
|
||||||
warning("Unable to add file descriptor to epoll instance");
|
warning("Unable to add file descriptor to epoll instance");
|
||||||
errno = 0;
|
errno = 0;
|
||||||
local = list_delete(local, &evt);
|
local = list_remove(local, evt->list_idx);
|
||||||
if (local == NULL) {
|
if (local == NULL) {
|
||||||
critical("Unable to resize async local list");
|
critical("Unable to resize async local list");
|
||||||
return;
|
return;
|
||||||
@@ -328,7 +339,7 @@ void async_thread(void) {
|
|||||||
// calculate wait timeout
|
// calculate wait timeout
|
||||||
min_ts = -1000, cur_ts = clock_micros();
|
min_ts = -1000, cur_ts = clock_micros();
|
||||||
for (int i = 0; i < list_size(local); i++) {
|
for (int i = 0; i < list_size(local); i++) {
|
||||||
evt_listen_t *evt = local[i];
|
const evt_listen_t *evt = local[i];
|
||||||
if (!evt->socket || evt->socket->timeout_us < 0) continue;
|
if (!evt->socket || evt->socket->timeout_us < 0) continue;
|
||||||
|
|
||||||
ts = evt->socket->ts_last + evt->socket->timeout_us - cur_ts;
|
ts = evt->socket->ts_last + evt->socket->timeout_us - cur_ts;
|
||||||
@@ -349,7 +360,10 @@ void async_thread(void) {
|
|||||||
|
|
||||||
for (int i = 0; i < num_fds; i++) {
|
for (int i = 0; i < num_fds; i++) {
|
||||||
evt_listen_t *evt = events[i].data.ptr;
|
evt_listen_t *evt = events[i].data.ptr;
|
||||||
if (!list_contains(local, &evt)) continue;
|
if (evt != local[evt->list_idx]) {
|
||||||
|
error("epoll instance reported event not contained in local list");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
void (*cb)(void *) = NULL;
|
void (*cb)(void *) = NULL;
|
||||||
if (async_exec_cb(evt, async_e2a(events[i].events), &cb) == 0) {
|
if (async_exec_cb(evt, async_e2a(events[i].events), &cb) == 0) {
|
||||||
@@ -364,7 +378,7 @@ void async_thread(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
local = list_delete(local, &evt);
|
local = list_remove(local, evt->list_idx);
|
||||||
if (local == NULL) {
|
if (local == NULL) {
|
||||||
critical("Unable to resize async local list");
|
critical("Unable to resize async local list");
|
||||||
return;
|
return;
|
||||||
@@ -381,7 +395,7 @@ void async_thread(void) {
|
|||||||
// check, if some socket ran into a timeout
|
// check, if some socket ran into a timeout
|
||||||
cur_ts = clock_micros();
|
cur_ts = clock_micros();
|
||||||
for (int i = 0; i < list_size(local); i++) {
|
for (int i = 0; i < list_size(local); i++) {
|
||||||
evt_listen_t *evt = local[i];
|
const evt_listen_t *evt = local[i];
|
||||||
if (!evt->socket) continue;
|
if (!evt->socket) continue;
|
||||||
|
|
||||||
if (evt->socket->timeout_us >= 0 && (cur_ts - evt->socket->ts_last) >= evt->socket->timeout_us) {
|
if (evt->socket->timeout_us >= 0 && (cur_ts - evt->socket->ts_last) >= evt->socket->timeout_us) {
|
||||||
|
|||||||
Reference in New Issue
Block a user