summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--Makefile20
-rw-r--r--src/debug/local-play.c147
-rw-r--r--src/debug/local-play.h11
-rw-r--r--src/encode.c166
-rw-r--r--src/encode.h11
-rw-r--r--src/lib.c166
-rw-r--r--src/lib.h57
-rw-r--r--src/main.c562
9 files changed, 1141 insertions, 1 deletions
diff --git a/.gitignore b/.gitignore
index aa309fc..7360884 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,4 @@
*.swp
*.webm
*.log
-sst-server
+stp-server
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..f87b792
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,20 @@
+DEBUG_CFLAGS := -O0 -ggdb -Wall -fsanitize=address -fno-omit-frame-pointer
+CFLAGS := -O3 -march=native -Wall
+PKGCONFIG := pkg-config
+LIBS := $(shell $(PKGCONFIG) --libs --cflags glib-2.0 gio-unix-2.0 libsoup-2.4 gstreamer-pbutils-1.0) -lrt
+
+SERVER_NAME := stp-server
+SRC_OBJS := $(addprefix src/,debug/local-play.o lib.o encode.o)
+
+VPATH := debug
+
+%.o: %.c %.h
+ $(CC) -c $(CFLAGS) $(LIBS) $< -o $@
+
+$(SERVER_NAME): src/main.c $(SRC_OBJS)
+ $(CC) $(CFLAGS) $(LIBS) $^ -o $@
+
+all: sst-server
+
+clean:
+ rm -f $(SRC_OBJS) $(SERVER_NAME) src/*.webm
diff --git a/src/debug/local-play.c b/src/debug/local-play.c
new file mode 100644
index 0000000..497d567
--- /dev/null
+++ b/src/debug/local-play.c
@@ -0,0 +1,147 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#include "local-play.h"
+
+static void
+pad_has_video_caps (TranscodeServerCtx *ctx,
+ GstPad *decodebin_pad)
+{
+ GstPad *sink_pad;
+ GstElement *bin;
+ GstElement *videoconv, *videoscale, *videosink;
+
+ bin = gst_bin_new (NULL);
+ videoconv = gst_element_factory_make ("videoconvert", "videoconv");
+ videoscale = gst_element_factory_make ("videoscale", "videoscale");
+ videosink = gst_element_factory_make ("autovideosink", "videosink");
+
+ gst_bin_add_many (GST_BIN (bin), videoconv, videoscale, videosink, NULL);
+ gst_element_link_many (videoconv, videoscale, videosink, NULL);
+
+ sink_pad = gst_element_get_static_pad (videoconv, "sink");
+ gst_element_add_pad (bin, gst_ghost_pad_new ("sink", sink_pad));
+ gst_object_unref (sink_pad);
+
+ gst_bin_add (GST_BIN (ctx->pipeline), bin);
+
+ sink_pad = gst_element_get_static_pad (bin, "sink");
+ if (gst_pad_link (decodebin_pad, sink_pad) != GST_PAD_LINK_OK) {
+ g_critical ("ERROR: Unable to link video pads");
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ goto out;
+ }
+
+ /* Since these elements have been added to the pipeline after the state was
+ * changed from GST_STATE_NULL, they have to be moved to PAUSED manually */
+ gst_element_set_state (bin, GST_STATE_PAUSED);
+out:
+ gst_object_unref (sink_pad);
+ return;
+}
+
+static void
+pad_has_audio_caps (TranscodeServerCtx *ctx,
+ GstPad *decodebin_pad)
+{
+ GstPad *sink_pad;
+ GstElement *bin;
+ GstElement *audioconv, *audioresample, *audiosink;
+
+ bin = gst_bin_new (NULL);
+ audioconv = gst_element_factory_make ("audioconvert", "audioconv");
+ audioresample = gst_element_factory_make ("audioresample", "audioresample");
+ audiosink = gst_element_factory_make ("autoaudiosink", "audiosink");
+
+ gst_bin_add_many (GST_BIN (bin), audioconv, audioresample, audiosink, NULL);
+ gst_element_link_many (audioconv, audioresample, audiosink, NULL);
+
+ sink_pad = gst_element_get_static_pad (audioconv, "sink");
+ gst_element_add_pad (bin, gst_ghost_pad_new ("sink", sink_pad));
+ gst_object_unref (sink_pad);
+
+ gst_bin_add (GST_BIN (ctx->pipeline), bin);
+
+ sink_pad = gst_element_get_static_pad (bin, "sink");
+ if (gst_pad_link (decodebin_pad, sink_pad) != GST_PAD_LINK_OK) {
+ g_critical ("ERROR: Unable to link video pads");
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ goto out;
+ }
+
+ /* Since these elements have been added to the pipeline after the state was
+ * changed from GST_STATE_NULL, they have to be moved to PAUSED manually */
+ gst_element_set_state (bin, GST_STATE_PAUSED);
+out:
+ gst_object_unref (sink_pad);
+ return;
+}
+
+static gboolean
+structure_printf (GQuark field_id,
+ const GValue *value,
+ gpointer user_data)
+{
+ char *char_value;
+
+ char_value = g_strdup_value_contents (value);
+ g_print ("%s = %s\n", g_quark_to_string (field_id), char_value);
+ g_free (char_value);
+ return TRUE;
+}
+
+static void
+decodebin_pad_added (GstElement *decodebin,
+ GstPad *src_pad,
+ gpointer user_data)
+{
+ GstCaps *pad_caps;
+ GstStructure *structure;
+ TranscodeServerCtx *ctx = user_data;
+
+ if (!gst_pad_has_current_caps (src_pad)) {
+ g_critical ("Decodebin pad doesn't have current caps");
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ pad_caps = gst_pad_get_current_caps (src_pad);
+ structure = gst_caps_get_structure (pad_caps, 0);
+ gst_structure_foreach (structure, structure_printf, NULL);
+
+ if (gst_structure_has_name (structure, "audio/x-raw"))
+ pad_has_audio_caps (ctx, src_pad);
+ else if (gst_structure_has_name (structure, "video/x-raw"))
+ pad_has_video_caps (ctx, src_pad);
+ else
+ g_critical ("Found unsupported caps: %s",
+ gst_structure_get_name (structure));
+
+ gst_caps_unref (pad_caps);
+}
+
+void
+stp_play_from_msg (TranscodeServerCtx *ctx)
+{
+ GstBus *bus;
+ GstElement *src, *decodebin;
+
+ g_debug ("Constructing pipeline\n");
+ src = gst_element_factory_make ("appsrc", "src");
+ g_object_set (src, "is-live", TRUE,
+ "stream-type", 0, NULL);
+ ctx->appsrc = src;
+
+ decodebin = gst_element_factory_make ("decodebin", "decodebin");
+ gst_bin_add_many (GST_BIN (ctx->pipeline), src, decodebin, NULL);
+ gst_element_link (src, decodebin);
+
+ g_signal_connect (decodebin, "pad-added",
+ G_CALLBACK (decodebin_pad_added), ctx);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline));
+ gst_bus_add_signal_watch (bus);
+ g_signal_connect (bus, "message", G_CALLBACK (stp_on_gst_bus_message), ctx);
+ g_object_unref (bus);
+}
+
diff --git a/src/debug/local-play.h b/src/debug/local-play.h
new file mode 100644
index 0000000..a56a572
--- /dev/null
+++ b/src/debug/local-play.h
@@ -0,0 +1,11 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#ifndef _SST_LOCAL_PLAY
+#define _SST_LOCAL_PLAY
+
+#include "../lib.h"
+
+void stp_play_from_msg (TranscodeServerCtx *ctx);
+
+#endif /* _SST_LOCAL_PLAY */
diff --git a/src/encode.c b/src/encode.c
new file mode 100644
index 0000000..caaaa46
--- /dev/null
+++ b/src/encode.c
@@ -0,0 +1,166 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#include "encode.h"
+
+#include <gst/pbutils/encoding-profile.h>
+
+static gboolean
+on_autoplug_continue (GstElement *decodebin,
+ GstPad *srcpad,
+ GstCaps *caps,
+ GstElement *encodebin)
+{
+ gboolean ret;
+ char *name;
+ GstPad *sinkpad;
+
+ name = gst_caps_to_string (caps);
+ g_signal_emit_by_name (encodebin, "request-pad", caps, &sinkpad);
+
+ if (sinkpad != NULL)
+ ret = FALSE, g_debug ("encodebin can passthrough %s\n", name);
+ else
+ ret = TRUE, g_debug ("encodebin cannot passthrough %s\n", name);
+
+ g_free (name);
+ return ret;
+}
+
+static void
+on_decodebin_pad_added (GstElement *decodebin,
+ GstPad *srcpad,
+ GstElement *encodebin)
+{
+ GstCaps *caps;
+ GstPad *sinkpad = NULL;
+ char *name;
+
+ caps = gst_pad_query_caps (srcpad, NULL);
+ name = gst_caps_to_string (caps);
+
+ /* FIXME: We don't try to fetch a compatible pad for raw audio because
+ * that somehow always fails to link. Transmageddon does the same. */
+ if (!g_str_has_prefix (name, "audio/x-raw"))
+ /* If we successfully requested a compatible sink pad in
+ * "autoplug-continue", we can fetch that here. */
+ sinkpad = gst_element_get_compatible_pad (encodebin, srcpad, NULL);
+
+ if (!sinkpad) {
+ /* We request a sink pad for the decoded stream */
+ g_signal_emit_by_name (encodebin, "request-pad", caps, &sinkpad);
+ if (!sinkpad) {
+ g_printerr ("Failed to request a new sink pad for %s\n", name);
+ goto out;
+ }
+ g_debug ("Requested a new sink pad for %s\n", name);
+ } else {
+ g_debug ("Found an existing sink pad for %s\n", name);
+ }
+
+ if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK)
+ g_printerr ("Couldn't link pads for %s\n", name);
+
+out:
+ g_free (name);
+ gst_caps_unref (caps);
+ return;
+}
+
+static GstEncodingProfile *
+create_webm_profile (void)
+{
+ GstEncodingProfile *t;
+ GstEncodingContainerProfile *prof;
+ GstCaps *caps;
+ GstPreset *vp8preset;
+
+ caps = gst_caps_from_string ("video/webm");
+ prof = gst_encoding_container_profile_new ("WebM audio/video", "Standard WEBM/VP8/VORBIS", caps, NULL);
+ gst_caps_unref (caps);
+
+ vp8preset = GST_PRESET (gst_element_factory_make ("vp8enc", "vp8preset"));
+ /* FIXME: This thing still doesn't encode fast enough in real time */
+ g_object_set (vp8preset,
+ "cpu-used", 5,
+ "end-usage", 1,
+ "max-quantizer", 56,
+ "threads", 4,
+ "undershoot", 95,
+ NULL);
+ gst_preset_save_preset (vp8preset, "stp_vp8preset");
+
+ caps = gst_caps_from_string ("video/x-vp8");
+ t = (GstEncodingProfile*) gst_encoding_video_profile_new (caps, "stp_vp8preset",
+ NULL, 0);
+ gst_encoding_container_profile_add_profile (prof, t);
+ gst_caps_unref (caps);
+
+ caps = gst_caps_from_string ("audio/x-vorbis");
+ t = (GstEncodingProfile*) gst_encoding_audio_profile_new (caps, NULL, NULL, 0);
+ gst_encoding_container_profile_add_profile (prof, t);
+ gst_caps_unref (caps);
+
+ /* What about application/x-ass? */
+
+ return (GstEncodingProfile*) prof;
+}
+
+void
+stp_encode_from_msg (TranscodeServerCtx *ctx)
+{
+ GstBus *bus;
+ GstElement *src, *decodebin, *encodebin;
+ GstElement *tee, *q1, *fakesink;
+ GstEncodingProfile *profile;
+
+ g_debug ("Constructing pipeline\n");
+
+ src = gst_element_factory_make ("appsrc", "src");
+ g_object_set (src, "is-live", TRUE,
+ "emit-signals", FALSE,
+ "stream-type", 0, NULL);
+ ctx->appsrc = src;
+
+ decodebin = gst_element_factory_make ("decodebin", "decodebin");
+
+ gst_bin_add_many (GST_BIN (ctx->pipeline), src, decodebin, NULL);
+ gst_element_link (src, decodebin);
+
+ /* TODO: Allow setting of a scaling factor to
+ * allow realtime encoding of all streams */
+ profile = create_webm_profile ();
+ encodebin = gst_element_factory_make ("encodebin", "encodebin");
+ g_object_set (encodebin,
+ "profile", profile,
+ "avoid-reencoding", TRUE, NULL);
+ tee = gst_element_factory_make ("tee", "tee");
+ q1 = gst_element_factory_make ("queue", "q1");
+#ifndef DEBUG
+ fakesink = gst_element_factory_make ("filesink", "filesink");
+ g_object_set (fakesink, "location", "debug-output.webm", NULL);
+#else
+ fakesink = gst_element_factory_make ("fakesink", "fakesink");
+#endif
+
+ gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL);
+ gst_element_link_many (encodebin, tee, q1, fakesink, NULL);
+
+ /* The pads of decodebin and encodebin are dynamic,
+ * so those will be linked when streams/pads are added */
+
+ /* When decodebin finds a stream that can be decoded, we check
+ * if we can pass that directly to encodebin instead of letting
+ * decodebin find a decoding element automatically */
+ g_signal_connect (decodebin, "autoplug-continue",
+ G_CALLBACK (on_autoplug_continue), encodebin);
+ /* When decodebin exposes a source pad, we need to request a
+ * corresponding sink pad on decodebin */
+ g_signal_connect (decodebin, "pad-added",
+ G_CALLBACK (on_decodebin_pad_added), encodebin);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline));
+ gst_bus_add_signal_watch (bus);
+ g_signal_connect (bus, "message", G_CALLBACK (stp_on_gst_bus_message), ctx);
+ g_object_unref (bus);
+}
diff --git a/src/encode.h b/src/encode.h
new file mode 100644
index 0000000..9c0d897
--- /dev/null
+++ b/src/encode.h
@@ -0,0 +1,11 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#ifndef _SST_ENCODE
+#define _SST_ENCODE
+
+#include "lib.h"
+
+void stp_encode_from_msg (TranscodeServerCtx *ctx);
+
+#endif /* _SST_ENCODE */
diff --git a/src/lib.c b/src/lib.c
new file mode 100644
index 0000000..584da39
--- /dev/null
+++ b/src/lib.c
@@ -0,0 +1,166 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#include "lib.h"
+
+#include <poll.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <glib-object.h>
+#include <gio/gunixinputstream.h>
+
+gboolean
+stp_on_gst_bus_message (GstBus *bus,
+ GstMessage *msg,
+ TranscodeServerCtx *ctx)
+{
+ GError *error = NULL;
+ char *tmp = NULL;
+
+ switch (GST_MESSAGE_TYPE (msg)) {
+ case GST_MESSAGE_ERROR:
+ gst_message_parse_error (msg, &error, &tmp);
+ g_printerr ("ERROR from element %s: %s\n",
+ GST_OBJECT_NAME (msg->src), error->message);
+ g_printerr ("Debug info: %s\n", tmp);
+ g_error_free (error);
+ g_free (tmp);
+ /* Setting the server response will only work if the request
+ * hasn't already finished, so we check that */
+ if (!ctx->request_finished)
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ break;
+ case GST_MESSAGE_EOS:
+ g_print ("End of file\n");
+ g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ break;
+ default:
+ //g_print ("%s\n", gst_message_type_get_name (msg->type));
+ break;
+ }
+
+ return TRUE;
+}
+
+void
+stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx)
+{
+ g_print (">>> Doing server cleanup\n");
+
+ /* Closing the stream should send a HUP on the fd and
+ * trigger cleanup in write_get_response_body_chunk() */
+ g_list_foreach (ctx->clients, (GFunc)stp_close_client_ctx, NULL);
+ g_list_free (ctx->clients);
+
+ /* Cleanup gstreamer pipeline */
+ gst_element_set_state (ctx->pipeline, GST_STATE_NULL);
+ gst_object_unref (ctx->pipeline);
+ ctx->pipeline_is_playing = FALSE;
+
+ g_free (ctx);
+}
+
+static GstPadProbeReturn
+pad_blocked_cleanup_cb (GstPad *srcpad,
+ GstPadProbeInfo *info,
+ TranscodeClientCtx *ctx)
+{
+ GstPad *sinkpad;
+ GstElement *tee;
+ GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
+
+ g_print (".");
+ /* Remove the probe, XXX: hence unblocking the pipeline? */
+ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info));
+
+ tee = gst_pad_get_parent_element (srcpad);
+ sinkpad = gst_element_get_static_pad (sinkbin, "qsink");
+ gst_pad_unlink (srcpad, sinkpad);
+ gst_element_remove_pad (tee, srcpad);
+
+ gst_object_unref (sinkpad);
+ gst_object_unref (tee);
+
+ gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin);
+ gst_object_unref (sinkbin);
+ gst_object_unref (ctx->appsink);
+
+ g_free (ctx);
+ g_print (" Client cleanup done!\n");
+ return GST_PAD_PROBE_OK;
+}
+
+void
+stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx)
+{
+ GstPad *sinkpad, *srcpad;
+ TranscodeServerCtx *server_ctx = ctx->server_ctx;
+ GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
+
+ g_print (">>> Doing client cleanup .");
+
+ /* If we're cleaning up because the server is shutting down,
+ * this will be NULL */
+ if (server_ctx)
+ server_ctx->clients = g_list_remove (server_ctx->clients, ctx);
+
+ /* Block sinkpad and srcpad, then unlink and remove */
+ sinkpad = gst_element_get_static_pad (sinkbin, "qsink");
+ srcpad = gst_pad_get_peer (sinkpad);
+
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK,
+ (GstPadProbeCallback) pad_blocked_cleanup_cb,
+ ctx, NULL);
+ gst_object_unref (sinkbin);
+ g_print (".");
+}
+
+void
+stp_close_client_ctx (TranscodeClientCtx *ctx)
+{
+ g_print (">>> Doing client cleanup\n");
+
+ ctx->server_ctx = NULL;
+
+ g_free (ctx);
+}
+
+/* Returns a copy of the streamheader GstBuffer */
+GstBuffer*
+stp_get_streamheader_from_caps (GstCaps *caps)
+{
+ GArray *array;
+ GstStructure *s;
+ const GValue *value;
+
+ s = gst_caps_get_structure (caps, 0);
+ value = gst_structure_get_value (s, "streamheader");
+ g_return_val_if_fail (G_IS_VALUE (value), NULL);
+
+ array = g_value_peek_pointer (value);
+ value = &g_array_index (array, GValue, 0);
+ g_return_val_if_fail (G_VALUE_TYPE (value) == GST_TYPE_BUFFER, NULL);
+
+ return gst_buffer_copy (g_value_peek_pointer (value));
+}
+
+/* Returns a GstBuffer which has consumed the passed-in data */
+GstBuffer*
+stp_get_gst_buffer (SoupBuffer *chunk)
+{
+ gsize len;
+ const guint8 *d;
+ SoupBuffer *copy;
+ GstBuffer *buffer;
+ GstMemory *memory;
+
+ copy = soup_buffer_copy (chunk);
+ soup_buffer_get_data (copy, &d, &len);
+ buffer = gst_buffer_new ();
+ memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
+ (guint8*)d, len, 0, len, copy,
+ (GDestroyNotify)soup_buffer_free);
+ gst_buffer_append_memory (buffer, memory);
+ return buffer;
+}
diff --git a/src/lib.h b/src/lib.h
new file mode 100644
index 0000000..187bae7
--- /dev/null
+++ b/src/lib.h
@@ -0,0 +1,57 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#ifndef _SST_LIB
+#define _SST_LIB
+
+#include <glib.h>
+#include <gst/gst.h>
+#include <libsoup/soup.h>
+
+typedef struct _TranscodeServerCtx TranscodeServerCtx;
+typedef struct _TranscodeClientCtx TranscodeClientCtx;
+
+struct _TranscodeServerCtx {
+ SoupMessage *msg;
+ GstElement *pipeline;
+ GstElement *appsrc;
+ /* The pipeline status can be queried from the pipeline itself,
+ * but it's probably better for branching to have it separately too */
+ gboolean pipeline_is_playing;
+ /* Set to TRUE when the incoming stream ends; let's us know if
+ * we need to set the PUT response on EOS/ERROR on the pipeline */
+ gboolean request_finished;
+
+ /* List of client contexts */
+ GList *clients;
+ /* Reference to the parent context hash table */
+ GHashTable *parent_ctx_table;
+ /* Reference to the key in the parent hash table */
+ char *path;
+};
+
+struct _TranscodeClientCtx {
+ SoupMessage *msg;
+ GstElement *appsink;
+ gulong first_sample_handler_id;
+ /* The transcode server context */
+ TranscodeServerCtx *server_ctx;
+};
+
+void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx);
+void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx);
+void stp_close_client_ctx (TranscodeClientCtx *ctx);
+
+gboolean stp_on_gst_bus_message (GstBus *bus,
+ GstMessage *msg,
+ TranscodeServerCtx *ctx);
+
+gboolean stp_copy_chunk_to_fd (SoupBuffer *chunk,
+ int fd);
+gboolean stp_copy_fd_to_body (TranscodeClientCtx *ctx);
+gboolean stp_fd_has_data_to_read (int fd);
+
+GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps);
+GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk);
+
+#endif /* _SST_LIB */
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 0000000..a2430f5
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,562 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#include "lib.h"
+#include "encode.h"
+
+#ifdef DEBUG
+#include "debug/local-play.h"
+#endif
+
+#include <fcntl.h>
+#include <string.h>
+
+static SoupServer *server;
+
+static TranscodeServerCtx*
+get_server_ctx_from_msg (SoupMessage *msg,
+ GHashTable *ctx_table)
+{
+ SoupURI *uri;
+ TranscodeServerCtx *ctx;
+
+ g_object_get (msg, "uri", &uri, NULL);
+ ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri));
+ soup_uri_free (uri);
+
+ return ctx;
+}
+
+static void
+enough_data_pause_message (GstElement *appsrc,
+ guint size,
+ TranscodeServerCtx *ctx)
+{
+ /* Pausing the request can cause weirdness if
+ * the recv rate is too high. So, we don't do this. */
+ //g_print ("Enough data, pause. ");
+ //soup_server_pause_message (server, ctx->msg);
+}
+
+static void
+need_data_unpause_message (GstElement *appsrc,
+ guint size,
+ TranscodeServerCtx *ctx)
+{
+ //g_print ("Need data, unpause. \n");
+ //soup_server_unpause_message (server, ctx->msg);
+}
+
+static GstPadProbeReturn
+tee_src_pad_blocked_cb (GstPad *srcpad,
+ GstPadProbeInfo *info,
+ TranscodeClientCtx *ctx)
+{
+ GstCaps *caps;
+ GstPad *sinkpad;
+ GstBuffer *buffer;
+ GstState state;
+ GstStateChangeReturn ret;
+ TranscodeServerCtx *server_ctx = ctx->server_ctx;
+ GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
+
+ /* Remove the probe, XXX: hence unblocking the pipeline? */
+ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info));
+
+ gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin);
+
+ /* t. ! queue ! appsink */
+ sinkpad = gst_element_get_static_pad (sinkbin, "qsink");
+ gst_pad_link (srcpad, sinkpad);
+
+ /* Send the WebM stream header through the pipeline first */
+ caps = gst_pad_get_current_caps (srcpad);
+ /* TODO: cache keyframes */
+ buffer = stp_get_streamheader_from_caps (caps);
+ if (!buffer)
+ goto err;
+ gst_pad_push (srcpad, buffer);
+
+ ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1);
+ g_print ("Linked pads, removed probe. State was %s:%s.\n",
+ gst_element_state_change_return_get_name (ret),
+ gst_element_state_get_name (state));
+
+ if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set pipeline back to PLAYING\n");
+ goto err;
+ } else {
+ g_print ("pipeline set to PLAYING successfully\n");
+ }
+
+out:
+ gst_caps_unref (caps);
+ gst_object_unref (sinkpad);
+ gst_object_unref (sinkbin);
+ return GST_PAD_PROBE_OK;
+err:
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_client_ctx (ctx);
+ /* FIXME: PROBE_OK for errors too? What to do here? */
+ goto out;
+}
+
+static GstFlowReturn
+write_first_client_chunk_cb (GstElement *appsink,
+ TranscodeClientCtx *ctx)
+{
+ GstMapInfo info;
+ GstSample *sample;
+ GstBuffer *buffer;
+ GstMemory *memory;
+
+ g_assert (ctx->first_sample_handler_id);
+
+ g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id);
+ ctx->first_sample_handler_id = 0;
+
+ /* XXX: If no samples are available, this will block till a sample is available */
+ g_signal_emit_by_name (appsink, "pull-sample", &sample);
+ buffer = gst_sample_get_buffer (sample);
+ memory = gst_buffer_get_all_memory (buffer); /* copy */
+ gst_memory_map (memory, &info, GST_MAP_READ);
+
+ soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY,
+ info.data, info.size); /* copy */
+
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
+ gst_sample_unref (sample);
+
+ soup_server_unpause_message (server, ctx->msg);
+ g_print (".");
+ return GST_FLOW_OK;
+}
+
+static void
+write_next_client_chunk_cb (SoupMessage *msg,
+ TranscodeClientCtx *ctx)
+{
+ GstMapInfo info;
+ GstSample *sample;
+ GstBuffer *buffer;
+ GstMemory *memory;
+ gboolean eos;
+
+ g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample);
+ if (!sample) {
+ g_print ("Null sample, ending stream\n");
+ g_object_get (ctx->appsink, "eos", &eos, NULL);
+ if (eos)
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ else
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ soup_message_body_complete (msg->response_body);
+ return;
+ }
+
+ buffer = gst_sample_get_buffer (sample);
+ memory = gst_buffer_get_all_memory (buffer); /* copy */
+ gst_memory_map (memory, &info, GST_MAP_READ);
+
+ soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY,
+ info.data, info.size); /* copy */
+
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
+ gst_sample_unref (sample);
+
+ soup_server_unpause_message (server, msg);
+ g_print (".");
+ return;
+}
+
+static void
+client_finished_cb (SoupMessage *msg,
+ TranscodeClientCtx *ctx)
+{
+ stp_cleanup_transcode_client_ctx (ctx);
+}
+
+static void
+handle_request_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ TranscodeServerCtx *server_ctx;
+
+ if (!msg)
+ return;
+
+ g_print ("Handling %s request on path %s\n", msg->method, path);
+
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ /* The PUT request has finished streaming. Cleanup
+ * will be done on EOS/ERROR in the gst pipeline. */
+ server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ server_ctx->request_finished = TRUE;
+ soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
+ return;
+ }
+
+GET:
+ {
+ /* A GET request was received. We connect from the pipeline to the
+ * client requesting the stream and start writing the response. */
+ GstElement *bin, *tee, *q2, *appsink;
+ GstPadTemplate *template;
+ GstPad *srcpad, *sinkpadq2;
+ TranscodeClientCtx *client_ctx;
+ GstStateChangeReturn state;
+
+ server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ if (server_ctx == NULL ||
+ server_ctx->pipeline == NULL) {
+ /* There's no request streaming on this path (yet) */
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ return;
+ }
+
+ /* Check if the pipeline successfully started PLAYING */
+ state = gst_element_get_state (server_ctx->pipeline,
+ NULL, NULL, 100*GST_MSECOND);
+ switch (state) {
+ case GST_STATE_CHANGE_SUCCESS:
+ break;
+ case GST_STATE_CHANGE_FAILURE:
+ /* PUT stream should've */
+ g_critical ("GET request, but state change failure?");
+ case GST_STATE_CHANGE_ASYNC:
+ soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE);
+ return;
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Connect appsink to tee, and start streaming */
+ client_ctx = g_new0 (TranscodeClientCtx, 1);
+ client_ctx->msg = msg;
+ client_ctx->server_ctx = server_ctx;
+
+ bin = gst_bin_new (NULL);
+
+ /* queue ! appsink */
+ q2 = gst_element_factory_make ("queue", "q2");
+ appsink = gst_element_factory_make ("appsink", NULL);
+ g_object_set (appsink, "drop", FALSE,
+ "emit-signals", TRUE,
+ "max-buffers", 0, NULL);
+ gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL);
+ gst_element_link (q2, appsink);
+
+ sinkpadq2 = gst_element_get_static_pad (q2, "sink");
+ gst_element_add_pad (bin, gst_ghost_pad_new ("qsink", sinkpadq2));
+ gst_object_unref (sinkpadq2);
+
+ /* Set to PAUSED so the bin can pre-roll */
+ if (gst_element_set_state (bin, GST_STATE_PAUSED) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set appsink to PAUSED\n");
+ soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_client_ctx (client_ctx);
+ } else {
+ g_print ("sinkbin set to PAUSED successfully\n");
+ }
+
+ /* Request a new src pad from the tee in the pipeline */
+ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee");
+ template = gst_pad_template_new ("teesrc", GST_PAD_SRC,
+ GST_PAD_REQUEST, GST_CAPS_ANY);
+ srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY);
+
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK,
+ (GstPadProbeCallback) tee_src_pad_blocked_cb,
+ client_ctx, NULL);
+ gst_object_unref (srcpad);
+
+ gst_object_ref (appsink);
+ client_ctx->appsink = appsink;
+
+ client_ctx->first_sample_handler_id =
+ g_signal_connect (appsink, "new-sample",
+ G_CALLBACK (write_first_client_chunk_cb), client_ctx);
+ g_signal_connect (client_ctx->msg, "wrote-chunk",
+ G_CALLBACK (write_next_client_chunk_cb), client_ctx);
+ g_signal_connect (client_ctx->msg, "finished",
+ G_CALLBACK (client_finished_cb), client_ctx);
+
+ server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx);
+
+ /* Since we set the response as chunked, the server will
+ * automatically pause the message for us, and the response
+ * won't be completed till we call soup_message_body_complete() */
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+}
+
+static void
+got_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ TranscodeServerCtx *ctx)
+{
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ /* XXX: Unfortunately, there doesn't seem to be a way to cancel the
+ * reception of further chunks when we've prematurely set the response
+ * (say due to an error), so we check if the status code is set and skip
+ * all further chunks */
+ if (msg->status_code != 0)
+ return;
+
+ /* We need to update this every chunk */
+ ctx->msg = msg;
+
+ /* Actually just a ref */
+ buffer = stp_get_gst_buffer (chunk);
+ g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret);
+ gst_buffer_unref (buffer);
+ if (ret != GST_FLOW_OK) {
+ g_critical ("Unable to push buffer\n");
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_server_ctx (ctx);
+ return;
+ }
+
+ /* We can only set the pipeline to playing when
+ * the first chunk has been received */
+ if (!ctx->pipeline_is_playing) {
+ if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set pipeline to PLAYING\n");
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_server_ctx (ctx);
+ } else {
+ g_print ("Set pipeline to PLAYING\n");
+ ctx->pipeline_is_playing = TRUE;
+ }
+ }
+}
+
+static void
+got_request_headers (SoupMessage *msg,
+ GHashTable *ctx_table)
+{
+ TranscodeServerCtx *ctx;
+ SoupURI *uri;
+
+#ifdef DEBUG
+ SoupMessageHeadersIter iter;
+ const char *name, *value;
+ soup_message_headers_iter_init (&iter, msg->request_headers);
+ while (soup_message_headers_iter_next (&iter, &name, &value))
+ g_print ("%s: %s\n", name, value);
+#endif
+
+ g_object_get (msg, "uri", &uri, NULL);
+ g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri));
+
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else {
+ /* Nothing to do; server doesn't implement this method */
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+PUT: {
+ if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) {
+ /* There's already a request streaming on this path */
+ g_print ("Recv duplicate request on the same URI: %s\n",
+ soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
+ goto out;
+ }
+
+ ctx = g_new0 (TranscodeServerCtx, 1);
+ ctx->msg = msg;
+ ctx->parent_ctx_table = ctx_table;
+ ctx->path = g_strdup (soup_uri_get_path (uri));
+ g_hash_table_insert (ctx_table, ctx->path, ctx);
+
+ /* We only do this when we receive the first chunk */
+ if (!ctx->pipeline) {
+ ctx->pipeline = gst_pipeline_new ("pipe");
+ /* The chunked request is copied into this stream
+ * for consumption by the gst pipeline */
+#ifdef DEBUG
+ stp_play_from_msg (ctx);
+#else
+ stp_encode_from_msg (ctx);
+#endif
+ g_signal_connect (ctx->appsrc, "need-data",
+ G_CALLBACK (need_data_unpause_message), ctx);
+ g_signal_connect (ctx->appsrc, "enough-data",
+ G_CALLBACK (enough_data_pause_message), ctx);
+ }
+
+ /* The request body isn't fixed length, so tell libsoup to
+ * not collect chunks for forming a complete request body */
+ soup_message_body_set_accumulate (msg->request_body, FALSE);
+
+ g_signal_connect (msg, "got-chunk",
+ G_CALLBACK (got_request_body_chunk), ctx);
+ goto out;
+ }
+
+GET: {
+ if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) {
+ g_print ("No stream on URI: %s\n", soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ /* Our response will be chunked, and not with a fixed Content-Length */
+ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED);
+ /* The request body isn't fixed length, so tell libsoup to
+ * not collect chunks for forming a complete request body */
+ soup_message_body_set_accumulate (msg->response_body, FALSE);
+ }
+
+out:
+ soup_uri_free (uri);
+ return;
+}
+
+static void
+request_started_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("New %s request started\n", msg->method);
+
+ /* SoupMessage is useful only once we have headers,
+ * so we do all initialization there */
+ g_signal_connect (msg, "got-headers",
+ G_CALLBACK (got_request_headers), ctx_table);
+}
+
+static void
+request_read_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ gboolean ret;
+ TranscodeServerCtx *ctx;
+
+ ctx = get_server_ctx_from_msg (msg, ctx_table);
+ g_return_if_fail (ctx);
+
+ /* Incoming stream has ended */
+ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret);
+ if (!ret)
+ g_printerr ("Unable to emit end-of-stream\n");
+ }
+
+GET:
+ {
+ /* Nothing to do */
+ }
+
+ g_print ("%s request read successfully\n", msg->method);
+}
+
+static void
+request_finished_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("%s request ended\n", msg->method);
+}
+
+static void
+request_aborted_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("%s request aborted!\n", msg->method);
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ /* FIXME: Is there anything to do here? */
+ }
+
+GET:
+ {
+ /* We do all cleanup in the "finished" msg signal, nothing to do here */
+ }
+}
+
+int
+main (int argc,
+ char *argv[])
+{
+ GHashTable *ctx_table;
+
+ gst_init (&argc, &argv);
+
+ /* Keys are paths, and values are TranscodeServerCtxs */
+ ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_cleanup_transcode_server_ctx);
+ server = soup_server_new (SOUP_SERVER_SERVER_HEADER,
+ "soup-transcode-proxy ",
+ SOUP_SERVER_PORT, 8000, NULL);
+ soup_server_add_handler (server, NULL,
+ (SoupServerCallback)handle_request_cb,
+ ctx_table, NULL);
+ g_signal_connect (server, "request-started",
+ G_CALLBACK (request_started_cb), ctx_table);
+ g_signal_connect (server, "request-read",
+ G_CALLBACK (request_read_cb), ctx_table);
+ g_signal_connect (server, "request-finished",
+ G_CALLBACK (request_finished_cb), ctx_table);
+ g_signal_connect (server, "request-aborted",
+ G_CALLBACK (request_aborted_cb), ctx_table);
+
+ soup_server_run (server);
+ return 0;
+}