diff --git a/src/lib/proxy.c b/src/lib/proxy.c index 26c5874..0f2adbb 100644 --- a/src/lib/proxy.c +++ b/src/lib/proxy.c @@ -141,6 +141,7 @@ proxy_ctx_t *proxy_get_by_conf(host_config_t *conf) { void proxy_unlock_ctx(proxy_ctx_t *ctx) { int n = (int) ((ctx - proxies) / MAX_PROXY_CNX_PER_HOST); ctx->in_use = 0; + ctx->client = NULL; sem_post(&available[n]); } @@ -567,62 +568,9 @@ int proxy_init(proxy_ctx_t **proxy_ptr, http_req *req, http_res *res, http_statu } int proxy_send(proxy_ctx_t *proxy, sock *client, unsigned long len_to_send, int flags) { - char buffer[CHUNK_SIZE], *ptr; - long ret = 0, snd_len; - - do { - snd_len = 0; - if (flags & PROXY_CHUNKED) { - ret = sock_recv_chunk_header(&proxy->proxy); - if (ret < 0) { - if (ret == -1) { - error("Unable to receive from server: Malformed chunk header"); - } else { - error("Unable to receive from server"); - } - break; - } - - len_to_send = ret; - ret = 1; - } - while (snd_len < len_to_send) { - ret = sock_recv(&proxy->proxy, buffer, CHUNK_SIZE < (len_to_send - snd_len) ? CHUNK_SIZE : len_to_send - snd_len, 0); - if (ret <= 0) { - error("Unable to receive from server"); - break; - } - ptr = buffer; - long buf_len = ret; - ret = 1; - - if (flags & PROXY_CHUNKED) ret = sock_send_chunk_header(client, buf_len); - if (ret <= 0) goto err; - - ret = sock_send_x(client, ptr, buf_len, 0); - if (ret <= 0) goto err; - snd_len += ret; - - if (flags & PROXY_CHUNKED) ret = sock_send_chunk_trailer(client); - if (ret <= 0) { - err: - error("Unable to send"); - break; - } - } - if (ret <= 0) break; - if (flags & PROXY_CHUNKED) if ((ret = sock_recv_chunk_trailer(&proxy->proxy)) == -1) break; - } while ((flags & PROXY_CHUNKED) && len_to_send > 0); - - if (ret <= 0) return -1; - - if (flags & PROXY_CHUNKED) { - if (sock_send_last_chunk(client) == -1) { - error("Unable to send"); - return -1; - } - } - + char buffer[CHUNK_SIZE]; + if (sock_splice(client, &proxy->proxy, buffer, sizeof(buffer), len_to_send) == -1) + return -1; return 0; } diff --git a/src/lib/sock.c b/src/lib/sock.c index ee72b6e..275e819 100644 --- a/src/lib/sock.c +++ b/src/lib/sock.c @@ -252,31 +252,26 @@ long sock_splice_chunked(sock *dst, sock *src, void *buf, unsigned long buf_len, unsigned long send_len = 0, next_len; do { - ret = sock_recv_chunk_header(src); - if (ret < 0) { - errno = EPROTO; - return -2; - } + if ((ret = sock_recv_chunk_header(src)) == -1) + return -1; next_len = ret; - if (flags & SOCK_CHUNKED) { + if (flags & SOCK_CHUNKED) if (sock_send_chunk_header(dst, next_len) == -1) return -1; - } - if ((ret = sock_splice(dst, src, buf, buf_len, next_len)) < 0) + if ((ret = sock_splice(dst, src, buf, buf_len, next_len)) == -1) return ret; send_len += ret; - if (flags & SOCK_CHUNKED) { - if (sock_send_chunk_trailer(dst) == -1) - return -1; - } - if (sock_recv_chunk_trailer(src) == -1) return -1; + + if (flags & SOCK_CHUNKED) + if (sock_send_chunk_trailer(dst) == -1) + return -1; } while (!(flags & SOCK_SINGLE_CHUNK) && next_len != 0); return (long) send_len; @@ -364,7 +359,7 @@ int sock_recv_chunk_trailer(sock *s) { if (sock_recv_x(s, buf, sizeof(buf), MSG_PEEK) == -1) return -1; - if (buf[0] != '\r' || buf[1] == '\n') { + if (buf[0] != '\r' || buf[1] != '\n') { errno = EPROTO; return -1; } diff --git a/src/server.c b/src/server.c index 89295e9..164876a 100644 --- a/src/server.c +++ b/src/server.c @@ -198,12 +198,13 @@ static void terminate_gracefully(int sig) { workers_stop(); workers_destroy(); - while (list_size(clients) > 0) - tcp_close(clients[0]); - logger_set_prefix(""); proxy_close_all(); + while (list_size(clients) > 0) + tcp_close(clients[0]); + logger_set_prefix(""); + async_stop(); } diff --git a/src/worker/chunk_handler.c b/src/worker/chunk_handler.c index be102af..acd0f95 100644 --- a/src/worker/chunk_handler.c +++ b/src/worker/chunk_handler.c @@ -21,7 +21,7 @@ void chunk_handler_func(chunk_ctx_t *ctx) { // error error("Unable to splice chunk"); errno = 0; - ctx->err_cb(ctx->client); + ctx->err_cb(ctx); } else if (sent == 0) { // last chunk ctx->client->chunks_transferred = 1; diff --git a/src/worker/func.h b/src/worker/func.h index 7692397..bebae25 100644 --- a/src/worker/func.h +++ b/src/worker/func.h @@ -77,4 +77,6 @@ int ws_handle_connection(client_ctx_t *ctx); void ws_close(ws_ctx_t *ctx); +int handle_chunks(client_ctx_t *ctx, sock *socket, int flags, void (*next_cb)(chunk_ctx_t *), void (*err_cb)(chunk_ctx_t *)); + #endif //SESIMOS_FUNC_H diff --git a/src/worker/proxy_handler.c b/src/worker/proxy_handler.c index 5ea066d..2077e13 100644 --- a/src/worker/proxy_handler.c +++ b/src/worker/proxy_handler.c @@ -28,14 +28,15 @@ void proxy_handler_func(client_ctx_t *ctx) { if (ret == 1) { proxy_unlock_ctx(ctx->proxy); - ctx->proxy->client = NULL; ctx->proxy = NULL; } else if (ctx->use_proxy == 0) { proxy_close(ctx->proxy); } else if (ctx->use_proxy == 1) { - proxy_handler_2(ctx); + if (proxy_handler_2(ctx) == 1) { + // chunked + return; + } proxy_unlock_ctx(ctx->proxy); - ctx->proxy->client = NULL; ctx->proxy = NULL; } else if (ctx->use_proxy == 2) { // WebSocket @@ -107,6 +108,19 @@ static int proxy_handler_1(client_ctx_t *ctx) { return streq(ctx->req.method, "HEAD") ? 1 : 0; } +static void proxy_chunk_next_cb(chunk_ctx_t *ctx) { + proxy_unlock_ctx(ctx->client->proxy); + ctx->client->proxy = NULL; + + request_complete(ctx->client); + handle_request(ctx->client); +} + +static void proxy_chunk_err_cb(chunk_ctx_t *ctx) { + ctx->client->c_keep_alive = 0; + proxy_chunk_next_cb(ctx); +} + static int proxy_handler_2(client_ctx_t *ctx) { const char *transfer_encoding = http_get_header_field(&ctx->res.hdr, "Transfer-Encoding"); int chunked = strcontains(transfer_encoding, "chunked"); @@ -114,10 +128,13 @@ static int proxy_handler_2(client_ctx_t *ctx) { const char *content_len = http_get_header_field(&ctx->res.hdr, "Content-Length"); unsigned long len_to_send = (content_len != NULL) ? strtol(content_len, NULL, 10) : 0; - int flags = (chunked ? PROXY_CHUNKED : 0); - int ret = proxy_send(ctx->proxy, &ctx->socket, len_to_send, flags); + if (chunked) { + handle_chunks(ctx, &ctx->proxy->proxy, SOCK_CHUNKED, proxy_chunk_next_cb, proxy_chunk_err_cb); + return 1; + } - if (ret < 0) { + int ret; + if ((ret = proxy_send(ctx->proxy, &ctx->socket, len_to_send, 0)) == -1) { ctx->c_keep_alive = 0; }