summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-11 11:27:02 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-11 11:31:01 (GMT)
commit4d3eb464ab471e1754f25d6dc9673fc2533e0b5c (patch)
treee891f3226532294bc3fab5aefa9b8f8cf1888619
parent4e9b88df39128d01139414263e9cf387e3339814 (diff)
downloadsoup-transcoding-proxy-4d3eb464ab471e1754f25d6dc9673fc2533e0b5c.zip
soup-transcoding-proxy-4d3eb464ab471e1754f25d6dc9673fc2533e0b5c.tar.gz
server: Add support for persistent connection streams
Now we also support Content-Length + Content-Range persistent HTTP connections for stream data. If no further data is received before `server_timeout`, we assume the stream has been closed. This is used by souphttpclientsink for sending streams.
-rw-r--r--src/lib.c4
-rw-r--r--src/lib.h13
-rw-r--r--src/main.c199
3 files changed, 169 insertions, 47 deletions
diff --git a/src/lib.c b/src/lib.c
index 750df87..2d86b7c 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -43,7 +43,7 @@ stp_on_gst_bus_message (GstBus *bus,
g_free (tmp);
/* Setting the server response will only work if the request
* hasn't already finished, so we check that */
- if (!ctx->request_finished)
+ if (!ctx->stream_finished)
soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
break;
@@ -73,7 +73,7 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx)
/* Cleanup gstreamer pipeline */
gst_element_set_state (ctx->pipeline, GST_STATE_NULL);
gst_object_unref (ctx->pipeline);
- ctx->pipeline_is_playing = FALSE;
+ g_free (ctx->path);
g_free (ctx);
}
diff --git a/src/lib.h b/src/lib.h
index e194949..f1d9588 100644
--- a/src/lib.h
+++ b/src/lib.h
@@ -34,12 +34,15 @@ struct _TranscodeServerCtx {
SoupMessage *msg;
GstElement *pipeline;
GstElement *appsrc;
- /* The pipeline status can be queried from the pipeline itself,
- * but it's probably better for branching to have it separately too */
- gboolean pipeline_is_playing;
+ /* If the encoding is not chunked, we'll get multiple requests
+ * with separate Content-Length headers on the same path */
+ SoupEncoding encoding;
/* Set to TRUE when the incoming stream ends; let's us know if
- * we need to set the PUT response on EOS/ERROR on the pipeline */
- gboolean request_finished;
+ * we need to set the PUT response on EOS/ERROR on the pipeline,
+ * and whether to reject further data when using a persistent
+ * Content-Length + Content-Range PUT stream. */
+ gboolean stream_finished;
+ guint seconds_since_read;
/* List of client contexts */
GList *clients;
diff --git a/src/main.c b/src/main.c
index 15ea208..83e9609 100644
--- a/src/main.c
+++ b/src/main.c
@@ -29,6 +29,7 @@
static int port = 8000;
static int client_timeout = 10;
+static int server_timeout = 5;
static GOptionEntry entries[] =
{
@@ -39,6 +40,9 @@ static GOptionEntry entries[] =
static SoupServer *server;
+static void stream_finished_cb (SoupMessage *msg,
+ TranscodeServerCtx *ctx);
+
static TranscodeServerCtx*
get_server_ctx_from_msg (SoupMessage *msg,
GHashTable *ctx_table)
@@ -109,6 +113,20 @@ err:
goto out;
}
+/* If it's been more than 10 seconds since the last time we got
+ * a chunk for a PUT request, we timeout and drop the connection */
+static gboolean
+increment_read_timer (TranscodeServerCtx *ctx)
+{
+ ctx->seconds_since_read += 2;
+ if (ctx->seconds_since_read < server_timeout)
+ return G_SOURCE_CONTINUE;
+
+ g_printerr ("Stream timed out, sending EOS\n");
+ stream_finished_cb (ctx->msg, ctx);
+ return G_SOURCE_REMOVE;
+}
+
/* If it's been more than 10 seconds since the last time we wrote
* a chunk for a GET response, we timeout and drop the connection */
static gboolean
@@ -209,7 +227,7 @@ stream_finished_cb (SoupMessage *msg,
g_print ("Stream finished/aborted, queueing EOS... ");
- ctx->request_finished = TRUE;
+ ctx->stream_finished = TRUE;
/* Incoming stream has ended */
g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret);
if (ret != GST_FLOW_OK)
@@ -249,7 +267,8 @@ PUT:
/* The PUT request has finished streaming. Cleanup
* will be done on EOS/ERROR in the gst pipeline. */
server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- server_ctx->request_finished = TRUE;
+ if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
+ server_ctx->stream_finished = TRUE;
soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
return;
}
@@ -371,6 +390,7 @@ got_request_body_chunk (SoupMessage *msg,
/* We need to update this every chunk */
ctx->msg = msg;
+ ctx->seconds_since_read = 0;
/* Actually just a ref */
buffer = stp_get_gst_buffer (chunk);
@@ -382,27 +402,76 @@ got_request_body_chunk (SoupMessage *msg,
stp_cleanup_transcode_server_ctx (ctx);
return;
}
+}
- /* We can only set the pipeline to playing when
- * the first chunk has been received */
- if (!ctx->pipeline_is_playing) {
- if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) ==
- GST_STATE_CHANGE_FAILURE) {
- g_critical ("Unable to set pipeline to PLAYING\n");
- soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
- stp_cleanup_transcode_server_ctx (ctx);
- } else {
- g_print ("Set pipeline to PLAYING\n");
- ctx->pipeline_is_playing = TRUE;
- }
+static void
+request_ended_no_body_cb (SoupMessage *msg,
+ TranscodeServerCtx *ctx)
+{
+ g_print ("Request ended without a body!\n");
+ g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ g_free (ctx);
+}
+
+static void
+got_first_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ TranscodeServerCtx *ctx)
+{
+ int num;
+
+ /* Disconnect the ctx cleanup function */
+ num = g_signal_handlers_disconnect_by_func (msg,
+ G_CALLBACK (request_ended_no_body_cb), ctx);
+ if (num != 1)
+ g_critical ("Unable to remove signal handler for cleanup function!\n");
+
+ /* Disconnect us from got-chunk */
+ num = g_signal_handlers_disconnect_by_func (msg,
+ G_CALLBACK (got_first_request_body_chunk), ctx);
+ if (num != 1)
+ g_critical ("Unable to remove signal handler for first request chunk!\n");
+
+ ctx->msg = msg;
+ ctx->pipeline = gst_pipeline_new ("pipe");
+
+ /* The chunked request is copied into this stream
+ * for consumption by the gst pipeline */
+#ifdef PLAY_DEBUG
+ stp_play_from_msg (ctx);
+#else
+ stp_encode_from_msg (ctx);
+#endif
+
+ got_request_body_chunk (msg, chunk, ctx);
+
+ if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set pipeline to PLAYING\n");
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_server_ctx (ctx);
+ } else {
+ g_print ("Set pipeline to PLAYING\n");
}
+
+ /* Connect a different method for all further chunks */
+ g_signal_connect (msg, "got-chunk",
+ G_CALLBACK (got_request_body_chunk), ctx);
+
+ if (ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
+ /* Our incoming stream ends if the stream ends only if
+ * we're using a chunked request encoding.
+ *
+ * This will also be called when the client sending
+ * the PUT request disconnects prematurely */
+ g_signal_connect (msg, "finished",
+ G_CALLBACK (stream_finished_cb), ctx);
}
static void
got_request_headers (SoupMessage *msg,
GHashTable *ctx_table)
{
- TranscodeServerCtx *ctx;
SoupURI *uri;
#ifdef HEADERS_DEBUG
@@ -428,42 +497,92 @@ got_request_headers (SoupMessage *msg,
}
PUT: {
- if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) {
- /* There's already a request streaming on this path */
- g_print ("Recv duplicate request on the same URI: %s\n",
- soup_uri_get_path (uri));
- soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
+ const char *content_range;
+ SoupEncoding encoding;
+ gboolean connection_exists;
+ TranscodeServerCtx *ctx;
+
+ connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri));
+
+ /* Data might be sent to us in one of three ways:
+ * 1. Chunked encoding with the stream (and request body) sent in chunks
+ * 2. Content-Length encoding with the entire stream in one chunk
+ * 3. A persistent Content-Length encoding connection, and subsequent
+ * requests can send more stream data, forever, subject to a timeout */
+ encoding = soup_message_headers_get_encoding (msg->request_headers);
+ switch (encoding) {
+ case SOUP_ENCODING_CHUNKED:
+ g_print ("Chunked encoding detected!\n");
+ if (connection_exists) {
+ /* There's already a chunked request streaming on this path */
+ g_printerr ("Recv duplicate request on the same URI: %s\n",
+ soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
+ goto out;
+ }
+ break;
+ case SOUP_ENCODING_CONTENT_LENGTH:
+ g_print ("Content-Length encoding detected!\n");
+ content_range = soup_message_headers_get_one (msg->request_headers,
+ "Content-Range");
+ /* TODO: Right now, we don't check if the Content-Range is valid,
+ * and just pass the data to the pipeline. */
+ if (connection_exists && !content_range) {
+ /* A connection already exists, and this one isn't a continuation
+ * of the previous one, so it's a conflict */
+ g_printerr ("Recv Content-Length PUT on '%s' without Content-Range\n",
+ soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
+ goto out;
+ }
+ break;
+ case SOUP_ENCODING_EOF:
+ case SOUP_ENCODING_UNRECOGNIZED:
+ case SOUP_ENCODING_NONE:
+ case SOUP_ENCODING_BYTERANGES:
+ g_critical ("Unknown encoding!\n");
+ goto out;
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Whether the incoming stream is chunked or fixed length, we want to
+ * handle it chunked, so tell libsoup to not collect chunks for
+ * forming a complete request body */
+ soup_message_body_set_accumulate (msg->request_body, FALSE);
+
+ if (connection_exists) {
+ g_print ("Stream already exists, connecting everything to that...\n");
+ ctx = get_server_ctx_from_msg (msg, ctx_table);
+ if (ctx->stream_finished) {
+ g_printerr ("Recv more data on '%s' after timeout\n",
+ soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT);
+ goto out;
+ }
+ /* The chunks we'll get are a continuation of the previous one */
+ g_signal_connect (msg, "got-chunk",
+ G_CALLBACK (got_request_body_chunk), ctx);
goto out;
}
+ /* This is a new connection; treat it as such */
ctx = g_new0 (TranscodeServerCtx, 1);
ctx->msg = msg;
+ ctx->encoding = encoding;
ctx->parent_ctx_table = ctx_table;
ctx->path = g_strdup (soup_uri_get_path (uri));
g_hash_table_insert (ctx_table, ctx->path, ctx);
-
- /* We only do this when we receive the first chunk */
- if (!ctx->pipeline) {
- ctx->pipeline = gst_pipeline_new ("pipe");
- /* The chunked request is copied into this stream
- * for consumption by the gst pipeline */
-#ifdef PLAY_DEBUG
- stp_play_from_msg (ctx);
-#else
- stp_encode_from_msg (ctx);
-#endif
- }
-
- /* The request body isn't fixed length, so tell libsoup to
- * not collect chunks for forming a complete request body */
- soup_message_body_set_accumulate (msg->request_body, FALSE);
+ g_print ("New stream on %s!\n", ctx->path);
g_signal_connect (msg, "got-chunk",
- G_CALLBACK (got_request_body_chunk), ctx);
- /* This will also be called when the client sending
- * the PUT request disconnects prematurely */
+ G_CALLBACK (got_first_request_body_chunk), ctx);
+ /* If we don't get any data after this, we have to cleanup ctx
+ * otherwise we'll leak it */
g_signal_connect (msg, "finished",
- G_CALLBACK (stream_finished_cb), ctx);
+ G_CALLBACK (request_ended_no_body_cb), ctx);
+ if (encoding == SOUP_ENCODING_CONTENT_LENGTH)
+ g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx);
goto out;
}