/* * vim: set sts=2 sw=2 et : * * License: LGPL-2.1+ * Copyright (c) 2014 Nirbheek Chauhan * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include #include "lib.h" #include "encode.h" #ifdef PLAY_DEBUG #include "debug/local-play.h" #endif static int port = 8000; static int client_timeout = 10; static int server_timeout = 5; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" }, { NULL } }; static SoupServer *server; static void stream_finished_cb (SoupMessage *msg, TranscodeServerCtx *ctx); static TranscodeServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; TranscodeServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); soup_uri_free (uri); if (!ctx) g_critical ("No matching context found for msg!\n"); return ctx; } /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean increment_read_timer (TranscodeServerCtx *ctx) { ctx->seconds_since_read += 2; if (ctx->seconds_since_read < server_timeout) return G_SOURCE_CONTINUE; g_printerr ("Stream timed out, sending EOS\n"); stream_finished_cb (ctx->msg, ctx); return G_SOURCE_REMOVE; } /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean increment_write_timer (TranscodeClientCtx *ctx) { ctx->seconds_since_write += 2; if (ctx->seconds_since_write < client_timeout) return G_SOURCE_CONTINUE; g_printerr ("Client timed out, cleaning up\n"); /* FIXME: This isn't actually setting the status for client * connections, and clients are just left hanging */ soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); soup_message_body_complete (ctx->msg->response_body); stp_cleanup_transcode_client_ctx (ctx); return G_SOURCE_REMOVE; } static void client_eos_cb (GstElement *appsink, TranscodeClientCtx *ctx) { g_debug ("Received EOS for client"); soup_message_set_status (ctx->msg, SOUP_STATUS_OK); soup_message_body_complete (ctx->msg->response_body); } static void can_write_next_client_chunk_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { ctx->seconds_since_write = 0; ctx->msg = msg; stp_print_status ("_"); } static gboolean invoke_write_client_chunk (TranscodeClientCtx *ctx) { GstMapInfo info; GstSample *sample; GstBuffer *buffer; gboolean eos; stp_print_status ("*"); /* XXX: If samples are coming in too quickly, this can get invoked * after the client has disconnected and the SoupMessage is invalid * So, we check if the msg is still a msg before trying to append. */ if (G_UNLIKELY (!SOUP_IS_MESSAGE (ctx->msg))) return FALSE; g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); if (!sample) { g_debug ("Null sample, ending stream"); g_object_get (ctx->appsink, "eos", &eos, NULL); if (eos) soup_message_set_status (ctx->msg, SOUP_STATUS_OK); else soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (ctx->msg->response_body); return FALSE; } buffer = gst_sample_get_buffer (sample); /* Skip all frames till the next keyframe */ if (!ctx->keyframe_found && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { gst_sample_unref (sample); return FALSE; } ctx->keyframe_found = TRUE; gst_buffer_map (buffer, &info, GST_MAP_READ); soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, info.data, info.size); /* copy */ gst_buffer_unmap (buffer, &info); gst_sample_unref (sample); soup_server_unpause_message (server, ctx->msg); stp_print_status ("."); return FALSE; } static GstFlowReturn write_client_chunk_cb (GstElement *appsink, TranscodeClientCtx *ctx) { g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT, (GSourceFunc)invoke_write_client_chunk, ctx, NULL); return GST_FLOW_OK; } static void client_finished_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { stp_print_status ("Client finished/aborted, doing cleanup...\n"); stp_cleanup_transcode_client_ctx (ctx); } static void stream_finished_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { gboolean ret; stp_print_status ("Stream finished/aborted, queueing EOS... "); ctx->stream_finished = TRUE; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (ret != GST_FLOW_OK) g_critical ("\nUnable to emit end-of-stream after an aborted stream"); else stp_print_status ("Done.\n"); } /* This is called when the entire request body has been read */ static void handle_request_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, GHashTable *ctx_table) { TranscodeServerCtx *server_ctx; if (!msg) return; g_debug ("Handling %s request on path %s", msg->method, path); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) server_ctx->stream_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); return; } GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ GstCaps *caps = NULL; GstBuffer *buffer; GstFlowReturn ret; GstStateChangeReturn state_change; GstElement *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; TranscodeClientCtx *client_ctx; server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); return; } /* Check if the pipeline successfully started PLAYING */ state_change = gst_element_get_state (server_ctx->pipeline, NULL, NULL, 100*GST_MSECOND); switch (state_change) { case GST_STATE_CHANGE_SUCCESS: break; case GST_STATE_CHANGE_FAILURE: /* PUT stream should've */ g_critical ("GET request on %s, but state change failure?", path); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); return; case GST_STATE_CHANGE_ASYNC: soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); return; default: g_assert_not_reached (); } g_print ("New GET request on %s\n", path); /* Connect appsink to tee, and start streaming */ client_ctx = g_new0 (TranscodeClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; bin = gst_bin_new (NULL); /* queue ! appsink */ q2 = gst_element_factory_make ("queue", NULL); appsink = gst_element_factory_make ("appsink", NULL); g_object_set (appsink, "drop", FALSE, "emit-signals", TRUE, "max-buffers", 0, NULL); gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); gst_element_link (q2, appsink); sinkpadq2 = gst_element_get_static_pad (q2, "sink"); client_ctx->ghostsinkpad = gst_ghost_pad_new (NULL, sinkpadq2); gst_element_add_pad (bin, client_ctx->ghostsinkpad); gst_object_unref (sinkpadq2); /* Request a new src pad from the tee in the pipeline */ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); /* Add to pipeline and sync state */ gst_bin_add (GST_BIN (server_ctx->pipeline), bin); if (!gst_element_sync_state_with_parent (bin)) { g_critical ("Unable to sync appsink bin with parent pipeline\n"); goto err; } g_debug ("appsink bin state synced successfully"); /* t. ! queue ! appsink */ gst_pad_link (srcpad, client_ctx->ghostsinkpad); /* Send the WebM stream header through the src pad first */ caps = gst_pad_get_current_caps (srcpad); buffer = stp_get_streamheader_from_caps (caps); if (!buffer) { g_critical ("Unable to get streamheader from caps"); goto nostreamheader; } ret = gst_pad_push (srcpad, buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push streamheader: %s", gst_flow_get_name (ret)); goto err; } nostreamheader: client_ctx->appsink = appsink; g_signal_connect (appsink, "new-sample", G_CALLBACK (write_client_chunk_cb), client_ctx); g_signal_connect (appsink, "eos", G_CALLBACK (client_eos_cb), client_ctx); g_signal_connect (client_ctx->msg, "wrote-chunk", G_CALLBACK (can_write_next_client_chunk_cb), client_ctx); client_ctx->finished_handler_id = g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); client_ctx->timeout_handler_id = \ g_timeout_add_seconds (2, (GSourceFunc)increment_write_timer, client_ctx); server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); /* Since we set the response as chunked, the server will * automatically pause the message for us, and the response * won't be completed till we call soup_message_body_complete() */ soup_message_set_status (msg, SOUP_STATUS_OK); out: gst_object_unref (template); gst_object_unref (srcpad); gst_object_unref (tee); if (caps) gst_caps_unref (caps); return; err: soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (client_ctx->msg->response_body); stp_cleanup_transcode_client_ctx (client_ctx); goto out; } } static void got_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, TranscodeServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; /* XXX: Unfortunately, there doesn't seem to be a way to cancel the * reception of further chunks when we've prematurely set the response * (say due to an error), so we check if the status code is set and skip * all further chunks */ if (msg->status_code != 0) return; /* We need to update this every chunk */ ctx->msg = msg; ctx->seconds_since_read = 0; /* Actually just a ref */ buffer = stp_get_gst_buffer (chunk); g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); gst_buffer_unref (buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push buffer\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); return; } } static void request_ended_no_body_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { g_critical ("Request ended without a body!"); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); g_free (ctx); } static void got_first_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, TranscodeServerCtx *ctx) { int num; /* Disconnect the ctx cleanup function */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (request_ended_no_body_cb), ctx); if (num != 1) g_critical ("Unable to remove signal handler for cleanup function!\n"); /* Disconnect us from got-chunk */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (got_first_request_body_chunk), ctx); if (num != 1) g_critical ("Unable to remove signal handler for first request chunk!\n"); ctx->msg = msg; ctx->pipeline = gst_pipeline_new ("pipe"); /* The chunked request is copied into this stream * for consumption by the gst pipeline */ #ifdef PLAY_DEBUG stp_play_from_msg (ctx); #else stp_encode_from_msg (ctx); #endif got_request_body_chunk (msg, chunk, ctx); if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline to PLAYING\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); } else { g_debug ("Set pipeline to PLAYING"); } /* Connect a different method for all further chunks */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); if (ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) /* Our incoming stream ends if the stream ends only if * we're using a chunked request encoding. * * This will also be called when the client sending * the PUT request disconnects prematurely */ g_signal_connect (msg, "finished", G_CALLBACK (stream_finished_cb), ctx); } static void got_request_headers (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; #ifdef HEADERS_DEBUG SoupMessageHeadersIter iter; const char *name, *value; soup_message_headers_iter_init (&iter, msg->request_headers); while (soup_message_headers_iter_next (&iter, &name, &value)) g_debug ("%s: %s", name, value); #endif g_object_get (msg, "uri", &uri, NULL); g_debug ("%s on uri %s", msg->method, soup_uri_get_path (uri)); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { const char *content_range; SoupEncoding encoding; gboolean connection_exists; TranscodeServerCtx *ctx; connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri)); /* Data might be sent to us in one of three ways: * 1. Chunked encoding with the stream (and request body) sent in chunks * 2. Content-Length encoding with the entire stream in one chunk * 3. A persistent Content-Length encoding connection, and subsequent * requests can send more stream data, forever, subject to a timeout */ encoding = soup_message_headers_get_encoding (msg->request_headers); switch (encoding) { case SOUP_ENCODING_CHUNKED: g_debug ("Chunked encoding detected!"); if (connection_exists) { /* There's already a chunked request streaming on this path */ g_critical ("Recv duplicate request on the same URI: %s", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto out; } break; case SOUP_ENCODING_CONTENT_LENGTH: g_debug ("Content-Length encoding detected!"); content_range = soup_message_headers_get_one (msg->request_headers, "Content-Range"); /* TODO: Right now, we don't check if the Content-Range is valid, * and just pass the data to the pipeline. */ if (connection_exists && !content_range) { /* A connection already exists, and this one isn't a continuation * of the previous one, so it's a conflict */ g_critical ("Recv Content-Length PUT on '%s' without Content-Range", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto out; } break; case SOUP_ENCODING_EOF: case SOUP_ENCODING_UNRECOGNIZED: case SOUP_ENCODING_NONE: case SOUP_ENCODING_BYTERANGES: g_critical ("Unknown encoding!\n"); goto out; default: g_assert_not_reached (); } /* Whether the incoming stream is chunked or fixed length, we want to * handle it chunked, so tell libsoup to not collect chunks for * forming a complete request body */ soup_message_body_set_accumulate (msg->request_body, FALSE); if (connection_exists) { g_debug ("Stream already exists, connecting everything to that..."); ctx = get_server_ctx_from_msg (msg, ctx_table); if (ctx->stream_finished) { g_critical ("Recv more data on '%s' after timeout", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); goto out; } /* The chunks we'll get are a continuation of the previous one */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); goto out; } /* This is a new connection; treat it as such */ ctx = g_new0 (TranscodeServerCtx, 1); ctx->msg = msg; ctx->encoding = encoding; ctx->parent_ctx_table = ctx_table; ctx->path = g_strdup (soup_uri_get_path (uri)); g_hash_table_insert (ctx_table, ctx->path, ctx); g_print ("New PUT stream on %s\n", ctx->path); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_first_request_body_chunk), ctx); /* If we don't get any data after this, we have to cleanup ctx * otherwise we'll leak it */ g_signal_connect (msg, "finished", G_CALLBACK (request_ended_no_body_cb), ctx); if (encoding == SOUP_ENCODING_CONTENT_LENGTH) g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); goto out; } GET: { const char *query; TranscodeServerCtx *server_ctx; if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { g_critical ("No stream on URI: %s", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } query = soup_uri_get_query (uri); if (query && g_strcmp0 (query, "abort") == 0) { /* We just use the "uri" field for this, so this is OK */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); /* Abort all processing for this stream */ if (!server_ctx->stream_finished) { /* Close the PUT stream if necessary */ server_ctx->stream_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_GONE); } /* Disconnect all clients, and shut down the stream */ g_hash_table_remove (ctx_table, server_ctx->path); soup_message_set_status (msg, SOUP_STATUS_OK); goto out; } /* Our response will be chunked, and not with a fixed Content-Length */ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); /* The request body isn't fixed length, so tell libsoup to * not collect chunks for forming a complete request body */ soup_message_body_set_accumulate (msg->response_body, FALSE); } out: soup_uri_free (uri); return; } static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_debug ("New %s request started", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", G_CALLBACK (got_request_headers), ctx_table); } static void request_read_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* Everything is handled in the "finished" signal. Nothing to do. */ } GET: { /* Nothing to do */ } g_debug ("%s request read successfully", msg->method); } static void request_finished_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_debug ("%s request ended", msg->method); } static void request_aborted_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_debug ("%s request aborted!", msg->method); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* FIXME: Is there anything to do here? */ } GET: { /* We do all cleanup in the "finished" msg signal, nothing to do here */ } } gboolean exit_on_signal_cb (SoupServer *server) { soup_server_quit (server); return G_SOURCE_REMOVE; } int main (int argc, char *argv[]) { GHashTable *ctx_table; GOptionContext *optctx; GError *error = NULL; gst_init (&argc, &argv); optctx = g_option_context_new ("- Soup Transcoding Proxy"); g_option_context_add_main_entries (optctx, entries, NULL); g_option_context_add_group (optctx, gst_init_get_option_group ()); if (!g_option_context_parse (optctx, &argc, &argv, &error)) { g_printerr ("Error parsing options: %s\n", error->message); return 1; } g_option_context_free (optctx); /* Keys are paths, and values are TranscodeServerCtxs */ ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)stp_cleanup_transcode_server_ctx); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, ctx_table, NULL); g_signal_connect (server, "request-started", G_CALLBACK (request_started_cb), ctx_table); g_signal_connect (server, "request-read", G_CALLBACK (request_read_cb), ctx_table); g_signal_connect (server, "request-finished", G_CALLBACK (request_finished_cb), ctx_table); g_signal_connect (server, "request-aborted", G_CALLBACK (request_aborted_cb), ctx_table); g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server); soup_server_run (server); return 0; }