summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-14 15:46:39 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-14 15:46:40 (GMT)
commitdf0f28945f2c32f519457e838eb424401eb586e9 (patch)
tree0069fb922da82ac01f0fa3a0eea572da7f863d35 /src
parentfeee319e6b6cb4774cce72d90d1a60f5ecc9bbda (diff)
downloadsoup-transcoding-proxy-df0f28945f2c32f519457e838eb424401eb586e9.zip
soup-transcoding-proxy-df0f28945f2c32f519457e838eb424401eb586e9.tar.gz
server: Always send data from the last keyframe
Store non-keyframe buffers sent to fakesink, and push them to all new clients
Diffstat (limited to 'src')
-rw-r--r--src/encode.c65
-rw-r--r--src/lib.c15
-rw-r--r--src/lib.h5
-rw-r--r--src/main.c28
4 files changed, 94 insertions, 19 deletions
diff --git a/src/encode.c b/src/encode.c
index 00eb791..3862aa6 100644
--- a/src/encode.c
+++ b/src/encode.c
@@ -95,6 +95,27 @@ out:
return;
}
+static void
+update_keyframe_buffer_cb (GstElement *fakesink,
+ GstBuffer *buffer,
+ GstPad *pad,
+ TranscodeServerCtx *ctx)
+{
+ if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DECODE_ONLY) ||
+ GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_CORRUPTED) ||
+ GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_GAP) ||
+ GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE))
+ return;
+
+ if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
+ gst_buffer_list_foreach (ctx->keyframe,
+ stp_unref_gst_buffer, NULL);
+
+ gst_buffer_ref (buffer);
+ gst_buffer_list_add (ctx->keyframe, buffer);
+ return;
+}
+
static GstEncodingProfile *
create_webm_profile (void)
{
@@ -140,6 +161,9 @@ stp_encode_from_msg (TranscodeServerCtx *ctx)
{
#ifdef ENCODE_DEBUG
char *tmp, *filename;
+ GstPad *srcpad, *sinkpad;
+ GstElement *filesink, *q2;
+ GstPadTemplate *template;
#endif
GstBus *bus;
GstElement *src, *decodebin, *encodebin;
@@ -166,23 +190,48 @@ stp_encode_from_msg (TranscodeServerCtx *ctx)
g_object_set (encodebin,
"profile", profile,
"avoid-reencoding", TRUE, NULL);
+
tee = gst_element_factory_make ("tee", "tee");
q1 = gst_element_factory_make ("queue", "q1");
+ fakesink = gst_element_factory_make ("fakesink", "fakesink");
+
+ /* Ensure that the stream is always realtime */
+ g_object_set (fakesink,
+ "sync", TRUE,
+ "signal-handoffs", TRUE, NULL);
+
+ gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL);
+ gst_element_link_many (encodebin, tee, q1, fakesink, NULL);
+
#ifdef ENCODE_DEBUG
- fakesink = gst_element_factory_make ("filesink", "filesink");
+ q2 = gst_element_factory_make ("queue", "q2");
+ filesink = gst_element_factory_make ("filesink", "filesink");
+
tmp = g_uri_escape_string (ctx->path, NULL, TRUE);
filename = g_strdup_printf ("debug-encode-%s.webm", tmp);
- g_object_set (fakesink, "location", filename, NULL);
+ g_object_set (filesink, "location", filename, NULL);
+
+ gst_bin_add_many (GST_BIN (ctx->pipeline), q2, filesink, NULL);
+ gst_element_link (q2, filesink);
+
+ /* Link pads */
+ template = gst_pad_template_new ("teesrc", GST_PAD_SRC,
+ GST_PAD_REQUEST, GST_CAPS_ANY);
+ srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY);
+ sinkpad = gst_element_get_static_pad (q2, "sink");
+
+ gst_pad_link (srcpad, sinkpad);
+
+ gst_object_unref (template);
+ gst_object_unref (srcpad);
g_free (filename);
g_free (tmp);
-#else
- fakesink = gst_element_factory_make ("fakesink", "fakesink");
#endif
- /* Ensure that the stream is always realtime */
- g_object_set (fakesink, "sync", TRUE, NULL);
- gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL);
- gst_element_link_many (encodebin, tee, q1, fakesink, NULL);
+ ctx->keyframe = gst_buffer_list_new ();
+
+ g_signal_connect (fakesink, "handoff",
+ G_CALLBACK (update_keyframe_buffer_cb), ctx);
/* The pads of decodebin and encodebin are dynamic,
* so those will be linked when streams/pads are added */
diff --git a/src/lib.c b/src/lib.c
index b22e82f..f527e04 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -59,6 +59,15 @@ stp_on_gst_bus_message (GstBus *bus,
return TRUE;
}
+gboolean stp_unref_gst_buffer (GstBuffer **buffer,
+ guint idx,
+ gpointer user_data)
+{
+ gst_buffer_unref (*buffer);
+ *buffer = NULL;
+ return TRUE;
+}
+
/* When the incoming stream reaches EOS, we call this
* which initiates a shutdown for all clients and then
* the server itself */
@@ -75,6 +84,9 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx)
gst_element_set_state (ctx->pipeline, GST_STATE_NULL);
gst_object_unref (ctx->pipeline);
+ gst_buffer_list_foreach (ctx->keyframe, stp_unref_gst_buffer, NULL);
+ gst_buffer_list_unref (ctx->keyframe);
+
g_free (ctx);
}
@@ -100,7 +112,6 @@ pad_blocked_cleanup_cb (GstPad *srcpad,
gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin);
gst_object_unref (sinkbin);
- g_mutex_clear (&ctx->can_write_chunk);
stp_print_status (" Client cleanup done!\n");
return GST_PAD_PROBE_OK;
}
@@ -128,6 +139,8 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx)
gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK,
(GstPadProbeCallback) pad_blocked_cleanup_cb,
ctx, (GDestroyNotify)g_free);
+
+ g_mutex_clear (&ctx->can_write_chunk);
gst_object_unref (sinkbin);
stp_print_status (".");
}
diff --git a/src/lib.h b/src/lib.h
index 9bbd790..815d18a 100644
--- a/src/lib.h
+++ b/src/lib.h
@@ -34,6 +34,8 @@ struct _TranscodeServerCtx {
SoupMessage *msg;
GstElement *pipeline;
GstElement *appsrc;
+ /* Contains buffers from the previous keyframe till right now */
+ GstBufferList *keyframe;
/* If the encoding is not chunked, we'll get multiple requests
* with separate Content-Length headers on the same path */
SoupEncoding encoding;
@@ -76,6 +78,9 @@ void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx);
gboolean stp_on_gst_bus_message (GstBus *bus,
GstMessage *msg,
TranscodeServerCtx *ctx);
+gboolean stp_unref_gst_buffer (GstBuffer **buffer,
+ guint idx,
+ gpointer user_data);
GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps);
GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk);
diff --git a/src/main.c b/src/main.c
index 30e16c9..abd4f46 100644
--- a/src/main.c
+++ b/src/main.c
@@ -224,7 +224,7 @@ GET:
{
/* A GET request was received. We connect from the pipeline to the
* client requesting the stream and start writing the response. */
- GstCaps *caps;
+ GstCaps *caps = NULL;
GstState state;
GstBuffer *buffer;
GstFlowReturn ret;
@@ -293,7 +293,7 @@ GET:
if (!gst_element_sync_state_with_parent (bin)) {
g_critical ("Unable to sync appsink bin with parent pipeline\n");
- return;
+ goto err;
}
g_debug ("appsink bin state synced successfully\n");
@@ -308,12 +308,21 @@ GET:
buffer = stp_get_streamheader_from_caps (caps);
if (!buffer) {
g_printerr ("Unable to get streamheader from caps\n");
- goto nostreamheader;
+ goto push_keyframe;
}
ret = gst_pad_push (srcpad, buffer);
if (ret != GST_FLOW_OK) {
- g_printerr ("Unable to push buffer: %s\n",
+ g_printerr ("Unable to push streamheader: %s\n",
+ gst_flow_get_name (ret));
+ goto err;
+ }
+
+push_keyframe:
+ ret = gst_pad_push_list (srcpad,
+ gst_buffer_list_copy (server_ctx->keyframe));
+ if (ret != GST_FLOW_OK) {
+ g_printerr ("Unable to push keyframe data: %s\n",
gst_flow_get_name (ret));
goto err;
}
@@ -324,11 +333,6 @@ GET:
gst_element_state_change_return_get_name (state_change),
gst_element_state_get_name (state));
-nostreamheader:
- gst_object_unref (template);
- gst_object_unref (srcpad);
- gst_object_unref (tee);
-
client_ctx->appsink = appsink;
g_signal_connect (appsink, "new-sample",
@@ -350,7 +354,11 @@ nostreamheader:
soup_message_set_status (msg, SOUP_STATUS_OK);
out:
- gst_caps_unref (caps);
+ gst_object_unref (template);
+ gst_object_unref (srcpad);
+ gst_object_unref (tee);
+ if (caps)
+ gst_caps_unref (caps);
return;
err:
soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);