diff options
author | Nirbheek Chauhan <nirbheek@centricular.com> | 2014-07-09 08:43:04 (GMT) |
---|---|---|
committer | Nirbheek Chauhan <nirbheek.chauhan@gmail.com> | 2014-07-09 08:43:04 (GMT) |
commit | 160c3dbb2ce9172ea8cf3ddb559a4efd9fde1683 (patch) | |
tree | 0123dba499f5ada7d82a6ff676732efb7b8a23bd /src/main.c | |
parent | 8ef516c328ef2cd5b3664be62274a08192608ffb (diff) | |
download | soup-transcoding-proxy-160c3dbb2ce9172ea8cf3ddb559a4efd9fde1683.zip soup-transcoding-proxy-160c3dbb2ce9172ea8cf3ddb559a4efd9fde1683.tar.gz |
Add source files
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 562 |
1 files changed, 562 insertions, 0 deletions
diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..a2430f5 --- /dev/null +++ b/src/main.c @@ -0,0 +1,562 @@ +/* License: LGPL-2.1 + * vim: set sts=2 sw=2 et : */ + +#include "lib.h" +#include "encode.h" + +#ifdef DEBUG +#include "debug/local-play.h" +#endif + +#include <fcntl.h> +#include <string.h> + +static SoupServer *server; + +static TranscodeServerCtx* +get_server_ctx_from_msg (SoupMessage *msg, + GHashTable *ctx_table) +{ + SoupURI *uri; + TranscodeServerCtx *ctx; + + g_object_get (msg, "uri", &uri, NULL); + ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); + soup_uri_free (uri); + + return ctx; +} + +static void +enough_data_pause_message (GstElement *appsrc, + guint size, + TranscodeServerCtx *ctx) +{ + /* Pausing the request can cause weirdness if + * the recv rate is too high. So, we don't do this. */ + //g_print ("Enough data, pause. "); + //soup_server_pause_message (server, ctx->msg); +} + +static void +need_data_unpause_message (GstElement *appsrc, + guint size, + TranscodeServerCtx *ctx) +{ + //g_print ("Need data, unpause. \n"); + //soup_server_unpause_message (server, ctx->msg); +} + +static GstPadProbeReturn +tee_src_pad_blocked_cb (GstPad *srcpad, + GstPadProbeInfo *info, + TranscodeClientCtx *ctx) +{ + GstCaps *caps; + GstPad *sinkpad; + GstBuffer *buffer; + GstState state; + GstStateChangeReturn ret; + TranscodeServerCtx *server_ctx = ctx->server_ctx; + GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); + + /* Remove the probe, XXX: hence unblocking the pipeline? */ + gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); + + gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin); + + /* t. ! queue ! appsink */ + sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); + gst_pad_link (srcpad, sinkpad); + + /* Send the WebM stream header through the pipeline first */ + caps = gst_pad_get_current_caps (srcpad); + /* TODO: cache keyframes */ + buffer = stp_get_streamheader_from_caps (caps); + if (!buffer) + goto err; + gst_pad_push (srcpad, buffer); + + ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1); + g_print ("Linked pads, removed probe. State was %s:%s.\n", + gst_element_state_change_return_get_name (ret), + gst_element_state_get_name (state)); + + if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set pipeline back to PLAYING\n"); + goto err; + } else { + g_print ("pipeline set to PLAYING successfully\n"); + } + +out: + gst_caps_unref (caps); + gst_object_unref (sinkpad); + gst_object_unref (sinkbin); + return GST_PAD_PROBE_OK; +err: + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_client_ctx (ctx); + /* FIXME: PROBE_OK for errors too? What to do here? */ + goto out; +} + +static GstFlowReturn +write_first_client_chunk_cb (GstElement *appsink, + TranscodeClientCtx *ctx) +{ + GstMapInfo info; + GstSample *sample; + GstBuffer *buffer; + GstMemory *memory; + + g_assert (ctx->first_sample_handler_id); + + g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id); + ctx->first_sample_handler_id = 0; + + /* XXX: If no samples are available, this will block till a sample is available */ + g_signal_emit_by_name (appsink, "pull-sample", &sample); + buffer = gst_sample_get_buffer (sample); + memory = gst_buffer_get_all_memory (buffer); /* copy */ + gst_memory_map (memory, &info, GST_MAP_READ); + + soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, + info.data, info.size); /* copy */ + + gst_memory_unmap (memory, &info); + gst_memory_unref (memory); + gst_sample_unref (sample); + + soup_server_unpause_message (server, ctx->msg); + g_print ("."); + return GST_FLOW_OK; +} + +static void +write_next_client_chunk_cb (SoupMessage *msg, + TranscodeClientCtx *ctx) +{ + GstMapInfo info; + GstSample *sample; + GstBuffer *buffer; + GstMemory *memory; + gboolean eos; + + g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); + if (!sample) { + g_print ("Null sample, ending stream\n"); + g_object_get (ctx->appsink, "eos", &eos, NULL); + if (eos) + soup_message_set_status (msg, SOUP_STATUS_OK); + else + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + soup_message_body_complete (msg->response_body); + return; + } + + buffer = gst_sample_get_buffer (sample); + memory = gst_buffer_get_all_memory (buffer); /* copy */ + gst_memory_map (memory, &info, GST_MAP_READ); + + soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY, + info.data, info.size); /* copy */ + + gst_memory_unmap (memory, &info); + gst_memory_unref (memory); + gst_sample_unref (sample); + + soup_server_unpause_message (server, msg); + g_print ("."); + return; +} + +static void +client_finished_cb (SoupMessage *msg, + TranscodeClientCtx *ctx) +{ + stp_cleanup_transcode_client_ctx (ctx); +} + +static void +handle_request_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + GHashTable *ctx_table) +{ + TranscodeServerCtx *server_ctx; + + if (!msg) + return; + + g_print ("Handling %s request on path %s\n", msg->method, path); + + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + /* The PUT request has finished streaming. Cleanup + * will be done on EOS/ERROR in the gst pipeline. */ + server_ctx = get_server_ctx_from_msg (msg, ctx_table); + server_ctx->request_finished = TRUE; + soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); + return; + } + +GET: + { + /* A GET request was received. We connect from the pipeline to the + * client requesting the stream and start writing the response. */ + GstElement *bin, *tee, *q2, *appsink; + GstPadTemplate *template; + GstPad *srcpad, *sinkpadq2; + TranscodeClientCtx *client_ctx; + GstStateChangeReturn state; + + server_ctx = get_server_ctx_from_msg (msg, ctx_table); + if (server_ctx == NULL || + server_ctx->pipeline == NULL) { + /* There's no request streaming on this path (yet) */ + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + return; + } + + /* Check if the pipeline successfully started PLAYING */ + state = gst_element_get_state (server_ctx->pipeline, + NULL, NULL, 100*GST_MSECOND); + switch (state) { + case GST_STATE_CHANGE_SUCCESS: + break; + case GST_STATE_CHANGE_FAILURE: + /* PUT stream should've */ + g_critical ("GET request, but state change failure?"); + case GST_STATE_CHANGE_ASYNC: + soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); + return; + default: + g_assert_not_reached (); + } + + /* Connect appsink to tee, and start streaming */ + client_ctx = g_new0 (TranscodeClientCtx, 1); + client_ctx->msg = msg; + client_ctx->server_ctx = server_ctx; + + bin = gst_bin_new (NULL); + + /* queue ! appsink */ + q2 = gst_element_factory_make ("queue", "q2"); + appsink = gst_element_factory_make ("appsink", NULL); + g_object_set (appsink, "drop", FALSE, + "emit-signals", TRUE, + "max-buffers", 0, NULL); + gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); + gst_element_link (q2, appsink); + + sinkpadq2 = gst_element_get_static_pad (q2, "sink"); + gst_element_add_pad (bin, gst_ghost_pad_new ("qsink", sinkpadq2)); + gst_object_unref (sinkpadq2); + + /* Set to PAUSED so the bin can pre-roll */ + if (gst_element_set_state (bin, GST_STATE_PAUSED) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set appsink to PAUSED\n"); + soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_client_ctx (client_ctx); + } else { + g_print ("sinkbin set to PAUSED successfully\n"); + } + + /* Request a new src pad from the tee in the pipeline */ + tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); + template = gst_pad_template_new ("teesrc", GST_PAD_SRC, + GST_PAD_REQUEST, GST_CAPS_ANY); + srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); + + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, + (GstPadProbeCallback) tee_src_pad_blocked_cb, + client_ctx, NULL); + gst_object_unref (srcpad); + + gst_object_ref (appsink); + client_ctx->appsink = appsink; + + client_ctx->first_sample_handler_id = + g_signal_connect (appsink, "new-sample", + G_CALLBACK (write_first_client_chunk_cb), client_ctx); + g_signal_connect (client_ctx->msg, "wrote-chunk", + G_CALLBACK (write_next_client_chunk_cb), client_ctx); + g_signal_connect (client_ctx->msg, "finished", + G_CALLBACK (client_finished_cb), client_ctx); + + server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); + + /* Since we set the response as chunked, the server will + * automatically pause the message for us, and the response + * won't be completed till we call soup_message_body_complete() */ + soup_message_set_status (msg, SOUP_STATUS_OK); + } +} + +static void +got_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + TranscodeServerCtx *ctx) +{ + GstBuffer *buffer; + GstFlowReturn ret; + + /* XXX: Unfortunately, there doesn't seem to be a way to cancel the + * reception of further chunks when we've prematurely set the response + * (say due to an error), so we check if the status code is set and skip + * all further chunks */ + if (msg->status_code != 0) + return; + + /* We need to update this every chunk */ + ctx->msg = msg; + + /* Actually just a ref */ + buffer = stp_get_gst_buffer (chunk); + g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); + gst_buffer_unref (buffer); + if (ret != GST_FLOW_OK) { + g_critical ("Unable to push buffer\n"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_server_ctx (ctx); + return; + } + + /* We can only set the pipeline to playing when + * the first chunk has been received */ + if (!ctx->pipeline_is_playing) { + if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == + GST_STATE_CHANGE_FAILURE) { + g_critical ("Unable to set pipeline to PLAYING\n"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + stp_cleanup_transcode_server_ctx (ctx); + } else { + g_print ("Set pipeline to PLAYING\n"); + ctx->pipeline_is_playing = TRUE; + } + } +} + +static void +got_request_headers (SoupMessage *msg, + GHashTable *ctx_table) +{ + TranscodeServerCtx *ctx; + SoupURI *uri; + +#ifdef DEBUG + SoupMessageHeadersIter iter; + const char *name, *value; + soup_message_headers_iter_init (&iter, msg->request_headers); + while (soup_message_headers_iter_next (&iter, &name, &value)) + g_print ("%s: %s\n", name, value); +#endif + + g_object_get (msg, "uri", &uri, NULL); + g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri)); + + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else { + /* Nothing to do; server doesn't implement this method */ + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +PUT: { + if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { + /* There's already a request streaming on this path */ + g_print ("Recv duplicate request on the same URI: %s\n", + soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_CONFLICT); + goto out; + } + + ctx = g_new0 (TranscodeServerCtx, 1); + ctx->msg = msg; + ctx->parent_ctx_table = ctx_table; + ctx->path = g_strdup (soup_uri_get_path (uri)); + g_hash_table_insert (ctx_table, ctx->path, ctx); + + /* We only do this when we receive the first chunk */ + if (!ctx->pipeline) { + ctx->pipeline = gst_pipeline_new ("pipe"); + /* The chunked request is copied into this stream + * for consumption by the gst pipeline */ +#ifdef DEBUG + stp_play_from_msg (ctx); +#else + stp_encode_from_msg (ctx); +#endif + g_signal_connect (ctx->appsrc, "need-data", + G_CALLBACK (need_data_unpause_message), ctx); + g_signal_connect (ctx->appsrc, "enough-data", + G_CALLBACK (enough_data_pause_message), ctx); + } + + /* The request body isn't fixed length, so tell libsoup to + * not collect chunks for forming a complete request body */ + soup_message_body_set_accumulate (msg->request_body, FALSE); + + g_signal_connect (msg, "got-chunk", + G_CALLBACK (got_request_body_chunk), ctx); + goto out; + } + +GET: { + if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { + g_print ("No stream on URI: %s\n", soup_uri_get_path (uri)); + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + /* Our response will be chunked, and not with a fixed Content-Length */ + soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); + /* The request body isn't fixed length, so tell libsoup to + * not collect chunks for forming a complete request body */ + soup_message_body_set_accumulate (msg->response_body, FALSE); + } + +out: + soup_uri_free (uri); + return; +} + +static void +request_started_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("New %s request started\n", msg->method); + + /* SoupMessage is useful only once we have headers, + * so we do all initialization there */ + g_signal_connect (msg, "got-headers", + G_CALLBACK (got_request_headers), ctx_table); +} + +static void +request_read_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + gboolean ret; + TranscodeServerCtx *ctx; + + ctx = get_server_ctx_from_msg (msg, ctx_table); + g_return_if_fail (ctx); + + /* Incoming stream has ended */ + g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); + if (!ret) + g_printerr ("Unable to emit end-of-stream\n"); + } + +GET: + { + /* Nothing to do */ + } + + g_print ("%s request read successfully\n", msg->method); +} + +static void +request_finished_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("%s request ended\n", msg->method); +} + +static void +request_aborted_cb (SoupServer *server, + SoupMessage *msg, + SoupClientContext *client, + GHashTable *ctx_table) +{ + g_print ("%s request aborted!\n", msg->method); + if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST) + goto PUT; + else if (msg->method == SOUP_METHOD_GET) + goto GET; + else + /* We return NOT_IMPLEMENTED for this + * when we get the request headers */ + g_assert_not_reached (); + +PUT: + { + /* FIXME: Is there anything to do here? */ + } + +GET: + { + /* We do all cleanup in the "finished" msg signal, nothing to do here */ + } +} + +int +main (int argc, + char *argv[]) +{ + GHashTable *ctx_table; + + gst_init (&argc, &argv); + + /* Keys are paths, and values are TranscodeServerCtxs */ + ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_cleanup_transcode_server_ctx); + server = soup_server_new (SOUP_SERVER_SERVER_HEADER, + "soup-transcode-proxy ", + SOUP_SERVER_PORT, 8000, NULL); + soup_server_add_handler (server, NULL, + (SoupServerCallback)handle_request_cb, + ctx_table, NULL); + g_signal_connect (server, "request-started", + G_CALLBACK (request_started_cb), ctx_table); + g_signal_connect (server, "request-read", + G_CALLBACK (request_read_cb), ctx_table); + g_signal_connect (server, "request-finished", + G_CALLBACK (request_finished_cb), ctx_table); + g_signal_connect (server, "request-aborted", + G_CALLBACK (request_aborted_cb), ctx_table); + + soup_server_run (server); + return 0; +} |