From 478b78c77e32612d84f2bfa7cc7dcc70ed59d5fb Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Tue, 22 Jul 2014 20:25:52 +0530 Subject: encode: POC for streaming incoming video streams to RTP/UDP The UDP broadcast happens to localhost:5004 by default. Doesn't handle multiple video streams properly yet, because this is a POC. --- src/encode.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/src/encode.c b/src/encode.c index 915cd00..2de4a9a 100644 --- a/src/encode.c +++ b/src/encode.c @@ -95,6 +95,109 @@ out: return; } +static GstPadProbeReturn +on_pad_probe_buffer (GstPad *pad, + GstPadProbeInfo *info, + gpointer user_data) +{ + GstBuffer *buffer; + static int count; + + /* Due to a bug, the DISCONT flag is set on the first two buffers, + * which matroskademux doesn't handle properly. So, we rewrite that flag. */ + buffer = gst_pad_probe_info_get_buffer (info); + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) + count++; + GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT); + + if (count != 2) + return GST_PAD_PROBE_PASS; + else + return GST_PAD_PROBE_REMOVE; +} + +static void +on_demuxer_pad_added (GstElement *demuxer, + GstPad *srcpad, + GstElement *pipeline) +{ + char *name; + GstCaps *caps; + GstPad *sinkpad; + GstElement *rtppay, *udpsink; + + caps = gst_pad_query_caps (srcpad, NULL); + name = gst_caps_to_string (caps); + + g_debug ("Pad added: %s\n", name); + if (!g_str_has_prefix (name, "video/x-vp8")) + goto out; + + /* Broadcast VP8 RTP over UDP://localhost:5004 */ + rtppay = gst_element_factory_make ("rtpvp8pay", "rtpvp8pay"); + udpsink = gst_element_factory_make ("udpsink", "udpsink"); + gst_bin_add_many (GST_BIN (pipeline), rtppay, udpsink, NULL); + gst_element_link (rtppay, udpsink); + + if (!gst_element_sync_state_with_parent (rtppay) || + !gst_element_sync_state_with_parent (udpsink)) { + g_critical ("Unable to sync rtppay ! udpsink with parent pipeline\n"); + goto out; + } + + sinkpad = gst_element_get_static_pad (rtppay, "sink"); + gst_pad_link (srcpad, sinkpad); + stp_print_status ("Streaming a vp8 stream on UDP\n"); +out: + g_free (name); + gst_object_unref (sinkpad); +} + +static void +do_udp_rtp_broadcast (GstElement *decodebin, + GstElement *pipeline) +{ + GstPadTemplate *template; + GstPad *srcpad, *sinkpad; + GstElement *demuxer, *q, *tee; + + /* We demux the encode stream, and rtp all the vp8 streams we find. + * Hacky, but quick for this POC. */ + q = gst_element_factory_make ("queue", "rtpq"); + /*demuxer = gst_element_factory_make ("filesink", "matroskademux"); + g_object_set (demuxer, "location", "filesink.webm", NULL);*/ + demuxer = gst_element_factory_make ("matroskademux", "matroskademux"); + + gst_bin_add_many (GST_BIN (pipeline), q, demuxer, NULL); + gst_element_link (q, demuxer); + + tee = gst_bin_get_by_name (GST_BIN (pipeline), "tee"); + template = gst_pad_template_new ("teesrc", GST_PAD_SRC, + GST_PAD_REQUEST, GST_CAPS_ANY); + srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); + sinkpad = gst_element_get_static_pad (q, "sink"); + + if (!gst_element_sync_state_with_parent (q) || + !gst_element_sync_state_with_parent (demuxer)) { + g_critical ("Unable to sync q ! demuxer with parent pipeline\n"); + goto out; + } + + gst_pad_link (srcpad, sinkpad); + + /* Due to a bug in webmmux, DISCONT is sent repeatedly for no reason */ + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) on_pad_probe_buffer, NULL, NULL); + + g_signal_connect (demuxer, "pad-added", + G_CALLBACK (on_demuxer_pad_added), pipeline); + +out: + gst_object_unref (template); + gst_object_unref (srcpad); + gst_object_unref (tee); +} + static GstEncodingProfile * create_webm_profile (void) { @@ -219,6 +322,9 @@ stp_encode_from_msg (TranscodeServerCtx *ctx) * corresponding sink pad on decodebin */ g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_decodebin_pad_added), encodebin); + /* When finished adding pads, try streaming it all over RTP/UDP */ + g_signal_connect (decodebin, "no-more-pads", + G_CALLBACK (do_udp_rtp_broadcast), ctx->pipeline); bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline)); gst_bus_add_signal_watch (bus); -- cgit v0.11.2-2-gd1dd