summaryrefslogtreecommitdiff
path: root/src/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.c')
-rw-r--r--src/main.c562
1 files changed, 562 insertions, 0 deletions
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 0000000..a2430f5
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,562 @@
+/* License: LGPL-2.1
+ * vim: set sts=2 sw=2 et : */
+
+#include "lib.h"
+#include "encode.h"
+
+#ifdef DEBUG
+#include "debug/local-play.h"
+#endif
+
+#include <fcntl.h>
+#include <string.h>
+
+static SoupServer *server;
+
+static TranscodeServerCtx*
+get_server_ctx_from_msg (SoupMessage *msg,
+ GHashTable *ctx_table)
+{
+ SoupURI *uri;
+ TranscodeServerCtx *ctx;
+
+ g_object_get (msg, "uri", &uri, NULL);
+ ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri));
+ soup_uri_free (uri);
+
+ return ctx;
+}
+
+static void
+enough_data_pause_message (GstElement *appsrc,
+ guint size,
+ TranscodeServerCtx *ctx)
+{
+ /* Pausing the request can cause weirdness if
+ * the recv rate is too high. So, we don't do this. */
+ //g_print ("Enough data, pause. ");
+ //soup_server_pause_message (server, ctx->msg);
+}
+
+static void
+need_data_unpause_message (GstElement *appsrc,
+ guint size,
+ TranscodeServerCtx *ctx)
+{
+ //g_print ("Need data, unpause. \n");
+ //soup_server_unpause_message (server, ctx->msg);
+}
+
+static GstPadProbeReturn
+tee_src_pad_blocked_cb (GstPad *srcpad,
+ GstPadProbeInfo *info,
+ TranscodeClientCtx *ctx)
+{
+ GstCaps *caps;
+ GstPad *sinkpad;
+ GstBuffer *buffer;
+ GstState state;
+ GstStateChangeReturn ret;
+ TranscodeServerCtx *server_ctx = ctx->server_ctx;
+ GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
+
+ /* Remove the probe, XXX: hence unblocking the pipeline? */
+ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info));
+
+ gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin);
+
+ /* t. ! queue ! appsink */
+ sinkpad = gst_element_get_static_pad (sinkbin, "qsink");
+ gst_pad_link (srcpad, sinkpad);
+
+ /* Send the WebM stream header through the pipeline first */
+ caps = gst_pad_get_current_caps (srcpad);
+ /* TODO: cache keyframes */
+ buffer = stp_get_streamheader_from_caps (caps);
+ if (!buffer)
+ goto err;
+ gst_pad_push (srcpad, buffer);
+
+ ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1);
+ g_print ("Linked pads, removed probe. State was %s:%s.\n",
+ gst_element_state_change_return_get_name (ret),
+ gst_element_state_get_name (state));
+
+ if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set pipeline back to PLAYING\n");
+ goto err;
+ } else {
+ g_print ("pipeline set to PLAYING successfully\n");
+ }
+
+out:
+ gst_caps_unref (caps);
+ gst_object_unref (sinkpad);
+ gst_object_unref (sinkbin);
+ return GST_PAD_PROBE_OK;
+err:
+ soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_client_ctx (ctx);
+ /* FIXME: PROBE_OK for errors too? What to do here? */
+ goto out;
+}
+
+static GstFlowReturn
+write_first_client_chunk_cb (GstElement *appsink,
+ TranscodeClientCtx *ctx)
+{
+ GstMapInfo info;
+ GstSample *sample;
+ GstBuffer *buffer;
+ GstMemory *memory;
+
+ g_assert (ctx->first_sample_handler_id);
+
+ g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id);
+ ctx->first_sample_handler_id = 0;
+
+ /* XXX: If no samples are available, this will block till a sample is available */
+ g_signal_emit_by_name (appsink, "pull-sample", &sample);
+ buffer = gst_sample_get_buffer (sample);
+ memory = gst_buffer_get_all_memory (buffer); /* copy */
+ gst_memory_map (memory, &info, GST_MAP_READ);
+
+ soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY,
+ info.data, info.size); /* copy */
+
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
+ gst_sample_unref (sample);
+
+ soup_server_unpause_message (server, ctx->msg);
+ g_print (".");
+ return GST_FLOW_OK;
+}
+
+static void
+write_next_client_chunk_cb (SoupMessage *msg,
+ TranscodeClientCtx *ctx)
+{
+ GstMapInfo info;
+ GstSample *sample;
+ GstBuffer *buffer;
+ GstMemory *memory;
+ gboolean eos;
+
+ g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample);
+ if (!sample) {
+ g_print ("Null sample, ending stream\n");
+ g_object_get (ctx->appsink, "eos", &eos, NULL);
+ if (eos)
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ else
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ soup_message_body_complete (msg->response_body);
+ return;
+ }
+
+ buffer = gst_sample_get_buffer (sample);
+ memory = gst_buffer_get_all_memory (buffer); /* copy */
+ gst_memory_map (memory, &info, GST_MAP_READ);
+
+ soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY,
+ info.data, info.size); /* copy */
+
+ gst_memory_unmap (memory, &info);
+ gst_memory_unref (memory);
+ gst_sample_unref (sample);
+
+ soup_server_unpause_message (server, msg);
+ g_print (".");
+ return;
+}
+
+static void
+client_finished_cb (SoupMessage *msg,
+ TranscodeClientCtx *ctx)
+{
+ stp_cleanup_transcode_client_ctx (ctx);
+}
+
+static void
+handle_request_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ TranscodeServerCtx *server_ctx;
+
+ if (!msg)
+ return;
+
+ g_print ("Handling %s request on path %s\n", msg->method, path);
+
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ /* The PUT request has finished streaming. Cleanup
+ * will be done on EOS/ERROR in the gst pipeline. */
+ server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ server_ctx->request_finished = TRUE;
+ soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
+ return;
+ }
+
+GET:
+ {
+ /* A GET request was received. We connect from the pipeline to the
+ * client requesting the stream and start writing the response. */
+ GstElement *bin, *tee, *q2, *appsink;
+ GstPadTemplate *template;
+ GstPad *srcpad, *sinkpadq2;
+ TranscodeClientCtx *client_ctx;
+ GstStateChangeReturn state;
+
+ server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ if (server_ctx == NULL ||
+ server_ctx->pipeline == NULL) {
+ /* There's no request streaming on this path (yet) */
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ return;
+ }
+
+ /* Check if the pipeline successfully started PLAYING */
+ state = gst_element_get_state (server_ctx->pipeline,
+ NULL, NULL, 100*GST_MSECOND);
+ switch (state) {
+ case GST_STATE_CHANGE_SUCCESS:
+ break;
+ case GST_STATE_CHANGE_FAILURE:
+ /* PUT stream should've */
+ g_critical ("GET request, but state change failure?");
+ case GST_STATE_CHANGE_ASYNC:
+ soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE);
+ return;
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Connect appsink to tee, and start streaming */
+ client_ctx = g_new0 (TranscodeClientCtx, 1);
+ client_ctx->msg = msg;
+ client_ctx->server_ctx = server_ctx;
+
+ bin = gst_bin_new (NULL);
+
+ /* queue ! appsink */
+ q2 = gst_element_factory_make ("queue", "q2");
+ appsink = gst_element_factory_make ("appsink", NULL);
+ g_object_set (appsink, "drop", FALSE,
+ "emit-signals", TRUE,
+ "max-buffers", 0, NULL);
+ gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL);
+ gst_element_link (q2, appsink);
+
+ sinkpadq2 = gst_element_get_static_pad (q2, "sink");
+ gst_element_add_pad (bin, gst_ghost_pad_new ("qsink", sinkpadq2));
+ gst_object_unref (sinkpadq2);
+
+ /* Set to PAUSED so the bin can pre-roll */
+ if (gst_element_set_state (bin, GST_STATE_PAUSED) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set appsink to PAUSED\n");
+ soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_client_ctx (client_ctx);
+ } else {
+ g_print ("sinkbin set to PAUSED successfully\n");
+ }
+
+ /* Request a new src pad from the tee in the pipeline */
+ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee");
+ template = gst_pad_template_new ("teesrc", GST_PAD_SRC,
+ GST_PAD_REQUEST, GST_CAPS_ANY);
+ srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY);
+
+ gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK,
+ (GstPadProbeCallback) tee_src_pad_blocked_cb,
+ client_ctx, NULL);
+ gst_object_unref (srcpad);
+
+ gst_object_ref (appsink);
+ client_ctx->appsink = appsink;
+
+ client_ctx->first_sample_handler_id =
+ g_signal_connect (appsink, "new-sample",
+ G_CALLBACK (write_first_client_chunk_cb), client_ctx);
+ g_signal_connect (client_ctx->msg, "wrote-chunk",
+ G_CALLBACK (write_next_client_chunk_cb), client_ctx);
+ g_signal_connect (client_ctx->msg, "finished",
+ G_CALLBACK (client_finished_cb), client_ctx);
+
+ server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx);
+
+ /* Since we set the response as chunked, the server will
+ * automatically pause the message for us, and the response
+ * won't be completed till we call soup_message_body_complete() */
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+}
+
+static void
+got_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ TranscodeServerCtx *ctx)
+{
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ /* XXX: Unfortunately, there doesn't seem to be a way to cancel the
+ * reception of further chunks when we've prematurely set the response
+ * (say due to an error), so we check if the status code is set and skip
+ * all further chunks */
+ if (msg->status_code != 0)
+ return;
+
+ /* We need to update this every chunk */
+ ctx->msg = msg;
+
+ /* Actually just a ref */
+ buffer = stp_get_gst_buffer (chunk);
+ g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret);
+ gst_buffer_unref (buffer);
+ if (ret != GST_FLOW_OK) {
+ g_critical ("Unable to push buffer\n");
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_server_ctx (ctx);
+ return;
+ }
+
+ /* We can only set the pipeline to playing when
+ * the first chunk has been received */
+ if (!ctx->pipeline_is_playing) {
+ if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) ==
+ GST_STATE_CHANGE_FAILURE) {
+ g_critical ("Unable to set pipeline to PLAYING\n");
+ soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+ stp_cleanup_transcode_server_ctx (ctx);
+ } else {
+ g_print ("Set pipeline to PLAYING\n");
+ ctx->pipeline_is_playing = TRUE;
+ }
+ }
+}
+
+static void
+got_request_headers (SoupMessage *msg,
+ GHashTable *ctx_table)
+{
+ TranscodeServerCtx *ctx;
+ SoupURI *uri;
+
+#ifdef DEBUG
+ SoupMessageHeadersIter iter;
+ const char *name, *value;
+ soup_message_headers_iter_init (&iter, msg->request_headers);
+ while (soup_message_headers_iter_next (&iter, &name, &value))
+ g_print ("%s: %s\n", name, value);
+#endif
+
+ g_object_get (msg, "uri", &uri, NULL);
+ g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri));
+
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else {
+ /* Nothing to do; server doesn't implement this method */
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+PUT: {
+ if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) {
+ /* There's already a request streaming on this path */
+ g_print ("Recv duplicate request on the same URI: %s\n",
+ soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
+ goto out;
+ }
+
+ ctx = g_new0 (TranscodeServerCtx, 1);
+ ctx->msg = msg;
+ ctx->parent_ctx_table = ctx_table;
+ ctx->path = g_strdup (soup_uri_get_path (uri));
+ g_hash_table_insert (ctx_table, ctx->path, ctx);
+
+ /* We only do this when we receive the first chunk */
+ if (!ctx->pipeline) {
+ ctx->pipeline = gst_pipeline_new ("pipe");
+ /* The chunked request is copied into this stream
+ * for consumption by the gst pipeline */
+#ifdef DEBUG
+ stp_play_from_msg (ctx);
+#else
+ stp_encode_from_msg (ctx);
+#endif
+ g_signal_connect (ctx->appsrc, "need-data",
+ G_CALLBACK (need_data_unpause_message), ctx);
+ g_signal_connect (ctx->appsrc, "enough-data",
+ G_CALLBACK (enough_data_pause_message), ctx);
+ }
+
+ /* The request body isn't fixed length, so tell libsoup to
+ * not collect chunks for forming a complete request body */
+ soup_message_body_set_accumulate (msg->request_body, FALSE);
+
+ g_signal_connect (msg, "got-chunk",
+ G_CALLBACK (got_request_body_chunk), ctx);
+ goto out;
+ }
+
+GET: {
+ if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) {
+ g_print ("No stream on URI: %s\n", soup_uri_get_path (uri));
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ /* Our response will be chunked, and not with a fixed Content-Length */
+ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED);
+ /* The request body isn't fixed length, so tell libsoup to
+ * not collect chunks for forming a complete request body */
+ soup_message_body_set_accumulate (msg->response_body, FALSE);
+ }
+
+out:
+ soup_uri_free (uri);
+ return;
+}
+
+static void
+request_started_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("New %s request started\n", msg->method);
+
+ /* SoupMessage is useful only once we have headers,
+ * so we do all initialization there */
+ g_signal_connect (msg, "got-headers",
+ G_CALLBACK (got_request_headers), ctx_table);
+}
+
+static void
+request_read_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ gboolean ret;
+ TranscodeServerCtx *ctx;
+
+ ctx = get_server_ctx_from_msg (msg, ctx_table);
+ g_return_if_fail (ctx);
+
+ /* Incoming stream has ended */
+ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret);
+ if (!ret)
+ g_printerr ("Unable to emit end-of-stream\n");
+ }
+
+GET:
+ {
+ /* Nothing to do */
+ }
+
+ g_print ("%s request read successfully\n", msg->method);
+}
+
+static void
+request_finished_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("%s request ended\n", msg->method);
+}
+
+static void
+request_aborted_cb (SoupServer *server,
+ SoupMessage *msg,
+ SoupClientContext *client,
+ GHashTable *ctx_table)
+{
+ g_print ("%s request aborted!\n", msg->method);
+ if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST)
+ goto PUT;
+ else if (msg->method == SOUP_METHOD_GET)
+ goto GET;
+ else
+ /* We return NOT_IMPLEMENTED for this
+ * when we get the request headers */
+ g_assert_not_reached ();
+
+PUT:
+ {
+ /* FIXME: Is there anything to do here? */
+ }
+
+GET:
+ {
+ /* We do all cleanup in the "finished" msg signal, nothing to do here */
+ }
+}
+
+int
+main (int argc,
+ char *argv[])
+{
+ GHashTable *ctx_table;
+
+ gst_init (&argc, &argv);
+
+ /* Keys are paths, and values are TranscodeServerCtxs */
+ ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_cleanup_transcode_server_ctx);
+ server = soup_server_new (SOUP_SERVER_SERVER_HEADER,
+ "soup-transcode-proxy ",
+ SOUP_SERVER_PORT, 8000, NULL);
+ soup_server_add_handler (server, NULL,
+ (SoupServerCallback)handle_request_cb,
+ ctx_table, NULL);
+ g_signal_connect (server, "request-started",
+ G_CALLBACK (request_started_cb), ctx_table);
+ g_signal_connect (server, "request-read",
+ G_CALLBACK (request_read_cb), ctx_table);
+ g_signal_connect (server, "request-finished",
+ G_CALLBACK (request_finished_cb), ctx_table);
+ g_signal_connect (server, "request-aborted",
+ G_CALLBACK (request_aborted_cb), ctx_table);
+
+ soup_server_run (server);
+ return 0;
+}