diff --git a/Makefile b/Makefile index f708de9..7c036af 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,7 @@ bin/res/%.txt: res/%.* bin/sesimos: bin/server.o bin/logger.o bin/cache_handler.o bin/async.o bin/workers.o \ bin/worker/request_handler.o bin/worker/tcp_acceptor.o \ bin/worker/fastcgi_handler.o bin/worker/local_handler.o bin/worker/proxy_handler.o \ + bin/worker/ws_frame_handler.o \ bin/lib/http_static.o bin/res/default.o bin/res/proxy.o bin/res/style.o \ bin/res/icon_error.o bin/res/icon_info.o bin/res/icon_success.o bin/res/icon_warning.o \ bin/res/globe.o \ @@ -88,6 +89,8 @@ bin/worker/local_handler.o: src/worker/func.h bin/worker/proxy_handler.o: src/worker/func.h +bin/worker/ws_frame_handler.o: src/worker/func.h + bin/lib/compress.o: src/lib/compress.h bin/lib/config.o: src/lib/config.h src/lib/utils.h src/lib/uri.h src/logger.h diff --git a/src/lib/websocket.c b/src/lib/websocket.c index bbb4e46..41ef49b 100644 --- a/src/lib/websocket.c +++ b/src/lib/websocket.c @@ -6,22 +6,14 @@ * @date 2022-08-16 */ -#include "../defs.h" #include "../logger.h" #include "websocket.h" #include "utils.h" #include #include -#include -#include static const char ws_key_uuid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; -static volatile sig_atomic_t terminate = 0; - -void ws_terminate(int _) { - terminate = 1; -} int ws_calc_accept_key(const char *key, char *accept_key) { if (key == NULL || accept_key == NULL) @@ -144,65 +136,3 @@ int ws_send_frame_header(sock *s, ws_frame *frame) { return 0; } - -int ws_handle_connection(sock *s1, sock *s2) { - sock *poll_socks[2] = {s1, s2}; - sock *readable[2], *error[2]; - int n_sock = 2, n_readable = 0, n_error = 0; - ws_frame frame; - char buf[CHUNK_SIZE]; - int closes = 0; - long ret; - - signal(SIGINT, ws_terminate); - signal(SIGTERM, ws_terminate); - - while (!terminate && closes != 3) { - ret = sock_poll_read(poll_socks, readable, error, n_sock, &n_readable, &n_error, WS_TIMEOUT * 1000); - if (terminate) { - break; - } else if (ret < 0) { - error("Unable to poll sockets"); - return -1; - } else if (n_readable == 0) { - error("Connection timed out"); - return -2; - } else if (n_error > 0) { - error("Peer closed connection"); - return -3; - } - - for (int i = 0; i < n_readable; i++) { - sock *s = readable[i]; - sock *o = (s == s1) ? s2 : s1; - if (ws_recv_frame_header(s, &frame) != 0) return -3; - - // debug("WebSocket: Peer %s, Opcode=0x%X, Len=%li", (s == s1) ? "1" : "2", frame.opcode, frame.len); - - if (frame.opcode == 0x8) { - n_sock--; - if (s == s1) { - poll_socks[0] = s2; - closes |= 1; - } else { - closes |= 2; - } - } - - if (ws_send_frame_header(o, &frame) != 0) return -3; - - if (frame.len > 0) { - ret = sock_splice(o, s, buf, sizeof(buf), frame.len); - if (ret < 0) { - error("Unable to forward data in WebSocket"); - return -4; - } else if (ret != frame.len) { - error("Unable to forward correct number of bytes in WebSocket"); - return -4; - } - } - } - } - - return 0; -} diff --git a/src/lib/websocket.h b/src/lib/websocket.h index f67b3b4..3772967 100644 --- a/src/lib/websocket.h +++ b/src/lib/websocket.h @@ -30,6 +30,4 @@ int ws_recv_frame_header(sock *s, ws_frame *frame); int ws_send_frame_header(sock *s, ws_frame *frame); -int ws_handle_connection(sock *s1, sock *s2); - #endif //SESIMOS_WEBSOCKET_H diff --git a/src/worker/func.h b/src/worker/func.h index 83a7b79..2af7f74 100644 --- a/src/worker/func.h +++ b/src/worker/func.h @@ -38,7 +38,8 @@ typedef struct { typedef struct { client_ctx_t *client; - sock *s1, *s2, *s, *r; + sock *socket; + void *other; } ws_ctx_t; void tcp_acceptor_func(client_ctx_t *ctx); @@ -61,4 +62,8 @@ void tcp_close(client_ctx_t *ctx); void proxy_close(proxy_ctx_t *ctx); +int ws_handle_connection(client_ctx_t *ctx); + +void ws_close(ws_ctx_t *ctx); + #endif //SESIMOS_FUNC_H diff --git a/src/worker/proxy_handler.c b/src/worker/proxy_handler.c index 840dfc2..3fb3570 100644 --- a/src/worker/proxy_handler.c +++ b/src/worker/proxy_handler.c @@ -27,23 +27,16 @@ void proxy_handler_func(client_ctx_t *ctx) { if (ctx->use_proxy == 0) { proxy_close(ctx->proxy); - request_complete(ctx); - handle_request(ctx); } else if (ctx->use_proxy == 1) { proxy_handler_2(ctx); - request_complete(ctx); } else if (ctx->use_proxy == 2) { // WebSocket - sock_set_timeout(&ctx->socket, WS_TIMEOUT); - sock_set_timeout(&ctx->proxy->proxy, WS_TIMEOUT); - info("Upgrading connection to WebSocket connection"); - if (ws_handle_connection(&ctx->socket, &ctx->proxy->proxy) != 0) { - ctx->c_keep_alive = 0; - proxy_close(ctx->proxy); - } - info("WebSocket connection closed"); + ws_handle_connection(ctx); return; } + + request_complete(ctx); + handle_request(ctx); } static int proxy_handler_1(client_ctx_t *ctx) { diff --git a/src/worker/ws_frame_handler.c b/src/worker/ws_frame_handler.c index deb4616..4072f7b 100644 --- a/src/worker/ws_frame_handler.c +++ b/src/worker/ws_frame_handler.c @@ -6,15 +6,89 @@ * @date 2022-12-30 */ +#include "../defs.h" #include "func.h" +#include "../logger.h" +#include "../lib/websocket.h" +#include "../workers.h" +#include static int ws_frame_handler(ws_ctx_t *ctx); void ws_frame_handler_func(ws_ctx_t *ctx) { + logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->client->socket.s_addr, ctx->client->log_prefix); + if (ws_frame_handler(ctx) == 0) { + if (ctx->client->ws_close == 3) { + ws_close(ctx); + } else { + ws_handle_frame(ctx); + } + } else { + ws_close(ctx); + } +} + +int ws_handle_connection(client_ctx_t *ctx) { + info("Upgrading to WebSocket connection"); + sock_set_timeout(&ctx->socket, WS_TIMEOUT); + sock_set_timeout(&ctx->proxy->proxy, WS_TIMEOUT); + + ws_ctx_t *a = malloc(sizeof(ws_ctx_t)); + ws_ctx_t *b = malloc(sizeof(ws_ctx_t)); + + a->other = b, b->other = a; + a->client = ctx, b->client = ctx; + a->socket = &ctx->socket, b->socket = &ctx->proxy->proxy; + + ws_handle_frame(a); + ws_handle_frame(b); + + return 0; } static int ws_frame_handler(ws_ctx_t *ctx) { - // TODO WebSocket Frame Handler + ws_frame frame; + char buf[CHUNK_SIZE]; + + sock *socket = ctx->socket; + sock *other = (ctx->socket == &ctx->client->socket) ? &ctx->client->proxy->proxy : &ctx->client->socket; + + if (ws_recv_frame_header(socket, &frame) != 0) + return -1; + + debug("WebSocket: Peer %s, Opcode=0x%X, Len=%li", (ctx->socket == &ctx->client->socket) ? "1" : "2", frame.opcode, frame.len); + + if (frame.opcode == 0x8) { + ctx->client->ws_close |= (ctx->socket == &ctx->client->socket) ? 1 : 2; + } + + if (ws_send_frame_header(other, &frame) != 0) + return -1; + + if (frame.len > 0) { + long ret = sock_splice(other, socket, buf, sizeof(buf), frame.len); + if (ret < 0) { + error("Unable to forward data in WebSocket"); + return -1; + } else if (ret != frame.len) { + error("Unable to forward correct number of bytes in WebSocket"); + return -1; + } + } + return 0; } + +void ws_close(ws_ctx_t *ctx) { + ws_ctx_t *other = ctx->other; + if (other) { + other->other = NULL; + logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->client->socket.s_addr, ctx->client->log_prefix); + info("Closing WebSocket connection"); + proxy_close(ctx->client->proxy); + tcp_close(ctx->client); + } + free(ctx); + errno = 0; +} diff --git a/src/workers.c b/src/workers.c index 889cf17..34c62ce 100644 --- a/src/workers.c +++ b/src/workers.c @@ -12,15 +12,16 @@ #include "worker/func.h" #include "async.h" -static mpmc_t tcp_acceptor_ctx, request_handler_ctx, - local_handler_ctx, fastcgi_handler_cxt, proxy_handler_ctx; +static mpmc_t tcp_acceptor_ctx, request_handler_ctx, local_handler_ctx, fastcgi_handler_cxt, proxy_handler_ctx, + ws_frame_handler_ctx; int workers_init(void) { - mpmc_init(&tcp_acceptor_ctx, 8, 64, (void (*)(void *)) tcp_acceptor_func, "tcp"); - mpmc_init(&request_handler_ctx, 16, 64, (void (*)(void *)) request_handler_func, "req"); - mpmc_init(&local_handler_ctx, 16, 64, (void (*)(void *)) local_handler_func, "local"); - mpmc_init(&fastcgi_handler_cxt, 16, 64, (void (*)(void *)) fastcgi_handler_func, "fcgi"); - mpmc_init(&proxy_handler_ctx, 16, 64, (void (*)(void *)) proxy_handler_func, "proxy"); + mpmc_init(&tcp_acceptor_ctx, 8, 64, (void (*)(void *)) tcp_acceptor_func, "tcp"); + mpmc_init(&request_handler_ctx, 16, 64, (void (*)(void *)) request_handler_func, "req"); + mpmc_init(&local_handler_ctx, 16, 64, (void (*)(void *)) local_handler_func, "local"); + mpmc_init(&fastcgi_handler_cxt, 16, 64, (void (*)(void *)) fastcgi_handler_func, "fcgi"); + mpmc_init(&proxy_handler_ctx, 16, 64, (void (*)(void *)) proxy_handler_func, "proxy"); + mpmc_init(&ws_frame_handler_ctx, 16, 64, (void (*)(void *)) ws_frame_handler_func, "ws"); return -1; } @@ -30,6 +31,7 @@ void workers_stop(void) { mpmc_stop(&fastcgi_handler_cxt); mpmc_stop(&proxy_handler_ctx); mpmc_stop(&request_handler_ctx); + mpmc_stop(&ws_frame_handler_ctx); } void workers_destroy(void) { @@ -38,6 +40,7 @@ void workers_destroy(void) { mpmc_destroy(&fastcgi_handler_cxt); mpmc_destroy(&proxy_handler_ctx); mpmc_destroy(&request_handler_ctx); + mpmc_destroy(&ws_frame_handler_ctx); } int tcp_accept(client_ctx_t *ctx) { @@ -68,3 +71,11 @@ int fastcgi_handle(client_ctx_t *ctx) { int proxy_handle(client_ctx_t *ctx) { return mpmc_queue(&proxy_handler_ctx, ctx); } + +static int ws_handle_frame_cb(ws_ctx_t *ctx) { + return mpmc_queue(&ws_frame_handler_ctx, ctx); +} + +int ws_handle_frame(ws_ctx_t *ctx) { + return async(ctx->socket, POLLIN, 0, (void (*)(void *)) ws_handle_frame_cb, ctx, (void (*)(void *)) ws_close, ctx); +} diff --git a/src/workers.h b/src/workers.h index dda7138..ad3c8e7 100644 --- a/src/workers.h +++ b/src/workers.h @@ -27,4 +27,6 @@ int fastcgi_handle(client_ctx_t *ctx); int proxy_handle(client_ctx_t *ctx); +int ws_handle_frame(ws_ctx_t *ctx); + #endif //SESIMOS_WORKERS_H