summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-24 17:56:02 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-24 20:55:42 (GMT)
commite0d453a3a2c120264b13c44f004025b0330547a8 (patch)
tree4517a7e8f3100f1f75b4cecd68486b42d4f7f0df
parentfae0aa58c19a1918e7b7750b4862e2abcbc55788 (diff)
downloadsoup-transcoding-proxy-e0d453a3a2c120264b13c44f004025b0330547a8.zip
soup-transcoding-proxy-e0d453a3a2c120264b13c44f004025b0330547a8.tar.gz
Implement concurrent RTP-UDP streams, and a REST API for status and auth
When the --token-server=ADDR/MASK argument is passed to the server, the token verification framework is enabled, and the specified subnet is allowed to access the REST API to add/revoke/list tokens that allow clients to connect, and to list/abort streams running on the server. Details about the REST API are documented in the file "REST-API". There were also some organisational and name changes in the code.
-rw-r--r--Makefile2
-rw-r--r--README71
-rw-r--r--REST-API49
-rw-r--r--src/debug/local-play.c12
-rw-r--r--src/debug/local-play.h2
-rw-r--r--src/encode.c32
-rw-r--r--src/encode.h2
-rw-r--r--src/lib.c85
-rw-r--r--src/lib.h75
-rw-r--r--src/main.c670
10 files changed, 828 insertions, 172 deletions
diff --git a/Makefile b/Makefile
index daf90c8..3af6ebc 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,7 @@ DEBUG_CFLAGS := -O0 -ggdb -Wall -fsanitize=address -fno-omit-frame-pointer -DENC
CFLAGS := -O0 -ggdb -Wall -fno-omit-frame-pointer -DENCODE_DEBUG
#CFLAGS := -O3 -march=native -Wall
PKGCONFIG := pkg-config
-LIBS := $(shell $(PKGCONFIG) --libs --cflags glib-2.0 gio-unix-2.0 libsoup-2.4 gstreamer-pbutils-1.0 gstreamer-video-1.0) -lrt
+LIBS := $(shell $(PKGCONFIG) --libs --cflags glib-2.0 gio-unix-2.0 libsoup-2.4 gstreamer-pbutils-1.0 gstreamer-video-1.0 json-glib-1.0) -lrt
SERVER_NAME := stp-server
SRC_OBJS := $(addprefix src/,debug/local-play.o lib.o encode.o)
diff --git a/README b/README
index 50c490a..6772060 100644
--- a/README
+++ b/README
@@ -7,54 +7,77 @@ Runs on port 8000 by default. Further options can be seen with the --help flag:
$ ./stp-server --help
-The server supports multiple PUT streams and multiple GET streams for each PUT
-stream.
+The server supports multiple HTTP PUT streams and multiple RTP/UDP and HTTP GET
+streams for each PUT stream.
Usage with curl:
----------------
-Sending a stream (PUT):
+> Sending a stream (PUT) for only HTTP GET streaming:
- $ curl -v "http://localhost:8000/somepath" -T - < [some video file]
+ $ curl -v "http://localhost:8000/[sessionid]?type=http" -T - < [video file]
This will use Chunked encoding and send the file in chunks.
Optionally, you can rate limit to emulate a live stream (--limit-rate)
Trying to create two streams on the same path will return a 409.
-Reading the webm output stream (GET):
+> Reading the webm output stream (GET):
- $ curl -v "http://localhost:8000/somepath" > [some output file]
+ $ curl -v "http://localhost:8000/[sessionid]" > [output file]
The server supports multiple PUT requests on different paths, and multiple GET
requests from these paths.
-A running/encoding stream can be aborted by sending a GET request with the query
-"abort" to a path where a stream is running.
+> Sending a stream (PUT) for only RTP-UDP streaming to 127.0.0.1:5004:
+
+ $ curl -v "http://localhost:8000/[sessionid]?type=rtp-udp&udp-clients=127.0.0.1:5004" -T - < [video file]
+
+> Sending a stream (PUT) for both RTP-UDP and HTTP GET streaming:
+
+ $ curl -v "http://localhost:8000/[sessionid]?type=rtp-udp,http&udp-clients=127.0.0.1:5004" -T - < [video file]
+
+> curl handles only HTTP, so there is no way to read the RTP-UDP stream using it
- $ curl -v "http://localhost:8000/somepath?abort"
Usage with souphttp:
--------------------
-Sending a stream from a file (PUT):
+> Sending a stream from a file (PUT) for only HTTP GET streaming:
$ gst-launch-1.0 filesrc location=[video file] ! \
- souphttpclientsink location="http://localhost:8000/somepath"
+ souphttpclientsink location="http://localhost:8000/[sessionid]?type=http"
This will use a persistent HTTP connection and Content-Length + Content-Range
headers to send the stream data.
-Reading a webm output stream (GET):
+> Sending a stream from a file (PUT) for only RTP-UDP streaming:
+
+ $ gst-launch-1.0 filesrc location=[video file] ! \
+ souphttpclientsink location="http://localhost:8000/[sessionid]?type=rtp-udp&udp-clients=localhost:5004"
+
+> Reading a webm output stream (GET):
- $ gst-launch-1.0 souphttpsrc location="http://localhost:8000/somepath" ! \
+ $ gst-launch-1.0 souphttpsrc location="http://localhost:8000/[sessionid]" ! \
filesink location=[some output file]
-Known bugs:
------------
-* Doing a massive PUT in a single go (say, 300-400MB or more) of a WebM
- file causes the server to become overwhelmed. This problem doesn't happen for
- non-WebM file dumps. Note however, that this is not a problem in the usual
- mode of operation using live streams.
-* Opening multiple GET streams from the same PUT stream works, but clients other
- than the first one might timeout due to a bug that is being investigated.
-* There are no queue size limit handling for PUT streams, and hence the server
- can take a lot of memory if the PUT stream is sending data faster than it can
- be encoded. In most circumstances, this is actually a feature. :-)
+> Reading an RTP-UDP output stream:
+
+ $ gst-launch-1.0 udpsrc host=127.0.0.1 port=5004 caps="application/x-rtp" ! \
+ rtpvp8depay ! vp8dec ! videoconvert ! autovideosink
+
+> Sending a stream from a file (PUT) for both RTP-UDP and HTTP GET streaming:
+
+ $ gst-launch-1.0 filesrc location=[video file] ! \
+ souphttpclientsink location="http://localhost:8000/[sessionid]?type=rtp-udp,http&udp-clients=localhost:5004"
+
+"udp-clients" is a comma-separated list of UDP host:ports to broadcast the RTP
+stream to.
+
+Token Validation:
+-----------------
+When the --token-server option is specified, only valid sessionids
+("[sessionid]" in the above examples) are allowed to PUT or GET streams. In
+addition, for RTP-UDP streams, the host and port must also be valid. These are
+checked as soon as a client connects, so these tokens must be added to the
+server's internal list before a client can connect to the server. See the
+REST-API file for details about this.
+
+If --token-server is not specified, token validation is disabled.
diff --git a/REST-API b/REST-API
new file mode 100644
index 0000000..ec1c2ca
--- /dev/null
+++ b/REST-API
@@ -0,0 +1,49 @@
+The server now implements two REST APIs:
+
+1) A token-based validation API for clients that want to stream
+2) A simple API to list or abort streams running on the server
+
+
+Token-based Validation:
+=======================
+
+The server maintains a hash table of (sessionid, udp-host, udp-port) tuples
+against which incoming (HTTP PUT/POST) and outgoing (HTTP GET) streams are
+validated. The following API is available to manipulate this table:
+
+Adding Tokens:
+--------------
+
+> Adding a token for an HTTP stream
+ POST http://[host]:8000/add-token?type=http&sessionid=[sessionid]
+
+> Adding a token for an RTP-UDP stream on host:port
+ POST http://[host]:8000/add-token?type=rtp-udp&sessionid=[sessionid]&udp-clients=[host]:[port]
+
+> Adding a token for an HTTP stream and RTP-UDP streams on host1:port1,host2:port2
+ POST http://[host]:8000/add-token?type=http,rtp-udp&sessionid=[sessionid]&udp-clients=[host1]:[port1],[host2]:[port2]
+
+Revoking Tokens:
+----------------
+
+> Revoking a token
+ DELETE http://[host]:8000/revoke-token?sessionid=[sessionid]
+
+Listing Tokens:
+----------------
+
+> Listing all tokens
+ GET http://[host]:8000/list-tokens
+
+This will return a JSON array of dicts with the "sessionid", "host", and "port"
+for each token.
+
+
+Listing and Aborting Running Streams:
+=====================================
+
+> Listing all streams
+ GET http://[host]:8000/list-streams
+
+> Aborting a running stream
+ DELETE http://[host]:8000/abort-stream?sessionid=[sessionid]
diff --git a/src/debug/local-play.c b/src/debug/local-play.c
index 8dbb90d..4302c2f 100644
--- a/src/debug/local-play.c
+++ b/src/debug/local-play.c
@@ -23,8 +23,8 @@
#include "local-play.h"
static void
-pad_has_video_caps (TranscodeServerCtx *ctx,
- GstPad *decodebin_pad)
+pad_has_video_caps (STPServerCtx *ctx,
+ GstPad *decodebin_pad)
{
GstPad *sink_pad;
GstElement *bin;
@@ -60,8 +60,8 @@ out:
}
static void
-pad_has_audio_caps (TranscodeServerCtx *ctx,
- GstPad *decodebin_pad)
+pad_has_audio_caps (STPServerCtx *ctx,
+ GstPad *decodebin_pad)
{
GstPad *sink_pad;
GstElement *bin;
@@ -116,7 +116,7 @@ decodebin_pad_added (GstElement *decodebin,
{
GstCaps *pad_caps;
GstStructure *structure;
- TranscodeServerCtx *ctx = user_data;
+ STPServerCtx *ctx = user_data;
if (!gst_pad_has_current_caps (src_pad)) {
g_critical ("Decodebin pad doesn't have current caps");
@@ -140,7 +140,7 @@ decodebin_pad_added (GstElement *decodebin,
}
void
-stp_play_from_msg (TranscodeServerCtx *ctx)
+stp_play_from_msg (STPServerCtx *ctx)
{
GstBus *bus;
GstElement *src, *decodebin;
diff --git a/src/debug/local-play.h b/src/debug/local-play.h
index 131a2af..eb8c283 100644
--- a/src/debug/local-play.h
+++ b/src/debug/local-play.h
@@ -25,6 +25,6 @@
#include "../lib.h"
-void stp_play_from_msg (TranscodeServerCtx *ctx);
+void stp_play_from_msg (STPServerCtx *ctx);
#endif /* _SST_LOCAL_PLAY */
diff --git a/src/encode.c b/src/encode.c
index 2de4a9a..a18fe5e 100644
--- a/src/encode.c
+++ b/src/encode.c
@@ -117,9 +117,9 @@ on_pad_probe_buffer (GstPad *pad,
}
static void
-on_demuxer_pad_added (GstElement *demuxer,
- GstPad *srcpad,
- GstElement *pipeline)
+on_demuxer_pad_added (GstElement *demuxer,
+ GstPad *srcpad,
+ STPServerCtx *ctx)
{
char *name;
GstCaps *caps;
@@ -135,8 +135,9 @@ on_demuxer_pad_added (GstElement *demuxer,
/* Broadcast VP8 RTP over UDP://localhost:5004 */
rtppay = gst_element_factory_make ("rtpvp8pay", "rtpvp8pay");
- udpsink = gst_element_factory_make ("udpsink", "udpsink");
- gst_bin_add_many (GST_BIN (pipeline), rtppay, udpsink, NULL);
+ udpsink = gst_element_factory_make ("multiudpsink", "multiudpsink");
+ g_object_set (udpsink, "clients", ctx->udp_clients, NULL);
+ gst_bin_add_many (GST_BIN (ctx->pipeline), rtppay, udpsink, NULL);
gst_element_link (rtppay, udpsink);
if (!gst_element_sync_state_with_parent (rtppay) ||
@@ -154,8 +155,8 @@ out:
}
static void
-do_udp_rtp_broadcast (GstElement *decodebin,
- GstElement *pipeline)
+do_udp_rtp_broadcast (GstElement *decodebin,
+ STPServerCtx *ctx)
{
GstPadTemplate *template;
GstPad *srcpad, *sinkpad;
@@ -168,10 +169,10 @@ do_udp_rtp_broadcast (GstElement *decodebin,
g_object_set (demuxer, "location", "filesink.webm", NULL);*/
demuxer = gst_element_factory_make ("matroskademux", "matroskademux");
- gst_bin_add_many (GST_BIN (pipeline), q, demuxer, NULL);
+ gst_bin_add_many (GST_BIN (ctx->pipeline), q, demuxer, NULL);
gst_element_link (q, demuxer);
- tee = gst_bin_get_by_name (GST_BIN (pipeline), "tee");
+ tee = gst_bin_get_by_name (GST_BIN (ctx->pipeline), "tee");
template = gst_pad_template_new ("teesrc", GST_PAD_SRC,
GST_PAD_REQUEST, GST_CAPS_ANY);
srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY);
@@ -190,7 +191,7 @@ do_udp_rtp_broadcast (GstElement *decodebin,
(GstPadProbeCallback) on_pad_probe_buffer, NULL, NULL);
g_signal_connect (demuxer, "pad-added",
- G_CALLBACK (on_demuxer_pad_added), pipeline);
+ G_CALLBACK (on_demuxer_pad_added), ctx);
out:
gst_object_unref (template);
@@ -239,7 +240,7 @@ create_webm_profile (void)
}
void
-stp_encode_from_msg (TranscodeServerCtx *ctx)
+stp_encode_from_msg (STPServerCtx *ctx)
{
#ifdef ENCODE_DEBUG
char *tmp, *filename;
@@ -288,7 +289,7 @@ stp_encode_from_msg (TranscodeServerCtx *ctx)
q2 = gst_element_factory_make ("queue", "q2");
filesink = gst_element_factory_make ("filesink", "filesink");
- tmp = g_uri_escape_string (ctx->path, NULL, TRUE);
+ tmp = g_uri_escape_string (ctx->sessionid, NULL, TRUE);
filename = g_strdup_printf ("debug-encode-%s.webm", tmp);
g_object_set (filesink, "location", filename, NULL);
@@ -322,9 +323,10 @@ stp_encode_from_msg (TranscodeServerCtx *ctx)
* corresponding sink pad on decodebin */
g_signal_connect (decodebin, "pad-added",
G_CALLBACK (on_decodebin_pad_added), encodebin);
- /* When finished adding pads, try streaming it all over RTP/UDP */
- g_signal_connect (decodebin, "no-more-pads",
- G_CALLBACK (do_udp_rtp_broadcast), ctx->pipeline);
+ if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP)
+ /* If requested to, when finished adding pads, stream things over RTP/UDP */
+ g_signal_connect (decodebin, "no-more-pads",
+ G_CALLBACK (do_udp_rtp_broadcast), ctx);
bus = gst_pipeline_get_bus (GST_PIPELINE (ctx->pipeline));
gst_bus_add_signal_watch (bus);
diff --git a/src/encode.h b/src/encode.h
index 15ac859..3a68909 100644
--- a/src/encode.h
+++ b/src/encode.h
@@ -25,6 +25,6 @@
#include "lib.h"
-void stp_encode_from_msg (TranscodeServerCtx *ctx);
+void stp_encode_from_msg (STPServerCtx *ctx);
#endif /* _SST_ENCODE */
diff --git a/src/lib.c b/src/lib.c
index 838f9e2..cc36292 100644
--- a/src/lib.c
+++ b/src/lib.c
@@ -22,26 +22,76 @@
#include "lib.h"
-static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx);
+static void stp_disconnect_cleanup_client (STPClientCtx *ctx);
gboolean
-invoke_g_hash_table_remove (TranscodeServerCtx *ctx)
+invoke_g_hash_table_remove (STPServerCtx *ctx)
{
- g_hash_table_remove (ctx->parent_ctx_table, ctx->path);
+ g_hash_table_remove (ctx->parent_ctx_table, ctx->sessionid);
return G_SOURCE_REMOVE;
}
gboolean
-invoke_g_free_client_context (TranscodeClientCtx *ctx)
+invoke_g_free_client_context (STPClientCtx *ctx)
{
g_free (ctx);
return G_SOURCE_REMOVE;
}
+guint
+stp_get_stream_type_from_string (const char *type)
+{
+ if (type == NULL)
+ return STP_STREAM_TYPE_ALL;
+
+ if (soup_str_case_equal (type, "http"))
+ return STP_STREAM_TYPE_HTTP;
+
+ if (soup_str_case_equal (type, "rtp-udp"))
+ return STP_STREAM_TYPE_RTP_UDP;
+
+ if (soup_str_case_equal (type, "rtp-udp,http") ||
+ soup_str_case_equal (type, "http,rtp-udp"))
+ return STP_STREAM_TYPE_ALL;
+
+ return STP_STREAM_TYPE_INVALID;
+}
+
+const char*
+stp_get_stream_type_string (guint stream_type)
+{
+ switch (stream_type) {
+ case STP_STREAM_TYPE_HTTP:
+ return "http";
+ case STP_STREAM_TYPE_RTP_UDP:
+ return "rtp-udp";
+ case STP_STREAM_TYPE_ALL:
+ return "http,rtp-udp";
+ default:
+ return "invalid";
+ }
+}
+
+gboolean
+stp_validate_token_server (GInetAddressMask *mask,
+ SoupClientContext *client)
+{
+ gboolean ret;
+ GInetAddress *addr;
+
+ if (!mask)
+ return TRUE;
+
+ addr = g_inet_address_new_from_string (soup_client_context_get_host (client));
+ ret = g_inet_address_mask_matches (mask, addr);
+ g_object_unref (addr);
+ return ret;
+}
+
gboolean
-stp_on_gst_bus_message (GstBus *bus,
- GstMessage *msg,
- TranscodeServerCtx *ctx)
+stp_on_gst_bus_message (GstBus *bus,
+ GstMessage *msg,
+ STPServerCtx *ctx)
{
GError *error = NULL;
char *tmp = NULL;
@@ -85,10 +135,13 @@ stp_on_gst_bus_message (GstBus *bus,
* which initiates a shutdown for all clients and then
* the server itself */
void
-stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx)
+stp_server_ctx_cleanup (STPServerCtx *ctx)
{
g_debug (">>> Doing server cleanup");
+ g_source_remove (ctx->timeout_handler_id);
+ ctx->timeout_handler_id = 0;
+
/* Disconnect and free the context for each client */
g_list_foreach (ctx->clients, (GFunc)stp_disconnect_cleanup_client, NULL);
g_list_free (ctx->clients);
@@ -99,13 +152,14 @@ stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx)
gst_object_unref (ctx->pipeline);
}
+ g_free (ctx->udp_clients);
g_free (ctx);
}
static GstPadProbeReturn
pad_blocked_cleanup_cb (GstPad *srcpad,
GstPadProbeInfo *info,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
GstElement *tee;
GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
@@ -133,10 +187,10 @@ pad_blocked_cleanup_cb (GstPad *srcpad,
* branch for the client from the pipeline, and then freeing the
* context. */
void
-stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx)
+stp_client_ctx_cleanup (STPClientCtx *ctx)
{
GstPad *srcpad;
- TranscodeServerCtx *server_ctx = ctx->server_ctx;
+ STPServerCtx *server_ctx = ctx->server_ctx;
GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink));
stp_print_status (">>> Doing client cleanup.");
@@ -157,11 +211,18 @@ stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx)
stp_print_status (".");
}
+void
+stp_stream_token_free (STPStreamToken *token)
+{
+ g_free (token->udp_clients);
+ g_free (token);
+}
+
/* When shutting down a client due to a server shutdown,
* we just need to remove the client timeout handler and
* free the context. The rest is owned by the pipeline. */
static void
-stp_disconnect_cleanup_client (TranscodeClientCtx *ctx)
+stp_disconnect_cleanup_client (STPClientCtx *ctx)
{
stp_print_status (">>> Disconnecting client on server shutdown\n");
diff --git a/src/lib.h b/src/lib.h
index 4813321..f97b0de 100644
--- a/src/lib.h
+++ b/src/lib.h
@@ -27,39 +27,50 @@
#include <gst/gst.h>
#include <libsoup/soup.h>
-typedef struct _TranscodeServerCtx TranscodeServerCtx;
-typedef struct _TranscodeClientCtx TranscodeClientCtx;
-
-enum stp_stream_status {
- STP_STATUS_NONE,
- STP_STATUS_STREAMING,
- STP_STATUS_FLUSHING,
- STP_STATUS_FINISHED,
-};
+typedef struct _STPServerCtx STPServerCtx;
+typedef struct _STPClientCtx STPClientCtx;
+typedef struct _STPStreamToken STPStreamToken;
+
+#define STP_STREAM_TYPE_INVALID (1 << 0)
+#define STP_STREAM_TYPE_HTTP (1 << 1)
+#define STP_STREAM_TYPE_RTP_UDP (1 << 2)
+#define STP_STREAM_TYPE_RTP_TCP (1 << 3) /* Not supported yet */
+#define STP_STREAM_TYPE_ALL STP_STREAM_TYPE_HTTP | STP_STREAM_TYPE_RTP_UDP
-struct _TranscodeServerCtx {
+#define STP_STATUS_NONE g_intern_static_string ("none")
+#define STP_STATUS_STREAMING g_intern_static_string ("streaming")
+#define STP_STATUS_FLUSHING g_intern_static_string ("flushing")
+#define STP_STATUS_FINISHED g_intern_static_string ("finished")
+
+struct _STPServerCtx {
SoupMessage *msg;
GstElement *pipeline;
GstElement *appsrc;
- /* If the encoding is not chunked, we'll get multiple requests
- * with separate Content-Length headers on the same path */
- SoupEncoding encoding;
- /* Set to TRUE when the incoming stream ends; let's us know if
+ /* Also used as the key in the parent hash table */
+ char *sessionid;
+ /* Comman-separated list of host:port pairs */
+ char *udp_clients;
+ /* RTP or HTTP or both */
+ guint stream_type;
+ /* Keeps track of the stream running status, let's us know if
* we need to set the PUT response on EOS/ERROR on the pipeline,
* and whether to reject further data when using a persistent
* Content-Length + Content-Range PUT stream. */
- enum stp_stream_status status;
+ const char *status;
+
+ /* If the encoding is not chunked, we'll get multiple requests
+ * with separate Content-Length headers on the same path */
+ SoupEncoding encoding;
+ guint timeout_handler_id;
guint seconds_since_read;
/* List of client contexts */
GList *clients;
/* Reference to the parent context hash table */
GHashTable *parent_ctx_table;
- /* Reference to the key in the parent hash table */
- char *path;
};
-struct _TranscodeClientCtx {
+struct _STPClientCtx {
/* We don't hold refs to any of these */
SoupMessage *msg;
GstElement *appsink;
@@ -70,7 +81,13 @@ struct _TranscodeClientCtx {
gulong finished_handler_id;
guint seconds_since_write;
/* The transcode server context; we don't hold a ref to this */
- TranscodeServerCtx *server_ctx;
+ STPServerCtx *server_ctx;
+};
+
+struct _STPStreamToken {
+ guint stream_type;
+ /* Comma-separated host:port list */
+ char *udp_clients;
};
#ifdef ENCODE_DEBUG
@@ -79,18 +96,22 @@ struct _TranscodeClientCtx {
#define stp_print_status(...) do {} while (0)
#endif
-gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx);
-gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx);
+gboolean invoke_g_hash_table_remove (STPServerCtx *ctx);
+gboolean invoke_g_free_client_context (STPClientCtx *ctx);
+
+const char* stp_get_stream_type_string (guint stream_type);
+guint stp_get_stream_type_from_string (const char *type);
+
+gboolean stp_validate_token_server (GInetAddressMask *mask,
+ SoupClientContext *client);
-void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx);
-void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx);
+void stp_server_ctx_cleanup (STPServerCtx *ctx);
+void stp_client_ctx_cleanup (STPClientCtx *ctx);
+void stp_stream_token_free (STPStreamToken *token);
gboolean stp_on_gst_bus_message (GstBus *bus,
GstMessage *msg,
- TranscodeServerCtx *ctx);
-gboolean stp_unref_gst_buffer (GstBuffer **buffer,
- guint idx,
- gpointer user_data);
+ STPServerCtx *ctx);
GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps);
GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk);
diff --git a/src/main.c b/src/main.c
index 63c3a13..926dc86 100644
--- a/src/main.c
+++ b/src/main.c
@@ -22,6 +22,7 @@
#include <glib-unix.h>
#include <gst/video/video.h>
+#include <json-glib/json-glib.h>
#include "lib.h"
#include "encode.h"
@@ -30,31 +31,47 @@
#include "debug/local-play.h"
#endif
+#define STP_REST_LIST_STREAMS "/list-streams"
+#define STP_REST_ABORT_STREAM "/abort-stream"
+#define STP_REST_ADD_TOKEN "/add-token"
+#define STP_REST_REVOKE_TOKEN "/revoke-token"
+#define STP_REST_LIST_TOKENS "/list-tokens"
+
+typedef struct _STPHashTables STPHashTables;
+
+struct _STPHashTables {
+ GHashTable *ctxs;
+ GHashTable *tokens;
+};
+
static int port = 8000;
static int client_timeout = 10;
static int server_timeout = 5;
+static char *token_server_addrmask = NULL;
+static GInetAddressMask *stp_inet_addrmask = NULL;
static GOptionEntry entries[] =
{
{ "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" },
{ "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" },
+ { "token-server", 's', 0, G_OPTION_ARG_STRING, &token_server_addrmask, "Subnet (or IP addr) for the token server (default: disabled)", "ADDR/PREFIX" },
{ NULL }
};
static SoupServer *server;
-static void stream_finished_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx);
+static void stream_finished_cb (SoupMessage *msg,
+ STPServerCtx *ctx);
-static TranscodeServerCtx*
+static STPServerCtx*
get_server_ctx_from_msg (SoupMessage *msg,
GHashTable *ctx_table)
{
SoupURI *uri;
- TranscodeServerCtx *ctx;
+ STPServerCtx *ctx;
g_object_get (msg, "uri", &uri, NULL);
- ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri));
+ ctx = g_hash_table_lookup (ctx_table, &soup_uri_get_path (uri)[1]);
soup_uri_free (uri);
if (!ctx)
@@ -63,8 +80,67 @@ get_server_ctx_from_msg (SoupMessage *msg,
return ctx;
}
+/* Validates the query, parses it, and fetches a STPStreamToken from it */
+static STPStreamToken*
+stp_validate_fetch_token_from_query (GHashTable *tokens,
+ const char *sessionid,
+ const char *query_string,
+ guint *http_status_code)
+{
+ GHashTable *query;
+ char *type, *udp_clients;
+ STPStreamToken *token, *valid_token;
+
+ if (!query_string) {
+ *http_status_code = SOUP_STATUS_BAD_REQUEST;
+ return NULL;
+ }
+
+ g_debug ("Recv query string: %s\n", query_string);
+
+ if (stp_inet_addrmask &&
+ !g_hash_table_contains (tokens, sessionid)) {
+ *http_status_code = SOUP_STATUS_FORBIDDEN;
+ return NULL;
+ }
+
+ query = soup_form_decode (query_string);
+
+ token = g_new0 (STPStreamToken, 1);
+
+ type = g_hash_table_lookup (query, "type");
+ token->stream_type = stp_get_stream_type_from_string (type);
+
+ valid_token = g_hash_table_lookup (tokens, sessionid);
+ if (stp_inet_addrmask &&
+ valid_token->stream_type != token->stream_type) {
+ *http_status_code = SOUP_STATUS_FORBIDDEN;
+ g_free (token);
+ token = NULL;
+ goto out;
+ }
+
+ if (!(token->stream_type & STP_STREAM_TYPE_RTP_UDP))
+ goto out;
+
+ /* Host and port are only needed for RTP streaming */
+ udp_clients = g_hash_table_lookup (query, "udp-clients");
+ if (!udp_clients) {
+ *http_status_code = SOUP_STATUS_BAD_REQUEST;
+ g_free (token);
+ token = NULL;
+ goto out;
+ }
+
+ token->udp_clients = g_strdup (udp_clients);
+
+out:
+ g_hash_table_destroy (query);
+ return token;
+}
+
static void
-stp_abort_server_ctx (TranscodeServerCtx *ctx)
+stp_abort_server_ctx (STPServerCtx *ctx)
{
/* Abort all processing for this stream */
if (ctx->status == STP_STATUS_STREAMING &&
@@ -73,6 +149,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx)
ctx->status = STP_STATUS_FINISHED;
soup_message_set_status (ctx->msg, SOUP_STATUS_GONE);
}
+ stream_finished_cb (ctx->msg, ctx);
/* Disconnect all clients, and shut down the stream */
invoke_g_hash_table_remove (ctx);
}
@@ -80,7 +157,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx)
/* If it's been more than 10 seconds since the last time we got
* a chunk for a PUT request, we timeout and drop the connection */
static gboolean
-increment_read_timer (TranscodeServerCtx *ctx)
+increment_read_timer (STPServerCtx *ctx)
{
ctx->seconds_since_read += 2;
if (ctx->seconds_since_read < server_timeout)
@@ -94,7 +171,7 @@ increment_read_timer (TranscodeServerCtx *ctx)
/* If it's been more than 10 seconds since the last time we wrote
* a chunk for a GET response, we timeout and drop the connection */
static gboolean
-increment_write_timer (TranscodeClientCtx *ctx)
+increment_write_timer (STPClientCtx *ctx)
{
ctx->seconds_since_write += 2;
if (ctx->seconds_since_write < client_timeout)
@@ -105,13 +182,13 @@ increment_write_timer (TranscodeClientCtx *ctx)
* connections, and clients are just left hanging */
soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT);
soup_message_body_complete (ctx->msg->response_body);
- stp_cleanup_transcode_client_ctx (ctx);
+ stp_client_ctx_cleanup (ctx);
return G_SOURCE_REMOVE;
}
static void
client_eos_cb (GstElement *appsink,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
g_debug ("Received EOS for client");
soup_message_set_status (ctx->msg, SOUP_STATUS_OK);
@@ -120,7 +197,7 @@ client_eos_cb (GstElement *appsink,
static void
can_write_next_client_chunk_cb (SoupMessage *msg,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
ctx->seconds_since_write = 0;
ctx->msg = msg;
@@ -128,7 +205,7 @@ can_write_next_client_chunk_cb (SoupMessage *msg,
}
static gboolean
-invoke_write_client_chunk (TranscodeClientCtx *ctx)
+invoke_write_client_chunk (STPClientCtx *ctx)
{
GstMapInfo info;
GstSample *sample;
@@ -180,7 +257,7 @@ invoke_write_client_chunk (TranscodeClientCtx *ctx)
static GstFlowReturn
write_client_chunk_cb (GstElement *appsink,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT,
(GSourceFunc)invoke_write_client_chunk,
@@ -190,15 +267,15 @@ write_client_chunk_cb (GstElement *appsink,
static void
client_finished_cb (SoupMessage *msg,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
stp_print_status ("Client finished/aborted, doing cleanup...\n");
- stp_cleanup_transcode_client_ctx (ctx);
+ stp_client_ctx_cleanup (ctx);
}
static void
-stream_finished_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx)
+stream_finished_cb (SoupMessage *msg,
+ STPServerCtx *ctx)
{
gboolean ret;
@@ -226,9 +303,9 @@ handle_request_cb (SoupServer *server,
const char *path,
GHashTable *query,
SoupClientContext *client,
- GHashTable *ctx_table)
+ STPHashTables *tables)
{
- TranscodeServerCtx *server_ctx;
+ STPServerCtx *server_ctx;
if (!msg)
return;
@@ -240,7 +317,10 @@ handle_request_cb (SoupServer *server,
goto PUT;
else if (msg->method == SOUP_METHOD_GET)
goto GET;
- else {
+ else if (msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ return;
+ } else {
soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
return;
}
@@ -249,10 +329,14 @@ PUT:
{
/* The PUT request has finished streaming. Cleanup
* will be done on EOS/ERROR in the gst pipeline. */
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
- server_ctx->status = STP_STATUS_FLUSHING;
- soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
+ server_ctx = get_server_ctx_from_msg (msg, tables->ctxs);
+ if (!server_ctx) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ } else {
+ if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
+ server_ctx->status = STP_STATUS_FLUSHING;
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
return;
}
@@ -268,9 +352,9 @@ GET:
GstElement *encodebin, *bin, *tee, *q2, *appsink;
GstPadTemplate *template;
GstPad *srcpad, *sinkpadq2;
- TranscodeClientCtx *client_ctx;
+ STPClientCtx *client_ctx;
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ server_ctx = get_server_ctx_from_msg (msg, tables->ctxs);
if (server_ctx == NULL ||
server_ctx->pipeline == NULL) {
/* There's no request streaming on this path (yet) */
@@ -296,10 +380,10 @@ GET:
g_assert_not_reached ();
}
- g_print ("New GET request on %s\n", path);
+ g_debug ("New GET request on %s\n", path);
/* Connect appsink to tee, and start streaming */
- client_ctx = g_new0 (TranscodeClientCtx, 1);
+ client_ctx = g_new0 (STPClientCtx, 1);
client_ctx->msg = msg;
client_ctx->server_ctx = server_ctx;
@@ -410,15 +494,15 @@ out:
err:
soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
soup_message_body_complete (client_ctx->msg->response_body);
- stp_cleanup_transcode_client_ctx (client_ctx);
+ stp_client_ctx_cleanup (client_ctx);
goto out;
}
}
static void
-got_request_body_chunk (SoupMessage *msg,
- SoupBuffer *chunk,
- TranscodeServerCtx *ctx)
+got_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ STPServerCtx *ctx)
{
GstBuffer *buffer;
GstFlowReturn ret;
@@ -448,18 +532,17 @@ got_request_body_chunk (SoupMessage *msg,
}
static void
-request_ended_no_body_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx)
+request_ended_no_body_cb (SoupMessage *msg,
+ STPServerCtx *ctx)
{
g_critical ("Request ended without a body!");
invoke_g_hash_table_remove (ctx);
- g_free (ctx);
}
static void
-got_first_request_body_chunk (SoupMessage *msg,
- SoupBuffer *chunk,
- TranscodeServerCtx *ctx)
+got_first_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ STPServerCtx *ctx)
{
int num;
@@ -512,10 +595,11 @@ got_first_request_body_chunk (SoupMessage *msg,
}
static void
-got_request_headers (SoupMessage *msg,
- GHashTable *ctx_table)
+got_request_headers (SoupMessage *msg,
+ STPHashTables *tables)
{
SoupURI *uri;
+ const char *path, *sessionid, *query;
#ifdef HEADERS_DEBUG
SoupMessageHeadersIter iter;
@@ -526,42 +610,72 @@ got_request_headers (SoupMessage *msg,
#endif
g_object_get (msg, "uri", &uri, NULL);
- g_debug ("%s on uri %s", msg->method, soup_uri_get_path (uri));
+ path = soup_uri_get_path (uri);
+ query = soup_uri_get_query (uri);
+ g_debug ("%s on uri %s", msg->method, path);
+
+ /* Token API methods are handled in the server callbacks,
+ * so we ignore those paths here. */
+ if (g_str_has_prefix (path, STP_REST_LIST_STREAMS) ||
+ g_str_has_prefix (path, STP_REST_ABORT_STREAM) ||
+ g_str_has_prefix (path, STP_REST_ADD_TOKEN) ||
+ g_str_has_prefix (path, STP_REST_REVOKE_TOKEN) ||
+ g_str_has_prefix (path, STP_REST_LIST_TOKENS))
+ goto out;
+
+ /* Remove the leading '/' to get the session id */
+ sessionid = &path[1];
if (msg->method == SOUP_METHOD_PUT ||
msg->method == SOUP_METHOD_POST)
+ /* Incoming stream; do everything needed */
goto PUT;
else if (msg->method == SOUP_METHOD_GET)
+ /* Just set response headers as appropriate */
goto GET;
- else {
+ else if (msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
/* Nothing to do; server doesn't implement this method */
soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
goto out;
}
PUT: {
+ guint http_status_code;
const char *content_range;
SoupEncoding encoding;
gboolean connection_exists;
- TranscodeServerCtx *ctx;
+ STPServerCtx *ctx;
+ STPStreamToken *token;
+
+ token = stp_validate_fetch_token_from_query (tables->tokens,
+ sessionid, query,
+ &http_status_code);
+ if (!token) {
+ soup_message_set_status (msg, http_status_code);
+ goto out;
+ }
- connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri));
+ connection_exists = g_hash_table_contains (tables->ctxs, sessionid);
/* Data might be sent to us in one of three ways:
* 1. Chunked encoding with the stream (and request body) sent in chunks
* 2. Content-Length encoding with the entire stream in one chunk
* 3. A persistent Content-Length encoding connection, and subsequent
- * requests can send more stream data, forever, subject to a timeout */
+ * requests can send more stream data, forever, subject to a timeout.
+ * The first request will send a Content-Length request, and
+ * subsequent ones will only have Content-Length + Content-Range set */
encoding = soup_message_headers_get_encoding (msg->request_headers);
switch (encoding) {
case SOUP_ENCODING_CHUNKED:
g_debug ("Chunked encoding detected!");
if (connection_exists) {
/* There's already a chunked request streaming on this path */
- g_critical ("Recv duplicate request on the same URI: %s",
- soup_uri_get_path (uri));
+ g_critical ("Recv duplicate request on the same URI: %s", path);
soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
- goto out;
+ goto PUT_out;
}
break;
case SOUP_ENCODING_CONTENT_LENGTH:
@@ -573,10 +687,15 @@ PUT: {
if (connection_exists && !content_range) {
/* A connection already exists, and this one isn't a continuation
* of the previous one, so it's a conflict */
- g_critical ("Recv Content-Length PUT on '%s' without Content-Range",
- soup_uri_get_path (uri));
+ g_critical ("Recv Content-Length PUT on '%s' without Content-Range", path);
soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
- goto out;
+ goto PUT_out;
+ }
+ if (!connection_exists && content_range) {
+ /* We cancelled this connection, but we got another request which is
+ * a continuation of the previous one */
+ soup_message_set_status (msg, SOUP_STATUS_GONE);
+ goto PUT_out;
}
break;
case SOUP_ENCODING_EOF:
@@ -584,9 +703,12 @@ PUT: {
case SOUP_ENCODING_NONE:
case SOUP_ENCODING_BYTERANGES:
g_critical ("Unknown encoding!");
- goto out;
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto PUT_out;
default:
g_critical ("Unsupported encoding?");
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto PUT_out;
}
/* Whether the incoming stream is chunked or fixed length, we want to
@@ -596,28 +718,32 @@ PUT: {
if (connection_exists) {
g_debug ("Stream already exists, connecting everything to that...");
- ctx = get_server_ctx_from_msg (msg, ctx_table);
+ ctx = get_server_ctx_from_msg (msg, tables->ctxs);
if (ctx->status != STP_STATUS_STREAMING) {
- g_critical ("Recv more data on '%s' after timeout",
- soup_uri_get_path (uri));
+ g_critical ("Recv more data on '%s' after timeout", path);
soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT);
- goto out;
+ goto PUT_out;
}
/* The chunks we'll get are a continuation of the previous one */
g_signal_connect (msg, "got-chunk",
G_CALLBACK (got_request_body_chunk), ctx);
+ PUT_out:
+ stp_stream_token_free (token);
goto out;
}
/* This is a new connection; treat it as such */
- ctx = g_new0 (TranscodeServerCtx, 1);
+ ctx = g_new0 (STPServerCtx, 1);
+ ctx->stream_type = token->stream_type;
+ if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP)
+ ctx->udp_clients = token->udp_clients;
ctx->msg = msg;
ctx->encoding = encoding;
ctx->status = STP_STATUS_STREAMING;
- ctx->parent_ctx_table = ctx_table;
- ctx->path = g_strdup (soup_uri_get_path (uri));
- g_hash_table_insert (ctx_table, ctx->path, ctx);
- g_print ("New PUT stream on %s\n", ctx->path);
+ ctx->parent_ctx_table = tables->ctxs;
+ ctx->sessionid = g_strdup (sessionid);
+ g_hash_table_insert (tables->ctxs, ctx->sessionid, ctx);
+ g_debug ("New PUT stream on %s\n", path);
g_signal_connect (msg, "got-chunk",
G_CALLBACK (got_first_request_body_chunk), ctx);
@@ -626,27 +752,20 @@ PUT: {
g_signal_connect (msg, "finished",
G_CALLBACK (request_ended_no_body_cb), ctx);
if (encoding == SOUP_ENCODING_CONTENT_LENGTH)
- g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx);
+ ctx->timeout_handler_id =
+ g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx);
goto out;
}
GET: {
- const char *query;
- TranscodeServerCtx *server_ctx;
+ STPServerCtx *server_ctx;
- if (!g_hash_table_contains (ctx_table,
- soup_uri_get_path (uri))) {
- g_critical ("No stream on URI: %s", soup_uri_get_path (uri));
- soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
- goto out;
- }
+ server_ctx = g_hash_table_lookup (tables->ctxs, sessionid);
- query = soup_uri_get_query (uri);
- if (query && g_strcmp0 (query, "abort") == 0) {
- /* We just use the "uri" field for this, so this is OK */
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- stp_abort_server_ctx (server_ctx);
- soup_message_set_status (msg, SOUP_STATUS_OK);
+ if (!server_ctx ||
+ !(server_ctx->stream_type & STP_STREAM_TYPE_HTTP)) {
+ stp_print_status ("No HTTP GET stream on URI: %s\n", path);
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
goto out;
}
@@ -666,14 +785,351 @@ static void
request_started_cb (SoupServer *server,
SoupMessage *msg,
SoupClientContext *client,
- GHashTable *ctx_table)
+ STPHashTables *tables)
{
g_debug ("New %s request started", msg->method);
/* SoupMessage is useful only once we have headers,
* so we do all initialization there */
g_signal_connect (msg, "got-headers",
- G_CALLBACK (got_request_headers), ctx_table);
+ G_CALLBACK (got_request_headers), tables);
+}
+
+static void
+handle_add_token_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_POST) {
+ goto POST;
+ } else if (g_strcmp0 (path, STP_REST_ADD_TOKEN) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+POST: {
+ STPStreamToken *token;
+ const char *type, *sessionid, *udp_clients;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ if (!query ||
+ !g_hash_table_contains (query, "sessionid") ||
+ !g_hash_table_contains (query, "type")) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+ type = g_hash_table_lookup (query, "type");
+ udp_clients = g_hash_table_lookup (query, "udp-clients");
+
+ token = g_new0 (STPStreamToken, 1);
+ token->stream_type = stp_get_stream_type_from_string (type);
+
+ if ((udp_clients == NULL) &&
+ (token->stream_type & STP_STREAM_TYPE_RTP_UDP)) {
+ g_free (token);
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ token->udp_clients = g_strdup (udp_clients);
+
+ /* We don't care if there's already an identical token, and
+ * we don't care if there's ongoing streams with this token */
+ g_hash_table_insert (tables->tokens, g_strdup (sessionid), token);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+
+out:
+ return;
+}
+
+static void
+handle_revoke_token_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_DELETE) {
+ goto DELETE;
+ } else if (g_strcmp0 (path, STP_REST_REVOKE_TOKEN) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_POST) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+DELETE: {
+ char *sessionid;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ if (!query ||
+ !g_hash_table_contains (query, "sessionid")) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+
+ if (!g_hash_table_remove (tables->tokens, sessionid)) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ if (g_hash_table_contains (tables->ctxs, sessionid))
+ /* If there's a stream using this session id, we remove the token,
+ * but can't guarantee that the stream will stop. */
+ soup_message_set_status (msg, SOUP_STATUS_ACCEPTED);
+ else
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+
+out:
+ return;
+}
+
+static void
+handle_list_tokens_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_GET) {
+ goto GET;
+ } else if (g_strcmp0 (path, STP_REST_LIST_TOKENS) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+GET: {
+ gsize length;
+ char *response;
+ GHashTableIter i;
+ JsonBuilder *b;
+ JsonNode *dict;
+ JsonGenerator *gen;
+ gpointer key, value;
+ STPStreamToken *token;
+ const char *tmp;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ g_hash_table_iter_init (&i, tables->tokens);
+ b = json_builder_new ();
+ json_builder_begin_array (b);
+ while (g_hash_table_iter_next (&i, &key, &value)) {
+ token = value;
+ json_builder_begin_object (b);
+
+ json_builder_set_member_name (b, "sessionid");
+ json_builder_add_string_value (b, key);
+
+ json_builder_set_member_name (b, "udp-clients");
+ tmp = token->udp_clients;
+ json_builder_add_string_value (b, tmp? tmp : "");
+
+ json_builder_set_member_name (b, "stream-type");
+ tmp = stp_get_stream_type_string (token->stream_type);
+ json_builder_add_string_value (b, tmp);
+
+ json_builder_end_object (b);
+ }
+ json_builder_end_array (b);
+ dict = json_builder_get_root (b);
+ g_object_unref (b);
+
+ gen = json_generator_new ();
+ json_generator_set_root (gen, dict);
+ json_node_free (dict);
+
+ json_generator_set_pretty (gen, TRUE);
+ response = json_generator_to_data (gen, &length);
+ g_object_unref (gen);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ soup_message_body_append (msg->response_body,
+ SOUP_MEMORY_TAKE,
+ response, length);
+ }
+out:
+ return;
+}
+
+static void
+handle_list_streams_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_GET) {
+ goto GET;
+ } else if (g_strcmp0 (path, STP_REST_LIST_STREAMS) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+GET: {
+ gsize length;
+ const char *tmp;
+ JsonBuilder *b;
+ JsonNode *dict;
+ JsonGenerator *gen;
+ GHashTableIter i;
+ gpointer key, value;
+ STPServerCtx *ctx;
+ char *sessionid, *response;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ g_hash_table_iter_init (&i, tables->ctxs);
+ b = json_builder_new ();
+ json_builder_begin_array (b);
+ while (g_hash_table_iter_next (&i, &key, &value)) {
+ ctx = value;
+ sessionid = key;
+ json_builder_begin_object (b);
+
+ json_builder_set_member_name (b, "type");
+ tmp = stp_get_stream_type_string (ctx->stream_type);
+ json_builder_add_string_value (b, tmp);
+
+ json_builder_set_member_name (b, "sessionid");
+ json_builder_add_string_value (b, sessionid);
+
+ json_builder_set_member_name (b, "udp-clients");
+ tmp = ctx->udp_clients;
+ json_builder_add_string_value (b, tmp? tmp : "");
+
+ json_builder_set_member_name (b, "status");
+ json_builder_add_string_value (b, ctx->status);
+
+ json_builder_end_object (b);
+ }
+ json_builder_end_array (b);
+ dict = json_builder_get_root (b);
+ g_object_unref (b);
+
+ gen = json_generator_new ();
+ json_generator_set_root (gen, dict);
+ json_node_free (dict);
+
+ json_generator_set_pretty (gen, TRUE);
+ response = json_generator_to_data (gen, &length);
+ g_object_unref (gen);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ soup_message_body_append (msg->response_body,
+ SOUP_MEMORY_TAKE,
+ response, length);
+ }
+out:
+ return;
+}
+
+static void
+handle_abort_stream_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_DELETE) {
+ goto DELETE;
+ } else if (g_strcmp0 (path, STP_REST_ABORT_STREAM) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_POST) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+DELETE: {
+ const char *sessionid;
+ STPServerCtx *ctx;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+ if (!sessionid) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ ctx = g_hash_table_lookup (tables->ctxs, sessionid);
+ if (!ctx) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ stp_abort_server_ctx (ctx);
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+out:
+ return;
}
gboolean
@@ -687,7 +1143,7 @@ int
main (int argc,
char *argv[])
{
- GHashTable *ctx_table;
+ STPHashTables *tables;
GOptionContext *optctx;
GError *error = NULL;
@@ -702,20 +1158,64 @@ main (int argc,
}
g_option_context_free (optctx);
- /* Keys are paths, and values are TranscodeServerCtxs */
- ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal,
- (GDestroyNotify)g_free,
- (GDestroyNotify)stp_cleanup_transcode_server_ctx);
+ if (token_server_addrmask) {
+ stp_inet_addrmask = g_inet_address_mask_new_from_string (token_server_addrmask,
+ &error);
+ if (!stp_inet_addrmask) {
+ g_printerr ("Error parsing token server address mask: %s\n", error->message);
+ return 1;
+ }
+ }
+
+ tables = g_new0 (STPHashTables, 1);
+ /* Keys are paths, and values are STPServerCtxs */
+ tables->ctxs = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_server_ctx_cleanup);
+ /* Keys are "session-id:host:port", and there are no values.
+ * If host or port are missing, they will be blank. */
+ tables->tokens = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_stream_token_free);
/* TODO: Accept command-line argument for host */
server = soup_server_new (SOUP_SERVER_SERVER_HEADER,
"soup-transcode-proxy ",
SOUP_SERVER_PORT, port, NULL);
+
+ /* Handle the end of POST/PUT, and all GET requests */
soup_server_add_handler (server, NULL,
(SoupServerCallback)handle_request_cb,
- ctx_table, NULL);
+ tables, NULL);
+
+ /* NOTE: When adding a new path handler, an exception must be made in
+ * got_request_headers() for that path, otherwise a 405 will be returned */
+ soup_server_add_handler (server, STP_REST_ABORT_STREAM,
+ (SoupServerCallback)handle_abort_stream_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_LIST_STREAMS,
+ (SoupServerCallback)handle_list_streams_cb,
+ tables, NULL);
+
+ /* REST API for token handling, if requested */
+ if (token_server_addrmask) {
+ soup_server_add_handler (server, STP_REST_ADD_TOKEN,
+ (SoupServerCallback)handle_add_token_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_REVOKE_TOKEN,
+ (SoupServerCallback)handle_revoke_token_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_LIST_TOKENS,
+ (SoupServerCallback)handle_list_tokens_cb,
+ tables, NULL);
+ }
+
+ /* Since chunked streaming requests send all the data in a single request,
+ * we need to handle those when we get the headers. Hence, all PUT stream
+ * handling is done here. */
g_signal_connect (server, "request-started",
- G_CALLBACK (request_started_cb), ctx_table);
+ G_CALLBACK (request_started_cb), tables);
+
g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server);
soup_server_run (server);