From d6044621bfa82debbb70f3609d08b69badac36c2 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Thu, 24 Jul 2014 03:21:50 +0530 Subject: server: Track stream state better, and abort better We now differentiate between STREAMING, FLUSHING, and FINISHED. Server aborting is now consolidated into a single function that's called from everywhere. --- src/lib.c | 7 ++++--- src/lib.h | 12 +++++++++++- src/main.c | 45 ++++++++++++++++++++++++++++++--------------- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/lib.c b/src/lib.c index 019b4f7..868e060 100644 --- a/src/lib.c +++ b/src/lib.c @@ -24,14 +24,14 @@ static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx); -static gboolean +gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx) { g_hash_table_remove (ctx->parent_ctx_table, ctx->path); return G_SOURCE_REMOVE; } -static gboolean +gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx) { g_free (ctx); @@ -56,7 +56,8 @@ stp_on_gst_bus_message (GstBus *bus, g_free (tmp); /* Setting the server response will only work if the request * hasn't already finished, so we check that */ - if (!ctx->stream_finished) + if (ctx->status == STP_STATUS_STREAMING && + ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); /* Cleanup in the default main context, * because GHashTable is not thread-safe */ diff --git a/src/lib.h b/src/lib.h index fe63114..4813321 100644 --- a/src/lib.h +++ b/src/lib.h @@ -30,6 +30,13 @@ typedef struct _TranscodeServerCtx TranscodeServerCtx; typedef struct _TranscodeClientCtx TranscodeClientCtx; +enum stp_stream_status { + STP_STATUS_NONE, + STP_STATUS_STREAMING, + STP_STATUS_FLUSHING, + STP_STATUS_FINISHED, +}; + struct _TranscodeServerCtx { SoupMessage *msg; GstElement *pipeline; @@ -41,7 +48,7 @@ struct _TranscodeServerCtx { * we need to set the PUT response on EOS/ERROR on the pipeline, * and whether to reject further data when using a persistent * Content-Length + Content-Range PUT stream. */ - gboolean stream_finished; + enum stp_stream_status status; guint seconds_since_read; /* List of client contexts */ @@ -72,6 +79,9 @@ struct _TranscodeClientCtx { #define stp_print_status(...) do {} while (0) #endif +gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx); +gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx); + void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx); void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx); diff --git a/src/main.c b/src/main.c index 7cc2d25..fb62d53 100644 --- a/src/main.c +++ b/src/main.c @@ -63,6 +63,20 @@ get_server_ctx_from_msg (SoupMessage *msg, return ctx; } +static void +stp_abort_server_ctx (TranscodeServerCtx *ctx) +{ + /* Abort all processing for this stream */ + if (ctx->status == STP_STATUS_STREAMING && + ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) { + /* Close the PUT stream if necessary */ + ctx->status = STP_STATUS_FINISHED; + soup_message_set_status (ctx->msg, SOUP_STATUS_GONE); + } + /* Disconnect all clients, and shut down the stream */ + invoke_g_hash_table_remove (ctx); +} + /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean @@ -190,7 +204,13 @@ stream_finished_cb (SoupMessage *msg, stp_print_status ("Stream finished/aborted, queueing EOS... "); - ctx->stream_finished = TRUE; + if (!ctx->appsrc) { + ctx->status = STP_STATUS_FINISHED; + stp_print_status ("No need.\n"); + return; + } + + ctx->status = STP_STATUS_FLUSHING; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (ret != GST_FLOW_OK) @@ -231,7 +251,7 @@ PUT: * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) - server_ctx->stream_finished = TRUE; + server_ctx->status = STP_STATUS_FLUSHING; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); return; } @@ -407,7 +427,8 @@ got_request_body_chunk (SoupMessage *msg, * reception of further chunks when we've prematurely set the response * (say due to an error), so we check if the status code is set and skip * all further chunks */ - if (msg->status_code != 0) + if (msg->status_code != 0 || + ctx->status != STP_STATUS_STREAMING) return; /* We need to update this every chunk */ @@ -421,7 +442,7 @@ got_request_body_chunk (SoupMessage *msg, if (ret != GST_FLOW_OK) { g_critical ("Unable to push buffer\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + invoke_g_hash_table_remove (ctx); return; } } @@ -431,7 +452,7 @@ request_ended_no_body_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { g_critical ("Request ended without a body!"); - g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + invoke_g_hash_table_remove (ctx); g_free (ctx); } @@ -471,7 +492,7 @@ got_first_request_body_chunk (SoupMessage *msg, GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline to PLAYING\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + invoke_g_hash_table_remove (ctx); } else { g_debug ("Set pipeline to PLAYING"); } @@ -576,7 +597,7 @@ PUT: { if (connection_exists) { g_debug ("Stream already exists, connecting everything to that..."); ctx = get_server_ctx_from_msg (msg, ctx_table); - if (ctx->stream_finished) { + if (ctx->status != STP_STATUS_STREAMING) { g_critical ("Recv more data on '%s' after timeout", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); @@ -592,6 +613,7 @@ PUT: { ctx = g_new0 (TranscodeServerCtx, 1); ctx->msg = msg; ctx->encoding = encoding; + ctx->status = STP_STATUS_STREAMING; ctx->parent_ctx_table = ctx_table; ctx->path = g_strdup (soup_uri_get_path (uri)); g_hash_table_insert (ctx_table, ctx->path, ctx); @@ -623,14 +645,7 @@ GET: { if (query && g_strcmp0 (query, "abort") == 0) { /* We just use the "uri" field for this, so this is OK */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); - /* Abort all processing for this stream */ - if (!server_ctx->stream_finished) { - /* Close the PUT stream if necessary */ - server_ctx->stream_finished = TRUE; - soup_message_set_status (server_ctx->msg, SOUP_STATUS_GONE); - } - /* Disconnect all clients, and shut down the stream */ - g_hash_table_remove (ctx_table, server_ctx->path); + stp_abort_server_ctx (server_ctx); soup_message_set_status (msg, SOUP_STATUS_OK); goto out; } -- cgit v0.11.2-2-gd1dd