WebSocket async working
This commit is contained in:
3
Makefile
3
Makefile
@ -58,6 +58,7 @@ bin/res/%.txt: res/%.*
|
||||
bin/sesimos: bin/server.o bin/logger.o bin/cache_handler.o bin/async.o bin/workers.o \
|
||||
bin/worker/request_handler.o bin/worker/tcp_acceptor.o \
|
||||
bin/worker/fastcgi_handler.o bin/worker/local_handler.o bin/worker/proxy_handler.o \
|
||||
bin/worker/ws_frame_handler.o \
|
||||
bin/lib/http_static.o bin/res/default.o bin/res/proxy.o bin/res/style.o \
|
||||
bin/res/icon_error.o bin/res/icon_info.o bin/res/icon_success.o bin/res/icon_warning.o \
|
||||
bin/res/globe.o \
|
||||
@ -88,6 +89,8 @@ bin/worker/local_handler.o: src/worker/func.h
|
||||
|
||||
bin/worker/proxy_handler.o: src/worker/func.h
|
||||
|
||||
bin/worker/ws_frame_handler.o: src/worker/func.h
|
||||
|
||||
bin/lib/compress.o: src/lib/compress.h
|
||||
|
||||
bin/lib/config.o: src/lib/config.h src/lib/utils.h src/lib/uri.h src/logger.h
|
||||
|
@ -6,22 +6,14 @@
|
||||
* @date 2022-08-16
|
||||
*/
|
||||
|
||||
#include "../defs.h"
|
||||
#include "../logger.h"
|
||||
#include "websocket.h"
|
||||
#include "utils.h"
|
||||
|
||||
#include <string.h>
|
||||
#include <openssl/sha.h>
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
|
||||
static const char ws_key_uuid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
||||
static volatile sig_atomic_t terminate = 0;
|
||||
|
||||
void ws_terminate(int _) {
|
||||
terminate = 1;
|
||||
}
|
||||
|
||||
int ws_calc_accept_key(const char *key, char *accept_key) {
|
||||
if (key == NULL || accept_key == NULL)
|
||||
@ -144,65 +136,3 @@ int ws_send_frame_header(sock *s, ws_frame *frame) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ws_handle_connection(sock *s1, sock *s2) {
|
||||
sock *poll_socks[2] = {s1, s2};
|
||||
sock *readable[2], *error[2];
|
||||
int n_sock = 2, n_readable = 0, n_error = 0;
|
||||
ws_frame frame;
|
||||
char buf[CHUNK_SIZE];
|
||||
int closes = 0;
|
||||
long ret;
|
||||
|
||||
signal(SIGINT, ws_terminate);
|
||||
signal(SIGTERM, ws_terminate);
|
||||
|
||||
while (!terminate && closes != 3) {
|
||||
ret = sock_poll_read(poll_socks, readable, error, n_sock, &n_readable, &n_error, WS_TIMEOUT * 1000);
|
||||
if (terminate) {
|
||||
break;
|
||||
} else if (ret < 0) {
|
||||
error("Unable to poll sockets");
|
||||
return -1;
|
||||
} else if (n_readable == 0) {
|
||||
error("Connection timed out");
|
||||
return -2;
|
||||
} else if (n_error > 0) {
|
||||
error("Peer closed connection");
|
||||
return -3;
|
||||
}
|
||||
|
||||
for (int i = 0; i < n_readable; i++) {
|
||||
sock *s = readable[i];
|
||||
sock *o = (s == s1) ? s2 : s1;
|
||||
if (ws_recv_frame_header(s, &frame) != 0) return -3;
|
||||
|
||||
// debug("WebSocket: Peer %s, Opcode=0x%X, Len=%li", (s == s1) ? "1" : "2", frame.opcode, frame.len);
|
||||
|
||||
if (frame.opcode == 0x8) {
|
||||
n_sock--;
|
||||
if (s == s1) {
|
||||
poll_socks[0] = s2;
|
||||
closes |= 1;
|
||||
} else {
|
||||
closes |= 2;
|
||||
}
|
||||
}
|
||||
|
||||
if (ws_send_frame_header(o, &frame) != 0) return -3;
|
||||
|
||||
if (frame.len > 0) {
|
||||
ret = sock_splice(o, s, buf, sizeof(buf), frame.len);
|
||||
if (ret < 0) {
|
||||
error("Unable to forward data in WebSocket");
|
||||
return -4;
|
||||
} else if (ret != frame.len) {
|
||||
error("Unable to forward correct number of bytes in WebSocket");
|
||||
return -4;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -30,6 +30,4 @@ int ws_recv_frame_header(sock *s, ws_frame *frame);
|
||||
|
||||
int ws_send_frame_header(sock *s, ws_frame *frame);
|
||||
|
||||
int ws_handle_connection(sock *s1, sock *s2);
|
||||
|
||||
#endif //SESIMOS_WEBSOCKET_H
|
||||
|
@ -38,7 +38,8 @@ typedef struct {
|
||||
|
||||
typedef struct {
|
||||
client_ctx_t *client;
|
||||
sock *s1, *s2, *s, *r;
|
||||
sock *socket;
|
||||
void *other;
|
||||
} ws_ctx_t;
|
||||
|
||||
void tcp_acceptor_func(client_ctx_t *ctx);
|
||||
@ -61,4 +62,8 @@ void tcp_close(client_ctx_t *ctx);
|
||||
|
||||
void proxy_close(proxy_ctx_t *ctx);
|
||||
|
||||
int ws_handle_connection(client_ctx_t *ctx);
|
||||
|
||||
void ws_close(ws_ctx_t *ctx);
|
||||
|
||||
#endif //SESIMOS_FUNC_H
|
||||
|
@ -27,23 +27,16 @@ void proxy_handler_func(client_ctx_t *ctx) {
|
||||
|
||||
if (ctx->use_proxy == 0) {
|
||||
proxy_close(ctx->proxy);
|
||||
request_complete(ctx);
|
||||
handle_request(ctx);
|
||||
} else if (ctx->use_proxy == 1) {
|
||||
proxy_handler_2(ctx);
|
||||
request_complete(ctx);
|
||||
} else if (ctx->use_proxy == 2) {
|
||||
// WebSocket
|
||||
sock_set_timeout(&ctx->socket, WS_TIMEOUT);
|
||||
sock_set_timeout(&ctx->proxy->proxy, WS_TIMEOUT);
|
||||
info("Upgrading connection to WebSocket connection");
|
||||
if (ws_handle_connection(&ctx->socket, &ctx->proxy->proxy) != 0) {
|
||||
ctx->c_keep_alive = 0;
|
||||
proxy_close(ctx->proxy);
|
||||
}
|
||||
info("WebSocket connection closed");
|
||||
ws_handle_connection(ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
request_complete(ctx);
|
||||
handle_request(ctx);
|
||||
}
|
||||
|
||||
static int proxy_handler_1(client_ctx_t *ctx) {
|
||||
|
@ -6,15 +6,89 @@
|
||||
* @date 2022-12-30
|
||||
*/
|
||||
|
||||
#include "../defs.h"
|
||||
#include "func.h"
|
||||
#include "../logger.h"
|
||||
#include "../lib/websocket.h"
|
||||
#include "../workers.h"
|
||||
#include <errno.h>
|
||||
|
||||
static int ws_frame_handler(ws_ctx_t *ctx);
|
||||
|
||||
void ws_frame_handler_func(ws_ctx_t *ctx) {
|
||||
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->client->socket.s_addr, ctx->client->log_prefix);
|
||||
|
||||
if (ws_frame_handler(ctx) == 0) {
|
||||
if (ctx->client->ws_close == 3) {
|
||||
ws_close(ctx);
|
||||
} else {
|
||||
ws_handle_frame(ctx);
|
||||
}
|
||||
} else {
|
||||
ws_close(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
int ws_handle_connection(client_ctx_t *ctx) {
|
||||
info("Upgrading to WebSocket connection");
|
||||
sock_set_timeout(&ctx->socket, WS_TIMEOUT);
|
||||
sock_set_timeout(&ctx->proxy->proxy, WS_TIMEOUT);
|
||||
|
||||
ws_ctx_t *a = malloc(sizeof(ws_ctx_t));
|
||||
ws_ctx_t *b = malloc(sizeof(ws_ctx_t));
|
||||
|
||||
a->other = b, b->other = a;
|
||||
a->client = ctx, b->client = ctx;
|
||||
a->socket = &ctx->socket, b->socket = &ctx->proxy->proxy;
|
||||
|
||||
ws_handle_frame(a);
|
||||
ws_handle_frame(b);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int ws_frame_handler(ws_ctx_t *ctx) {
|
||||
// TODO WebSocket Frame Handler
|
||||
ws_frame frame;
|
||||
char buf[CHUNK_SIZE];
|
||||
|
||||
sock *socket = ctx->socket;
|
||||
sock *other = (ctx->socket == &ctx->client->socket) ? &ctx->client->proxy->proxy : &ctx->client->socket;
|
||||
|
||||
if (ws_recv_frame_header(socket, &frame) != 0)
|
||||
return -1;
|
||||
|
||||
debug("WebSocket: Peer %s, Opcode=0x%X, Len=%li", (ctx->socket == &ctx->client->socket) ? "1" : "2", frame.opcode, frame.len);
|
||||
|
||||
if (frame.opcode == 0x8) {
|
||||
ctx->client->ws_close |= (ctx->socket == &ctx->client->socket) ? 1 : 2;
|
||||
}
|
||||
|
||||
if (ws_send_frame_header(other, &frame) != 0)
|
||||
return -1;
|
||||
|
||||
if (frame.len > 0) {
|
||||
long ret = sock_splice(other, socket, buf, sizeof(buf), frame.len);
|
||||
if (ret < 0) {
|
||||
error("Unable to forward data in WebSocket");
|
||||
return -1;
|
||||
} else if (ret != frame.len) {
|
||||
error("Unable to forward correct number of bytes in WebSocket");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void ws_close(ws_ctx_t *ctx) {
|
||||
ws_ctx_t *other = ctx->other;
|
||||
if (other) {
|
||||
other->other = NULL;
|
||||
logger_set_prefix("[%*s]%s", INET6_ADDRSTRLEN, ctx->client->socket.s_addr, ctx->client->log_prefix);
|
||||
info("Closing WebSocket connection");
|
||||
proxy_close(ctx->client->proxy);
|
||||
tcp_close(ctx->client);
|
||||
}
|
||||
free(ctx);
|
||||
errno = 0;
|
||||
}
|
||||
|
@ -12,8 +12,8 @@
|
||||
#include "worker/func.h"
|
||||
#include "async.h"
|
||||
|
||||
static mpmc_t tcp_acceptor_ctx, request_handler_ctx,
|
||||
local_handler_ctx, fastcgi_handler_cxt, proxy_handler_ctx;
|
||||
static mpmc_t tcp_acceptor_ctx, request_handler_ctx, local_handler_ctx, fastcgi_handler_cxt, proxy_handler_ctx,
|
||||
ws_frame_handler_ctx;
|
||||
|
||||
int workers_init(void) {
|
||||
mpmc_init(&tcp_acceptor_ctx, 8, 64, (void (*)(void *)) tcp_acceptor_func, "tcp");
|
||||
@ -21,6 +21,7 @@ int workers_init(void) {
|
||||
mpmc_init(&local_handler_ctx, 16, 64, (void (*)(void *)) local_handler_func, "local");
|
||||
mpmc_init(&fastcgi_handler_cxt, 16, 64, (void (*)(void *)) fastcgi_handler_func, "fcgi");
|
||||
mpmc_init(&proxy_handler_ctx, 16, 64, (void (*)(void *)) proxy_handler_func, "proxy");
|
||||
mpmc_init(&ws_frame_handler_ctx, 16, 64, (void (*)(void *)) ws_frame_handler_func, "ws");
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -30,6 +31,7 @@ void workers_stop(void) {
|
||||
mpmc_stop(&fastcgi_handler_cxt);
|
||||
mpmc_stop(&proxy_handler_ctx);
|
||||
mpmc_stop(&request_handler_ctx);
|
||||
mpmc_stop(&ws_frame_handler_ctx);
|
||||
}
|
||||
|
||||
void workers_destroy(void) {
|
||||
@ -38,6 +40,7 @@ void workers_destroy(void) {
|
||||
mpmc_destroy(&fastcgi_handler_cxt);
|
||||
mpmc_destroy(&proxy_handler_ctx);
|
||||
mpmc_destroy(&request_handler_ctx);
|
||||
mpmc_destroy(&ws_frame_handler_ctx);
|
||||
}
|
||||
|
||||
int tcp_accept(client_ctx_t *ctx) {
|
||||
@ -68,3 +71,11 @@ int fastcgi_handle(client_ctx_t *ctx) {
|
||||
int proxy_handle(client_ctx_t *ctx) {
|
||||
return mpmc_queue(&proxy_handler_ctx, ctx);
|
||||
}
|
||||
|
||||
static int ws_handle_frame_cb(ws_ctx_t *ctx) {
|
||||
return mpmc_queue(&ws_frame_handler_ctx, ctx);
|
||||
}
|
||||
|
||||
int ws_handle_frame(ws_ctx_t *ctx) {
|
||||
return async(ctx->socket, POLLIN, 0, (void (*)(void *)) ws_handle_frame_cb, ctx, (void (*)(void *)) ws_close, ctx);
|
||||
}
|
||||
|
@ -27,4 +27,6 @@ int fastcgi_handle(client_ctx_t *ctx);
|
||||
|
||||
int proxy_handle(client_ctx_t *ctx);
|
||||
|
||||
int ws_handle_frame(ws_ctx_t *ctx);
|
||||
|
||||
#endif //SESIMOS_WORKERS_H
|
||||
|
Reference in New Issue
Block a user