summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-10 15:12:34 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-10 15:14:52 (GMT)
commitaefc695d53e0d519d438741ca96923b95212f73a (patch)
tree8c598f51500299ad3863a6ba5e3d091409d0ef03
parent720b406ed6382187bbb60ec9038828178d58736b (diff)
downloadsoup-transcoding-proxy-aefc695d53e0d519d438741ca96923b95212f73a.zip
soup-transcoding-proxy-aefc695d53e0d519d438741ca96923b95212f73a.tar.gz
appsink: Add a client timeout, and handle eos separately
If no chunks are written to a client for 10 seconds, we end that stream and clean it up. This is currently happening due to a bug, but might happen due to bad/slow clients as well. Instead of handling EOS from null samples, handle EOS through the signal handler. This is a more reliable way of handling EOS, and also works when the whole pipeline is torn down because the PUT stream ended.
-rw-r--r--src/lib.c2
-rw-r--r--src/lib.h8
-rw-r--r--src/main.c45
3 files changed, 47 insertions, 8 deletions
diff --git a/src/lib.c b/src/lib.c
index cb24fa6..b36015d 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -90,6 +90,8 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx)
g_print (">>> Doing client cleanup.");
+ g_source_remove (ctx->timeout_handler_id);
+
server_ctx->clients = g_list_remove (server_ctx->clients, ctx);
/* Block sinkpad and srcpad, then unlink and remove */
diff --git a/src/lib.h b/src/lib.h
index 14ec096..26cb770 100644
--- a/src/lib.h
+++ b/src/lib.h
@@ -32,11 +32,13 @@ struct _TranscodeServerCtx {
struct _TranscodeClientCtx {
SoupMessage *msg;
- GstElement *appsink;
+ GstElement *appsink; /* We hold an extra ref to this */
GstPad *ghostsinkpad;
+
GMutex can_write_chunk;
- gulong first_sample_handler_id;
- /* The transcode server context */
+ guint timeout_handler_id;
+ guint seconds_since_write;
+ /* The transcode server context; we don't hold a ref to this */
TranscodeServerCtx *server_ctx;
};
diff --git a/src/main.c b/src/main.c
index db08a60..3d0933c 100644
--- a/src/main.c
+++ b/src/main.c
@@ -9,10 +9,12 @@
#endif
static int port = 8000;
+static int client_timeout = 10;
static GOptionEntry entries[] =
{
{ "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" },
+ { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" },
{ NULL }
};
@@ -99,16 +101,45 @@ out:
return GST_PAD_PROBE_OK;
err:
soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ soup_message_body_complete (ctx->msg->response_body);
stp_cleanup_transcode_client_ctx (ctx);
/* FIXME: PROBE_OK for errors too? What to do here? */
goto out;
}
+/* If it's been more than 10 seconds since the last time we wrote
+ * a chunk for a GET response, we timeout and drop the connection */
+static gboolean
+increment_write_timer (TranscodeClientCtx *ctx)
+{
+ ctx->seconds_since_write++;
+ if (ctx->seconds_since_write < client_timeout)
+ return G_SOURCE_CONTINUE;
+
+ g_printerr ("Client timed out, cleaning up\n");
+ soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT);
+ soup_message_body_complete (ctx->msg->response_body);
+ stp_cleanup_transcode_client_ctx (ctx);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+client_eos_cb (GstElement *appsink,
+ TranscodeClientCtx *ctx)
+{
+ g_print ("Received EOS for client\n");
+ soup_message_set_status (ctx->msg, SOUP_STATUS_OK);
+ soup_message_body_complete (ctx->msg->response_body);
+}
+
static void
can_write_next_client_chunk_cb (SoupMessage *msg,
TranscodeClientCtx *ctx)
{
g_mutex_unlock (&ctx->can_write_chunk);
+ ctx->seconds_since_write = 0;
+ ctx->msg = msg;
+ g_print ("_");
}
static GstFlowReturn
@@ -121,13 +152,14 @@ write_client_chunk_cb (GstElement *appsink,
GstMemory *memory;
gboolean eos;
- if (!g_mutex_trylock (&ctx->can_write_chunk))
+ if (!g_mutex_trylock (&ctx->can_write_chunk)) {
+ g_print ("!");
/* We cannot safely append to the message body till
* the previous chunk is written out, otherwise we get
* a segfault. This is likely because SoupMessageBody
* uses a GSList for the chunks, which isn't MT-safe. */
return GST_FLOW_OK;
-
+ }
g_print ("*");
g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample);
@@ -297,13 +329,16 @@ GET:
gst_object_ref (appsink);
client_ctx->appsink = appsink;
- client_ctx->first_sample_handler_id =
- g_signal_connect (appsink, "new-sample",
- G_CALLBACK (write_client_chunk_cb), client_ctx);
+ g_signal_connect (appsink, "new-sample",
+ G_CALLBACK (write_client_chunk_cb), client_ctx);
+ g_signal_connect (appsink, "eos",
+ G_CALLBACK (client_eos_cb), client_ctx);
g_signal_connect (client_ctx->msg, "wrote-chunk",
G_CALLBACK (can_write_next_client_chunk_cb), client_ctx);
g_signal_connect (client_ctx->msg, "finished",
G_CALLBACK (client_finished_cb), client_ctx);
+ client_ctx->timeout_handler_id = \
+ g_timeout_add_seconds (2, (GSourceFunc)increment_write_timer, client_ctx);
server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx);