Allow proxies to delay chunks
This commit is contained in:
		@@ -141,6 +141,7 @@ proxy_ctx_t *proxy_get_by_conf(host_config_t *conf) {
 | 
			
		||||
void proxy_unlock_ctx(proxy_ctx_t *ctx) {
 | 
			
		||||
    int n = (int) ((ctx - proxies) / MAX_PROXY_CNX_PER_HOST);
 | 
			
		||||
    ctx->in_use = 0;
 | 
			
		||||
    ctx->client = NULL;
 | 
			
		||||
    sem_post(&available[n]);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -567,62 +568,9 @@ int proxy_init(proxy_ctx_t **proxy_ptr, http_req *req, http_res *res, http_statu
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int proxy_send(proxy_ctx_t *proxy, sock *client, unsigned long len_to_send, int flags) {
 | 
			
		||||
    char buffer[CHUNK_SIZE], *ptr;
 | 
			
		||||
    long ret = 0, snd_len;
 | 
			
		||||
 | 
			
		||||
    do {
 | 
			
		||||
        snd_len = 0;
 | 
			
		||||
        if (flags & PROXY_CHUNKED) {
 | 
			
		||||
            ret = sock_recv_chunk_header(&proxy->proxy);
 | 
			
		||||
            if (ret < 0) {
 | 
			
		||||
                if (ret == -1) {
 | 
			
		||||
                    error("Unable to receive from server: Malformed chunk header");
 | 
			
		||||
                } else {
 | 
			
		||||
                    error("Unable to receive from server");
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            len_to_send = ret;
 | 
			
		||||
            ret = 1;
 | 
			
		||||
        }
 | 
			
		||||
        while (snd_len < len_to_send) {
 | 
			
		||||
            ret = sock_recv(&proxy->proxy, buffer, CHUNK_SIZE < (len_to_send - snd_len) ? CHUNK_SIZE : len_to_send - snd_len, 0);
 | 
			
		||||
            if (ret <= 0) {
 | 
			
		||||
                error("Unable to receive from server");
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
            ptr = buffer;
 | 
			
		||||
            long buf_len = ret;
 | 
			
		||||
            ret = 1;
 | 
			
		||||
 | 
			
		||||
            if (flags & PROXY_CHUNKED) ret = sock_send_chunk_header(client, buf_len);
 | 
			
		||||
            if (ret <= 0) goto err;
 | 
			
		||||
 | 
			
		||||
            ret = sock_send_x(client, ptr, buf_len, 0);
 | 
			
		||||
            if (ret <= 0) goto err;
 | 
			
		||||
            snd_len += ret;
 | 
			
		||||
 | 
			
		||||
            if (flags & PROXY_CHUNKED) ret = sock_send_chunk_trailer(client);
 | 
			
		||||
            if (ret <= 0) {
 | 
			
		||||
                err:
 | 
			
		||||
                error("Unable to send");
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if (ret <= 0) break;
 | 
			
		||||
        if (flags & PROXY_CHUNKED) if ((ret = sock_recv_chunk_trailer(&proxy->proxy)) == -1) break;
 | 
			
		||||
    } while ((flags & PROXY_CHUNKED) && len_to_send > 0);
 | 
			
		||||
 | 
			
		||||
    if (ret <= 0) return -1;
 | 
			
		||||
 | 
			
		||||
    if (flags & PROXY_CHUNKED) {
 | 
			
		||||
        if (sock_send_last_chunk(client) == -1) {
 | 
			
		||||
            error("Unable to send");
 | 
			
		||||
            return -1;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    char buffer[CHUNK_SIZE];
 | 
			
		||||
    if (sock_splice(client, &proxy->proxy, buffer, sizeof(buffer), len_to_send) == -1)
 | 
			
		||||
        return -1;
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -252,31 +252,26 @@ long sock_splice_chunked(sock *dst, sock *src, void *buf, unsigned long buf_len,
 | 
			
		||||
    unsigned long send_len = 0, next_len;
 | 
			
		||||
 | 
			
		||||
    do {
 | 
			
		||||
        ret = sock_recv_chunk_header(src);
 | 
			
		||||
        if (ret < 0) {
 | 
			
		||||
            errno = EPROTO;
 | 
			
		||||
            return -2;
 | 
			
		||||
        }
 | 
			
		||||
        if ((ret = sock_recv_chunk_header(src)) == -1)
 | 
			
		||||
            return -1;
 | 
			
		||||
 | 
			
		||||
        next_len = ret;
 | 
			
		||||
 | 
			
		||||
        if (flags & SOCK_CHUNKED) {
 | 
			
		||||
        if (flags & SOCK_CHUNKED)
 | 
			
		||||
            if (sock_send_chunk_header(dst, next_len) == -1)
 | 
			
		||||
                return -1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ((ret = sock_splice(dst, src, buf, buf_len, next_len)) < 0)
 | 
			
		||||
        if ((ret = sock_splice(dst, src, buf, buf_len, next_len)) == -1)
 | 
			
		||||
            return ret;
 | 
			
		||||
 | 
			
		||||
        send_len += ret;
 | 
			
		||||
 | 
			
		||||
        if (flags & SOCK_CHUNKED) {
 | 
			
		||||
            if (sock_send_chunk_trailer(dst) == -1)
 | 
			
		||||
                return -1;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (sock_recv_chunk_trailer(src) == -1)
 | 
			
		||||
            return -1;
 | 
			
		||||
 | 
			
		||||
        if (flags & SOCK_CHUNKED)
 | 
			
		||||
            if (sock_send_chunk_trailer(dst) == -1)
 | 
			
		||||
                return -1;
 | 
			
		||||
    } while (!(flags & SOCK_SINGLE_CHUNK) && next_len != 0);
 | 
			
		||||
 | 
			
		||||
    return (long) send_len;
 | 
			
		||||
@@ -364,7 +359,7 @@ int sock_recv_chunk_trailer(sock *s) {
 | 
			
		||||
    if (sock_recv_x(s, buf, sizeof(buf), MSG_PEEK) == -1)
 | 
			
		||||
        return -1;
 | 
			
		||||
 | 
			
		||||
    if (buf[0] != '\r' || buf[1] == '\n') {
 | 
			
		||||
    if (buf[0] != '\r' || buf[1] != '\n') {
 | 
			
		||||
        errno = EPROTO;
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -198,12 +198,13 @@ static void terminate_gracefully(int sig) {
 | 
			
		||||
    workers_stop();
 | 
			
		||||
    workers_destroy();
 | 
			
		||||
 | 
			
		||||
    while (list_size(clients) > 0)
 | 
			
		||||
        tcp_close(clients[0]);
 | 
			
		||||
 | 
			
		||||
    logger_set_prefix("");
 | 
			
		||||
    proxy_close_all();
 | 
			
		||||
 | 
			
		||||
    while (list_size(clients) > 0)
 | 
			
		||||
        tcp_close(clients[0]);
 | 
			
		||||
    logger_set_prefix("");
 | 
			
		||||
 | 
			
		||||
    async_stop();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -21,7 +21,7 @@ void chunk_handler_func(chunk_ctx_t *ctx) {
 | 
			
		||||
        // error
 | 
			
		||||
        error("Unable to splice chunk");
 | 
			
		||||
        errno = 0;
 | 
			
		||||
        ctx->err_cb(ctx->client);
 | 
			
		||||
        ctx->err_cb(ctx);
 | 
			
		||||
    } else if (sent == 0) {
 | 
			
		||||
        // last chunk
 | 
			
		||||
        ctx->client->chunks_transferred = 1;
 | 
			
		||||
 
 | 
			
		||||
@@ -77,4 +77,6 @@ int ws_handle_connection(client_ctx_t *ctx);
 | 
			
		||||
 | 
			
		||||
void ws_close(ws_ctx_t *ctx);
 | 
			
		||||
 | 
			
		||||
int handle_chunks(client_ctx_t *ctx, sock *socket, int flags, void (*next_cb)(chunk_ctx_t *), void (*err_cb)(chunk_ctx_t *));
 | 
			
		||||
 | 
			
		||||
#endif //SESIMOS_FUNC_H
 | 
			
		||||
 
 | 
			
		||||
@@ -28,14 +28,15 @@ void proxy_handler_func(client_ctx_t *ctx) {
 | 
			
		||||
 | 
			
		||||
    if (ret == 1) {
 | 
			
		||||
        proxy_unlock_ctx(ctx->proxy);
 | 
			
		||||
        ctx->proxy->client = NULL;
 | 
			
		||||
        ctx->proxy = NULL;
 | 
			
		||||
    } else if (ctx->use_proxy == 0) {
 | 
			
		||||
        proxy_close(ctx->proxy);
 | 
			
		||||
    } else if (ctx->use_proxy == 1) {
 | 
			
		||||
        proxy_handler_2(ctx);
 | 
			
		||||
        if (proxy_handler_2(ctx) == 1) {
 | 
			
		||||
            // chunked
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        proxy_unlock_ctx(ctx->proxy);
 | 
			
		||||
        ctx->proxy->client = NULL;
 | 
			
		||||
        ctx->proxy = NULL;
 | 
			
		||||
    } else if (ctx->use_proxy == 2) {
 | 
			
		||||
        // WebSocket
 | 
			
		||||
@@ -107,6 +108,19 @@ static int proxy_handler_1(client_ctx_t *ctx) {
 | 
			
		||||
    return streq(ctx->req.method, "HEAD") ? 1 : 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void proxy_chunk_next_cb(chunk_ctx_t *ctx) {
 | 
			
		||||
    proxy_unlock_ctx(ctx->client->proxy);
 | 
			
		||||
    ctx->client->proxy = NULL;
 | 
			
		||||
 | 
			
		||||
    request_complete(ctx->client);
 | 
			
		||||
    handle_request(ctx->client);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void proxy_chunk_err_cb(chunk_ctx_t *ctx) {
 | 
			
		||||
    ctx->client->c_keep_alive = 0;
 | 
			
		||||
    proxy_chunk_next_cb(ctx);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int proxy_handler_2(client_ctx_t *ctx) {
 | 
			
		||||
    const char *transfer_encoding = http_get_header_field(&ctx->res.hdr, "Transfer-Encoding");
 | 
			
		||||
    int chunked = strcontains(transfer_encoding, "chunked");
 | 
			
		||||
@@ -114,10 +128,13 @@ static int proxy_handler_2(client_ctx_t *ctx) {
 | 
			
		||||
    const char *content_len = http_get_header_field(&ctx->res.hdr, "Content-Length");
 | 
			
		||||
    unsigned long len_to_send = (content_len != NULL) ? strtol(content_len, NULL, 10) : 0;
 | 
			
		||||
 | 
			
		||||
    int flags = (chunked ? PROXY_CHUNKED : 0);
 | 
			
		||||
    int ret = proxy_send(ctx->proxy, &ctx->socket, len_to_send, flags);
 | 
			
		||||
    if (chunked) {
 | 
			
		||||
        handle_chunks(ctx, &ctx->proxy->proxy, SOCK_CHUNKED, proxy_chunk_next_cb, proxy_chunk_err_cb);
 | 
			
		||||
        return 1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (ret < 0) {
 | 
			
		||||
    int ret;
 | 
			
		||||
    if ((ret = proxy_send(ctx->proxy, &ctx->socket, len_to_send, 0)) == -1) {
 | 
			
		||||
        ctx->c_keep_alive = 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user