From 4d3eb464ab471e1754f25d6dc9673fc2533e0b5c Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Fri, 11 Jul 2014 16:57:02 +0530 Subject: server: Add support for persistent connection streams Now we also support Content-Length + Content-Range persistent HTTP connections for stream data. If no further data is received before `server_timeout`, we assume the stream has been closed. This is used by souphttpclientsink for sending streams. --- src/lib.c | 4 +- src/lib.h | 13 ++-- src/main.c | 199 ++++++++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 169 insertions(+), 47 deletions(-) diff --git a/src/lib.c b/src/lib.c index 750df87..2d86b7c 100644 --- a/src/lib.c +++ b/src/lib.c @@ -43,7 +43,7 @@ stp_on_gst_bus_message (GstBus *bus, g_free (tmp); /* Setting the server response will only work if the request * hasn't already finished, so we check that */ - if (!ctx->request_finished) + if (!ctx->stream_finished) soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); break; @@ -73,7 +73,7 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) /* Cleanup gstreamer pipeline */ gst_element_set_state (ctx->pipeline, GST_STATE_NULL); gst_object_unref (ctx->pipeline); - ctx->pipeline_is_playing = FALSE; + g_free (ctx->path); g_free (ctx); } diff --git a/src/lib.h b/src/lib.h index e194949..f1d9588 100644 --- a/src/lib.h +++ b/src/lib.h @@ -34,12 +34,15 @@ struct _TranscodeServerCtx { SoupMessage *msg; GstElement *pipeline; GstElement *appsrc; - /* The pipeline status can be queried from the pipeline itself, - * but it's probably better for branching to have it separately too */ - gboolean pipeline_is_playing; + /* If the encoding is not chunked, we'll get multiple requests + * with separate Content-Length headers on the same path */ + SoupEncoding encoding; /* Set to TRUE when the incoming stream ends; let's us know if - * we need to set the PUT response on EOS/ERROR on the pipeline */ - gboolean request_finished; + * we need to set the PUT response on EOS/ERROR on the pipeline, + * and whether to reject further data when using a persistent + * Content-Length + Content-Range PUT stream. */ + gboolean stream_finished; + guint seconds_since_read; /* List of client contexts */ GList *clients; diff --git a/src/main.c b/src/main.c index 15ea208..83e9609 100644 --- a/src/main.c +++ b/src/main.c @@ -29,6 +29,7 @@ static int port = 8000; static int client_timeout = 10; +static int server_timeout = 5; static GOptionEntry entries[] = { @@ -39,6 +40,9 @@ static GOptionEntry entries[] = static SoupServer *server; +static void stream_finished_cb (SoupMessage *msg, + TranscodeServerCtx *ctx); + static TranscodeServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) @@ -109,6 +113,20 @@ err: goto out; } +/* If it's been more than 10 seconds since the last time we got + * a chunk for a PUT request, we timeout and drop the connection */ +static gboolean +increment_read_timer (TranscodeServerCtx *ctx) +{ + ctx->seconds_since_read += 2; + if (ctx->seconds_since_read < server_timeout) + return G_SOURCE_CONTINUE; + + g_printerr ("Stream timed out, sending EOS\n"); + stream_finished_cb (ctx->msg, ctx); + return G_SOURCE_REMOVE; +} + /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean @@ -209,7 +227,7 @@ stream_finished_cb (SoupMessage *msg, g_print ("Stream finished/aborted, queueing EOS... "); - ctx->request_finished = TRUE; + ctx->stream_finished = TRUE; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (ret != GST_FLOW_OK) @@ -249,7 +267,8 @@ PUT: /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); - server_ctx->request_finished = TRUE; + if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) + server_ctx->stream_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); return; } @@ -371,6 +390,7 @@ got_request_body_chunk (SoupMessage *msg, /* We need to update this every chunk */ ctx->msg = msg; + ctx->seconds_since_read = 0; /* Actually just a ref */ buffer = stp_get_gst_buffer (chunk); @@ -382,27 +402,76 @@ got_request_body_chunk (SoupMessage *msg, stp_cleanup_transcode_server_ctx (ctx); return; } +} - /* We can only set the pipeline to playing when - * the first chunk has been received */ - if (!ctx->pipeline_is_playing) { - if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == - GST_STATE_CHANGE_FAILURE) { - g_critical ("Unable to set pipeline to PLAYING\n"); - soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - stp_cleanup_transcode_server_ctx (ctx); - } else { - g_print ("Set pipeline to PLAYING\n"); - ctx->pipeline_is_playing = TRUE; - } +static void +request_ended_no_body_cb (SoupMessage *msg, + TranscodeServerCtx *ctx) +{ + g_print ("Request ended without a body!\n"); + g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + g_free (ctx); +} + +static void +got_first_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + TranscodeServerCtx *ctx) +{ + int num; + + /* Disconnect the ctx cleanup function */ + num = g_signal_handlers_disconnect_by_func (msg, + G_CALLBACK (request_ended_no_body_cb), ctx); + if (num != 1) + g_critical ("Unable to remove signal handler for cleanup function!\n"); + + /* Disconnect us from got-chunk */ + num = g_signal_handlers_disconnect_by_func (msg, + G_CALLBACK (got_first_request_body_chunk), ctx); + if (num != 1) + g_critical ("Unable to remove signal handler for first request chunk!\n"); + + ctx->msg = msg; + ctx->pipeline = gst_pipeline_new ("pipe"); + + /* The chunked request is copied into this stream + * for consumption by the gst pipeline */ +#ifdef PLAY_DEBUG + stp_play_from_msg (ctx); +#else + stp_encode_from_msg (ctx); +#endif + + got_request_body_chunk (msg, chunk, ctx); + + if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set pipeline to PLAYING\n"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_server_ctx (ctx); + } else { + g_print ("Set pipeline to PLAYING\n"); } + + /* Connect a different method for all further chunks */ + g_signal_connect (msg, "got-chunk", + G_CALLBACK (got_request_body_chunk), ctx); + + if (ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) + /* Our incoming stream ends if the stream ends only if + * we're using a chunked request encoding. + * + * This will also be called when the client sending + * the PUT request disconnects prematurely */ + g_signal_connect (msg, "finished", + G_CALLBACK (stream_finished_cb), ctx); } static void got_request_headers (SoupMessage *msg, GHashTable *ctx_table) { - TranscodeServerCtx *ctx; SoupURI *uri; #ifdef HEADERS_DEBUG @@ -428,42 +497,92 @@ got_request_headers (SoupMessage *msg, } PUT: { - if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { - /* There's already a request streaming on this path */ - g_print ("Recv duplicate request on the same URI: %s\n", - soup_uri_get_path (uri)); - soup_message_set_status (msg, SOUP_STATUS_CONFLICT); + const char *content_range; + SoupEncoding encoding; + gboolean connection_exists; + TranscodeServerCtx *ctx; + + connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri)); + + /* Data might be sent to us in one of three ways: + * 1. Chunked encoding with the stream (and request body) sent in chunks + * 2. Content-Length encoding with the entire stream in one chunk + * 3. A persistent Content-Length encoding connection, and subsequent + * requests can send more stream data, forever, subject to a timeout */ + encoding = soup_message_headers_get_encoding (msg->request_headers); + switch (encoding) { + case SOUP_ENCODING_CHUNKED: + g_print ("Chunked encoding detected!\n"); + if (connection_exists) { + /* There's already a chunked request streaming on this path */ + g_printerr ("Recv duplicate request on the same URI: %s\n", + soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_CONFLICT); + goto out; + } + break; + case SOUP_ENCODING_CONTENT_LENGTH: + g_print ("Content-Length encoding detected!\n"); + content_range = soup_message_headers_get_one (msg->request_headers, + "Content-Range"); + /* TODO: Right now, we don't check if the Content-Range is valid, + * and just pass the data to the pipeline. */ + if (connection_exists && !content_range) { + /* A connection already exists, and this one isn't a continuation + * of the previous one, so it's a conflict */ + g_printerr ("Recv Content-Length PUT on '%s' without Content-Range\n", + soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_CONFLICT); + goto out; + } + break; + case SOUP_ENCODING_EOF: + case SOUP_ENCODING_UNRECOGNIZED: + case SOUP_ENCODING_NONE: + case SOUP_ENCODING_BYTERANGES: + g_critical ("Unknown encoding!\n"); + goto out; + default: + g_assert_not_reached (); + } + + /* Whether the incoming stream is chunked or fixed length, we want to + * handle it chunked, so tell libsoup to not collect chunks for + * forming a complete request body */ + soup_message_body_set_accumulate (msg->request_body, FALSE); + + if (connection_exists) { + g_print ("Stream already exists, connecting everything to that...\n"); + ctx = get_server_ctx_from_msg (msg, ctx_table); + if (ctx->stream_finished) { + g_printerr ("Recv more data on '%s' after timeout\n", + soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); + goto out; + } + /* The chunks we'll get are a continuation of the previous one */ + g_signal_connect (msg, "got-chunk", + G_CALLBACK (got_request_body_chunk), ctx); goto out; } + /* This is a new connection; treat it as such */ ctx = g_new0 (TranscodeServerCtx, 1); ctx->msg = msg; + ctx->encoding = encoding; ctx->parent_ctx_table = ctx_table; ctx->path = g_strdup (soup_uri_get_path (uri)); g_hash_table_insert (ctx_table, ctx->path, ctx); - - /* We only do this when we receive the first chunk */ - if (!ctx->pipeline) { - ctx->pipeline = gst_pipeline_new ("pipe"); - /* The chunked request is copied into this stream - * for consumption by the gst pipeline */ -#ifdef PLAY_DEBUG - stp_play_from_msg (ctx); -#else - stp_encode_from_msg (ctx); -#endif - } - - /* The request body isn't fixed length, so tell libsoup to - * not collect chunks for forming a complete request body */ - soup_message_body_set_accumulate (msg->request_body, FALSE); + g_print ("New stream on %s!\n", ctx->path); g_signal_connect (msg, "got-chunk", - G_CALLBACK (got_request_body_chunk), ctx); - /* This will also be called when the client sending - * the PUT request disconnects prematurely */ + G_CALLBACK (got_first_request_body_chunk), ctx); + /* If we don't get any data after this, we have to cleanup ctx + * otherwise we'll leak it */ g_signal_connect (msg, "finished", - G_CALLBACK (stream_finished_cb), ctx); + G_CALLBACK (request_ended_no_body_cb), ctx); + if (encoding == SOUP_ENCODING_CONTENT_LENGTH) + g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); goto out; } -- cgit v0.11.2-2-gd1dd