summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-23 21:51:50 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-23 21:51:50 (GMT)
commitd6044621bfa82debbb70f3609d08b69badac36c2 (patch)
treee41b823bc83a0111085ebb3d66c7cce3dac1329c
parent478b78c77e32612d84f2bfa7cc7dcc70ed59d5fb (diff)
downloadsoup-transcoding-proxy-d6044621bfa82debbb70f3609d08b69badac36c2.zip
soup-transcoding-proxy-d6044621bfa82debbb70f3609d08b69badac36c2.tar.gz
server: Track stream state better, and abort better
We now differentiate between STREAMING, FLUSHING, and FINISHED. Server aborting is now consolidated into a single function that's called from everywhere.
-rw-r--r--src/lib.c7
-rw-r--r--src/lib.h12
-rw-r--r--src/main.c45
3 files changed, 45 insertions, 19 deletions
diff --git a/src/lib.c b/src/lib.c
index 019b4f7..868e060 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -24,14 +24,14 @@
static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx);
-static gboolean
+gboolean
invoke_g_hash_table_remove (TranscodeServerCtx *ctx)
{
g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
return G_SOURCE_REMOVE;
}
-static gboolean
+gboolean
invoke_g_free_client_context (TranscodeClientCtx *ctx)
{
g_free (ctx);
@@ -56,7 +56,8 @@ stp_on_gst_bus_message (GstBus *bus,
g_free (tmp);
/* Setting the server response will only work if the request
* hasn't already finished, so we check that */
- if (!ctx->stream_finished)
+ if (ctx->status == STP_STATUS_STREAMING &&
+ ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
/* Cleanup in the default main context,
* because GHashTable is not thread-safe */
diff --git a/src/lib.h b/src/lib.h
index fe63114..4813321 100644
--- a/src/lib.h
+++ b/src/lib.h
@@ -30,6 +30,13 @@
typedef struct _TranscodeServerCtx TranscodeServerCtx;
typedef struct _TranscodeClientCtx TranscodeClientCtx;
+enum stp_stream_status {
+ STP_STATUS_NONE,
+ STP_STATUS_STREAMING,
+ STP_STATUS_FLUSHING,
+ STP_STATUS_FINISHED,
+};
+
struct _TranscodeServerCtx {
SoupMessage *msg;
GstElement *pipeline;
@@ -41,7 +48,7 @@ struct _TranscodeServerCtx {
* we need to set the PUT response on EOS/ERROR on the pipeline,
* and whether to reject further data when using a persistent
* Content-Length + Content-Range PUT stream. */
- gboolean stream_finished;
+ enum stp_stream_status status;
guint seconds_since_read;
/* List of client contexts */
@@ -72,6 +79,9 @@ struct _TranscodeClientCtx {
#define stp_print_status(...) do {} while (0)
#endif
+gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx);
+gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx);
+
void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx);
void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx);
diff --git a/src/main.c b/src/main.c
index 7cc2d25..fb62d53 100644
--- a/src/main.c
+++ b/src/main.c
@@ -63,6 +63,20 @@ get_server_ctx_from_msg (SoupMessage *msg,
return ctx;
}
+static void
+stp_abort_server_ctx (TranscodeServerCtx *ctx)
+{
+ /* Abort all processing for this stream */
+ if (ctx->status == STP_STATUS_STREAMING &&
+ ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) {
+ /* Close the PUT stream if necessary */
+ ctx->status = STP_STATUS_FINISHED;
+ soup_message_set_status (ctx->msg, SOUP_STATUS_GONE);
+ }
+ /* Disconnect all clients, and shut down the stream */
+ invoke_g_hash_table_remove (ctx);
+}
+
/* If it's been more than 10 seconds since the last time we got
* a chunk for a PUT request, we timeout and drop the connection */
static gboolean
@@ -190,7 +204,13 @@ stream_finished_cb (SoupMessage *msg,
stp_print_status ("Stream finished/aborted, queueing EOS... ");
- ctx->stream_finished = TRUE;
+ if (!ctx->appsrc) {
+ ctx->status = STP_STATUS_FINISHED;
+ stp_print_status ("No need.\n");
+ return;
+ }
+
+ ctx->status = STP_STATUS_FLUSHING;
/* Incoming stream has ended */
g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret);
if (ret != GST_FLOW_OK)
@@ -231,7 +251,7 @@ PUT:
* will be done on EOS/ERROR in the gst pipeline. */
server_ctx = get_server_ctx_from_msg (msg, ctx_table);
if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
- server_ctx->stream_finished = TRUE;
+ server_ctx->status = STP_STATUS_FLUSHING;
soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
return;
}
@@ -407,7 +427,8 @@ got_request_body_chunk (SoupMessage *msg,
* reception of further chunks when we've prematurely set the response
* (say due to an error), so we check if the status code is set and skip
* all further chunks */
- if (msg->status_code != 0)
+ if (msg->status_code != 0 ||
+ ctx->status != STP_STATUS_STREAMING)
return;
/* We need to update this every chunk */
@@ -421,7 +442,7 @@ got_request_body_chunk (SoupMessage *msg,
if (ret != GST_FLOW_OK) {
g_critical ("Unable to push buffer\n");
soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
- g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ invoke_g_hash_table_remove (ctx);
return;
}
}
@@ -431,7 +452,7 @@ request_ended_no_body_cb (SoupMessage *msg,
TranscodeServerCtx *ctx)
{
g_critical ("Request ended without a body!");
- g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ invoke_g_hash_table_remove (ctx);
g_free (ctx);
}
@@ -471,7 +492,7 @@ got_first_request_body_chunk (SoupMessage *msg,
GST_STATE_CHANGE_FAILURE) {
g_critical ("Unable to set pipeline to PLAYING\n");
soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
- g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ invoke_g_hash_table_remove (ctx);
} else {
g_debug ("Set pipeline to PLAYING");
}
@@ -576,7 +597,7 @@ PUT: {
if (connection_exists) {
g_debug ("Stream already exists, connecting everything to that...");
ctx = get_server_ctx_from_msg (msg, ctx_table);
- if (ctx->stream_finished) {
+ if (ctx->status != STP_STATUS_STREAMING) {
g_critical ("Recv more data on '%s' after timeout",
soup_uri_get_path (uri));
soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT);
@@ -592,6 +613,7 @@ PUT: {
ctx = g_new0 (TranscodeServerCtx, 1);
ctx->msg = msg;
ctx->encoding = encoding;
+ ctx->status = STP_STATUS_STREAMING;
ctx->parent_ctx_table = ctx_table;
ctx->path = g_strdup (soup_uri_get_path (uri));
g_hash_table_insert (ctx_table, ctx->path, ctx);
@@ -623,14 +645,7 @@ GET: {
if (query && g_strcmp0 (query, "abort") == 0) {
/* We just use the "uri" field for this, so this is OK */
server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- /* Abort all processing for this stream */
- if (!server_ctx->stream_finished) {
- /* Close the PUT stream if necessary */
- server_ctx->stream_finished = TRUE;
- soup_message_set_status (server_ctx->msg, SOUP_STATUS_GONE);
- }
- /* Disconnect all clients, and shut down the stream */
- g_hash_table_remove (ctx_table, server_ctx->path);
+ stp_abort_server_ctx (server_ctx);
soup_message_set_status (msg, SOUP_STATUS_OK);
goto out;
}