Async working (in concept)
This commit is contained in:
		
							
								
								
									
										4
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								Makefile
									
									
									
									
									
								
							@@ -40,7 +40,7 @@ bin/%.o: src/%.c
 | 
			
		||||
bin/lib/%.o: src/lib/%.c
 | 
			
		||||
	$(CC) -c -o $@ $(CFLAGS) $<
 | 
			
		||||
 | 
			
		||||
bin/sesimos: bin/server.o bin/client.o bin/logger.o bin/cache_handler.o \
 | 
			
		||||
bin/sesimos: bin/server.o bin/client.o bin/logger.o bin/cache_handler.o bin/async.o \
 | 
			
		||||
			 bin/lib/compress.o bin/lib/config.o bin/lib/fastcgi.o bin/lib/geoip.o \
 | 
			
		||||
			 bin/lib/http.o bin/lib/http_static.o bin/lib/proxy.o bin/lib/sock.o bin/lib/uri.o \
 | 
			
		||||
		     bin/lib/utils.o bin/lib/websocket.o
 | 
			
		||||
@@ -58,6 +58,8 @@ bin/logger.o: src/logger.h
 | 
			
		||||
 | 
			
		||||
bin/cache_handler.o: src/cache_handler.h src/lib/utils.h src/lib/uri.h src/lib/compress.h src/logger.h
 | 
			
		||||
 | 
			
		||||
bin/async.o: src/async.h src/logger.h
 | 
			
		||||
 | 
			
		||||
bin/lib/compress.o: src/lib/compress.h
 | 
			
		||||
 | 
			
		||||
bin/lib/config.o: src/lib/config.h src/lib/utils.h src/lib/uri.h src/logger.h
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										75
									
								
								src/async.c
									
									
									
									
									
								
							
							
						
						
									
										75
									
								
								src/async.c
									
									
									
									
									
								
							@@ -1,12 +1,18 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Sesimos - secure, simple, modern web server
 | 
			
		||||
 * @brief Async handler
 | 
			
		||||
 * @file src/async.c
 | 
			
		||||
 * @author Lorenz Stechauner
 | 
			
		||||
 * @date 2022-12-28
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#include "async.h"
 | 
			
		||||
#include "logger.h"
 | 
			
		||||
 | 
			
		||||
#include <stdio.h>
 | 
			
		||||
#include <poll.h>
 | 
			
		||||
#include <signal.h>
 | 
			
		||||
#include <errno.h>
 | 
			
		||||
 | 
			
		||||
#include <memory.h>
 | 
			
		||||
 | 
			
		||||
typedef struct {
 | 
			
		||||
    int fd;
 | 
			
		||||
@@ -23,21 +29,31 @@ typedef struct {
 | 
			
		||||
    evt_listen_t q[256];
 | 
			
		||||
} listen_queue_t;
 | 
			
		||||
 | 
			
		||||
static listen_queue_t listen1, listen2, *listen = &listen1;
 | 
			
		||||
static volatile sig_atomic_t alive = 1;
 | 
			
		||||
 | 
			
		||||
static listen_queue_t listen1;
 | 
			
		||||
static listen_queue_t listen2;
 | 
			
		||||
listen_queue_t *listen = &listen1;
 | 
			
		||||
static int async_add_to_queue(evt_listen_t *evt) {
 | 
			
		||||
    // TODO locking
 | 
			
		||||
    memcpy(&listen->q[listen->n++], evt, sizeof(*evt));
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
volatile sig_atomic_t alive = 1;
 | 
			
		||||
 | 
			
		||||
int async(int fd, int events, int flags, void (*cb)(void *), void *arg, void (*err_cb)(void *), void *err_arg) {
 | 
			
		||||
    return -1;
 | 
			
		||||
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg) {
 | 
			
		||||
    evt_listen_t evt = {
 | 
			
		||||
            .fd = fd,
 | 
			
		||||
            .events = events,
 | 
			
		||||
            .flags = flags,
 | 
			
		||||
            .cb = cb,
 | 
			
		||||
            .arg = arg,
 | 
			
		||||
            .err_cb = err_cb,
 | 
			
		||||
            .err_arg = err_arg,
 | 
			
		||||
    };
 | 
			
		||||
    return async_add_to_queue(&evt);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void async_thread(void) {
 | 
			
		||||
 | 
			
		||||
    int num_fds = 0;
 | 
			
		||||
    struct pollfd fds[256];
 | 
			
		||||
    int num_fds;
 | 
			
		||||
    struct pollfd fds[256];  // TODO dynamic
 | 
			
		||||
 | 
			
		||||
    // main event loop
 | 
			
		||||
    while (alive) {
 | 
			
		||||
@@ -45,17 +61,15 @@ void async_thread(void) {
 | 
			
		||||
        listen_queue_t *l = listen;
 | 
			
		||||
        listen = (listen == &listen1) ? &listen2 : &listen1;
 | 
			
		||||
 | 
			
		||||
        // fill fds with newly added
 | 
			
		||||
        for (int i = 0; i < l->n; i++, num_fds++) {
 | 
			
		||||
            fds[num_fds].fd = l->q[i].fd;
 | 
			
		||||
            fds[num_fds].events = l->q[i].events;
 | 
			
		||||
        // fill fds with newly added queue entries
 | 
			
		||||
        for (num_fds = 0; num_fds < l->n; num_fds++) {
 | 
			
		||||
            fds[num_fds].fd = l->q[num_fds].fd;
 | 
			
		||||
            fds[num_fds].events = l->q[num_fds].events;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        int ready_fds = poll(fds, num_fds, -1);
 | 
			
		||||
        if (ready_fds < 0) {
 | 
			
		||||
        if (poll(fds, num_fds, -1) < 0) {
 | 
			
		||||
            if (errno == EINTR) {
 | 
			
		||||
                // interrupt
 | 
			
		||||
                continue;
 | 
			
		||||
            } else {
 | 
			
		||||
                // other error
 | 
			
		||||
                critical("Unable to poll for events");
 | 
			
		||||
@@ -64,7 +78,26 @@ void async_thread(void) {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < num_fds; i++) {
 | 
			
		||||
            // TODO
 | 
			
		||||
            evt_listen_t *e = &l->q[i];
 | 
			
		||||
            if (fds[i].revents & e->events) {
 | 
			
		||||
                // specified event(s) occurred
 | 
			
		||||
                e->cb(e->arg);
 | 
			
		||||
 | 
			
		||||
                if (e->flags & ASYNC_KEEP)
 | 
			
		||||
                    async_add_to_queue(e);
 | 
			
		||||
            } else if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
 | 
			
		||||
                // error occurred
 | 
			
		||||
                e->err_cb(e->err_arg);
 | 
			
		||||
            } else {
 | 
			
		||||
                // no event occurred
 | 
			
		||||
                async_add_to_queue(e);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // reset errno to prevent strange behaviour
 | 
			
		||||
            errno = 0;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // reset size of queue
 | 
			
		||||
        l->n = 0;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										13
									
								
								src/async.h
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								src/async.h
									
									
									
									
									
								
							@@ -1,12 +1,17 @@
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Sesimos - secure, simple, modern web server
 | 
			
		||||
 * @brief Async handler (header file)
 | 
			
		||||
 * @file src/async.h
 | 
			
		||||
 * @author Lorenz Stechauner
 | 
			
		||||
 * @date 2022-12-28
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#ifndef SESIMOS_ASYNC_H
 | 
			
		||||
#define SESIMOS_ASYNC_H
 | 
			
		||||
 | 
			
		||||
#define async_read(fd, cb, arg, err_cb, err_arg) async(fd, 0, 0, cb, arg, err, err_arg)
 | 
			
		||||
#define async_read_keep(fd, cb, arg, err_cb, err_arg) async(fd, 0, 0, cb, arg, err, err_arg)
 | 
			
		||||
#define ASYNC_KEEP 1
 | 
			
		||||
 | 
			
		||||
int async(int fd, int events, int flags, void (*cb)(void *), void *arg, void (*err_cb)(void *), void *err_arg);
 | 
			
		||||
int async(int fd, short events, int flags, void cb(void *), void *arg, void err_cb(void *), void *err_arg);
 | 
			
		||||
 | 
			
		||||
void async_thread(void);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										95
									
								
								src/server.c
									
									
									
									
									
								
							
							
						
						
									
										95
									
								
								src/server.c
									
									
									
									
									
								
							@@ -10,6 +10,7 @@
 | 
			
		||||
#include "server.h"
 | 
			
		||||
#include "client.h"
 | 
			
		||||
#include "logger.h"
 | 
			
		||||
#include "async.h"
 | 
			
		||||
 | 
			
		||||
#include "cache_handler.h"
 | 
			
		||||
#include "lib/config.h"
 | 
			
		||||
@@ -39,7 +40,7 @@ static int sockets[NUM_SOCKETS];
 | 
			
		||||
static pthread_t children[MAX_CHILDREN];
 | 
			
		||||
static SSL_CTX *contexts[CONFIG_MAX_CERT_CONFIG];
 | 
			
		||||
 | 
			
		||||
static client_ctx_t clients[MAX_CLIENTS];
 | 
			
		||||
static client_ctx_t clients[MAX_CLIENTS];  // TODO dynamic
 | 
			
		||||
 | 
			
		||||
static int clean() {
 | 
			
		||||
    remove("/var/sesimos/server/cache");
 | 
			
		||||
@@ -56,12 +57,38 @@ static int ssl_servername_cb(SSL *ssl, int *ad, void *arg) {
 | 
			
		||||
    return SSL_TLSEXT_ERR_OK;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void accept_cb() {
 | 
			
		||||
static void accept_cb(void *arg) {
 | 
			
		||||
    int i = (int) (((int *) arg) - sockets);
 | 
			
		||||
    int fd = sockets[i];
 | 
			
		||||
 | 
			
		||||
    int j;
 | 
			
		||||
    for (j = 0; j < MAX_CHILDREN; j++) {
 | 
			
		||||
        if (children[j] == 0) break;
 | 
			
		||||
    }
 | 
			
		||||
    client_ctx_t *client_ctx = &clients[j];
 | 
			
		||||
    sock *client = &client_ctx->socket;
 | 
			
		||||
 | 
			
		||||
    client->ctx = contexts[0];
 | 
			
		||||
    socklen_t addr_len = sizeof(client->addr);
 | 
			
		||||
    int client_fd = accept(fd, &client->addr.sock, &addr_len);
 | 
			
		||||
    if (client_fd < 0) {
 | 
			
		||||
        critical("Unable to accept connection");
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    client->socket = client_fd;
 | 
			
		||||
    client->enc = (i == 1);
 | 
			
		||||
    pthread_t ret = pthread_create(&children[j], NULL, (void *(*)(void *)) &client_handler, client);
 | 
			
		||||
    if (ret != 0) {
 | 
			
		||||
        errno = (int) ret;
 | 
			
		||||
        critical("Unable to create thread");
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void accept_err_cb() {
 | 
			
		||||
 | 
			
		||||
static void accept_err_cb(void *arg) {
 | 
			
		||||
    int i = (int) (((int *) arg) - sockets);
 | 
			
		||||
    int fd = sockets[i];
 | 
			
		||||
    // TODO accept error callback
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void terminate_forcefully(int sig) {
 | 
			
		||||
@@ -115,9 +142,6 @@ static void terminate_gracefully(int sig) {
 | 
			
		||||
 | 
			
		||||
int main(int argc, char *const argv[]) {
 | 
			
		||||
    const int YES = 1;
 | 
			
		||||
    struct pollfd poll_fds[NUM_SOCKETS];
 | 
			
		||||
    int ready_sockets_num;
 | 
			
		||||
    long client_num = 0;
 | 
			
		||||
    int ret;
 | 
			
		||||
 | 
			
		||||
    memset(sockets, 0, sizeof(sockets));
 | 
			
		||||
@@ -248,66 +272,15 @@ int main(int argc, char *const argv[]) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (int i = 0; i < NUM_SOCKETS; i++) {
 | 
			
		||||
        poll_fds[i].fd = sockets[i];
 | 
			
		||||
        poll_fds[i].events = POLLIN;
 | 
			
		||||
        async(sockets[i], POLLIN, ASYNC_KEEP, accept_cb, &sockets[i], accept_err_cb, &sockets[i]);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    errno = 0;
 | 
			
		||||
    notice("Ready to accept connections");
 | 
			
		||||
 | 
			
		||||
    while (alive) {
 | 
			
		||||
        ready_sockets_num = poll(poll_fds, NUM_SOCKETS, 1000);
 | 
			
		||||
        if (ready_sockets_num < 0) {
 | 
			
		||||
            critical("Unable to poll sockets");
 | 
			
		||||
            terminate_gracefully(0);
 | 
			
		||||
            return 1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < NUM_SOCKETS; i++) {
 | 
			
		||||
            if (poll_fds[i].revents & POLLIN) {
 | 
			
		||||
                int j;
 | 
			
		||||
                for (j = 0; j < MAX_CHILDREN; j++) {
 | 
			
		||||
                    if (children[j] == 0) break;
 | 
			
		||||
                }
 | 
			
		||||
                client_ctx_t *client_ctx = &clients[j];
 | 
			
		||||
                sock *client = &client_ctx->socket;
 | 
			
		||||
 | 
			
		||||
                client->ctx = contexts[0];
 | 
			
		||||
                socklen_t addr_len = sizeof(client->addr);
 | 
			
		||||
                int client_fd = accept(sockets[i], &client->addr.sock, &addr_len);
 | 
			
		||||
                if (client_fd < 0) {
 | 
			
		||||
                    critical("Unable to accept connection");
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                client->socket = client_fd;
 | 
			
		||||
                client->enc = (i == 1);
 | 
			
		||||
                ret = pthread_create(&children[j], NULL, (void *(*)(void *)) &client_handler, client);
 | 
			
		||||
                if (ret != 0) {
 | 
			
		||||
                    errno = ret;
 | 
			
		||||
                    critical("Unable to create child process");
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                client_num++;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // TODO outsource in thread
 | 
			
		||||
        /*
 | 
			
		||||
        void *ret_val = NULL;
 | 
			
		||||
        for (int i = 0; i < MAX_CHILDREN; i++) {
 | 
			
		||||
            if (children[i] != 0) {
 | 
			
		||||
                ret = pthread_timed(children[i], &ret_val);
 | 
			
		||||
                if (ret < 0) {
 | 
			
		||||
                    critical("Unable to wait for thread (PID %i)", children[i]);
 | 
			
		||||
                } else if (ret == children[i]) {
 | 
			
		||||
                    children[i] = 0;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        */
 | 
			
		||||
    }
 | 
			
		||||
    async_thread();
 | 
			
		||||
 | 
			
		||||
    // cleanup
 | 
			
		||||
    geoip_free();
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user