From df0f28945f2c32f519457e838eb424401eb586e9 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Mon, 14 Jul 2014 21:16:39 +0530 Subject: server: Always send data from the last keyframe Store non-keyframe buffers sent to fakesink, and push them to all new clients --- src/encode.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------- src/lib.c | 15 +++++++++++++- src/lib.h | 5 +++++ src/main.c | 28 ++++++++++++++++---------- 4 files changed, 94 insertions(+), 19 deletions(-) diff --git a/src/encode.c b/src/encode.c index 00eb791..3862aa6 100644 --- a/src/encode.c +++ b/src/encode.c @@ -95,6 +95,27 @@ out: return; } +static void +update_keyframe_buffer_cb (GstElement *fakesink, + GstBuffer *buffer, + GstPad *pad, + TranscodeServerCtx *ctx) +{ + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DECODE_ONLY) || + GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_CORRUPTED) || + GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_GAP) || + GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DROPPABLE)) + return; + + if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) + gst_buffer_list_foreach (ctx->keyframe, + stp_unref_gst_buffer, NULL); + + gst_buffer_ref (buffer); + gst_buffer_list_add (ctx->keyframe, buffer); + return; +} + static GstEncodingProfile * create_webm_profile (void) { @@ -140,6 +161,9 @@ stp_encode_from_msg (TranscodeServerCtx *ctx) { #ifdef ENCODE_DEBUG char *tmp, *filename; + GstPad *srcpad, *sinkpad; + GstElement *filesink, *q2; + GstPadTemplate *template; #endif GstBus *bus; GstElement *src, *decodebin, *encodebin; @@ -166,23 +190,48 @@ stp_encode_from_msg (TranscodeServerCtx *ctx) g_object_set (encodebin, "profile", profile, "avoid-reencoding", TRUE, NULL); + tee = gst_element_factory_make ("tee", "tee"); q1 = gst_element_factory_make ("queue", "q1"); + fakesink = gst_element_factory_make ("fakesink", "fakesink"); + + /* Ensure that the stream is always realtime */ + g_object_set (fakesink, + "sync", TRUE, + "signal-handoffs", TRUE, NULL); + + gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL); + gst_element_link_many (encodebin, tee, q1, fakesink, NULL); + #ifdef ENCODE_DEBUG - fakesink = gst_element_factory_make ("filesink", "filesink"); + q2 = gst_element_factory_make ("queue", "q2"); + filesink = gst_element_factory_make ("filesink", "filesink"); + tmp = g_uri_escape_string (ctx->path, NULL, TRUE); filename = g_strdup_printf ("debug-encode-%s.webm", tmp); - g_object_set (fakesink, "location", filename, NULL); + g_object_set (filesink, "location", filename, NULL); + + gst_bin_add_many (GST_BIN (ctx->pipeline), q2, filesink, NULL); + gst_element_link (q2, filesink); + + /* Link pads */ + template = gst_pad_template_new ("teesrc", GST_PAD_SRC, + GST_PAD_REQUEST, GST_CAPS_ANY); + srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); + sinkpad = gst_element_get_static_pad (q2, "sink"); + + gst_pad_link (srcpad, sinkpad); + + gst_object_unref (template); + gst_object_unref (srcpad); g_free (filename); g_free (tmp); -#else - fakesink = gst_element_factory_make ("fakesink", "fakesink"); #endif - /* Ensure that the stream is always realtime */ - g_object_set (fakesink, "sync", TRUE, NULL); - gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL); - gst_element_link_many (encodebin, tee, q1, fakesink, NULL); + ctx->keyframe = gst_buffer_list_new (); + + g_signal_connect (fakesink, "handoff", + G_CALLBACK (update_keyframe_buffer_cb), ctx); /* The pads of decodebin and encodebin are dynamic, * so those will be linked when streams/pads are added */ diff --git a/src/lib.c b/src/lib.c index b22e82f..f527e04 100644 --- a/src/lib.c +++ b/src/lib.c @@ -59,6 +59,15 @@ stp_on_gst_bus_message (GstBus *bus, return TRUE; } +gboolean stp_unref_gst_buffer (GstBuffer **buffer, + guint idx, + gpointer user_data) +{ + gst_buffer_unref (*buffer); + *buffer = NULL; + return TRUE; +} + /* When the incoming stream reaches EOS, we call this * which initiates a shutdown for all clients and then * the server itself */ @@ -75,6 +84,9 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) gst_element_set_state (ctx->pipeline, GST_STATE_NULL); gst_object_unref (ctx->pipeline); + gst_buffer_list_foreach (ctx->keyframe, stp_unref_gst_buffer, NULL); + gst_buffer_list_unref (ctx->keyframe); + g_free (ctx); } @@ -100,7 +112,6 @@ pad_blocked_cleanup_cb (GstPad *srcpad, gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin); gst_object_unref (sinkbin); - g_mutex_clear (&ctx->can_write_chunk); stp_print_status (" Client cleanup done!\n"); return GST_PAD_PROBE_OK; } @@ -128,6 +139,8 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) pad_blocked_cleanup_cb, ctx, (GDestroyNotify)g_free); + + g_mutex_clear (&ctx->can_write_chunk); gst_object_unref (sinkbin); stp_print_status ("."); } diff --git a/src/lib.h b/src/lib.h index 9bbd790..815d18a 100644 --- a/src/lib.h +++ b/src/lib.h @@ -34,6 +34,8 @@ struct _TranscodeServerCtx { SoupMessage *msg; GstElement *pipeline; GstElement *appsrc; + /* Contains buffers from the previous keyframe till right now */ + GstBufferList *keyframe; /* If the encoding is not chunked, we'll get multiple requests * with separate Content-Length headers on the same path */ SoupEncoding encoding; @@ -76,6 +78,9 @@ void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx); gboolean stp_on_gst_bus_message (GstBus *bus, GstMessage *msg, TranscodeServerCtx *ctx); +gboolean stp_unref_gst_buffer (GstBuffer **buffer, + guint idx, + gpointer user_data); GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps); GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk); diff --git a/src/main.c b/src/main.c index 30e16c9..abd4f46 100644 --- a/src/main.c +++ b/src/main.c @@ -224,7 +224,7 @@ GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ - GstCaps *caps; + GstCaps *caps = NULL; GstState state; GstBuffer *buffer; GstFlowReturn ret; @@ -293,7 +293,7 @@ GET: if (!gst_element_sync_state_with_parent (bin)) { g_critical ("Unable to sync appsink bin with parent pipeline\n"); - return; + goto err; } g_debug ("appsink bin state synced successfully\n"); @@ -308,12 +308,21 @@ GET: buffer = stp_get_streamheader_from_caps (caps); if (!buffer) { g_printerr ("Unable to get streamheader from caps\n"); - goto nostreamheader; + goto push_keyframe; } ret = gst_pad_push (srcpad, buffer); if (ret != GST_FLOW_OK) { - g_printerr ("Unable to push buffer: %s\n", + g_printerr ("Unable to push streamheader: %s\n", + gst_flow_get_name (ret)); + goto err; + } + +push_keyframe: + ret = gst_pad_push_list (srcpad, + gst_buffer_list_copy (server_ctx->keyframe)); + if (ret != GST_FLOW_OK) { + g_printerr ("Unable to push keyframe data: %s\n", gst_flow_get_name (ret)); goto err; } @@ -324,11 +333,6 @@ GET: gst_element_state_change_return_get_name (state_change), gst_element_state_get_name (state)); -nostreamheader: - gst_object_unref (template); - gst_object_unref (srcpad); - gst_object_unref (tee); - client_ctx->appsink = appsink; g_signal_connect (appsink, "new-sample", @@ -350,7 +354,11 @@ nostreamheader: soup_message_set_status (msg, SOUP_STATUS_OK); out: - gst_caps_unref (caps); + gst_object_unref (template); + gst_object_unref (srcpad); + gst_object_unref (tee); + if (caps) + gst_caps_unref (caps); return; err: soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); -- cgit v0.11.2-2-gd1dd