From e0d453a3a2c120264b13c44f004025b0330547a8 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Thu, 24 Jul 2014 23:26:02 +0530 Subject: Implement concurrent RTP-UDP streams, and a REST API for status and auth When the --token-server=ADDR/MASK argument is passed to the server, the token verification framework is enabled, and the specified subnet is allowed to access the REST API to add/revoke/list tokens that allow clients to connect, and to list/abort streams running on the server. Details about the REST API are documented in the file "REST-API". There were also some organisational and name changes in the code. --- Makefile | 2 +- README | 71 ++++-- REST-API | 49 ++++ src/debug/local-play.c | 12 +- src/debug/local-play.h | 2 +- src/encode.c | 32 +-- src/encode.h | 2 +- src/lib.c | 85 ++++++- src/lib.h | 75 ++++-- src/main.c | 670 ++++++++++++++++++++++++++++++++++++++++++------- 10 files changed, 828 insertions(+), 172 deletions(-) create mode 100644 REST-API diff --git a/Makefile b/Makefile index daf90c8..3af6ebc 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ DEBUG_CFLAGS := -O0 -ggdb -Wall -fsanitize=address -fno-omit-frame-pointer -DENC CFLAGS := -O0 -ggdb -Wall -fno-omit-frame-pointer -DENCODE_DEBUG #CFLAGS := -O3 -march=native -Wall PKGCONFIG := pkg-config -LIBS := $(shell $(PKGCONFIG) --libs --cflags glib-2.0 gio-unix-2.0 libsoup-2.4 gstreamer-pbutils-1.0 gstreamer-video-1.0) -lrt +LIBS := $(shell $(PKGCONFIG) --libs --cflags glib-2.0 gio-unix-2.0 libsoup-2.4 gstreamer-pbutils-1.0 gstreamer-video-1.0 json-glib-1.0) -lrt SERVER_NAME := stp-server SRC_OBJS := $(addprefix src/,debug/local-play.o lib.o encode.o) diff --git a/README b/README index 50c490a..6772060 100644 --- a/README +++ b/README @@ -7,54 +7,77 @@ Runs on port 8000 by default. Further options can be seen with the --help flag: $ ./stp-server --help -The server supports multiple PUT streams and multiple GET streams for each PUT -stream. +The server supports multiple HTTP PUT streams and multiple RTP/UDP and HTTP GET +streams for each PUT stream. Usage with curl: ---------------- -Sending a stream (PUT): +> Sending a stream (PUT) for only HTTP GET streaming: - $ curl -v "http://localhost:8000/somepath" -T - < [some video file] + $ curl -v "http://localhost:8000/[sessionid]?type=http" -T - < [video file] This will use Chunked encoding and send the file in chunks. Optionally, you can rate limit to emulate a live stream (--limit-rate) Trying to create two streams on the same path will return a 409. -Reading the webm output stream (GET): +> Reading the webm output stream (GET): - $ curl -v "http://localhost:8000/somepath" > [some output file] + $ curl -v "http://localhost:8000/[sessionid]" > [output file] The server supports multiple PUT requests on different paths, and multiple GET requests from these paths. -A running/encoding stream can be aborted by sending a GET request with the query -"abort" to a path where a stream is running. +> Sending a stream (PUT) for only RTP-UDP streaming to 127.0.0.1:5004: + + $ curl -v "http://localhost:8000/[sessionid]?type=rtp-udp&udp-clients=127.0.0.1:5004" -T - < [video file] + +> Sending a stream (PUT) for both RTP-UDP and HTTP GET streaming: + + $ curl -v "http://localhost:8000/[sessionid]?type=rtp-udp,http&udp-clients=127.0.0.1:5004" -T - < [video file] + +> curl handles only HTTP, so there is no way to read the RTP-UDP stream using it - $ curl -v "http://localhost:8000/somepath?abort" Usage with souphttp: -------------------- -Sending a stream from a file (PUT): +> Sending a stream from a file (PUT) for only HTTP GET streaming: $ gst-launch-1.0 filesrc location=[video file] ! \ - souphttpclientsink location="http://localhost:8000/somepath" + souphttpclientsink location="http://localhost:8000/[sessionid]?type=http" This will use a persistent HTTP connection and Content-Length + Content-Range headers to send the stream data. -Reading a webm output stream (GET): +> Sending a stream from a file (PUT) for only RTP-UDP streaming: + + $ gst-launch-1.0 filesrc location=[video file] ! \ + souphttpclientsink location="http://localhost:8000/[sessionid]?type=rtp-udp&udp-clients=localhost:5004" + +> Reading a webm output stream (GET): - $ gst-launch-1.0 souphttpsrc location="http://localhost:8000/somepath" ! \ + $ gst-launch-1.0 souphttpsrc location="http://localhost:8000/[sessionid]" ! \ filesink location=[some output file] -Known bugs: ------------ -* Doing a massive PUT in a single go (say, 300-400MB or more) of a WebM - file causes the server to become overwhelmed. This problem doesn't happen for - non-WebM file dumps. Note however, that this is not a problem in the usual - mode of operation using live streams. -* Opening multiple GET streams from the same PUT stream works, but clients other - than the first one might timeout due to a bug that is being investigated. -* There are no queue size limit handling for PUT streams, and hence the server - can take a lot of memory if the PUT stream is sending data faster than it can - be encoded. In most circumstances, this is actually a feature. :-) +> Reading an RTP-UDP output stream: + + $ gst-launch-1.0 udpsrc host=127.0.0.1 port=5004 caps="application/x-rtp" ! \ + rtpvp8depay ! vp8dec ! videoconvert ! autovideosink + +> Sending a stream from a file (PUT) for both RTP-UDP and HTTP GET streaming: + + $ gst-launch-1.0 filesrc location=[video file] ! \ + souphttpclientsink location="http://localhost:8000/[sessionid]?type=rtp-udp,http&udp-clients=localhost:5004" + +"udp-clients" is a comma-separated list of UDP host:ports to broadcast the RTP +stream to. + +Token Validation: +----------------- +When the --token-server option is specified, only valid sessionids +("[sessionid]" in the above examples) are allowed to PUT or GET streams. In +addition, for RTP-UDP streams, the host and port must also be valid. These are +checked as soon as a client connects, so these tokens must be added to the +server's internal list before a client can connect to the server. See the +REST-API file for details about this. + +If --token-server is not specified, token validation is disabled. diff --git a/REST-API b/REST-API new file mode 100644 index 0000000..ec1c2ca --- /dev/null +++ b/REST-API @@ -0,0 +1,49 @@ +The server now implements two REST APIs: + +1) A token-based validation API for clients that want to stream +2) A simple API to list or abort streams running on the server + + +Token-based Validation: +======================= + +The server maintains a hash table of (sessionid, udp-host, udp-port) tuples +against which incoming (HTTP PUT/POST) and outgoing (HTTP GET) streams are +validated. The following API is available to manipulate this table: + +Adding Tokens: +-------------- + +> Adding a token for an HTTP stream + POST http://[host]:8000/add-token?type=http&sessionid=[sessionid] + +> Adding a token for an RTP-UDP stream on host:port + POST http://[host]:8000/add-token?type=rtp-udp&sessionid=[sessionid]&udp-clients=[host]:[port] + +> Adding a token for an HTTP stream and RTP-UDP streams on host1:port1,host2:port2 + POST http://[host]:8000/add-token?type=http,rtp-udp&sessionid=[sessionid]&udp-clients=[host1]:[port1],[host2]:[port2] + +Revoking Tokens: +---------------- + +> Revoking a token + DELETE http://[host]:8000/revoke-token?sessionid=[sessionid] + +Listing Tokens: +---------------- + +> Listing all tokens + GET http://[host]:8000/list-tokens + +This will return a JSON array of dicts with the "sessionid", "host", and "port" +for each token. + + +Listing and Aborting Running Streams: +===================================== + +> Listing all streams + GET http://[host]:8000/list-streams + +> Aborting a running stream + DELETE http://[host]:8000/abort-stream?sessionid=[sessionid] diff --git a/src/debug/local-play.c b/src/debug/local-play.c index 8dbb90d..4302c2f 100644 --- a/src/debug/local-play.c +++ b/src/debug/local-play.c @@ -23,8 +23,8 @@ #include "local-play.h" static void -pad_has_video_caps (TranscodeServerCtx *ctx, - GstPad *decodebin_pad) +pad_has_video_caps (STPServerCtx *ctx, + GstPad *decodebin_pad) { GstPad *sink_pad; GstElement *bin; @@ -60,8 +60,8 @@ out: } static void -pad_has_audio_caps (TranscodeServerCtx *ctx, - GstPad *decodebin_pad) +pad_has_audio_caps (STPServerCtx *ctx, + GstPad *decodebin_pad) { GstPad *sink_pad; GstElement *bin; @@ -116,7 +116,7 @@ decodebin_pad_added (GstElement *decodebin, { GstCaps *pad_caps; GstStructure *structure; - TranscodeServerCtx *ctx = user_data; + STPServerCtx *ctx = user_data; if (!gst_pad_has_current_caps (src_pad)) { g_critical ("Decodebin pad doesn't have current caps"); @@ -140,7 +140,7 @@ decodebin_pad_added (GstElement *decodebin, } void -stp_play_from_msg (TranscodeServerCtx *ctx) +stp_play_from_msg (STPServerCtx *ctx) { GstBus *bus; GstElement *src, *decodebin; diff --git a/src/debug/local-play.h b/src/debug/local-play.h index 131a2af..eb8c283 100644 --- a/src/debug/local-play.h +++ b/src/debug/local-play.h @@ -25,6 +25,6 @@ #include "../lib.h" -void stp_play_from_msg (TranscodeServerCtx *ctx); +void stp_play_from_msg (STPServerCtx *ctx); #endif /* _SST_LOCAL_PLAY */ diff --git a/src/encode.c b/src/encode.c index 2de4a9a..a18fe5e 100644 --- a/src/encode.c +++ b/src/encode.c @@ -117,9 +117,9 @@ on_pad_probe_buffer (GstPad *pad, } static void -on_demuxer_pad_added (GstElement *demuxer, - GstPad *srcpad, - GstElement *pipeline) +on_demuxer_pad_added (GstElement *demuxer, + GstPad *srcpad, + STPServerCtx *ctx) { char *name; GstCaps *caps; @@ -135,8 +135,9 @@ on_demuxer_pad_added (GstElement *demuxer, /* Broadcast VP8 RTP over UDP://localhost:5004 */ rtppay = gst_element_factory_make ("rtpvp8pay", "rtpvp8pay"); - udpsink = gst_element_factory_make ("udpsink", "udpsink"); - gst_bin_add_many (GST_BIN (pipeline), rtppay, udpsink, NULL); + udpsink = gst_element_factory_make ("multiudpsink", "multiudpsink"); + g_object_set (udpsink, "clients", ctx->udp_clients, NULL); + gst_bin_add_many (GST_BIN (ctx->pipeline), rtppay, udpsink, NULL); gst_element_link (rtppay, udpsink); if (!gst_element_sync_state_with_parent (rtppay) || @@ -154,8 +155,8 @@ out: } static void -do_udp_rtp_broadcast (GstElement *decodebin, - GstElement *pipeline) +do_udp_rtp_broadcast (GstElement *decodebin, + STPServerCtx *ctx) { GstPadTemplate *template; GstPad *srcpad, *sinkpad; @@ -168,10 +169,10 @@ do_udp_rtp_broadcast (GstElement *decodebin, g_object_set (demuxer, "location", "filesink.webm", NULL);*/ demuxer = gst_element_factory_make ("matroskademux", "matroskademux"); - gst_bin_add_many (GST_BIN (pipeline), q, demuxer, NULL); + gst_bin_add_many (GST_BIN (ctx->pipeline), q, demuxer, NULL); gst_element_link (q, demuxer); - tee = gst_bin_get_by_name (GST_BIN (pipeline), "tee"); + tee = gst_bin_get_by_name (GST_BIN (ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); @@ -190,7 +191,7 @@ do_udp_rtp_broadcast (GstElement *decodebin, (GstPadProbeCallback) on_pad_probe_buffer, NULL, NULL); g_signal_connect (demuxer, "pad-added", - G_CALLBACK (on_demuxer_pad_added), pipeline); + G_CALLBACK (on_demuxer_pad_added), ctx); out: gst_object_unref (template); @@ -239,7 +240,7 @@ create_webm_profile (void) } void -stp_encode_from_msg (TranscodeServerCtx *ctx) +stp_encode_from_msg (STPServerCtx *ctx) { #ifdef ENCODE_DEBUG char *tmp, *filename; @@ -288,7 +289,7 @@ stp_encode_from_msg (TranscodeServerCtx *ctx) q2 = gst_element_factory_make ("queue", "q2"); filesink = gst_element_factory_make ("filesink", "filesink"); - tmp = g_uri_escape_string (ctx->path, NULL, TRUE); + tmp = g_uri_escape_string (ctx->sessionid, NULL, TRUE); filename = g_strdup_printf ("debug-encode-%s.webm", tmp); g_object_set (filesink, "location", filename, NULL); @@ -322,9 +323,10 @@ stp_encode_from_msg (TranscodeServerCtx *ctx) * corresponding sink pad on decodebin */ g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_decodebin_pad_added), encodebin); - /* When finished adding pads, try streaming it all over RTP/UDP */ - g_signal_connect (decodebin, "no-more-pads", - G_CALLBACK (do_udp_rtp_broadcast), ctx->pipeline); + if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP) + /* If requested to, when finished adding pads, stream things over RTP/UDP */ + g_signal_connect (decodebin, "no-more-pads", + G_CALLBACK (do_udp_rtp_broadcast), ctx); bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline)); gst_bus_add_signal_watch (bus); diff --git a/src/encode.h b/src/encode.h index 15ac859..3a68909 100644 --- a/src/encode.h +++ b/src/encode.h @@ -25,6 +25,6 @@ #include "lib.h" -void stp_encode_from_msg (TranscodeServerCtx *ctx); +void stp_encode_from_msg (STPServerCtx *ctx); #endif /* _SST_ENCODE */ diff --git a/src/lib.c b/src/lib.c index 838f9e2..cc36292 100644 --- a/src/lib.c +++ b/src/lib.c @@ -22,26 +22,76 @@ #include "lib.h" -static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx); +static void stp_disconnect_cleanup_client (STPClientCtx *ctx); gboolean -invoke_g_hash_table_remove (TranscodeServerCtx *ctx) +invoke_g_hash_table_remove (STPServerCtx *ctx) { - g_hash_table_remove (ctx->parent_ctx_table, ctx->path); + g_hash_table_remove (ctx->parent_ctx_table, ctx->sessionid); return G_SOURCE_REMOVE; } gboolean -invoke_g_free_client_context (TranscodeClientCtx *ctx) +invoke_g_free_client_context (STPClientCtx *ctx) { g_free (ctx); return G_SOURCE_REMOVE; } +guint +stp_get_stream_type_from_string (const char *type) +{ + if (type == NULL) + return STP_STREAM_TYPE_ALL; + + if (soup_str_case_equal (type, "http")) + return STP_STREAM_TYPE_HTTP; + + if (soup_str_case_equal (type, "rtp-udp")) + return STP_STREAM_TYPE_RTP_UDP; + + if (soup_str_case_equal (type, "rtp-udp,http") || + soup_str_case_equal (type, "http,rtp-udp")) + return STP_STREAM_TYPE_ALL; + + return STP_STREAM_TYPE_INVALID; +} + +const char* +stp_get_stream_type_string (guint stream_type) +{ + switch (stream_type) { + case STP_STREAM_TYPE_HTTP: + return "http"; + case STP_STREAM_TYPE_RTP_UDP: + return "rtp-udp"; + case STP_STREAM_TYPE_ALL: + return "http,rtp-udp"; + default: + return "invalid"; + } +} + +gboolean +stp_validate_token_server (GInetAddressMask *mask, + SoupClientContext *client) +{ + gboolean ret; + GInetAddress *addr; + + if (!mask) + return TRUE; + + addr = g_inet_address_new_from_string (soup_client_context_get_host (client)); + ret = g_inet_address_mask_matches (mask, addr); + g_object_unref (addr); + return ret; +} + gboolean -stp_on_gst_bus_message (GstBus *bus, - GstMessage *msg, - TranscodeServerCtx *ctx) +stp_on_gst_bus_message (GstBus *bus, + GstMessage *msg, + STPServerCtx *ctx) { GError *error = NULL; char *tmp = NULL; @@ -85,10 +135,13 @@ stp_on_gst_bus_message (GstBus *bus, * which initiates a shutdown for all clients and then * the server itself */ void -stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) +stp_server_ctx_cleanup (STPServerCtx *ctx) { g_debug (">>> Doing server cleanup"); + g_source_remove (ctx->timeout_handler_id); + ctx->timeout_handler_id = 0; + /* Disconnect and free the context for each client */ g_list_foreach (ctx->clients, (GFunc)stp_disconnect_cleanup_client, NULL); g_list_free (ctx->clients); @@ -99,13 +152,14 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) gst_object_unref (ctx->pipeline); } + g_free (ctx->udp_clients); g_free (ctx); } static GstPadProbeReturn pad_blocked_cleanup_cb (GstPad *srcpad, GstPadProbeInfo *info, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { GstElement *tee; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); @@ -133,10 +187,10 @@ pad_blocked_cleanup_cb (GstPad *srcpad, * branch for the client from the pipeline, and then freeing the * context. */ void -stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) +stp_client_ctx_cleanup (STPClientCtx *ctx) { GstPad *srcpad; - TranscodeServerCtx *server_ctx = ctx->server_ctx; + STPServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); stp_print_status (">>> Doing client cleanup."); @@ -157,11 +211,18 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) stp_print_status ("."); } +void +stp_stream_token_free (STPStreamToken *token) +{ + g_free (token->udp_clients); + g_free (token); +} + /* When shutting down a client due to a server shutdown, * we just need to remove the client timeout handler and * free the context. The rest is owned by the pipeline. */ static void -stp_disconnect_cleanup_client (TranscodeClientCtx *ctx) +stp_disconnect_cleanup_client (STPClientCtx *ctx) { stp_print_status (">>> Disconnecting client on server shutdown\n"); diff --git a/src/lib.h b/src/lib.h index 4813321..f97b0de 100644 --- a/src/lib.h +++ b/src/lib.h @@ -27,39 +27,50 @@ #include #include -typedef struct _TranscodeServerCtx TranscodeServerCtx; -typedef struct _TranscodeClientCtx TranscodeClientCtx; - -enum stp_stream_status { - STP_STATUS_NONE, - STP_STATUS_STREAMING, - STP_STATUS_FLUSHING, - STP_STATUS_FINISHED, -}; +typedef struct _STPServerCtx STPServerCtx; +typedef struct _STPClientCtx STPClientCtx; +typedef struct _STPStreamToken STPStreamToken; + +#define STP_STREAM_TYPE_INVALID (1 << 0) +#define STP_STREAM_TYPE_HTTP (1 << 1) +#define STP_STREAM_TYPE_RTP_UDP (1 << 2) +#define STP_STREAM_TYPE_RTP_TCP (1 << 3) /* Not supported yet */ +#define STP_STREAM_TYPE_ALL STP_STREAM_TYPE_HTTP | STP_STREAM_TYPE_RTP_UDP -struct _TranscodeServerCtx { +#define STP_STATUS_NONE g_intern_static_string ("none") +#define STP_STATUS_STREAMING g_intern_static_string ("streaming") +#define STP_STATUS_FLUSHING g_intern_static_string ("flushing") +#define STP_STATUS_FINISHED g_intern_static_string ("finished") + +struct _STPServerCtx { SoupMessage *msg; GstElement *pipeline; GstElement *appsrc; - /* If the encoding is not chunked, we'll get multiple requests - * with separate Content-Length headers on the same path */ - SoupEncoding encoding; - /* Set to TRUE when the incoming stream ends; let's us know if + /* Also used as the key in the parent hash table */ + char *sessionid; + /* Comman-separated list of host:port pairs */ + char *udp_clients; + /* RTP or HTTP or both */ + guint stream_type; + /* Keeps track of the stream running status, let's us know if * we need to set the PUT response on EOS/ERROR on the pipeline, * and whether to reject further data when using a persistent * Content-Length + Content-Range PUT stream. */ - enum stp_stream_status status; + const char *status; + + /* If the encoding is not chunked, we'll get multiple requests + * with separate Content-Length headers on the same path */ + SoupEncoding encoding; + guint timeout_handler_id; guint seconds_since_read; /* List of client contexts */ GList *clients; /* Reference to the parent context hash table */ GHashTable *parent_ctx_table; - /* Reference to the key in the parent hash table */ - char *path; }; -struct _TranscodeClientCtx { +struct _STPClientCtx { /* We don't hold refs to any of these */ SoupMessage *msg; GstElement *appsink; @@ -70,7 +81,13 @@ struct _TranscodeClientCtx { gulong finished_handler_id; guint seconds_since_write; /* The transcode server context; we don't hold a ref to this */ - TranscodeServerCtx *server_ctx; + STPServerCtx *server_ctx; +}; + +struct _STPStreamToken { + guint stream_type; + /* Comma-separated host:port list */ + char *udp_clients; }; #ifdef ENCODE_DEBUG @@ -79,18 +96,22 @@ struct _TranscodeClientCtx { #define stp_print_status(...) do {} while (0) #endif -gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx); -gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx); +gboolean invoke_g_hash_table_remove (STPServerCtx *ctx); +gboolean invoke_g_free_client_context (STPClientCtx *ctx); + +const char* stp_get_stream_type_string (guint stream_type); +guint stp_get_stream_type_from_string (const char *type); + +gboolean stp_validate_token_server (GInetAddressMask *mask, + SoupClientContext *client); -void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx); -void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx); +void stp_server_ctx_cleanup (STPServerCtx *ctx); +void stp_client_ctx_cleanup (STPClientCtx *ctx); +void stp_stream_token_free (STPStreamToken *token); gboolean stp_on_gst_bus_message (GstBus *bus, GstMessage *msg, - TranscodeServerCtx *ctx); -gboolean stp_unref_gst_buffer (GstBuffer **buffer, - guint idx, - gpointer user_data); + STPServerCtx *ctx); GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps); GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk); diff --git a/src/main.c b/src/main.c index 63c3a13..926dc86 100644 --- a/src/main.c +++ b/src/main.c @@ -22,6 +22,7 @@ #include #include +#include #include "lib.h" #include "encode.h" @@ -30,31 +31,47 @@ #include "debug/local-play.h" #endif +#define STP_REST_LIST_STREAMS "/list-streams" +#define STP_REST_ABORT_STREAM "/abort-stream" +#define STP_REST_ADD_TOKEN "/add-token" +#define STP_REST_REVOKE_TOKEN "/revoke-token" +#define STP_REST_LIST_TOKENS "/list-tokens" + +typedef struct _STPHashTables STPHashTables; + +struct _STPHashTables { + GHashTable *ctxs; + GHashTable *tokens; +}; + static int port = 8000; static int client_timeout = 10; static int server_timeout = 5; +static char *token_server_addrmask = NULL; +static GInetAddressMask *stp_inet_addrmask = NULL; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" }, + { "token-server", 's', 0, G_OPTION_ARG_STRING, &token_server_addrmask, "Subnet (or IP addr) for the token server (default: disabled)", "ADDR/PREFIX" }, { NULL } }; static SoupServer *server; -static void stream_finished_cb (SoupMessage *msg, - TranscodeServerCtx *ctx); +static void stream_finished_cb (SoupMessage *msg, + STPServerCtx *ctx); -static TranscodeServerCtx* +static STPServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; - TranscodeServerCtx *ctx; + STPServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); - ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); + ctx = g_hash_table_lookup (ctx_table, &soup_uri_get_path (uri)[1]); soup_uri_free (uri); if (!ctx) @@ -63,8 +80,67 @@ get_server_ctx_from_msg (SoupMessage *msg, return ctx; } +/* Validates the query, parses it, and fetches a STPStreamToken from it */ +static STPStreamToken* +stp_validate_fetch_token_from_query (GHashTable *tokens, + const char *sessionid, + const char *query_string, + guint *http_status_code) +{ + GHashTable *query; + char *type, *udp_clients; + STPStreamToken *token, *valid_token; + + if (!query_string) { + *http_status_code = SOUP_STATUS_BAD_REQUEST; + return NULL; + } + + g_debug ("Recv query string: %s\n", query_string); + + if (stp_inet_addrmask && + !g_hash_table_contains (tokens, sessionid)) { + *http_status_code = SOUP_STATUS_FORBIDDEN; + return NULL; + } + + query = soup_form_decode (query_string); + + token = g_new0 (STPStreamToken, 1); + + type = g_hash_table_lookup (query, "type"); + token->stream_type = stp_get_stream_type_from_string (type); + + valid_token = g_hash_table_lookup (tokens, sessionid); + if (stp_inet_addrmask && + valid_token->stream_type != token->stream_type) { + *http_status_code = SOUP_STATUS_FORBIDDEN; + g_free (token); + token = NULL; + goto out; + } + + if (!(token->stream_type & STP_STREAM_TYPE_RTP_UDP)) + goto out; + + /* Host and port are only needed for RTP streaming */ + udp_clients = g_hash_table_lookup (query, "udp-clients"); + if (!udp_clients) { + *http_status_code = SOUP_STATUS_BAD_REQUEST; + g_free (token); + token = NULL; + goto out; + } + + token->udp_clients = g_strdup (udp_clients); + +out: + g_hash_table_destroy (query); + return token; +} + static void -stp_abort_server_ctx (TranscodeServerCtx *ctx) +stp_abort_server_ctx (STPServerCtx *ctx) { /* Abort all processing for this stream */ if (ctx->status == STP_STATUS_STREAMING && @@ -73,6 +149,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx) ctx->status = STP_STATUS_FINISHED; soup_message_set_status (ctx->msg, SOUP_STATUS_GONE); } + stream_finished_cb (ctx->msg, ctx); /* Disconnect all clients, and shut down the stream */ invoke_g_hash_table_remove (ctx); } @@ -80,7 +157,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx) /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean -increment_read_timer (TranscodeServerCtx *ctx) +increment_read_timer (STPServerCtx *ctx) { ctx->seconds_since_read += 2; if (ctx->seconds_since_read < server_timeout) @@ -94,7 +171,7 @@ increment_read_timer (TranscodeServerCtx *ctx) /* If it's been more than 10 seconds since the last time we wrote * a chunk for a GET response, we timeout and drop the connection */ static gboolean -increment_write_timer (TranscodeClientCtx *ctx) +increment_write_timer (STPClientCtx *ctx) { ctx->seconds_since_write += 2; if (ctx->seconds_since_write < client_timeout) @@ -105,13 +182,13 @@ increment_write_timer (TranscodeClientCtx *ctx) * connections, and clients are just left hanging */ soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT); soup_message_body_complete (ctx->msg->response_body); - stp_cleanup_transcode_client_ctx (ctx); + stp_client_ctx_cleanup (ctx); return G_SOURCE_REMOVE; } static void client_eos_cb (GstElement *appsink, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { g_debug ("Received EOS for client"); soup_message_set_status (ctx->msg, SOUP_STATUS_OK); @@ -120,7 +197,7 @@ client_eos_cb (GstElement *appsink, static void can_write_next_client_chunk_cb (SoupMessage *msg, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { ctx->seconds_since_write = 0; ctx->msg = msg; @@ -128,7 +205,7 @@ can_write_next_client_chunk_cb (SoupMessage *msg, } static gboolean -invoke_write_client_chunk (TranscodeClientCtx *ctx) +invoke_write_client_chunk (STPClientCtx *ctx) { GstMapInfo info; GstSample *sample; @@ -180,7 +257,7 @@ invoke_write_client_chunk (TranscodeClientCtx *ctx) static GstFlowReturn write_client_chunk_cb (GstElement *appsink, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT, (GSourceFunc)invoke_write_client_chunk, @@ -190,15 +267,15 @@ write_client_chunk_cb (GstElement *appsink, static void client_finished_cb (SoupMessage *msg, - TranscodeClientCtx *ctx) + STPClientCtx *ctx) { stp_print_status ("Client finished/aborted, doing cleanup...\n"); - stp_cleanup_transcode_client_ctx (ctx); + stp_client_ctx_cleanup (ctx); } static void -stream_finished_cb (SoupMessage *msg, - TranscodeServerCtx *ctx) +stream_finished_cb (SoupMessage *msg, + STPServerCtx *ctx) { gboolean ret; @@ -226,9 +303,9 @@ handle_request_cb (SoupServer *server, const char *path, GHashTable *query, SoupClientContext *client, - GHashTable *ctx_table) + STPHashTables *tables) { - TranscodeServerCtx *server_ctx; + STPServerCtx *server_ctx; if (!msg) return; @@ -240,7 +317,10 @@ handle_request_cb (SoupServer *server, goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; - else { + else if (msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + return; + } else { soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); return; } @@ -249,10 +329,14 @@ PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ - server_ctx = get_server_ctx_from_msg (msg, ctx_table); - if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) - server_ctx->status = STP_STATUS_FLUSHING; - soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); + server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); + if (!server_ctx) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + } else { + if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) + server_ctx->status = STP_STATUS_FLUSHING; + soup_message_set_status (msg, SOUP_STATUS_OK); + } return; } @@ -268,9 +352,9 @@ GET: GstElement *encodebin, *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; - TranscodeClientCtx *client_ctx; + STPClientCtx *client_ctx; - server_ctx = get_server_ctx_from_msg (msg, ctx_table); + server_ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ @@ -296,10 +380,10 @@ GET: g_assert_not_reached (); } - g_print ("New GET request on %s\n", path); + g_debug ("New GET request on %s\n", path); /* Connect appsink to tee, and start streaming */ - client_ctx = g_new0 (TranscodeClientCtx, 1); + client_ctx = g_new0 (STPClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; @@ -410,15 +494,15 @@ out: err: soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (client_ctx->msg->response_body); - stp_cleanup_transcode_client_ctx (client_ctx); + stp_client_ctx_cleanup (client_ctx); goto out; } } static void -got_request_body_chunk (SoupMessage *msg, - SoupBuffer *chunk, - TranscodeServerCtx *ctx) +got_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + STPServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; @@ -448,18 +532,17 @@ got_request_body_chunk (SoupMessage *msg, } static void -request_ended_no_body_cb (SoupMessage *msg, - TranscodeServerCtx *ctx) +request_ended_no_body_cb (SoupMessage *msg, + STPServerCtx *ctx) { g_critical ("Request ended without a body!"); invoke_g_hash_table_remove (ctx); - g_free (ctx); } static void -got_first_request_body_chunk (SoupMessage *msg, - SoupBuffer *chunk, - TranscodeServerCtx *ctx) +got_first_request_body_chunk (SoupMessage *msg, + SoupBuffer *chunk, + STPServerCtx *ctx) { int num; @@ -512,10 +595,11 @@ got_first_request_body_chunk (SoupMessage *msg, } static void -got_request_headers (SoupMessage *msg, - GHashTable *ctx_table) +got_request_headers (SoupMessage *msg, + STPHashTables *tables) { SoupURI *uri; + const char *path, *sessionid, *query; #ifdef HEADERS_DEBUG SoupMessageHeadersIter iter; @@ -526,42 +610,72 @@ got_request_headers (SoupMessage *msg, #endif g_object_get (msg, "uri", &uri, NULL); - g_debug ("%s on uri %s", msg->method, soup_uri_get_path (uri)); + path = soup_uri_get_path (uri); + query = soup_uri_get_query (uri); + g_debug ("%s on uri %s", msg->method, path); + + /* Token API methods are handled in the server callbacks, + * so we ignore those paths here. */ + if (g_str_has_prefix (path, STP_REST_LIST_STREAMS) || + g_str_has_prefix (path, STP_REST_ABORT_STREAM) || + g_str_has_prefix (path, STP_REST_ADD_TOKEN) || + g_str_has_prefix (path, STP_REST_REVOKE_TOKEN) || + g_str_has_prefix (path, STP_REST_LIST_TOKENS)) + goto out; + + /* Remove the leading '/' to get the session id */ + sessionid = &path[1]; if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) + /* Incoming stream; do everything needed */ goto PUT; else if (msg->method == SOUP_METHOD_GET) + /* Just set response headers as appropriate */ goto GET; - else { + else if (msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { + guint http_status_code; const char *content_range; SoupEncoding encoding; gboolean connection_exists; - TranscodeServerCtx *ctx; + STPServerCtx *ctx; + STPStreamToken *token; + + token = stp_validate_fetch_token_from_query (tables->tokens, + sessionid, query, + &http_status_code); + if (!token) { + soup_message_set_status (msg, http_status_code); + goto out; + } - connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri)); + connection_exists = g_hash_table_contains (tables->ctxs, sessionid); /* Data might be sent to us in one of three ways: * 1. Chunked encoding with the stream (and request body) sent in chunks * 2. Content-Length encoding with the entire stream in one chunk * 3. A persistent Content-Length encoding connection, and subsequent - * requests can send more stream data, forever, subject to a timeout */ + * requests can send more stream data, forever, subject to a timeout. + * The first request will send a Content-Length request, and + * subsequent ones will only have Content-Length + Content-Range set */ encoding = soup_message_headers_get_encoding (msg->request_headers); switch (encoding) { case SOUP_ENCODING_CHUNKED: g_debug ("Chunked encoding detected!"); if (connection_exists) { /* There's already a chunked request streaming on this path */ - g_critical ("Recv duplicate request on the same URI: %s", - soup_uri_get_path (uri)); + g_critical ("Recv duplicate request on the same URI: %s", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); - goto out; + goto PUT_out; } break; case SOUP_ENCODING_CONTENT_LENGTH: @@ -573,10 +687,15 @@ PUT: { if (connection_exists && !content_range) { /* A connection already exists, and this one isn't a continuation * of the previous one, so it's a conflict */ - g_critical ("Recv Content-Length PUT on '%s' without Content-Range", - soup_uri_get_path (uri)); + g_critical ("Recv Content-Length PUT on '%s' without Content-Range", path); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); - goto out; + goto PUT_out; + } + if (!connection_exists && content_range) { + /* We cancelled this connection, but we got another request which is + * a continuation of the previous one */ + soup_message_set_status (msg, SOUP_STATUS_GONE); + goto PUT_out; } break; case SOUP_ENCODING_EOF: @@ -584,9 +703,12 @@ PUT: { case SOUP_ENCODING_NONE: case SOUP_ENCODING_BYTERANGES: g_critical ("Unknown encoding!"); - goto out; + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto PUT_out; default: g_critical ("Unsupported encoding?"); + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto PUT_out; } /* Whether the incoming stream is chunked or fixed length, we want to @@ -596,28 +718,32 @@ PUT: { if (connection_exists) { g_debug ("Stream already exists, connecting everything to that..."); - ctx = get_server_ctx_from_msg (msg, ctx_table); + ctx = get_server_ctx_from_msg (msg, tables->ctxs); if (ctx->status != STP_STATUS_STREAMING) { - g_critical ("Recv more data on '%s' after timeout", - soup_uri_get_path (uri)); + g_critical ("Recv more data on '%s' after timeout", path); soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT); - goto out; + goto PUT_out; } /* The chunks we'll get are a continuation of the previous one */ g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); + PUT_out: + stp_stream_token_free (token); goto out; } /* This is a new connection; treat it as such */ - ctx = g_new0 (TranscodeServerCtx, 1); + ctx = g_new0 (STPServerCtx, 1); + ctx->stream_type = token->stream_type; + if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP) + ctx->udp_clients = token->udp_clients; ctx->msg = msg; ctx->encoding = encoding; ctx->status = STP_STATUS_STREAMING; - ctx->parent_ctx_table = ctx_table; - ctx->path = g_strdup (soup_uri_get_path (uri)); - g_hash_table_insert (ctx_table, ctx->path, ctx); - g_print ("New PUT stream on %s\n", ctx->path); + ctx->parent_ctx_table = tables->ctxs; + ctx->sessionid = g_strdup (sessionid); + g_hash_table_insert (tables->ctxs, ctx->sessionid, ctx); + g_debug ("New PUT stream on %s\n", path); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_first_request_body_chunk), ctx); @@ -626,27 +752,20 @@ PUT: { g_signal_connect (msg, "finished", G_CALLBACK (request_ended_no_body_cb), ctx); if (encoding == SOUP_ENCODING_CONTENT_LENGTH) - g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); + ctx->timeout_handler_id = + g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx); goto out; } GET: { - const char *query; - TranscodeServerCtx *server_ctx; + STPServerCtx *server_ctx; - if (!g_hash_table_contains (ctx_table, - soup_uri_get_path (uri))) { - g_critical ("No stream on URI: %s", soup_uri_get_path (uri)); - soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); - goto out; - } + server_ctx = g_hash_table_lookup (tables->ctxs, sessionid); - query = soup_uri_get_query (uri); - if (query && g_strcmp0 (query, "abort") == 0) { - /* We just use the "uri" field for this, so this is OK */ - server_ctx = get_server_ctx_from_msg (msg, ctx_table); - stp_abort_server_ctx (server_ctx); - soup_message_set_status (msg, SOUP_STATUS_OK); + if (!server_ctx || + !(server_ctx->stream_type & STP_STREAM_TYPE_HTTP)) { + stp_print_status ("No HTTP GET stream on URI: %s\n", path); + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } @@ -666,14 +785,351 @@ static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, - GHashTable *ctx_table) + STPHashTables *tables) { g_debug ("New %s request started", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", - G_CALLBACK (got_request_headers), ctx_table); + G_CALLBACK (got_request_headers), tables); +} + +static void +handle_add_token_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_POST) { + goto POST; + } else if (g_strcmp0 (path, STP_REST_ADD_TOKEN) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +POST: { + STPStreamToken *token; + const char *type, *sessionid, *udp_clients; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + if (!query || + !g_hash_table_contains (query, "sessionid") || + !g_hash_table_contains (query, "type")) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + type = g_hash_table_lookup (query, "type"); + udp_clients = g_hash_table_lookup (query, "udp-clients"); + + token = g_new0 (STPStreamToken, 1); + token->stream_type = stp_get_stream_type_from_string (type); + + if ((udp_clients == NULL) && + (token->stream_type & STP_STREAM_TYPE_RTP_UDP)) { + g_free (token); + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + token->udp_clients = g_strdup (udp_clients); + + /* We don't care if there's already an identical token, and + * we don't care if there's ongoing streams with this token */ + g_hash_table_insert (tables->tokens, g_strdup (sessionid), token); + + soup_message_set_status (msg, SOUP_STATUS_OK); + } + +out: + return; +} + +static void +handle_revoke_token_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_DELETE) { + goto DELETE; + } else if (g_strcmp0 (path, STP_REST_REVOKE_TOKEN) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_POST) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +DELETE: { + char *sessionid; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + if (!query || + !g_hash_table_contains (query, "sessionid")) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + + if (!g_hash_table_remove (tables->tokens, sessionid)) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + if (g_hash_table_contains (tables->ctxs, sessionid)) + /* If there's a stream using this session id, we remove the token, + * but can't guarantee that the stream will stop. */ + soup_message_set_status (msg, SOUP_STATUS_ACCEPTED); + else + soup_message_set_status (msg, SOUP_STATUS_OK); + } + +out: + return; +} + +static void +handle_list_tokens_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_GET) { + goto GET; + } else if (g_strcmp0 (path, STP_REST_LIST_TOKENS) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +GET: { + gsize length; + char *response; + GHashTableIter i; + JsonBuilder *b; + JsonNode *dict; + JsonGenerator *gen; + gpointer key, value; + STPStreamToken *token; + const char *tmp; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + g_hash_table_iter_init (&i, tables->tokens); + b = json_builder_new (); + json_builder_begin_array (b); + while (g_hash_table_iter_next (&i, &key, &value)) { + token = value; + json_builder_begin_object (b); + + json_builder_set_member_name (b, "sessionid"); + json_builder_add_string_value (b, key); + + json_builder_set_member_name (b, "udp-clients"); + tmp = token->udp_clients; + json_builder_add_string_value (b, tmp? tmp : ""); + + json_builder_set_member_name (b, "stream-type"); + tmp = stp_get_stream_type_string (token->stream_type); + json_builder_add_string_value (b, tmp); + + json_builder_end_object (b); + } + json_builder_end_array (b); + dict = json_builder_get_root (b); + g_object_unref (b); + + gen = json_generator_new (); + json_generator_set_root (gen, dict); + json_node_free (dict); + + json_generator_set_pretty (gen, TRUE); + response = json_generator_to_data (gen, &length); + g_object_unref (gen); + + soup_message_set_status (msg, SOUP_STATUS_OK); + soup_message_body_append (msg->response_body, + SOUP_MEMORY_TAKE, + response, length); + } +out: + return; +} + +static void +handle_list_streams_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_GET) { + goto GET; + } else if (g_strcmp0 (path, STP_REST_LIST_STREAMS) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_POST || + msg->method == SOUP_METHOD_DELETE) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +GET: { + gsize length; + const char *tmp; + JsonBuilder *b; + JsonNode *dict; + JsonGenerator *gen; + GHashTableIter i; + gpointer key, value; + STPServerCtx *ctx; + char *sessionid, *response; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + g_hash_table_iter_init (&i, tables->ctxs); + b = json_builder_new (); + json_builder_begin_array (b); + while (g_hash_table_iter_next (&i, &key, &value)) { + ctx = value; + sessionid = key; + json_builder_begin_object (b); + + json_builder_set_member_name (b, "type"); + tmp = stp_get_stream_type_string (ctx->stream_type); + json_builder_add_string_value (b, tmp); + + json_builder_set_member_name (b, "sessionid"); + json_builder_add_string_value (b, sessionid); + + json_builder_set_member_name (b, "udp-clients"); + tmp = ctx->udp_clients; + json_builder_add_string_value (b, tmp? tmp : ""); + + json_builder_set_member_name (b, "status"); + json_builder_add_string_value (b, ctx->status); + + json_builder_end_object (b); + } + json_builder_end_array (b); + dict = json_builder_get_root (b); + g_object_unref (b); + + gen = json_generator_new (); + json_generator_set_root (gen, dict); + json_node_free (dict); + + json_generator_set_pretty (gen, TRUE); + response = json_generator_to_data (gen, &length); + g_object_unref (gen); + + soup_message_set_status (msg, SOUP_STATUS_OK); + soup_message_body_append (msg->response_body, + SOUP_MEMORY_TAKE, + response, length); + } +out: + return; +} + +static void +handle_abort_stream_cb (SoupServer *server, + SoupMessage *msg, + const char *path, + GHashTable *query, + SoupClientContext *client, + STPHashTables *tables) +{ + if (msg->method == SOUP_METHOD_DELETE) { + goto DELETE; + } else if (g_strcmp0 (path, STP_REST_ABORT_STREAM) != 0) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } else if (msg->method == SOUP_METHOD_PUT || + msg->method == SOUP_METHOD_GET || + msg->method == SOUP_METHOD_POST) { + soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED); + goto out; + } else { + soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); + goto out; + } + +DELETE: { + const char *sessionid; + STPServerCtx *ctx; + + if (!stp_validate_token_server (stp_inet_addrmask, client)) { + soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN); + goto out; + } + + sessionid = g_hash_table_lookup (query, "sessionid"); + if (!sessionid) { + soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST); + goto out; + } + + ctx = g_hash_table_lookup (tables->ctxs, sessionid); + if (!ctx) { + soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); + goto out; + } + + stp_abort_server_ctx (ctx); + soup_message_set_status (msg, SOUP_STATUS_OK); + } +out: + return; } gboolean @@ -687,7 +1143,7 @@ int main (int argc, char *argv[]) { - GHashTable *ctx_table; + STPHashTables *tables; GOptionContext *optctx; GError *error = NULL; @@ -702,20 +1158,64 @@ main (int argc, } g_option_context_free (optctx); - /* Keys are paths, and values are TranscodeServerCtxs */ - ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, - (GDestroyNotify)g_free, - (GDestroyNotify)stp_cleanup_transcode_server_ctx); + if (token_server_addrmask) { + stp_inet_addrmask = g_inet_address_mask_new_from_string (token_server_addrmask, + &error); + if (!stp_inet_addrmask) { + g_printerr ("Error parsing token server address mask: %s\n", error->message); + return 1; + } + } + + tables = g_new0 (STPHashTables, 1); + /* Keys are paths, and values are STPServerCtxs */ + tables->ctxs = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_server_ctx_cleanup); + /* Keys are "session-id:host:port", and there are no values. + * If host or port are missing, they will be blank. */ + tables->tokens = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, + (GDestroyNotify)stp_stream_token_free); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); + + /* Handle the end of POST/PUT, and all GET requests */ soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, - ctx_table, NULL); + tables, NULL); + + /* NOTE: When adding a new path handler, an exception must be made in + * got_request_headers() for that path, otherwise a 405 will be returned */ + soup_server_add_handler (server, STP_REST_ABORT_STREAM, + (SoupServerCallback)handle_abort_stream_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_LIST_STREAMS, + (SoupServerCallback)handle_list_streams_cb, + tables, NULL); + + /* REST API for token handling, if requested */ + if (token_server_addrmask) { + soup_server_add_handler (server, STP_REST_ADD_TOKEN, + (SoupServerCallback)handle_add_token_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_REVOKE_TOKEN, + (SoupServerCallback)handle_revoke_token_cb, + tables, NULL); + soup_server_add_handler (server, STP_REST_LIST_TOKENS, + (SoupServerCallback)handle_list_tokens_cb, + tables, NULL); + } + + /* Since chunked streaming requests send all the data in a single request, + * we need to handle those when we get the headers. Hence, all PUT stream + * handling is done here. */ g_signal_connect (server, "request-started", - G_CALLBACK (request_started_cb), ctx_table); + G_CALLBACK (request_started_cb), tables); + g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server); soup_server_run (server); -- cgit v0.11.2-2-gd1dd