/* * vim: set sts=2 sw=2 et : * * License: LGPL-2.1+ * Copyright (c) 2014 Nirbheek Chauhan * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include "lib.h" #include "encode.h" #ifdef PLAY_DEBUG #include "debug/local-play.h" #endif static int port = 8000; static int client_timeout = 10; static int server_timeout = 5; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" }, { NULL } }; static SoupServer *server; static void stream_finished_cb (SoupMessage *msg, TranscodeServerCtx *ctx); static TranscodeServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; TranscodeServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); soup_uri_free (uri); if (!ctx) g_critical ("No matching context found for msg!\n"); return ctx; } static GstPadProbeReturn tee_src_pad_blocked_cb (GstPad *srcpad, GstPadProbeInfo *info, TranscodeClientCtx *ctx) { GstCaps *caps; GstBuffer *buffer; GstState state; GstStateChangeReturn ret; TranscodeServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); /* Remove the probe, XXX: hence unblocking the pipeline? */ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin); /* t. ! queue ! appsink */ gst_pad_link (srcpad, ctx->ghostsinkpad); /* Send the WebM stream header through the pipeline first */ caps = gst_pad_get_current_caps (srcpad); /* TODO: cache keyframes */ buffer = stp_get_streamheader_from_caps (caps); if (!buffer) goto err; gst_pad_push (srcpad, buffer); ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1); g_print ("Linked pads, removed probe. State was %s:%s.\n", gst_element_state_change_return_get_name (ret), gst_element_state_get_name (state)); if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline back to PLAYING\n"); goto err; } else { g_print ("pipeline set to PLAYING successfully\n"); } out: gst_caps_unref (caps); gst_object_unref (sinkbin); return GST_PAD_PROBE_OK; err: soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (ctx->msg->response_body); stp_cleanup_transcode_client_ctx (ctx); /* FIXME: PROBE_OK for errors too? What to do here? */ goto out; } /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean increment_read_timer (TranscodeServerCtx *ctx) { ctx->seconds_since_read += 2; if (ctx->seconds_since_read < server_timeout) return G_SOURCE_CONTINUE; g_printerr ("Stream timed out, sending EOS\n"); stream_finished_cb (ctx->msg, ctx); return G_SOURCE_REMOVE; } /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean increment_write_timer (TranscodeClientCtx *ctx) { ctx->seconds_since_write += 2; if (ctx->seconds_since_write < client_timeout) return G_SOURCE_CONTINUE; g_printerr ("Client timed out, cleaning up\n"); soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); soup_message_body_complete (ctx->msg->response_body); stp_cleanup_transcode_client_ctx (ctx); return G_SOURCE_REMOVE; } static void client_eos_cb (GstElement *appsink, TranscodeClientCtx *ctx) { g_print ("Received EOS for client\n"); soup_message_set_status (ctx->msg, SOUP_STATUS_OK); soup_message_body_complete (ctx->msg->response_body); } static void can_write_next_client_chunk_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { g_mutex_unlock (&ctx->can_write_chunk); ctx->seconds_since_write = 0; ctx->msg = msg; g_print ("_"); } static GstFlowReturn write_client_chunk_cb (GstElement *appsink, TranscodeClientCtx *ctx) { GstMapInfo info; GstSample *sample; GstBuffer *buffer; GstMemory *memory; gboolean eos; if (!g_mutex_trylock (&ctx->can_write_chunk)) { g_print ("!"); /* We cannot safely append to the message body till * the previous chunk is written out, otherwise we get * a segfault. This is likely because SoupMessageBody * uses a GSList for the chunks, which isn't MT-safe. */ return GST_FLOW_OK; } g_print ("*"); g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); if (!sample) { g_print ("Null sample, ending stream\n"); g_object_get (ctx->appsink, "eos", &eos, NULL); if (eos) soup_message_set_status (ctx->msg, SOUP_STATUS_OK); else soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (ctx->msg->response_body); return GST_FLOW_OK; } buffer = gst_sample_get_buffer (sample); memory = gst_buffer_get_all_memory (buffer); /* copy */ gst_memory_map (memory, &info, GST_MAP_READ); soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, info.data, info.size); /* copy */ gst_memory_unmap (memory, &info); gst_memory_unref (memory); gst_sample_unref (sample); soup_server_unpause_message (server, ctx->msg); g_print ("."); return GST_FLOW_OK; } static void client_finished_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { g_print ("Client finished/aborted, doing cleanup...\n"); stp_cleanup_transcode_client_ctx (ctx); } static void stream_finished_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { gboolean ret; g_print ("Stream finished/aborted, queueing EOS... "); ctx->stream_finished = TRUE; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (ret != GST_FLOW_OK) g_printerr ("\nUnable to emit end-of-stream after an aborted stream\n"); else g_print ("Done.\n"); } /* This is called when the entire request body has been read */ static void handle_request_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, GHashTable *ctx_table) { TranscodeServerCtx *server_ctx; if (!msg) return; g_print ("Handling %s request on path %s\n", msg->method, path); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) server_ctx->stream_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); return; } GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ GstElement *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; TranscodeClientCtx *client_ctx; GstStateChangeReturn state; server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); return; } /* Check if the pipeline successfully started PLAYING */ state = gst_element_get_state (server_ctx->pipeline, NULL, NULL, 100*GST_MSECOND); switch (state) { case GST_STATE_CHANGE_SUCCESS: break; case GST_STATE_CHANGE_FAILURE: /* PUT stream should've */ g_critical ("GET request, but state change failure?"); case GST_STATE_CHANGE_ASYNC: soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); return; default: g_assert_not_reached (); } /* Connect appsink to tee, and start streaming */ client_ctx = g_new0 (TranscodeClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; g_mutex_init (&client_ctx->can_write_chunk); bin = gst_bin_new (NULL); /* queue ! appsink */ q2 = gst_element_factory_make ("queue", NULL); appsink = gst_element_factory_make ("appsink", NULL); g_object_set (appsink, "drop", FALSE, "emit-signals", TRUE, "max-buffers", 0, NULL); gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); gst_element_link (q2, appsink); sinkpadq2 = gst_element_get_static_pad (q2, "sink"); client_ctx->ghostsinkpad = gst_ghost_pad_new (NULL, sinkpadq2); gst_element_add_pad (bin, client_ctx->ghostsinkpad); gst_object_unref (sinkpadq2); /* Set to PAUSED so the bin can pre-roll */ if (gst_element_set_state (bin, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set appsink to PAUSED\n"); soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_client_ctx (client_ctx); } else { g_print ("sinkbin set to PAUSED successfully\n"); } /* Request a new src pad from the tee in the pipeline */ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) tee_src_pad_blocked_cb, client_ctx, NULL); gst_object_unref (srcpad); gst_object_ref (appsink); client_ctx->appsink = appsink; g_signal_connect (appsink, "new-sample", G_CALLBACK (write_client_chunk_cb), client_ctx); g_signal_connect (appsink, "eos", G_CALLBACK (client_eos_cb), client_ctx); g_signal_connect (client_ctx->msg, "wrote-chunk", G_CALLBACK (can_write_next_client_chunk_cb), client_ctx); g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); client_ctx->timeout_handler_id = \ g_timeout_add_seconds (2, (GSourceFunc)increment_write_timer, client_ctx); server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); /* Since we set the response as chunked, the server will * automatically pause the message for us, and the response * won't be completed till we call soup_message_body_complete() */ soup_message_set_status (msg, SOUP_STATUS_OK); } } static void got_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, TranscodeServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; /* XXX: Unfortunately, there doesn't seem to be a way to cancel the * reception of further chunks when we've prematurely set the response * (say due to an error), so we check if the status code is set and skip * all further chunks */ if (msg->status_code != 0) return; /* We need to update this every chunk */ ctx->msg = msg; ctx->seconds_since_read = 0; /* Actually just a ref */ buffer = stp_get_gst_buffer (chunk); g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); gst_buffer_unref (buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push buffer\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_server_ctx (ctx); return; } } static void request_ended_no_body_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { g_print ("Request ended without a body!\n"); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); g_free (ctx); } static void got_first_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, TranscodeServerCtx *ctx) { int num; /* Disconnect the ctx cleanup function */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (request_ended_no_body_cb), ctx); if (num != 1) g_critical ("Unable to remove signal handler for cleanup function!\n"); /* Disconnect us from got-chunk */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (got_first_request_body_chunk), ctx); if (num != 1) g_critical ("Unable to remove signal handler for first request chunk!\n"); ctx->msg = msg; ctx->pipeline = gst_pipeline_new ("pipe"); /* The chunked request is copied into this stream * for consumption by the gst pipeline */ #ifdef PLAY_DEBUG stp_play_from_msg (ctx); #else stp_encode_from_msg (ctx); #endif got_request_body_chunk (msg, chunk, ctx); if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline to PLAYING\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_server_ctx (ctx); } else { g_print ("Set pipeline to PLAYING\n"); } /* Connect a different method for all further chunks */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); if (ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) /* Our incoming stream ends if the stream ends only if * we're using a chunked request encoding. * * This will also be called when the client sending * the PUT request disconnects prematurely */ g_signal_connect (msg, "finished", G_CALLBACK (stream_finished_cb), ctx); } static void got_request_headers (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; #ifdef HEADERS_DEBUG SoupMessageHeadersIter iter; const char *name, *value; soup_message_headers_iter_init (&iter, msg->request_headers); while (soup_message_headers_iter_next (&iter, &name, &value)) g_print ("%s: %s\n", name, value); #endif g_object_get (msg, "uri", &uri, NULL); g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri)); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { const char *content_range; SoupEncoding encoding; gboolean connection_exists; TranscodeServerCtx *ctx; connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri)); /* Data might be sent to us in one of three ways: * 1. Chunked encoding with the stream (and request body) sent in chunks * 2. Content-Length encoding with the entire stream in one chunk * 3. A persistent Content-Length encoding connection, and subsequent * requests can send more stream data, forever, subject to a timeout */ encoding = soup_message_headers_get_encoding (msg->request_headers); switch (encoding) { case SOUP_ENCODING_CHUNKED: g_print ("Chunked encoding detected!\n"); if (connection_exists) { /* There's already a chunked request streaming on this path */ g_printerr ("Recv duplicate request on the same URI: %s\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto out; } break; case SOUP_ENCODING_CONTENT_LENGTH: g_print ("Content-Length encoding detected!\n"); content_range = soup_message_headers_get_one (msg->request_headers, "Content-Range"); /* TODO: Right now, we don't check if the Content-Range is valid, * and just pass the data to the pipeline. */ if (connection_exists && !content_range) { /* A connection already exists, and this one isn't a continuation * of the previous one, so it's a conflict */ g_printerr ("Recv Content-Length PUT on '%s' without Content-Range\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto out; } break; case SOUP_ENCODING_EOF: case SOUP_ENCODING_UNRECOGNIZED: case SOUP_ENCODING_NONE: case SOUP_ENCODING_BYTERANGES: g_critical ("Unknown encoding!\n"); goto out; default: g_assert_not_reached (); } /* Whether the incoming stream is chunked or fixed length, we want to * handle it chunked, so tell libsoup to not collect chunks for * forming a complete request body */ soup_message_body_set_accumulate (msg->request_body, FALSE); if (connection_exists) { g_print ("Stream already exists, connecting everything to that...\n"); ctx = get_server_ctx_from_msg (msg, ctx_table); if (ctx->stream_finished) { g_printerr ("Recv more data on '%s' after timeout\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); goto out; } /* The chunks we'll get are a continuation of the previous one */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); goto out; } /* This is a new connection; treat it as such */ ctx = g_new0 (TranscodeServerCtx, 1); ctx->msg = msg; ctx->encoding = encoding; ctx->parent_ctx_table = ctx_table; ctx->path = g_strdup (soup_uri_get_path (uri)); g_hash_table_insert (ctx_table, ctx->path, ctx); g_print ("New stream on %s!\n", ctx->path); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_first_request_body_chunk), ctx); /* If we don't get any data after this, we have to cleanup ctx * otherwise we'll leak it */ g_signal_connect (msg, "finished", G_CALLBACK (request_ended_no_body_cb), ctx); if (encoding == SOUP_ENCODING_CONTENT_LENGTH) g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); goto out; } GET: { const char *query; TranscodeServerCtx *server_ctx; if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { g_print ("No stream on URI: %s\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } query = soup_uri_get_query (uri); if (query && g_strcmp0 (query, "abort") == 0) { /* We just use the "uri" field for this, so this is OK */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); /* Abort all processing for this stream */ if (!server_ctx->stream_finished) { /* Close the PUT stream if necessary */ server_ctx->stream_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); } /* Disconnect all clients, and shut down the stream */ stp_cleanup_transcode_server_ctx (server_ctx); soup_message_set_status (msg, SOUP_STATUS_OK); goto out; } /* Our response will be chunked, and not with a fixed Content-Length */ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); /* The request body isn't fixed length, so tell libsoup to * not collect chunks for forming a complete request body */ soup_message_body_set_accumulate (msg->response_body, FALSE); } out: soup_uri_free (uri); return; } static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("New %s request started\n", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", G_CALLBACK (got_request_headers), ctx_table); } static void request_read_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* Everything is handled in the "finished" signal. Nothing to do. */ } GET: { /* Nothing to do */ } g_print ("%s request read successfully\n", msg->method); } static void request_finished_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("%s request ended\n", msg->method); } static void request_aborted_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("%s request aborted!\n", msg->method); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* FIXME: Is there anything to do here? */ } GET: { /* We do all cleanup in the "finished" msg signal, nothing to do here */ } } int main (int argc, char *argv[]) { GHashTable *ctx_table; GOptionContext *optctx; GError *error = NULL; gst_init (&argc, &argv); optctx = g_option_context_new ("- Soup Transcoding Proxy"); g_option_context_add_main_entries (optctx, entries, NULL); g_option_context_add_group (optctx, gst_init_get_option_group ()); if (!g_option_context_parse (optctx, &argc, &argv, &error)) { g_printerr ("Error parsing options: %s\n", error->message); return 1; } g_option_context_free (optctx); /* Keys are paths, and values are TranscodeServerCtxs */ ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)stp_cleanup_transcode_server_ctx); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, ctx_table, NULL); g_signal_connect (server, "request-started", G_CALLBACK (request_started_cb), ctx_table); g_signal_connect (server, "request-read", G_CALLBACK (request_read_cb), ctx_table); g_signal_connect (server, "request-finished", G_CALLBACK (request_finished_cb), ctx_table); g_signal_connect (server, "request-aborted", G_CALLBACK (request_aborted_cb), ctx_table); soup_server_run (server); return 0; }