diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/debug/local-play.c | 147 | ||||
-rw-r--r-- | src/debug/local-play.h | 11 | ||||
-rw-r--r-- | src/encode.c | 166 | ||||
-rw-r--r-- | src/encode.h | 11 | ||||
-rw-r--r-- | src/lib.c | 166 | ||||
-rw-r--r-- | src/lib.h | 57 | ||||
-rw-r--r-- | src/main.c | 562 |
7 files changed, 1120 insertions, 0 deletions
diff --git a/src/debug/local-play.c b/src/debug/local-play.c new file mode 100644 index 0000000..497d567 --- /dev/null +++ b/src/debug/local-play.c @@ -0,0 +1,147 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#include "local-play.h" + +static void +pad_has_video_caps (TranscodeServerCtx *ctx, + GstPad *decodebin_pad) +{ + GstPad *sink_pad; + GstElement *bin; + GstElement *videoconv, *videoscale, *videosink; + + bin = gst_bin_new (NULL); + videoconv = gst_element_factory_make ("videoconvert", "videoconv"); + videoscale = gst_element_factory_make ("videoscale", "videoscale"); + videosink = gst_element_factory_make ("autovideosink", "videosink"); + + gst_bin_add_many (GST_BIN (bin), videoconv, videoscale, videosink, NULL); + gst_element_link_many (videoconv, videoscale, videosink, NULL); + + sink_pad = gst_element_get_static_pad (videoconv, "sink"); + gst_element_add_pad (bin, gst_ghost_pad_new ("sink", sink_pad)); + gst_object_unref (sink_pad); + + gst_bin_add (GST_BIN (ctx->pipeline), bin); + + sink_pad = gst_element_get_static_pad (bin, "sink"); + if (gst_pad_link (decodebin_pad, sink_pad) != GST_PAD_LINK_OK) { + g_critical ("ERROR: Unable to link video pads"); + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + goto out; + } + + /* Since these elements have been added to the pipeline after the state was + * changed from GST_STATE_NULL, they have to be moved to PAUSED manually */ + gst_element_set_state (bin, GST_STATE_PAUSED); +out: + gst_object_unref (sink_pad); + return; +} + +static void +pad_has_audio_caps (TranscodeServerCtx *ctx, + GstPad *decodebin_pad) +{ + GstPad *sink_pad; + GstElement *bin; + GstElement *audioconv, *audioresample, *audiosink; + + bin = gst_bin_new (NULL); + audioconv = gst_element_factory_make ("audioconvert", "audioconv"); + audioresample = gst_element_factory_make ("audioresample", "audioresample"); + audiosink = gst_element_factory_make ("autoaudiosink", "audiosink"); + + gst_bin_add_many (GST_BIN (bin), audioconv, audioresample, audiosink, NULL); + gst_element_link_many (audioconv, audioresample, audiosink, NULL); + + sink_pad = gst_element_get_static_pad (audioconv, "sink"); + gst_element_add_pad (bin, gst_ghost_pad_new ("sink", sink_pad)); + gst_object_unref (sink_pad); + + gst_bin_add (GST_BIN (ctx->pipeline), bin); + + sink_pad = gst_element_get_static_pad (bin, "sink"); + if (gst_pad_link (decodebin_pad, sink_pad) != GST_PAD_LINK_OK) { + g_critical ("ERROR: Unable to link video pads"); + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + goto out; + } + + /* Since these elements have been added to the pipeline after the state was + * changed from GST_STATE_NULL, they have to be moved to PAUSED manually */ + gst_element_set_state (bin, GST_STATE_PAUSED); +out: + gst_object_unref (sink_pad); + return; +} + +static gboolean +structure_printf (GQuark field_id, + const GValue *value, + gpointer user_data) +{ + char *char_value; + + char_value = g_strdup_value_contents (value); + g_print ("%s = %s\n", g_quark_to_string (field_id), char_value); + g_free (char_value); + return TRUE; +} + +static void +decodebin_pad_added (GstElement *decodebin, + GstPad *src_pad, + gpointer user_data) +{ + GstCaps *pad_caps; + GstStructure *structure; + TranscodeServerCtx *ctx = user_data; + + if (!gst_pad_has_current_caps (src_pad)) { + g_critical ("Decodebin pad doesn't have current caps"); + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + return; + } + + pad_caps = gst_pad_get_current_caps (src_pad); + structure = gst_caps_get_structure (pad_caps, 0); + gst_structure_foreach (structure, structure_printf, NULL); + + if (gst_structure_has_name (structure, "audio/x-raw")) + pad_has_audio_caps (ctx, src_pad); + else if (gst_structure_has_name (structure, "video/x-raw")) + pad_has_video_caps (ctx, src_pad); + else + g_critical ("Found unsupported caps: %s", + gst_structure_get_name (structure)); + + gst_caps_unref (pad_caps); +} + +void +stp_play_from_msg (TranscodeServerCtx *ctx) +{ + GstBus *bus; + GstElement *src, *decodebin; + + g_debug ("Constructing pipeline\n"); + src = gst_element_factory_make ("appsrc", "src"); + g_object_set (src, "is-live", TRUE, + "stream-type", 0, NULL); + ctx->appsrc = src; + + decodebin = gst_element_factory_make ("decodebin", "decodebin"); + gst_bin_add_many (GST_BIN (ctx->pipeline), src, decodebin, NULL); + gst_element_link (src, decodebin); + + g_signal_connect (decodebin, "pad-added", + G_CALLBACK (decodebin_pad_added), ctx); + + bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline)); + gst_bus_add_signal_watch (bus); + g_signal_connect (bus, "message", G_CALLBACK (stp_on_gst_bus_message), ctx); + g_object_unref (bus); +} + diff --git a/src/debug/local-play.h b/src/debug/local-play.h new file mode 100644 index 0000000..a56a572 --- /dev/null +++ b/src/debug/local-play.h @@ -0,0 +1,11 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#ifndef _SST_LOCAL_PLAY +#define _SST_LOCAL_PLAY + +#include "../lib.h" + +void stp_play_from_msg (TranscodeServerCtx *ctx); + +#endif /* _SST_LOCAL_PLAY */ diff --git a/src/encode.c b/src/encode.c new file mode 100644 index 0000000..caaaa46 --- /dev/null +++ b/src/encode.c @@ -0,0 +1,166 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#include "encode.h" + +#include <gst/pbutils/encoding-profile.h> + +static gboolean +on_autoplug_continue (GstElement *decodebin, + GstPad *srcpad, + GstCaps *caps, + GstElement *encodebin) +{ + gboolean ret; + char *name; + GstPad *sinkpad; + + name = gst_caps_to_string (caps); + g_signal_emit_by_name (encodebin, "request-pad", caps, &sinkpad); + + if (sinkpad != NULL) + ret = FALSE, g_debug ("encodebin can passthrough %s\n", name); + else + ret = TRUE, g_debug ("encodebin cannot passthrough %s\n", name); + + g_free (name); + return ret; +} + +static void +on_decodebin_pad_added (GstElement *decodebin, + GstPad *srcpad, + GstElement *encodebin) +{ + GstCaps *caps; + GstPad *sinkpad = NULL; + char *name; + + caps = gst_pad_query_caps (srcpad, NULL); + name = gst_caps_to_string (caps); + + /* FIXME: We don't try to fetch a compatible pad for raw audio because + * that somehow always fails to link. Transmageddon does the same. */ + if (!g_str_has_prefix (name, "audio/x-raw")) + /* If we successfully requested a compatible sink pad in + * "autoplug-continue", we can fetch that here. */ + sinkpad = gst_element_get_compatible_pad (encodebin, srcpad, NULL); + + if (!sinkpad) { + /* We request a sink pad for the decoded stream */ + g_signal_emit_by_name (encodebin, "request-pad", caps, &sinkpad); + if (!sinkpad) { + g_printerr ("Failed to request a new sink pad for %s\n", name); + goto out; + } + g_debug ("Requested a new sink pad for %s\n", name); + } else { + g_debug ("Found an existing sink pad for %s\n", name); + } + + if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) + g_printerr ("Couldn't link pads for %s\n", name); + +out: + g_free (name); + gst_caps_unref (caps); + return; +} + +static GstEncodingProfile * +create_webm_profile (void) +{ + GstEncodingProfile *t; + GstEncodingContainerProfile *prof; + GstCaps *caps; + GstPreset *vp8preset; + + caps = gst_caps_from_string ("video/webm"); + prof = gst_encoding_container_profile_new ("WebM audio/video", "Standard WEBM/VP8/VORBIS", caps, NULL); + gst_caps_unref (caps); + + vp8preset = GST_PRESET (gst_element_factory_make ("vp8enc", "vp8preset")); + /* FIXME: This thing still doesn't encode fast enough in real time */ + g_object_set (vp8preset, + "cpu-used", 5, + "end-usage", 1, + "max-quantizer", 56, + "threads", 4, + "undershoot", 95, + NULL); + gst_preset_save_preset (vp8preset, "stp_vp8preset"); + + caps = gst_caps_from_string ("video/x-vp8"); + t = (GstEncodingProfile*) gst_encoding_video_profile_new (caps, "stp_vp8preset", + NULL, 0); + gst_encoding_container_profile_add_profile (prof, t); + gst_caps_unref (caps); + + caps = gst_caps_from_string ("audio/x-vorbis"); + t = (GstEncodingProfile*) gst_encoding_audio_profile_new (caps, NULL, NULL, 0); + gst_encoding_container_profile_add_profile (prof, t); + gst_caps_unref (caps); + + /* What about application/x-ass? */ + + return (GstEncodingProfile*) prof; +} + +void +stp_encode_from_msg (TranscodeServerCtx *ctx) +{ + GstBus *bus; + GstElement *src, *decodebin, *encodebin; + GstElement *tee, *q1, *fakesink; + GstEncodingProfile *profile; + + g_debug ("Constructing pipeline\n"); + + src = gst_element_factory_make ("appsrc", "src"); + g_object_set (src, "is-live", TRUE, + "emit-signals", FALSE, + "stream-type", 0, NULL); + ctx->appsrc = src; + + decodebin = gst_element_factory_make ("decodebin", "decodebin"); + + gst_bin_add_many (GST_BIN (ctx->pipeline), src, decodebin, NULL); + gst_element_link (src, decodebin); + + /* TODO: Allow setting of a scaling factor to + * allow realtime encoding of all streams */ + profile = create_webm_profile (); + encodebin = gst_element_factory_make ("encodebin", "encodebin"); + g_object_set (encodebin, + "profile", profile, + "avoid-reencoding", TRUE, NULL); + tee = gst_element_factory_make ("tee", "tee"); + q1 = gst_element_factory_make ("queue", "q1"); +#ifndef DEBUG + fakesink = gst_element_factory_make ("filesink", "filesink"); + g_object_set (fakesink, "location", "debug-output.webm", NULL); +#else + fakesink = gst_element_factory_make ("fakesink", "fakesink"); +#endif + + gst_bin_add_many (GST_BIN (ctx->pipeline), encodebin, tee, q1, fakesink, NULL); + gst_element_link_many (encodebin, tee, q1, fakesink, NULL); + + /* The pads of decodebin and encodebin are dynamic, + * so those will be linked when streams/pads are added */ + + /* When decodebin finds a stream that can be decoded, we check + * if we can pass that directly to encodebin instead of letting + * decodebin find a decoding element automatically */ + g_signal_connect (decodebin, "autoplug-continue", + G_CALLBACK (on_autoplug_continue), encodebin); + /* When decodebin exposes a source pad, we need to request a + * corresponding sink pad on decodebin */ + g_signal_connect (decodebin, "pad-added", + G_CALLBACK (on_decodebin_pad_added), encodebin); + + bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline)); + gst_bus_add_signal_watch (bus); + g_signal_connect (bus, "message", G_CALLBACK (stp_on_gst_bus_message), ctx); + g_object_unref (bus); +} diff --git a/src/encode.h b/src/encode.h new file mode 100644 index 0000000..9c0d897 --- /dev/null +++ b/src/encode.h @@ -0,0 +1,11 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#ifndef _SST_ENCODE +#define _SST_ENCODE + +#include "lib.h" + +void stp_encode_from_msg (TranscodeServerCtx *ctx); + +#endif /* _SST_ENCODE */ diff --git a/src/lib.c b/src/lib.c new file mode 100644 index 0000000..584da39 --- /dev/null +++ b/src/lib.c @@ -0,0 +1,166 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#include "lib.h" + +#include <poll.h> +#include <unistd.h> +#include <sys/stat.h> +#include <glib-object.h> +#include <gio/gunixinputstream.h> + +gboolean +stp_on_gst_bus_message (GstBus *bus, + GstMessage *msg, + TranscodeServerCtx *ctx) +{ + GError *error = NULL; + char *tmp = NULL; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR: + gst_message_parse_error (msg, &error, &tmp); + g_printerr ("ERROR from element %s: %s\n", + GST_OBJECT_NAME (msg->src), error->message); + g_printerr ("Debug info: %s\n", tmp); + g_error_free (error); + g_free (tmp); + /* Setting the server response will only work if the request + * hasn't already finished, so we check that */ + if (!ctx->request_finished) + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + break; + case GST_MESSAGE_EOS: + g_print ("End of file\n"); + g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + break; + default: + //g_print ("%s\n", gst_message_type_get_name (msg->type)); + break; + } + + return TRUE; +} + +void +stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) +{ + g_print (">>> Doing server cleanup\n"); + + /* Closing the stream should send a HUP on the fd and + * trigger cleanup in write_get_response_body_chunk() */ + g_list_foreach (ctx->clients, (GFunc)stp_close_client_ctx, NULL); + g_list_free (ctx->clients); + + /* Cleanup gstreamer pipeline */ + gst_element_set_state (ctx->pipeline, GST_STATE_NULL); + gst_object_unref (ctx->pipeline); + ctx->pipeline_is_playing = FALSE; + + g_free (ctx); +} + +static GstPadProbeReturn +pad_blocked_cleanup_cb (GstPad *srcpad, + GstPadProbeInfo *info, + TranscodeClientCtx *ctx) +{ + GstPad *sinkpad; + GstElement *tee; + GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); + + g_print ("."); + /* Remove the probe, XXX: hence unblocking the pipeline? */ + gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); + + tee = gst_pad_get_parent_element (srcpad); + sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); + gst_pad_unlink (srcpad, sinkpad); + gst_element_remove_pad (tee, srcpad); + + gst_object_unref (sinkpad); + gst_object_unref (tee); + + gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin); + gst_object_unref (sinkbin); + gst_object_unref (ctx->appsink); + + g_free (ctx); + g_print (" Client cleanup done!\n"); + return GST_PAD_PROBE_OK; +} + +void +stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) +{ + GstPad *sinkpad, *srcpad; + TranscodeServerCtx *server_ctx = ctx->server_ctx; + GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); + + g_print (">>> Doing client cleanup ."); + + /* If we're cleaning up because the server is shutting down, + * this will be NULL */ + if (server_ctx) + server_ctx->clients = g_list_remove (server_ctx->clients, ctx); + + /* Block sinkpad and srcpad, then unlink and remove */ + sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); + srcpad = gst_pad_get_peer (sinkpad); + + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, + (GstPadProbeCallback) pad_blocked_cleanup_cb, + ctx, NULL); + gst_object_unref (sinkbin); + g_print ("."); +} + +void +stp_close_client_ctx (TranscodeClientCtx *ctx) +{ + g_print (">>> Doing client cleanup\n"); + + ctx->server_ctx = NULL; + + g_free (ctx); +} + +/* Returns a copy of the streamheader GstBuffer */ +GstBuffer* +stp_get_streamheader_from_caps (GstCaps *caps) +{ + GArray *array; + GstStructure *s; + const GValue *value; + + s = gst_caps_get_structure (caps, 0); + value = gst_structure_get_value (s, "streamheader"); + g_return_val_if_fail (G_IS_VALUE (value), NULL); + + array = g_value_peek_pointer (value); + value = &g_array_index (array, GValue, 0); + g_return_val_if_fail (G_VALUE_TYPE (value) == GST_TYPE_BUFFER, NULL); + + return gst_buffer_copy (g_value_peek_pointer (value)); +} + +/* Returns a GstBuffer which has consumed the passed-in data */ +GstBuffer* +stp_get_gst_buffer (SoupBuffer *chunk) +{ + gsize len; + const guint8 *d; + SoupBuffer *copy; + GstBuffer *buffer; + GstMemory *memory; + + copy = soup_buffer_copy (chunk); + soup_buffer_get_data (copy, &d, &len); + buffer = gst_buffer_new (); + memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, + (guint8*)d, len, 0, len, copy, + (GDestroyNotify)soup_buffer_free); + gst_buffer_append_memory (buffer, memory); + return buffer; +} diff --git a/src/lib.h b/src/lib.h new file mode 100644 index 0000000..187bae7 --- /dev/null +++ b/src/lib.h @@ -0,0 +1,57 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#ifndef _SST_LIB +#define _SST_LIB + +#include <glib.h> +#include <gst/gst.h> +#include <libsoup/soup.h> + +typedef struct _TranscodeServerCtx TranscodeServerCtx; +typedef struct _TranscodeClientCtx TranscodeClientCtx; + +struct _TranscodeServerCtx { + SoupMessage *msg; + GstElement *pipeline; + GstElement *appsrc; + /* The pipeline status can be queried from the pipeline itself, + * but it's probably better for branching to have it separately too */ + gboolean pipeline_is_playing; + /* Set to TRUE when the incoming stream ends; let's us know if + * we need to set the PUT response on EOS/ERROR on the pipeline */ + gboolean request_finished; + + /* List of client contexts */ + GList *clients; + /* Reference to the parent context hash table */ + GHashTable *parent_ctx_table; + /* Reference to the key in the parent hash table */ + char *path; +}; + +struct _TranscodeClientCtx { + SoupMessage *msg; + GstElement *appsink; + gulong first_sample_handler_id; + /* The transcode server context */ + TranscodeServerCtx *server_ctx; +}; + +void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx); +void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx); +void stp_close_client_ctx (TranscodeClientCtx *ctx); + +gboolean stp_on_gst_bus_message (GstBus *bus, + GstMessage *msg, + TranscodeServerCtx *ctx); + +gboolean stp_copy_chunk_to_fd (SoupBuffer *chunk, + int fd); +gboolean stp_copy_fd_to_body (TranscodeClientCtx *ctx); +gboolean stp_fd_has_data_to_read (int fd); + +GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps); +GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk); + +#endif /* _SST_LIB */ diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..a2430f5 --- /dev/null +++ b/src/main.c @@ -0,0 +1,562 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#include "lib.h" +#include "encode.h" + +#ifdef DEBUG +#include "debug/local-play.h" +#endif + +#include <fcntl.h> +#include <string.h> + +static SoupServer *server; + +static TranscodeServerCtx* +get_server_ctx_from_msg (SoupMessage *msg, + GHashTable *ctx_table) +{ + SoupURI *uri; + TranscodeServerCtx *ctx; + + g_object_get (msg, "uri", &uri, NULL); + ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); + soup_uri_free (uri); + + return ctx; +} + +static void +enough_data_pause_message (GstElement *appsrc, + guint size, + TranscodeServerCtx *ctx) +{ + /* Pausing the request can cause weirdness if + * the recv rate is too high. So, we don't do this. */ + //g_print ("Enough data, pause. "); + //soup_server_pause_message (server, ctx->msg); +} + +static void +need_data_unpause_message (GstElement *appsrc, + guint size, + TranscodeServerCtx *ctx) +{ + //g_print ("Need data, unpause. \n"); + //soup_server_unpause_message (server, ctx->msg); +} + +static GstPadProbeReturn +tee_src_pad_blocked_cb (GstPad *srcpad, + GstPadProbeInfo *info, + TranscodeClientCtx *ctx) +{ + GstCaps *caps; + GstPad *sinkpad; + GstBuffer *buffer; + GstState state; + GstStateChangeReturn ret; + TranscodeServerCtx *server_ctx = ctx->server_ctx; + GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); + + /* Remove the probe, XXX: hence unblocking the pipeline? */ + gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); + + gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin); + + /* t. ! queue ! appsink */ + sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); + gst_pad_link (srcpad, sinkpad); + + /* Send the WebM stream header through the pipeline first */ + caps = gst_pad_get_current_caps (srcpad); + /* TODO: cache keyframes */ + buffer = stp_get_streamheader_from_caps (caps); + if (!buffer) + goto err; + gst_pad_push (srcpad, buffer); + + ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1); + g_print ("Linked pads, removed probe. State was %s:%s.\n", + gst_element_state_change_return_get_name (ret), + gst_element_state_get_name (state)); + + if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set pipeline back to PLAYING\n"); + goto err; + } else { + g_print ("pipeline set to PLAYING successfully\n"); + } + +out: + gst_caps_unref (caps); + gst_object_unref (sinkpad); + gst_object_unref (sinkbin); + return GST_PAD_PROBE_OK; +err: + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_client_ctx (ctx); + /* FIXME: PROBE_OK for errors too? What to do here? */ + goto out; +} + +static GstFlowReturn +write_first_client_chunk_cb (GstElement *appsink, + TranscodeClientCtx *ctx) +{ + GstMapInfo info; + GstSample *sample; + GstBuffer *buffer; + GstMemory *memory; + + g_assert (ctx->first_sample_handler_id); + + g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id); + ctx->first_sample_handler_id = 0; + + /* XXX: If no samples are available, this will block till a sample is available */ + g_signal_emit_by_name (appsink, "pull-sample", &sample); + buffer = gst_sample_get_buffer (sample); + memory = gst_buffer_get_all_memory (buffer); /* copy */ + gst_memory_map (memory, &info, GST_MAP_READ); + + soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, + info.data, info.size); /* copy */ + + gst_memory_unmap (memory, &info); + gst_memory_unref (memory); + gst_sample_unref (sample); + + soup_server_unpause_message (server, ctx->msg); + g_print ("."); + return GST_FLOW_OK; +} + +static void +write_next_client_chunk_cb (SoupMessage *msg, + TranscodeClientCtx *ctx) +{ + GstMapInfo info; + GstSample *sample; + GstBuffer *buffer; + GstMemory *memory; + gboolean eos; + + g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); + if (!sample) { + g_print ("Null sample, ending stream\n"); + g_object_get (ctx->appsink, "eos", &eos, NULL); + if (eos) + soup_message_set_status (msg, SOUP_STATUS_OK); + else + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + soup_message_body_complete (msg->response_body); + return; + } + + buffer = gst_sample_get_buffer (sample); + memory = gst_buffer_get_all_memory (buffer); /* copy */ + gst_memory_map (memory, &info, GST_MAP_READ); + + soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY, + info.data, info.size); /* copy */ + + gst_memory_unmap (memory, &info); + gst_memory_unref (memory); + gst_sample_unref (sample); + + soup_server_unpause_message (server, msg); + g_print ("."); + return; +} + +static void +client_finished_cb (SoupMessage *msg, + TranscodeClientCtx *ctx) +{ + stp_cleanup_transcode_client_ctx (ctx); +} + +static void +handle_request_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + GHashTable *ctx_table) +{ + TranscodeServerCtx *server_ctx; + + if (!msg) + return; + + g_print ("Handling %s request on path %s\n", msg->method, path); + + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + /* The PUT request has finished streaming. Cleanup + * will be done on EOS/ERROR in the gst pipeline. */ + server_ctx = get_server_ctx_from_msg (msg, ctx_table); + server_ctx->request_finished = TRUE; + soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); + return; + } + +GET: + { + /* A GET request was received. We connect from the pipeline to the + * client requesting the stream and start writing the response. */ + GstElement *bin, *tee, *q2, *appsink; + GstPadTemplate *template; + GstPad *srcpad, *sinkpadq2; + TranscodeClientCtx *client_ctx; + GstStateChangeReturn state; + + server_ctx = get_server_ctx_from_msg (msg, ctx_table); + if (server_ctx == NULL || + server_ctx->pipeline == NULL) { + /* There's no request streaming on this path (yet) */ + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + return; + } + + /* Check if the pipeline successfully started PLAYING */ + state = gst_element_get_state (server_ctx->pipeline, + NULL, NULL, 100*GST_MSECOND); + switch (state) { + case GST_STATE_CHANGE_SUCCESS: + break; + case GST_STATE_CHANGE_FAILURE: + /* PUT stream should've */ + g_critical ("GET request, but state change failure?"); + case GST_STATE_CHANGE_ASYNC: + soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); + return; + default: + g_assert_not_reached (); + } + + /* Connect appsink to tee, and start streaming */ + client_ctx = g_new0 (TranscodeClientCtx, 1); + client_ctx->msg = msg; + client_ctx->server_ctx = server_ctx; + + bin = gst_bin_new (NULL); + + /* queue ! appsink */ + q2 = gst_element_factory_make ("queue", "q2"); + appsink = gst_element_factory_make ("appsink", NULL); + g_object_set (appsink, "drop", FALSE, + "emit-signals", TRUE, + "max-buffers", 0, NULL); + gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); + gst_element_link (q2, appsink); + + sinkpadq2 = gst_element_get_static_pad (q2, "sink"); + gst_element_add_pad (bin, gst_ghost_pad_new ("qsink", sinkpadq2)); + gst_object_unref (sinkpadq2); + + /* Set to PAUSED so the bin can pre-roll */ + if (gst_element_set_state (bin, GST_STATE_PAUSED) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set appsink to PAUSED\n"); + soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_client_ctx (client_ctx); + } else { + g_print ("sinkbin set to PAUSED successfully\n"); + } + + /* Request a new src pad from the tee in the pipeline */ + tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); + template = gst_pad_template_new ("teesrc", GST_PAD_SRC, + GST_PAD_REQUEST, GST_CAPS_ANY); + srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); + + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, + (GstPadProbeCallback) tee_src_pad_blocked_cb, + client_ctx, NULL); + gst_object_unref (srcpad); + + gst_object_ref (appsink); + client_ctx->appsink = appsink; + + client_ctx->first_sample_handler_id = + g_signal_connect (appsink, "new-sample", + G_CALLBACK (write_first_client_chunk_cb), client_ctx); + g_signal_connect (client_ctx->msg, "wrote-chunk", + G_CALLBACK (write_next_client_chunk_cb), client_ctx); + g_signal_connect (client_ctx->msg, "finished", + G_CALLBACK (client_finished_cb), client_ctx); + + server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); + + /* Since we set the response as chunked, the server will + * automatically pause the message for us, and the response + * won't be completed till we call soup_message_body_complete() */ + soup_message_set_status (msg, SOUP_STATUS_OK); + } +} + +static void +got_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + TranscodeServerCtx *ctx) +{ + GstBuffer *buffer; + GstFlowReturn ret; + + /* XXX: Unfortunately, there doesn't seem to be a way to cancel the + * reception of further chunks when we've prematurely set the response + * (say due to an error), so we check if the status code is set and skip + * all further chunks */ + if (msg->status_code != 0) + return; + + /* We need to update this every chunk */ + ctx->msg = msg; + + /* Actually just a ref */ + buffer = stp_get_gst_buffer (chunk); + g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); + gst_buffer_unref (buffer); + if (ret != GST_FLOW_OK) { + g_critical ("Unable to push buffer\n"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_server_ctx (ctx); + return; + } + + /* We can only set the pipeline to playing when + * the first chunk has been received */ + if (!ctx->pipeline_is_playing) { + if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set pipeline to PLAYING\n"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_server_ctx (ctx); + } else { + g_print ("Set pipeline to PLAYING\n"); + ctx->pipeline_is_playing = TRUE; + } + } +} + +static void +got_request_headers (SoupMessage *msg, + GHashTable *ctx_table) +{ + TranscodeServerCtx *ctx; + SoupURI *uri; + +#ifdef DEBUG + SoupMessageHeadersIter iter; + const char *name, *value; + soup_message_headers_iter_init (&iter, msg->request_headers); + while (soup_message_headers_iter_next (&iter, &name, &value)) + g_print ("%s: %s\n", name, value); +#endif + + g_object_get (msg, "uri", &uri, NULL); + g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri)); + + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else { + /* Nothing to do; server doesn't implement this method */ + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +PUT: { + if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { + /* There's already a request streaming on this path */ + g_print ("Recv duplicate request on the same URI: %s\n", + soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_CONFLICT); + goto out; + } + + ctx = g_new0 (TranscodeServerCtx, 1); + ctx->msg = msg; + ctx->parent_ctx_table = ctx_table; + ctx->path = g_strdup (soup_uri_get_path (uri)); + g_hash_table_insert (ctx_table, ctx->path, ctx); + + /* We only do this when we receive the first chunk */ + if (!ctx->pipeline) { + ctx->pipeline = gst_pipeline_new ("pipe"); + /* The chunked request is copied into this stream + * for consumption by the gst pipeline */ +#ifdef DEBUG + stp_play_from_msg (ctx); +#else + stp_encode_from_msg (ctx); +#endif + g_signal_connect (ctx->appsrc, "need-data", + G_CALLBACK (need_data_unpause_message), ctx); + g_signal_connect (ctx->appsrc, "enough-data", + G_CALLBACK (enough_data_pause_message), ctx); + } + + /* The request body isn't fixed length, so tell libsoup to + * not collect chunks for forming a complete request body */ + soup_message_body_set_accumulate (msg->request_body, FALSE); + + g_signal_connect (msg, "got-chunk", + G_CALLBACK (got_request_body_chunk), ctx); + goto out; + } + +GET: { + if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { + g_print ("No stream on URI: %s\n", soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + /* Our response will be chunked, and not with a fixed Content-Length */ + soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); + /* The request body isn't fixed length, so tell libsoup to + * not collect chunks for forming a complete request body */ + soup_message_body_set_accumulate (msg->response_body, FALSE); + } + +out: + soup_uri_free (uri); + return; +} + +static void +request_started_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("New %s request started\n", msg->method); + + /* SoupMessage is useful only once we have headers, + * so we do all initialization there */ + g_signal_connect (msg, "got-headers", + G_CALLBACK (got_request_headers), ctx_table); +} + +static void +request_read_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + gboolean ret; + TranscodeServerCtx *ctx; + + ctx = get_server_ctx_from_msg (msg, ctx_table); + g_return_if_fail (ctx); + + /* Incoming stream has ended */ + g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); + if (!ret) + g_printerr ("Unable to emit end-of-stream\n"); + } + +GET: + { + /* Nothing to do */ + } + + g_print ("%s request read successfully\n", msg->method); +} + +static void +request_finished_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("%s request ended\n", msg->method); +} + +static void +request_aborted_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("%s request aborted!\n", msg->method); + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + /* FIXME: Is there anything to do here? */ + } + +GET: + { + /* We do all cleanup in the "finished" msg signal, nothing to do here */ + } +} + +int +main (int argc, + char *argv[]) +{ + GHashTable *ctx_table; + + gst_init (&argc, &argv); + + /* Keys are paths, and values are TranscodeServerCtxs */ + ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_cleanup_transcode_server_ctx); + server = soup_server_new (SOUP_SERVER_SERVER_HEADER, + "soup-transcode-proxy ", + SOUP_SERVER_PORT, 8000, NULL); + soup_server_add_handler (server, NULL, + (SoupServerCallback)handle_request_cb, + ctx_table, NULL); + g_signal_connect (server, "request-started", + G_CALLBACK (request_started_cb), ctx_table); + g_signal_connect (server, "request-read", + G_CALLBACK (request_read_cb), ctx_table); + g_signal_connect (server, "request-finished", + G_CALLBACK (request_finished_cb), ctx_table); + g_signal_connect (server, "request-aborted", + G_CALLBACK (request_aborted_cb), ctx_table); + + soup_server_run (server); + return 0; +} |