diff options
author | Nirbheek Chauhan <nirbheek@centricular.com> | 2014-07-10 15:12:34 (GMT) |
---|---|---|
committer | Nirbheek Chauhan <nirbheek@centricular.com> | 2014-07-10 15:14:52 (GMT) |
commit | aefc695d53e0d519d438741ca96923b95212f73a (patch) | |
tree | 8c598f51500299ad3863a6ba5e3d091409d0ef03 | |
parent | 720b406ed6382187bbb60ec9038828178d58736b (diff) | |
download | soup-transcoding-proxy-aefc695d53e0d519d438741ca96923b95212f73a.zip soup-transcoding-proxy-aefc695d53e0d519d438741ca96923b95212f73a.tar.gz |
appsink: Add a client timeout, and handle eos separately
If no chunks are written to a client for 10 seconds, we end that stream and
clean it up. This is currently happening due to a bug, but might happen due to
bad/slow clients as well.
Instead of handling EOS from null samples, handle EOS through the signal
handler. This is a more reliable way of handling EOS, and also works when the
whole pipeline is torn down because the PUT stream ended.
-rw-r--r-- | src/lib.c | 2 | ||||
-rw-r--r-- | src/lib.h | 8 | ||||
-rw-r--r-- | src/main.c | 45 |
3 files changed, 47 insertions, 8 deletions
@@ -90,6 +90,8 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) g_print (">>> Doing client cleanup."); + g_source_remove (ctx->timeout_handler_id); + server_ctx->clients = g_list_remove (server_ctx->clients, ctx); /* Block sinkpad and srcpad, then unlink and remove */ @@ -32,11 +32,13 @@ struct _TranscodeServerCtx { struct _TranscodeClientCtx { SoupMessage *msg; - GstElement *appsink; + GstElement *appsink; /* We hold an extra ref to this */ GstPad *ghostsinkpad; + GMutex can_write_chunk; - gulong first_sample_handler_id; - /* The transcode server context */ + guint timeout_handler_id; + guint seconds_since_write; + /* The transcode server context; we don't hold a ref to this */ TranscodeServerCtx *server_ctx; }; @@ -9,10 +9,12 @@ #endif static int port = 8000; +static int client_timeout = 10; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, + { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" }, { NULL } }; @@ -99,16 +101,45 @@ out: return GST_PAD_PROBE_OK; err: soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + soup_message_body_complete (ctx->msg->response_body); stp_cleanup_transcode_client_ctx (ctx); /* FIXME: PROBE_OK for errors too? What to do here? */ goto out; } +/* If it's been more than 10 seconds since the last time we wrote + * a chunk for a GET response, we timeout and drop the connection */ +static gboolean +increment_write_timer (TranscodeClientCtx *ctx) +{ + ctx->seconds_since_write++; + if (ctx->seconds_since_write < client_timeout) + return G_SOURCE_CONTINUE; + + g_printerr ("Client timed out, cleaning up\n"); + soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); + soup_message_body_complete (ctx->msg->response_body); + stp_cleanup_transcode_client_ctx (ctx); + return G_SOURCE_REMOVE; +} + +static void +client_eos_cb (GstElement *appsink, + TranscodeClientCtx *ctx) +{ + g_print ("Received EOS for client\n"); + soup_message_set_status (ctx->msg, SOUP_STATUS_OK); + soup_message_body_complete (ctx->msg->response_body); +} + static void can_write_next_client_chunk_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { g_mutex_unlock (&ctx->can_write_chunk); + ctx->seconds_since_write = 0; + ctx->msg = msg; + g_print ("_"); } static GstFlowReturn @@ -121,13 +152,14 @@ write_client_chunk_cb (GstElement *appsink, GstMemory *memory; gboolean eos; - if (!g_mutex_trylock (&ctx->can_write_chunk)) + if (!g_mutex_trylock (&ctx->can_write_chunk)) { + g_print ("!"); /* We cannot safely append to the message body till * the previous chunk is written out, otherwise we get * a segfault. This is likely because SoupMessageBody * uses a GSList for the chunks, which isn't MT-safe. */ return GST_FLOW_OK; - + } g_print ("*"); g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); @@ -297,13 +329,16 @@ GET: gst_object_ref (appsink); client_ctx->appsink = appsink; - client_ctx->first_sample_handler_id = - g_signal_connect (appsink, "new-sample", - G_CALLBACK (write_client_chunk_cb), client_ctx); + g_signal_connect (appsink, "new-sample", + G_CALLBACK (write_client_chunk_cb), client_ctx); + g_signal_connect (appsink, "eos", + G_CALLBACK (client_eos_cb), client_ctx); g_signal_connect (client_ctx->msg, "wrote-chunk", G_CALLBACK (can_write_next_client_chunk_cb), client_ctx); g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); + client_ctx->timeout_handler_id = \ + g_timeout_add_seconds (2, (GSourceFunc)increment_write_timer, client_ctx); server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); |