/* * vim: set sts=2 sw=2 et : * * License: LGPL-2.1+ * Copyright (c) 2014 Nirbheek Chauhan * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include #include #include #include "lib.h" #include "encode.h" #ifdef PLAY_DEBUG #include "debug/local-play.h" #endif #define STP_REST_LIST_STREAMS "/api/list-streams" #define STP_REST_ABORT_STREAM "/api/abort-stream" #define STP_REST_ADD_TOKEN "/api/add-token" #define STP_REST_REVOKE_TOKEN "/api/revoke-token" #define STP_REST_LIST_TOKENS "/api/list-tokens" typedef struct _STPHashTables STPHashTables; struct _STPHashTables { GHashTable *ctxs; GHashTable *tokens; }; static int port = 8000; static int client_timeout = 10; static int server_timeout = 5; static char *token_server_addrmask = NULL; static gboolean conflict_is_restart = FALSE; static GInetAddressMask *stp_inet_addrmask = NULL; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { "client-timeout", 0, 0, G_OPTION_ARG_INT, &client_timeout, "Outgoing client connection timeout (default: 10s)", "SECONDS" }, { "stream-timeout", 0, 0, G_OPTION_ARG_INT, &server_timeout, "Incoming stream connection timeout (default: 5s)", "SECONDS" }, { "token-server", 's', 0, G_OPTION_ARG_STRING, &token_server_addrmask, "Subnet (or IP addr) for the token server (default: disabled)", "ADDR/PREFIX" }, { "conflict-is-restart", 0, 0, G_OPTION_ARG_NONE, &conflict_is_restart, "If a new PUT stream is received at the same path as an existing one, treat it as a restart instead of reporting a 409/CONFLICT (default: no)", NULL }, { NULL } }; static SoupServer *server; static void stream_finished_cb (SoupMessage *msg, STPServerCtx *ctx); static STPServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; STPServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); ctx = g_hash_table_lookup (ctx_table, &soup_uri_get_path (uri)[1]); soup_uri_free (uri); if (!ctx) g_critical ("No matching context found for msg!\n"); return ctx; } /* Validates the query, parses it, and fetches a STPStreamToken from it */ static STPStreamToken* stp_validate_fetch_token_from_query (GHashTable *tokens, const char *sessionid, const char *query_string, guint *http_status_code) { GHashTable *query; char *type, *udp_clients; STPStreamToken *token, *perms_token; if (!query_string) { g_debug ("Rejecting stream: no query string specified"); *http_status_code = SOUP_STATUS_BAD_REQUEST; return NULL; } g_debug ("Recv query string: %s\n", query_string); if (stp_inet_addrmask && !g_hash_table_contains (tokens, sessionid)) { g_debug ("Rejecting stream: unauthorized session id"); *http_status_code = SOUP_STATUS_FORBIDDEN; return NULL; } query = soup_form_decode (query_string); token = g_new0 (STPStreamToken, 1); type = g_hash_table_lookup (query, "type"); token->stream_type = stp_get_stream_type_from_string (type); perms_token = g_hash_table_lookup (tokens, sessionid); if (stp_inet_addrmask && perms_token && perms_token->stream_type != token->stream_type) { g_debug ("Rejecting stream: Stream type does not match"); *http_status_code = SOUP_STATUS_FORBIDDEN; goto err; } if (!(token->stream_type & STP_STREAM_TYPE_RTP_UDP)) goto out; /* Only needed for RTP streaming */ udp_clients = g_hash_table_lookup (query, "udp-clients"); token->udp_clients = g_strdup (udp_clients); if (!token->udp_clients) { g_debug ("Rejecting stream: Stream type is UDP but no UDP clients specified"); *http_status_code = SOUP_STATUS_BAD_REQUEST; goto err; } if (perms_token && !stp_clients_is_subset (perms_token->udp_clients, token->udp_clients)) { g_debug ("Rejecting stream: UDP client list does not match"); *http_status_code = SOUP_STATUS_FORBIDDEN; goto err; } out: g_hash_table_destroy (query); return token; err: g_free (token); token = NULL; goto out; } static void stp_abort_server_ctx (STPServerCtx *ctx) { /* Abort all processing for this stream */ if (ctx->status == STP_STATUS_STREAMING && ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) { /* Close the PUT stream if necessary */ ctx->status = STP_STATUS_FINISHED; soup_message_set_status (ctx->msg, SOUP_STATUS_GONE); } stream_finished_cb (ctx->msg, ctx); /* Disconnect all clients, and shut down the stream */ invoke_g_hash_table_remove (ctx); } /* If it's been more than 5 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean increment_read_timer (STPServerCtx *ctx) { ctx->seconds_since_read += 2; g_debug ("Checking stream read time (%i)...", ctx->seconds_since_read); if (ctx->seconds_since_read < server_timeout) return G_SOURCE_CONTINUE; g_debug ("Stream timed out, sending EOS\n"); stream_finished_cb (ctx->msg, ctx); return G_SOURCE_REMOVE; } /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean increment_write_timer (STPClientCtx *ctx) { ctx->seconds_since_write += 2; if (ctx->seconds_since_write < client_timeout) return G_SOURCE_CONTINUE; g_printerr ("Client timed out, cleaning up\n"); /* FIXME: This isn't actually setting the status for client * connections, and clients are just left hanging */ soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); soup_message_body_complete (ctx->msg->response_body); stp_client_ctx_cleanup (ctx); return G_SOURCE_REMOVE; } static void client_eos_cb (GstElement *appsink, STPClientCtx *ctx) { g_debug ("Received EOS for client"); soup_message_set_status (ctx->msg, SOUP_STATUS_OK); soup_message_body_complete (ctx->msg->response_body); } static void can_write_next_client_chunk_cb (SoupMessage *msg, STPClientCtx *ctx) { ctx->seconds_since_write = 0; ctx->msg = msg; stp_print_status ("_"); } static gboolean invoke_write_client_chunk (STPClientCtx *ctx) { GstMapInfo info; GstSample *sample; GstBuffer *buffer; gboolean eos; stp_print_status ("*"); /* XXX: If samples are coming in too quickly, this can get invoked * after the client has disconnected and the SoupMessage is invalid. * When the client disconnects, timeout_handler_id is set to 0. */ if (G_UNLIKELY (ctx->timeout_handler_id == 0)) return G_SOURCE_REMOVE; g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); if (!sample) { g_debug ("Null sample, ending stream"); g_object_get (ctx->appsink, "eos", &eos, NULL); if (eos) soup_message_set_status (ctx->msg, SOUP_STATUS_OK); else soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (ctx->msg->response_body); return G_SOURCE_REMOVE; } buffer = gst_sample_get_buffer (sample); /* Skip all frames till the next keyframe */ if (!ctx->keyframe_found && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { gst_sample_unref (sample); return G_SOURCE_REMOVE; } ctx->keyframe_found = TRUE; gst_buffer_map (buffer, &info, GST_MAP_READ); soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, info.data, info.size); /* copy */ gst_buffer_unmap (buffer, &info); gst_sample_unref (sample); soup_server_unpause_message (server, ctx->msg); stp_print_status ("."); return G_SOURCE_REMOVE; } static GstFlowReturn write_client_chunk_cb (GstElement *appsink, STPClientCtx *ctx) { g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT, (GSourceFunc)invoke_write_client_chunk, ctx, NULL); return GST_FLOW_OK; } static void client_finished_cb (SoupMessage *msg, STPClientCtx *ctx) { stp_print_status ("Client finished/aborted, doing cleanup...\n"); stp_client_ctx_cleanup (ctx); } static void stream_finished_cb (SoupMessage *msg, STPServerCtx *ctx) { gboolean ret; stp_print_status ("Stream finished/aborted, queueing EOS... "); if (!ctx->appsrc) { ctx->status = STP_STATUS_FINISHED; stp_print_status ("No need.\n"); return; } ctx->status = STP_STATUS_FLUSHING; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (ret != GST_FLOW_OK) g_critical ("\nUnable to emit end-of-stream after an aborted stream"); else stp_print_status ("Done.\n"); } /* This is called when the entire request body has been read */ static void handle_request_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { STPServerCtx *server_ctx; if (!msg) return; g_debug ("Handling %s request on path %s", msg->method, path); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else if (msg->method == SOUP_METHOD_DELETE) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); return; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); return; } PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (!server_ctx) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); } else { if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) server_ctx->status = STP_STATUS_FLUSHING; soup_message_set_status (msg, SOUP_STATUS_OK); } return; } GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ GstCaps *caps = NULL; GstEvent *event; GstBuffer *buffer; GstFlowReturn ret; GstStateChangeReturn state_change; GstElement *encodebin, *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; STPClientCtx *client_ctx; server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); return; } /* Check if the pipeline successfully started PLAYING */ state_change = gst_element_get_state (server_ctx->pipeline, NULL, NULL, 100*GST_MSECOND); switch (state_change) { case GST_STATE_CHANGE_SUCCESS: break; case GST_STATE_CHANGE_FAILURE: /* PUT stream should've */ g_critical ("GET request on %s, but state change failure?", path); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); return; case GST_STATE_CHANGE_ASYNC: soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); return; default: g_assert_not_reached (); } g_debug ("New GET request on %s\n", path); /* Connect appsink to tee, and start streaming */ client_ctx = g_new0 (STPClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; bin = gst_bin_new (NULL); /* queue ! appsink */ q2 = gst_element_factory_make ("queue", NULL); appsink = gst_element_factory_make ("appsink", NULL); g_object_set (appsink, "drop", FALSE, "emit-signals", TRUE, "max-buffers", 0, NULL); gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); gst_element_link (q2, appsink); sinkpadq2 = gst_element_get_static_pad (q2, "sink"); client_ctx->ghostsinkpad = gst_ghost_pad_new (NULL, sinkpadq2); gst_element_add_pad (bin, client_ctx->ghostsinkpad); gst_object_unref (sinkpadq2); /* Request a new src pad from the tee in the pipeline */ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); /* Add to pipeline and sync state */ gst_bin_add (GST_BIN (server_ctx->pipeline), bin); if (!gst_element_sync_state_with_parent (bin)) { g_critical ("Unable to sync appsink bin with parent pipeline\n"); goto err; } g_debug ("appsink bin state synced successfully"); /* t. ! queue ! appsink */ gst_pad_link (srcpad, client_ctx->ghostsinkpad); /* Send the WebM stream header through the src pad first */ caps = gst_pad_get_current_caps (srcpad); buffer = stp_get_streamheader_from_caps (caps); if (!buffer) { g_critical ("Unable to get streamheader from caps"); goto force_key_unit; } ret = gst_pad_push (srcpad, buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push streamheader: %s", gst_flow_get_name (ret)); goto err; } force_key_unit: encodebin = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "encodebin"); event = gst_video_event_new_downstream_force_key_unit (GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, GST_CLOCK_TIME_NONE, FALSE, 1); if (!gst_element_send_event (encodebin, event)) g_critical ("Couldn't send force key unit event!"); gst_object_unref (encodebin); client_ctx->appsink = appsink; g_signal_connect (appsink, "new-sample", G_CALLBACK (write_client_chunk_cb), client_ctx); g_signal_connect (appsink, "eos", G_CALLBACK (client_eos_cb), client_ctx); g_signal_connect (client_ctx->msg, "wrote-chunk", G_CALLBACK (can_write_next_client_chunk_cb), client_ctx); client_ctx->finished_handler_id = g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); client_ctx->timeout_handler_id = \ g_timeout_add_seconds (2, (GSourceFunc)increment_write_timer, client_ctx); server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); /* Since we set the response as chunked, the server will * automatically pause the message for us, and the response * won't be completed till we call soup_message_body_complete() */ soup_message_set_status (msg, SOUP_STATUS_OK); /* Make sure that browsers never cache our responses */ soup_message_headers_append (msg->response_headers, "Cache-Control", "no-cache"); soup_message_headers_append (msg->response_headers, "Cache-Control", "no-store"); soup_message_headers_append (msg->response_headers, "Cache-Control", "must-revalidate"); soup_message_headers_append (msg->response_headers, "Pragma", "no-cache"); soup_message_headers_append (msg->response_headers, "Expires", "0"); soup_message_headers_append (msg->response_headers, "Content-Type", "video/webm"); out: gst_object_unref (template); gst_object_unref (srcpad); gst_object_unref (tee); if (caps) gst_caps_unref (caps); return; err: soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (client_ctx->msg->response_body); stp_client_ctx_cleanup (client_ctx); goto out; } } static void got_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, STPServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; /* XXX: Unfortunately, there doesn't seem to be a way to cancel the * reception of further chunks when we've prematurely set the response * (say due to an error), so we check if the status code is set and skip * all further chunks */ if (msg->status_code != 0 || ctx->status != STP_STATUS_STREAMING) return; /* We need to update this every chunk */ ctx->msg = msg; ctx->seconds_since_read = 0; g_debug ("Got data, resetting stream read time..."); /* Actually just a ref */ buffer = stp_get_gst_buffer (chunk); g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); gst_buffer_unref (buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push buffer\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); invoke_g_hash_table_remove (ctx); return; } } static void request_ended_no_body_cb (SoupMessage *msg, STPServerCtx *ctx) { g_critical ("Request ended without a body!"); invoke_g_hash_table_remove (ctx); } static void got_first_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, STPServerCtx *ctx) { int num; /* Disconnect the ctx cleanup function */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (request_ended_no_body_cb), ctx); if (num != 1) g_critical ("Unable to remove signal handler for cleanup function!\n"); /* Disconnect us from got-chunk */ num = g_signal_handlers_disconnect_by_func (msg, G_CALLBACK (got_first_request_body_chunk), ctx); if (num != 1) g_critical ("Unable to remove signal handler for first request chunk!\n"); ctx->msg = msg; ctx->pipeline = gst_pipeline_new ("pipe"); /* The chunked request is copied into this stream * for consumption by the gst pipeline */ #ifdef PLAY_DEBUG stp_play_from_msg (ctx); #else stp_encode_from_msg (ctx); #endif got_request_body_chunk (msg, chunk, ctx); if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline to PLAYING\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); invoke_g_hash_table_remove (ctx); } else { g_debug ("Set pipeline to PLAYING"); } /* Connect a different method for all further chunks */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); if (ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) /* Our incoming stream ends if the stream ends only if * we're using a chunked request encoding. * * This will also be called when the client sending * the PUT request disconnects prematurely */ g_signal_connect (msg, "finished", G_CALLBACK (stream_finished_cb), ctx); } static void got_request_headers (SoupMessage *msg, STPHashTables *tables) { SoupURI *uri; const char *path, *sessionid, *query; #ifdef HEADERS_DEBUG SoupMessageHeadersIter iter; const char *name, *value; soup_message_headers_iter_init (&iter, msg->request_headers); while (soup_message_headers_iter_next (&iter, &name, &value)) g_debug ("%s: %s", name, value); #endif g_object_get (msg, "uri", &uri, NULL); path = soup_uri_get_path (uri); query = soup_uri_get_query (uri); g_debug ("%s on uri %s", msg->method, path); /* Token API methods are handled in the server callbacks, * so we ignore those paths here. */ if (g_str_has_prefix (path, STP_REST_LIST_STREAMS) || g_str_has_prefix (path, STP_REST_ABORT_STREAM) || g_str_has_prefix (path, STP_REST_ADD_TOKEN) || g_str_has_prefix (path, STP_REST_REVOKE_TOKEN) || g_str_has_prefix (path, STP_REST_LIST_TOKENS)) goto out; /* Remove the leading '/' to get the session id */ sessionid = &path[1]; if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) /* Incoming stream; do everything needed */ goto PUT; else if (msg->method == SOUP_METHOD_GET) /* Just set response headers as appropriate */ goto GET; else if (msg->method == SOUP_METHOD_DELETE) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { guint http_status_code; const char *content_range; SoupEncoding encoding; gboolean connection_exists; STPServerCtx *ctx; STPStreamToken *token; token = stp_validate_fetch_token_from_query (tables->tokens, sessionid, query, &http_status_code); if (!token) { soup_message_set_status (msg, http_status_code); goto out; } connection_exists = g_hash_table_contains (tables->ctxs, sessionid); /* Data might be sent to us in one of three ways: * 1. Chunked encoding with the stream (and request body) sent in chunks * 2. Content-Length encoding with the entire stream in one chunk * 3. A persistent Content-Length encoding connection, and subsequent * requests can send more stream data, forever, subject to a timeout. * The first request will send a Content-Length request, and * subsequent ones will only have Content-Length + Content-Range set */ encoding = soup_message_headers_get_encoding (msg->request_headers); switch (encoding) { case SOUP_ENCODING_CHUNKED: g_debug ("Chunked encoding detected!"); if (connection_exists) { /* There's already a chunked request streaming on this path */ g_critical ("Recv duplicate request on the same URI: %s", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto PUT_out; } break; case SOUP_ENCODING_CONTENT_LENGTH: g_debug ("Content-Length encoding detected!"); content_range = soup_message_headers_get_one (msg->request_headers, "Content-Range"); /* TODO: Right now, we don't check if the Content-Range is valid, * and just pass the data to the pipeline. */ if (connection_exists && !content_range) { /* A connection already exists, and this one isn't a continuation * of the previous one, so it's a conflict... unless the * '--conflict-is-restart' option has been specified. */ if (conflict_is_restart) { ctx = get_server_ctx_from_msg (msg, tables->ctxs); stream_finished_cb (ctx->msg, ctx); invoke_g_hash_table_remove (ctx); goto new_conn; } g_critical ("Recv Content-Length PUT on '%s' without Content-Range", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto PUT_out; } if (!connection_exists && content_range) { /* We cancelled this connection, but we got another request which is * a continuation of the previous one */ soup_message_set_status (msg, SOUP_STATUS_GONE); goto PUT_out; } break; case SOUP_ENCODING_EOF: case SOUP_ENCODING_UNRECOGNIZED: case SOUP_ENCODING_NONE: case SOUP_ENCODING_BYTERANGES: g_critical ("Unknown encoding!"); soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto PUT_out; default: g_critical ("Unsupported encoding?"); soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto PUT_out; } /* Whether the incoming stream is chunked or fixed length, we want to * handle it chunked, so tell libsoup to not collect chunks for * forming a complete request body */ soup_message_body_set_accumulate (msg->request_body, FALSE); if (connection_exists) { g_debug ("Stream already exists, connecting everything to that..."); ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (ctx->status != STP_STATUS_STREAMING) { g_critical ("Recv more data on '%s' after timeout", path); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); goto PUT_out; } /* The chunks we'll get are a continuation of the previous one */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); goto PUT_out; } new_conn: /* This is a new connection; treat it as such */ ctx = g_new0 (STPServerCtx, 1); ctx->stream_type = token->stream_type; if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP) ctx->udp_clients = g_strdup (token->udp_clients); ctx->msg = msg; ctx->encoding = encoding; ctx->status = STP_STATUS_STREAMING; ctx->parent_ctx_table = tables->ctxs; ctx->sessionid = g_strdup (sessionid); g_hash_table_insert (tables->ctxs, ctx->sessionid, ctx); g_debug ("New PUT stream on %s\n", path); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_first_request_body_chunk), ctx); /* If we don't get any data after this, we have to cleanup ctx * otherwise we'll leak it */ g_signal_connect (msg, "finished", G_CALLBACK (request_ended_no_body_cb), ctx); if (encoding == SOUP_ENCODING_CONTENT_LENGTH) ctx->timeout_handler_id = g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); PUT_out: stp_stream_token_free (token); goto out; } GET: { STPServerCtx *server_ctx; server_ctx = g_hash_table_lookup (tables->ctxs, sessionid); if (!server_ctx || !(server_ctx->stream_type & STP_STREAM_TYPE_HTTP)) { stp_print_status ("No HTTP GET stream on URI: %s\n", path); soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } /* Our response will be chunked, and not with a fixed Content-Length */ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); /* The request body isn't fixed length, so tell libsoup to * not collect chunks for forming a complete request body */ soup_message_body_set_accumulate (msg->response_body, FALSE); } out: soup_uri_free (uri); return; } static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, STPHashTables *tables) { g_debug ("New %s request started", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", G_CALLBACK (got_request_headers), tables); } static void handle_add_token_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { if (msg->method == SOUP_METHOD_POST) { goto POST; } else if (g_strcmp0 (path, STP_REST_ADD_TOKEN) != 0) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } else if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_GET || msg->method == SOUP_METHOD_DELETE) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } POST: { STPStreamToken *token; const char *type, *sessionid, *udp_clients; if (!stp_validate_token_server (stp_inet_addrmask, client)) { g_debug ("Attempted to access token API from unauthorized host"); soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); goto out; } if (!query || !g_hash_table_contains (query, "sessionid") || !g_hash_table_contains (query, "type")) { soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto out; } sessionid = g_hash_table_lookup (query, "sessionid"); type = g_hash_table_lookup (query, "type"); udp_clients = g_hash_table_lookup (query, "udp-clients"); token = g_new0 (STPStreamToken, 1); token->stream_type = stp_get_stream_type_from_string (type); if ((udp_clients == NULL) && (token->stream_type & STP_STREAM_TYPE_RTP_UDP)) { g_free (token); soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto out; } token->udp_clients = g_strdup (udp_clients); /* We don't care if there's already an identical token, and * we don't care if there's ongoing streams with this token */ g_hash_table_insert (tables->tokens, g_strdup (sessionid), token); soup_message_set_status (msg, SOUP_STATUS_OK); } out: return; } static void handle_revoke_token_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { if (msg->method == SOUP_METHOD_DELETE) { goto DELETE; } else if (g_strcmp0 (path, STP_REST_REVOKE_TOKEN) != 0) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } else if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_GET || msg->method == SOUP_METHOD_POST) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } DELETE: { char *sessionid; if (!stp_validate_token_server (stp_inet_addrmask, client)) { g_debug ("Attempted to access token API from unauthorized host"); soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); goto out; } if (!query || !g_hash_table_contains (query, "sessionid")) { soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto out; } sessionid = g_hash_table_lookup (query, "sessionid"); if (!g_hash_table_remove (tables->tokens, sessionid)) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } if (g_hash_table_contains (tables->ctxs, sessionid)) /* If there's a stream using this session id, we remove the token, * but can't guarantee that the stream will stop. */ soup_message_set_status (msg, SOUP_STATUS_ACCEPTED); else soup_message_set_status (msg, SOUP_STATUS_OK); } out: return; } static void handle_list_tokens_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { if (msg->method == SOUP_METHOD_GET) { goto GET; } else if (g_strcmp0 (path, STP_REST_LIST_TOKENS) != 0) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } else if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST || msg->method == SOUP_METHOD_DELETE) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } GET: { gsize length; char *response; GHashTableIter i; JsonBuilder *b; JsonNode *dict; JsonGenerator *gen; gpointer key, value; STPStreamToken *token; const char *tmp; if (!stp_validate_token_server (stp_inet_addrmask, client)) { g_debug ("Attempted to access token API from unauthorized host"); soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); goto out; } g_hash_table_iter_init (&i, tables->tokens); b = json_builder_new (); json_builder_begin_array (b); while (g_hash_table_iter_next (&i, &key, &value)) { token = value; json_builder_begin_object (b); json_builder_set_member_name (b, "sessionid"); json_builder_add_string_value (b, key); json_builder_set_member_name (b, "udp-clients"); tmp = token->udp_clients; json_builder_add_string_value (b, tmp? tmp : ""); json_builder_set_member_name (b, "stream-type"); tmp = stp_get_stream_type_string (token->stream_type); json_builder_add_string_value (b, tmp); json_builder_end_object (b); } json_builder_end_array (b); dict = json_builder_get_root (b); g_object_unref (b); gen = json_generator_new (); json_generator_set_root (gen, dict); json_node_free (dict); json_generator_set_pretty (gen, TRUE); response = json_generator_to_data (gen, &length); g_object_unref (gen); soup_message_set_status (msg, SOUP_STATUS_OK); soup_message_body_append (msg->response_body, SOUP_MEMORY_TAKE, response, length); } out: return; } static void handle_list_streams_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { if (msg->method == SOUP_METHOD_GET) { goto GET; } else if (g_strcmp0 (path, STP_REST_LIST_STREAMS) != 0) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } else if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST || msg->method == SOUP_METHOD_DELETE) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } GET: { gsize length; const char *tmp; JsonBuilder *b; JsonNode *dict; JsonGenerator *gen; GHashTableIter i; gpointer key, value; STPServerCtx *ctx; char *sessionid, *response; if (!stp_validate_token_server (stp_inet_addrmask, client)) { g_debug ("Attempted to access token API from unauthorized host"); soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); goto out; } g_hash_table_iter_init (&i, tables->ctxs); b = json_builder_new (); json_builder_begin_array (b); while (g_hash_table_iter_next (&i, &key, &value)) { ctx = value; sessionid = key; json_builder_begin_object (b); json_builder_set_member_name (b, "type"); tmp = stp_get_stream_type_string (ctx->stream_type); json_builder_add_string_value (b, tmp); json_builder_set_member_name (b, "sessionid"); json_builder_add_string_value (b, sessionid); json_builder_set_member_name (b, "udp-clients"); tmp = ctx->udp_clients; json_builder_add_string_value (b, tmp? tmp : ""); json_builder_set_member_name (b, "status"); json_builder_add_string_value (b, ctx->status); json_builder_end_object (b); } json_builder_end_array (b); dict = json_builder_get_root (b); g_object_unref (b); gen = json_generator_new (); json_generator_set_root (gen, dict); json_node_free (dict); json_generator_set_pretty (gen, TRUE); response = json_generator_to_data (gen, &length); g_object_unref (gen); soup_message_set_status (msg, SOUP_STATUS_OK); soup_message_body_append (msg->response_body, SOUP_MEMORY_TAKE, response, length); } out: return; } static void handle_abort_stream_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, STPHashTables *tables) { if (msg->method == SOUP_METHOD_DELETE) { goto DELETE; } else if (g_strcmp0 (path, STP_REST_ABORT_STREAM) != 0) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } else if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_GET || msg->method == SOUP_METHOD_POST) { soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); goto out; } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } DELETE: { const char *sessionid; STPServerCtx *ctx; if (!stp_validate_token_server (stp_inet_addrmask, client)) { g_debug ("Attempted to access token API from unauthorized host"); soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); goto out; } sessionid = g_hash_table_lookup (query, "sessionid"); if (!sessionid) { soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); goto out; } ctx = g_hash_table_lookup (tables->ctxs, sessionid); if (!ctx) { soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } stp_abort_server_ctx (ctx); soup_message_set_status (msg, SOUP_STATUS_OK); } out: return; } gboolean exit_on_signal_cb (SoupServer *server) { soup_server_quit (server); g_object_unref (server); return G_SOURCE_REMOVE; } int main (int argc, char *argv[]) { STPHashTables *tables; GOptionContext *optctx; GError *error = NULL; gst_init (&argc, &argv); optctx = g_option_context_new ("- Soup Transcoding Proxy"); g_option_context_add_main_entries (optctx, entries, NULL); g_option_context_add_group (optctx, gst_init_get_option_group ()); if (!g_option_context_parse (optctx, &argc, &argv, &error)) { g_printerr ("Error parsing options: %s\n", error->message); return 1; } g_option_context_free (optctx); if (token_server_addrmask) { stp_inet_addrmask = g_inet_address_mask_new_from_string (token_server_addrmask, &error); if (!stp_inet_addrmask) { g_printerr ("Error parsing token server address mask: %s\n", error->message); return 1; } } tables = g_new0 (STPHashTables, 1); /* Keys are paths, and values are STPServerCtxs */ tables->ctxs = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)stp_server_ctx_cleanup); /* Keys are "session-id:host:port", and there are no values. * If host or port are missing, they will be blank. */ tables->tokens = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)stp_stream_token_free); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); /* Handle the end of POST/PUT, and all GET requests */ soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, tables, NULL); /* NOTE: When adding a new path handler, an exception must be made in * got_request_headers() for that path, otherwise a 405 will be returned */ soup_server_add_handler (server, STP_REST_ABORT_STREAM, (SoupServerCallback)handle_abort_stream_cb, tables, NULL); soup_server_add_handler (server, STP_REST_LIST_STREAMS, (SoupServerCallback)handle_list_streams_cb, tables, NULL); /* REST API for token handling, if requested */ if (token_server_addrmask) { soup_server_add_handler (server, STP_REST_ADD_TOKEN, (SoupServerCallback)handle_add_token_cb, tables, NULL); soup_server_add_handler (server, STP_REST_REVOKE_TOKEN, (SoupServerCallback)handle_revoke_token_cb, tables, NULL); soup_server_add_handler (server, STP_REST_LIST_TOKENS, (SoupServerCallback)handle_list_tokens_cb, tables, NULL); } /* Since chunked streaming requests send all the data in a single request, * we need to handle those when we get the headers. Hence, all PUT stream * handling is done here. */ g_signal_connect (server, "request-started", G_CALLBACK (request_started_cb), tables); g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server); soup_server_run (server); g_hash_table_unref (tables->tokens); g_hash_table_unref (tables->ctxs); g_free (tables); return 0; }