diff options
Diffstat (limited to 'src/main.c')
-rw-r--r-- | src/main.c | 670 |
1 files changed, 585 insertions, 85 deletions
@@ -22,6 +22,7 @@ #include <glib-unix.h> #include <gst/video/video.h> +#include <json-glib/json-glib.h> #include "lib.h" #include "encode.h" @@ -30,31 +31,47 @@ #include "debug/local-play.h" #endif +#define STP_REST_LIST_STREAMS "/list-streams" +#define STP_REST_ABORT_STREAM "/abort-stream" +#define STP_REST_ADD_TOKEN "/add-token" +#define STP_REST_REVOKE_TOKEN "/revoke-token" +#define STP_REST_LIST_TOKENS "/list-tokens" + +typedef struct _STPHashTables STPHashTables; + +struct _STPHashTables { + GHashTable *ctxs; + GHashTable *tokens; +}; + static int port = 8000; static int client_timeout = 10; static int server_timeout = 5; +static char *token_server_addrmask = NULL; +static GInetAddressMask *stp_inet_addrmask = NULL; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" }, + { "token-server", 's', 0, G_OPTION_ARG_STRING, &token_server_addrmask, "Subnet (or IP addr) for the token server (default: disabled)", "ADDR/PREFIX" }, { NULL } }; static SoupServer *server; -static void stream_finished_cb (SoupMessage *msg, - TranscodeServerCtx *ctx); +static void stream_finished_cb (SoupMessage *msg, + STPServerCtx *ctx); -static TranscodeServerCtx* +static STPServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; - TranscodeServerCtx *ctx; + STPServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); - ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); + ctx = g_hash_table_lookup (ctx_table, &soup_uri_get_path (uri)[1]); soup_uri_free (uri); if (!ctx) @@ -63,8 +80,67 @@ get_server_ctx_from_msg (SoupMessage *msg, return ctx; } +/* Validates the query, parses it, and fetches a STPStreamToken from it */ +static STPStreamToken* +stp_validate_fetch_token_from_query (GHashTable *tokens, + const char *sessionid, + const char *query_string, + guint *http_status_code) +{ + GHashTable *query; + char *type, *udp_clients; + STPStreamToken *token, *valid_token; + + if (!query_string) { + *http_status_code = SOUP_STATUS_BAD_REQUEST; + return NULL; + } + + g_debug ("Recv query string: %s\n", query_string); + + if (stp_inet_addrmask && + !g_hash_table_contains (tokens, sessionid)) { + *http_status_code = SOUP_STATUS_FORBIDDEN; + return NULL; + } + + query = soup_form_decode (query_string); + + token = g_new0 (STPStreamToken, 1); + + type = g_hash_table_lookup (query, "type"); + token->stream_type = stp_get_stream_type_from_string (type); + + valid_token = g_hash_table_lookup (tokens, sessionid); + if (stp_inet_addrmask && + valid_token->stream_type != token->stream_type) { + *http_status_code = SOUP_STATUS_FORBIDDEN; + g_free (token); + token = NULL; + goto out; + } + + if (!(token->stream_type & STP_STREAM_TYPE_RTP_UDP)) + goto out; + + /* Host and port are only needed for RTP streaming */ + udp_clients = g_hash_table_lookup (query, "udp-clients"); + if (!udp_clients) { + *http_status_code = SOUP_STATUS_BAD_REQUEST; + g_free (token); + token = NULL; + goto out; + } + + token->udp_clients = g_strdup (udp_clients); + +out: + g_hash_table_destroy (query); + return token; +} + static void -stp_abort_server_ctx (TranscodeServerCtx *ctx) +stp_abort_server_ctx (STPServerCtx *ctx) { /* Abort all processing for this stream */ if (ctx->status == STP_STATUS_STREAMING && @@ -73,6 +149,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx) ctx->status = STP_STATUS_FINISHED; soup_message_set_status (ctx->msg, SOUP_STATUS_GONE); } + stream_finished_cb (ctx->msg, ctx); /* Disconnect all clients, and shut down the stream */ invoke_g_hash_table_remove (ctx); } @@ -80,7 +157,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx) /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean -increment_read_timer (TranscodeServerCtx *ctx) +increment_read_timer (STPServerCtx *ctx) { ctx->seconds_since_read += 2; if (ctx->seconds_since_read < server_timeout) @@ -94,7 +171,7 @@ increment_read_timer (TranscodeServerCtx *ctx) /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean -increment_write_timer (TranscodeClientCtx *ctx) +increment_write_timer (STPClientCtx *ctx) { ctx->seconds_since_write += 2; if (ctx->seconds_since_write < client_timeout) @@ -105,13 +182,13 @@ increment_write_timer (TranscodeClientCtx *ctx) * connections, and clients are just left hanging */ soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); soup_message_body_complete (ctx->msg->response_body); - stp_cleanup_transcode_client_ctx (ctx); + stp_client_ctx_cleanup (ctx); return G_SOURCE_REMOVE; } static void client_eos_cb (GstElement *appsink, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { g_debug ("Received EOS for client"); soup_message_set_status (ctx->msg, SOUP_STATUS_OK); @@ -120,7 +197,7 @@ client_eos_cb (GstElement *appsink, static void can_write_next_client_chunk_cb (SoupMessage *msg, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { ctx->seconds_since_write = 0; ctx->msg = msg; @@ -128,7 +205,7 @@ can_write_next_client_chunk_cb (SoupMessage *msg, } static gboolean -invoke_write_client_chunk (TranscodeClientCtx *ctx) +invoke_write_client_chunk (STPClientCtx *ctx) { GstMapInfo info; GstSample *sample; @@ -180,7 +257,7 @@ invoke_write_client_chunk (TranscodeClientCtx *ctx) static GstFlowReturn write_client_chunk_cb (GstElement *appsink, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT, (GSourceFunc)invoke_write_client_chunk, @@ -190,15 +267,15 @@ write_client_chunk_cb (GstElement *appsink, static void client_finished_cb (SoupMessage *msg, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { stp_print_status ("Client finished/aborted, doing cleanup...\n"); - stp_cleanup_transcode_client_ctx (ctx); + stp_client_ctx_cleanup (ctx); } static void -stream_finished_cb (SoupMessage *msg, - TranscodeServerCtx *ctx) +stream_finished_cb (SoupMessage *msg, + STPServerCtx *ctx) { gboolean ret; @@ -226,9 +303,9 @@ handle_request_cb (SoupServer *server, const char *path, GHashTable *query, SoupClientContext *client, - GHashTable *ctx_table) + STPHashTables *tables) { - TranscodeServerCtx *server_ctx; + STPServerCtx *server_ctx; if (!msg) return; @@ -240,7 +317,10 @@ handle_request_cb (SoupServer *server, goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; - else { + else if (msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + return; + } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); return; } @@ -249,10 +329,14 @@ PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ - server_ctx = get_server_ctx_from_msg (msg, ctx_table); - if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) - server_ctx->status = STP_STATUS_FLUSHING; - soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); + server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); + if (!server_ctx) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + } else { + if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) + server_ctx->status = STP_STATUS_FLUSHING; + soup_message_set_status (msg, SOUP_STATUS_OK); + } return; } @@ -268,9 +352,9 @@ GET: GstElement *encodebin, *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; - TranscodeClientCtx *client_ctx; + STPClientCtx *client_ctx; - server_ctx = get_server_ctx_from_msg (msg, ctx_table); + server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ @@ -296,10 +380,10 @@ GET: g_assert_not_reached (); } - g_print ("New GET request on %s\n", path); + g_debug ("New GET request on %s\n", path); /* Connect appsink to tee, and start streaming */ - client_ctx = g_new0 (TranscodeClientCtx, 1); + client_ctx = g_new0 (STPClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; @@ -410,15 +494,15 @@ out: err: soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (client_ctx->msg->response_body); - stp_cleanup_transcode_client_ctx (client_ctx); + stp_client_ctx_cleanup (client_ctx); goto out; } } static void -got_request_body_chunk (SoupMessage *msg, - SoupBuffer *chunk, - TranscodeServerCtx *ctx) +got_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + STPServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; @@ -448,18 +532,17 @@ got_request_body_chunk (SoupMessage *msg, } static void -request_ended_no_body_cb (SoupMessage *msg, - TranscodeServerCtx *ctx) +request_ended_no_body_cb (SoupMessage *msg, + STPServerCtx *ctx) { g_critical ("Request ended without a body!"); invoke_g_hash_table_remove (ctx); - g_free (ctx); } static void -got_first_request_body_chunk (SoupMessage *msg, - SoupBuffer *chunk, - TranscodeServerCtx *ctx) +got_first_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + STPServerCtx *ctx) { int num; @@ -512,10 +595,11 @@ got_first_request_body_chunk (SoupMessage *msg, } static void -got_request_headers (SoupMessage *msg, - GHashTable *ctx_table) +got_request_headers (SoupMessage *msg, + STPHashTables *tables) { SoupURI *uri; + const char *path, *sessionid, *query; #ifdef HEADERS_DEBUG SoupMessageHeadersIter iter; @@ -526,42 +610,72 @@ got_request_headers (SoupMessage *msg, #endif g_object_get (msg, "uri", &uri, NULL); - g_debug ("%s on uri %s", msg->method, soup_uri_get_path (uri)); + path = soup_uri_get_path (uri); + query = soup_uri_get_query (uri); + g_debug ("%s on uri %s", msg->method, path); + + /* Token API methods are handled in the server callbacks, + * so we ignore those paths here. */ + if (g_str_has_prefix (path, STP_REST_LIST_STREAMS) || + g_str_has_prefix (path, STP_REST_ABORT_STREAM) || + g_str_has_prefix (path, STP_REST_ADD_TOKEN) || + g_str_has_prefix (path, STP_REST_REVOKE_TOKEN) || + g_str_has_prefix (path, STP_REST_LIST_TOKENS)) + goto out; + + /* Remove the leading '/' to get the session id */ + sessionid = &path[1]; if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) + /* Incoming stream; do everything needed */ goto PUT; else if (msg->method == SOUP_METHOD_GET) + /* Just set response headers as appropriate */ goto GET; - else { + else if (msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { + guint http_status_code; const char *content_range; SoupEncoding encoding; gboolean connection_exists; - TranscodeServerCtx *ctx; + STPServerCtx *ctx; + STPStreamToken *token; + + token = stp_validate_fetch_token_from_query (tables->tokens, + sessionid, query, + &http_status_code); + if (!token) { + soup_message_set_status (msg, http_status_code); + goto out; + } - connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri)); + connection_exists = g_hash_table_contains (tables->ctxs, sessionid); /* Data might be sent to us in one of three ways: * 1. Chunked encoding with the stream (and request body) sent in chunks * 2. Content-Length encoding with the entire stream in one chunk * 3. A persistent Content-Length encoding connection, and subsequent - * requests can send more stream data, forever, subject to a timeout */ + * requests can send more stream data, forever, subject to a timeout. + * The first request will send a Content-Length request, and + * subsequent ones will only have Content-Length + Content-Range set */ encoding = soup_message_headers_get_encoding (msg->request_headers); switch (encoding) { case SOUP_ENCODING_CHUNKED: g_debug ("Chunked encoding detected!"); if (connection_exists) { /* There's already a chunked request streaming on this path */ - g_critical ("Recv duplicate request on the same URI: %s", - soup_uri_get_path (uri)); + g_critical ("Recv duplicate request on the same URI: %s", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); - goto out; + goto PUT_out; } break; case SOUP_ENCODING_CONTENT_LENGTH: @@ -573,10 +687,15 @@ PUT: { if (connection_exists && !content_range) { /* A connection already exists, and this one isn't a continuation * of the previous one, so it's a conflict */ - g_critical ("Recv Content-Length PUT on '%s' without Content-Range", - soup_uri_get_path (uri)); + g_critical ("Recv Content-Length PUT on '%s' without Content-Range", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); - goto out; + goto PUT_out; + } + if (!connection_exists && content_range) { + /* We cancelled this connection, but we got another request which is + * a continuation of the previous one */ + soup_message_set_status (msg, SOUP_STATUS_GONE); + goto PUT_out; } break; case SOUP_ENCODING_EOF: @@ -584,9 +703,12 @@ PUT: { case SOUP_ENCODING_NONE: case SOUP_ENCODING_BYTERANGES: g_critical ("Unknown encoding!"); - goto out; + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto PUT_out; default: g_critical ("Unsupported encoding?"); + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto PUT_out; } /* Whether the incoming stream is chunked or fixed length, we want to @@ -596,28 +718,32 @@ PUT: { if (connection_exists) { g_debug ("Stream already exists, connecting everything to that..."); - ctx = get_server_ctx_from_msg (msg, ctx_table); + ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (ctx->status != STP_STATUS_STREAMING) { - g_critical ("Recv more data on '%s' after timeout", - soup_uri_get_path (uri)); + g_critical ("Recv more data on '%s' after timeout", path); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); - goto out; + goto PUT_out; } /* The chunks we'll get are a continuation of the previous one */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); + PUT_out: + stp_stream_token_free (token); goto out; } /* This is a new connection; treat it as such */ - ctx = g_new0 (TranscodeServerCtx, 1); + ctx = g_new0 (STPServerCtx, 1); + ctx->stream_type = token->stream_type; + if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP) + ctx->udp_clients = token->udp_clients; ctx->msg = msg; ctx->encoding = encoding; ctx->status = STP_STATUS_STREAMING; - ctx->parent_ctx_table = ctx_table; - ctx->path = g_strdup (soup_uri_get_path (uri)); - g_hash_table_insert (ctx_table, ctx->path, ctx); - g_print ("New PUT stream on %s\n", ctx->path); + ctx->parent_ctx_table = tables->ctxs; + ctx->sessionid = g_strdup (sessionid); + g_hash_table_insert (tables->ctxs, ctx->sessionid, ctx); + g_debug ("New PUT stream on %s\n", path); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_first_request_body_chunk), ctx); @@ -626,27 +752,20 @@ PUT: { g_signal_connect (msg, "finished", G_CALLBACK (request_ended_no_body_cb), ctx); if (encoding == SOUP_ENCODING_CONTENT_LENGTH) - g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); + ctx->timeout_handler_id = + g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); goto out; } GET: { - const char *query; - TranscodeServerCtx *server_ctx; + STPServerCtx *server_ctx; - if (!g_hash_table_contains (ctx_table, - soup_uri_get_path (uri))) { - g_critical ("No stream on URI: %s", soup_uri_get_path (uri)); - soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); - goto out; - } + server_ctx = g_hash_table_lookup (tables->ctxs, sessionid); - query = soup_uri_get_query (uri); - if (query && g_strcmp0 (query, "abort") == 0) { - /* We just use the "uri" field for this, so this is OK */ - server_ctx = get_server_ctx_from_msg (msg, ctx_table); - stp_abort_server_ctx (server_ctx); - soup_message_set_status (msg, SOUP_STATUS_OK); + if (!server_ctx || + !(server_ctx->stream_type & STP_STREAM_TYPE_HTTP)) { + stp_print_status ("No HTTP GET stream on URI: %s\n", path); + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } @@ -666,14 +785,351 @@ static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, - GHashTable *ctx_table) + STPHashTables *tables) { g_debug ("New %s request started", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", - G_CALLBACK (got_request_headers), ctx_table); + G_CALLBACK (got_request_headers), tables); +} + +static void +handle_add_token_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_POST) { + goto POST; + } else if (g_strcmp0 (path, STP_REST_ADD_TOKEN) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +POST: { + STPStreamToken *token; + const char *type, *sessionid, *udp_clients; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + if (!query || + !g_hash_table_contains (query, "sessionid") || + !g_hash_table_contains (query, "type")) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + type = g_hash_table_lookup (query, "type"); + udp_clients = g_hash_table_lookup (query, "udp-clients"); + + token = g_new0 (STPStreamToken, 1); + token->stream_type = stp_get_stream_type_from_string (type); + + if ((udp_clients == NULL) && + (token->stream_type & STP_STREAM_TYPE_RTP_UDP)) { + g_free (token); + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + token->udp_clients = g_strdup (udp_clients); + + /* We don't care if there's already an identical token, and + * we don't care if there's ongoing streams with this token */ + g_hash_table_insert (tables->tokens, g_strdup (sessionid), token); + + soup_message_set_status (msg, SOUP_STATUS_OK); + } + +out: + return; +} + +static void +handle_revoke_token_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_DELETE) { + goto DELETE; + } else if (g_strcmp0 (path, STP_REST_REVOKE_TOKEN) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_POST) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +DELETE: { + char *sessionid; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + if (!query || + !g_hash_table_contains (query, "sessionid")) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + + if (!g_hash_table_remove (tables->tokens, sessionid)) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + if (g_hash_table_contains (tables->ctxs, sessionid)) + /* If there's a stream using this session id, we remove the token, + * but can't guarantee that the stream will stop. */ + soup_message_set_status (msg, SOUP_STATUS_ACCEPTED); + else + soup_message_set_status (msg, SOUP_STATUS_OK); + } + +out: + return; +} + +static void +handle_list_tokens_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_GET) { + goto GET; + } else if (g_strcmp0 (path, STP_REST_LIST_TOKENS) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +GET: { + gsize length; + char *response; + GHashTableIter i; + JsonBuilder *b; + JsonNode *dict; + JsonGenerator *gen; + gpointer key, value; + STPStreamToken *token; + const char *tmp; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + g_hash_table_iter_init (&i, tables->tokens); + b = json_builder_new (); + json_builder_begin_array (b); + while (g_hash_table_iter_next (&i, &key, &value)) { + token = value; + json_builder_begin_object (b); + + json_builder_set_member_name (b, "sessionid"); + json_builder_add_string_value (b, key); + + json_builder_set_member_name (b, "udp-clients"); + tmp = token->udp_clients; + json_builder_add_string_value (b, tmp? tmp : ""); + + json_builder_set_member_name (b, "stream-type"); + tmp = stp_get_stream_type_string (token->stream_type); + json_builder_add_string_value (b, tmp); + + json_builder_end_object (b); + } + json_builder_end_array (b); + dict = json_builder_get_root (b); + g_object_unref (b); + + gen = json_generator_new (); + json_generator_set_root (gen, dict); + json_node_free (dict); + + json_generator_set_pretty (gen, TRUE); + response = json_generator_to_data (gen, &length); + g_object_unref (gen); + + soup_message_set_status (msg, SOUP_STATUS_OK); + soup_message_body_append (msg->response_body, + SOUP_MEMORY_TAKE, + response, length); + } +out: + return; +} + +static void +handle_list_streams_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_GET) { + goto GET; + } else if (g_strcmp0 (path, STP_REST_LIST_STREAMS) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +GET: { + gsize length; + const char *tmp; + JsonBuilder *b; + JsonNode *dict; + JsonGenerator *gen; + GHashTableIter i; + gpointer key, value; + STPServerCtx *ctx; + char *sessionid, *response; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + g_hash_table_iter_init (&i, tables->ctxs); + b = json_builder_new (); + json_builder_begin_array (b); + while (g_hash_table_iter_next (&i, &key, &value)) { + ctx = value; + sessionid = key; + json_builder_begin_object (b); + + json_builder_set_member_name (b, "type"); + tmp = stp_get_stream_type_string (ctx->stream_type); + json_builder_add_string_value (b, tmp); + + json_builder_set_member_name (b, "sessionid"); + json_builder_add_string_value (b, sessionid); + + json_builder_set_member_name (b, "udp-clients"); + tmp = ctx->udp_clients; + json_builder_add_string_value (b, tmp? tmp : ""); + + json_builder_set_member_name (b, "status"); + json_builder_add_string_value (b, ctx->status); + + json_builder_end_object (b); + } + json_builder_end_array (b); + dict = json_builder_get_root (b); + g_object_unref (b); + + gen = json_generator_new (); + json_generator_set_root (gen, dict); + json_node_free (dict); + + json_generator_set_pretty (gen, TRUE); + response = json_generator_to_data (gen, &length); + g_object_unref (gen); + + soup_message_set_status (msg, SOUP_STATUS_OK); + soup_message_body_append (msg->response_body, + SOUP_MEMORY_TAKE, + response, length); + } +out: + return; +} + +static void +handle_abort_stream_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_DELETE) { + goto DELETE; + } else if (g_strcmp0 (path, STP_REST_ABORT_STREAM) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_POST) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +DELETE: { + const char *sessionid; + STPServerCtx *ctx; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + if (!sessionid) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + ctx = g_hash_table_lookup (tables->ctxs, sessionid); + if (!ctx) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + stp_abort_server_ctx (ctx); + soup_message_set_status (msg, SOUP_STATUS_OK); + } +out: + return; } gboolean @@ -687,7 +1143,7 @@ int main (int argc, char *argv[]) { - GHashTable *ctx_table; + STPHashTables *tables; GOptionContext *optctx; GError *error = NULL; @@ -702,20 +1158,64 @@ main (int argc, } g_option_context_free (optctx); - /* Keys are paths, and values are TranscodeServerCtxs */ - ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, - (GDestroyNotify)g_free, - (GDestroyNotify)stp_cleanup_transcode_server_ctx); + if (token_server_addrmask) { + stp_inet_addrmask = g_inet_address_mask_new_from_string (token_server_addrmask, + &error); + if (!stp_inet_addrmask) { + g_printerr ("Error parsing token server address mask: %s\n", error->message); + return 1; + } + } + + tables = g_new0 (STPHashTables, 1); + /* Keys are paths, and values are STPServerCtxs */ + tables->ctxs = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_server_ctx_cleanup); + /* Keys are "session-id:host:port", and there are no values. + * If host or port are missing, they will be blank. */ + tables->tokens = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_stream_token_free); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); + + /* Handle the end of POST/PUT, and all GET requests */ soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, - ctx_table, NULL); + tables, NULL); + + /* NOTE: When adding a new path handler, an exception must be made in + * got_request_headers() for that path, otherwise a 405 will be returned */ + soup_server_add_handler (server, STP_REST_ABORT_STREAM, + (SoupServerCallback)handle_abort_stream_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_LIST_STREAMS, + (SoupServerCallback)handle_list_streams_cb, + tables, NULL); + + /* REST API for token handling, if requested */ + if (token_server_addrmask) { + soup_server_add_handler (server, STP_REST_ADD_TOKEN, + (SoupServerCallback)handle_add_token_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_REVOKE_TOKEN, + (SoupServerCallback)handle_revoke_token_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_LIST_TOKENS, + (SoupServerCallback)handle_list_tokens_cb, + tables, NULL); + } + + /* Since chunked streaming requests send all the data in a single request, + * we need to handle those when we get the headers. Hence, all PUT stream + * handling is done here. */ g_signal_connect (server, "request-started", - G_CALLBACK (request_started_cb), ctx_table); + G_CALLBACK (request_started_cb), tables); + g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server); soup_server_run (server); |