summaryrefslogtreecommitdiff
path: root/src/main.c
diff options
context:
space:
mode:
authorNirbheek Chauhan <nirbheek@centricular.com>2014-07-24 17:56:02 (GMT)
committerNirbheek Chauhan <nirbheek@centricular.com>2014-07-24 20:55:42 (GMT)
commite0d453a3a2c120264b13c44f004025b0330547a8 (patch)
tree4517a7e8f3100f1f75b4cecd68486b42d4f7f0df /src/main.c
parentfae0aa58c19a1918e7b7750b4862e2abcbc55788 (diff)
downloadsoup-transcoding-proxy-e0d453a3a2c120264b13c44f004025b0330547a8.zip
soup-transcoding-proxy-e0d453a3a2c120264b13c44f004025b0330547a8.tar.gz
Implement concurrent RTP-UDP streams, and a REST API for status and auth
When the --token-server=ADDR/MASK argument is passed to the server, the token verification framework is enabled, and the specified subnet is allowed to access the REST API to add/revoke/list tokens that allow clients to connect, and to list/abort streams running on the server. Details about the REST API are documented in the file "REST-API". There were also some organisational and name changes in the code.
Diffstat (limited to 'src/main.c')
-rw-r--r--src/main.c670
1 files changed, 585 insertions, 85 deletions
diff --git a/src/main.c b/src/main.c
index 63c3a13..926dc86 100644
--- a/src/main.c
+++ b/src/main.c
@@ -22,6 +22,7 @@
#include <glib-unix.h>
#include <gst/video/video.h>
+#include <json-glib/json-glib.h>
#include "lib.h"
#include "encode.h"
@@ -30,31 +31,47 @@
#include "debug/local-play.h"
#endif
+#define STP_REST_LIST_STREAMS "/list-streams"
+#define STP_REST_ABORT_STREAM "/abort-stream"
+#define STP_REST_ADD_TOKEN "/add-token"
+#define STP_REST_REVOKE_TOKEN "/revoke-token"
+#define STP_REST_LIST_TOKENS "/list-tokens"
+
+typedef struct _STPHashTables STPHashTables;
+
+struct _STPHashTables {
+ GHashTable *ctxs;
+ GHashTable *tokens;
+};
+
static int port = 8000;
static int client_timeout = 10;
static int server_timeout = 5;
+static char *token_server_addrmask = NULL;
+static GInetAddressMask *stp_inet_addrmask = NULL;
static GOptionEntry entries[] =
{
{ "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" },
{ "timeout", 't', 0, G_OPTION_ARG_INT, &client_timeout, "Client connection timeout (default: 10s)", "SECONDS" },
+ { "token-server", 's', 0, G_OPTION_ARG_STRING, &token_server_addrmask, "Subnet (or IP addr) for the token server (default: disabled)", "ADDR/PREFIX" },
{ NULL }
};
static SoupServer *server;
-static void stream_finished_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx);
+static void stream_finished_cb (SoupMessage *msg,
+ STPServerCtx *ctx);
-static TranscodeServerCtx*
+static STPServerCtx*
get_server_ctx_from_msg (SoupMessage *msg,
GHashTable *ctx_table)
{
SoupURI *uri;
- TranscodeServerCtx *ctx;
+ STPServerCtx *ctx;
g_object_get (msg, "uri", &uri, NULL);
- ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri));
+ ctx = g_hash_table_lookup (ctx_table, &soup_uri_get_path (uri)[1]);
soup_uri_free (uri);
if (!ctx)
@@ -63,8 +80,67 @@ get_server_ctx_from_msg (SoupMessage *msg,
return ctx;
}
+/* Validates the query, parses it, and fetches a STPStreamToken from it */
+static STPStreamToken*
+stp_validate_fetch_token_from_query (GHashTable *tokens,
+ const char *sessionid,
+ const char *query_string,
+ guint *http_status_code)
+{
+ GHashTable *query;
+ char *type, *udp_clients;
+ STPStreamToken *token, *valid_token;
+
+ if (!query_string) {
+ *http_status_code = SOUP_STATUS_BAD_REQUEST;
+ return NULL;
+ }
+
+ g_debug ("Recv query string: %s\n", query_string);
+
+ if (stp_inet_addrmask &&
+ !g_hash_table_contains (tokens, sessionid)) {
+ *http_status_code = SOUP_STATUS_FORBIDDEN;
+ return NULL;
+ }
+
+ query = soup_form_decode (query_string);
+
+ token = g_new0 (STPStreamToken, 1);
+
+ type = g_hash_table_lookup (query, "type");
+ token->stream_type = stp_get_stream_type_from_string (type);
+
+ valid_token = g_hash_table_lookup (tokens, sessionid);
+ if (stp_inet_addrmask &&
+ valid_token->stream_type != token->stream_type) {
+ *http_status_code = SOUP_STATUS_FORBIDDEN;
+ g_free (token);
+ token = NULL;
+ goto out;
+ }
+
+ if (!(token->stream_type & STP_STREAM_TYPE_RTP_UDP))
+ goto out;
+
+ /* Host and port are only needed for RTP streaming */
+ udp_clients = g_hash_table_lookup (query, "udp-clients");
+ if (!udp_clients) {
+ *http_status_code = SOUP_STATUS_BAD_REQUEST;
+ g_free (token);
+ token = NULL;
+ goto out;
+ }
+
+ token->udp_clients = g_strdup (udp_clients);
+
+out:
+ g_hash_table_destroy (query);
+ return token;
+}
+
static void
-stp_abort_server_ctx (TranscodeServerCtx *ctx)
+stp_abort_server_ctx (STPServerCtx *ctx)
{
/* Abort all processing for this stream */
if (ctx->status == STP_STATUS_STREAMING &&
@@ -73,6 +149,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx)
ctx->status = STP_STATUS_FINISHED;
soup_message_set_status (ctx->msg, SOUP_STATUS_GONE);
}
+ stream_finished_cb (ctx->msg, ctx);
/* Disconnect all clients, and shut down the stream */
invoke_g_hash_table_remove (ctx);
}
@@ -80,7 +157,7 @@ stp_abort_server_ctx (TranscodeServerCtx *ctx)
/* If it's been more than 10 seconds since the last time we got
* a chunk for a PUT request, we timeout and drop the connection */
static gboolean
-increment_read_timer (TranscodeServerCtx *ctx)
+increment_read_timer (STPServerCtx *ctx)
{
ctx->seconds_since_read += 2;
if (ctx->seconds_since_read < server_timeout)
@@ -94,7 +171,7 @@ increment_read_timer (TranscodeServerCtx *ctx)
/* If it's been more than 10 seconds since the last time we wrote
* a chunk for a GET response, we timeout and drop the connection */
static gboolean
-increment_write_timer (TranscodeClientCtx *ctx)
+increment_write_timer (STPClientCtx *ctx)
{
ctx->seconds_since_write += 2;
if (ctx->seconds_since_write < client_timeout)
@@ -105,13 +182,13 @@ increment_write_timer (TranscodeClientCtx *ctx)
* connections, and clients are just left hanging */
soup_message_set_status (ctx->msg, SOUP_STATUS_REQUEST_TIMEOUT);
soup_message_body_complete (ctx->msg->response_body);
- stp_cleanup_transcode_client_ctx (ctx);
+ stp_client_ctx_cleanup (ctx);
return G_SOURCE_REMOVE;
}
static void
client_eos_cb (GstElement *appsink,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
g_debug ("Received EOS for client");
soup_message_set_status (ctx->msg, SOUP_STATUS_OK);
@@ -120,7 +197,7 @@ client_eos_cb (GstElement *appsink,
static void
can_write_next_client_chunk_cb (SoupMessage *msg,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
ctx->seconds_since_write = 0;
ctx->msg = msg;
@@ -128,7 +205,7 @@ can_write_next_client_chunk_cb (SoupMessage *msg,
}
static gboolean
-invoke_write_client_chunk (TranscodeClientCtx *ctx)
+invoke_write_client_chunk (STPClientCtx *ctx)
{
GstMapInfo info;
GstSample *sample;
@@ -180,7 +257,7 @@ invoke_write_client_chunk (TranscodeClientCtx *ctx)
static GstFlowReturn
write_client_chunk_cb (GstElement *appsink,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
g_main_context_invoke_full (NULL, G_PRIORITY_DEFAULT,
(GSourceFunc)invoke_write_client_chunk,
@@ -190,15 +267,15 @@ write_client_chunk_cb (GstElement *appsink,
static void
client_finished_cb (SoupMessage *msg,
- TranscodeClientCtx *ctx)
+ STPClientCtx *ctx)
{
stp_print_status ("Client finished/aborted, doing cleanup...\n");
- stp_cleanup_transcode_client_ctx (ctx);
+ stp_client_ctx_cleanup (ctx);
}
static void
-stream_finished_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx)
+stream_finished_cb (SoupMessage *msg,
+ STPServerCtx *ctx)
{
gboolean ret;
@@ -226,9 +303,9 @@ handle_request_cb (SoupServer *server,
const char *path,
GHashTable *query,
SoupClientContext *client,
- GHashTable *ctx_table)
+ STPHashTables *tables)
{
- TranscodeServerCtx *server_ctx;
+ STPServerCtx *server_ctx;
if (!msg)
return;
@@ -240,7 +317,10 @@ handle_request_cb (SoupServer *server,
goto PUT;
else if (msg->method == SOUP_METHOD_GET)
goto GET;
- else {
+ else if (msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ return;
+ } else {
soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
return;
}
@@ -249,10 +329,14 @@ PUT:
{
/* The PUT request has finished streaming. Cleanup
* will be done on EOS/ERROR in the gst pipeline. */
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
- server_ctx->status = STP_STATUS_FLUSHING;
- soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK);
+ server_ctx = get_server_ctx_from_msg (msg, tables->ctxs);
+ if (!server_ctx) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ } else {
+ if (server_ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH)
+ server_ctx->status = STP_STATUS_FLUSHING;
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
return;
}
@@ -268,9 +352,9 @@ GET:
GstElement *encodebin, *bin, *tee, *q2, *appsink;
GstPadTemplate *template;
GstPad *srcpad, *sinkpadq2;
- TranscodeClientCtx *client_ctx;
+ STPClientCtx *client_ctx;
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
+ server_ctx = get_server_ctx_from_msg (msg, tables->ctxs);
if (server_ctx == NULL ||
server_ctx->pipeline == NULL) {
/* There's no request streaming on this path (yet) */
@@ -296,10 +380,10 @@ GET:
g_assert_not_reached ();
}
- g_print ("New GET request on %s\n", path);
+ g_debug ("New GET request on %s\n", path);
/* Connect appsink to tee, and start streaming */
- client_ctx = g_new0 (TranscodeClientCtx, 1);
+ client_ctx = g_new0 (STPClientCtx, 1);
client_ctx->msg = msg;
client_ctx->server_ctx = server_ctx;
@@ -410,15 +494,15 @@ out:
err:
soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
soup_message_body_complete (client_ctx->msg->response_body);
- stp_cleanup_transcode_client_ctx (client_ctx);
+ stp_client_ctx_cleanup (client_ctx);
goto out;
}
}
static void
-got_request_body_chunk (SoupMessage *msg,
- SoupBuffer *chunk,
- TranscodeServerCtx *ctx)
+got_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ STPServerCtx *ctx)
{
GstBuffer *buffer;
GstFlowReturn ret;
@@ -448,18 +532,17 @@ got_request_body_chunk (SoupMessage *msg,
}
static void
-request_ended_no_body_cb (SoupMessage *msg,
- TranscodeServerCtx *ctx)
+request_ended_no_body_cb (SoupMessage *msg,
+ STPServerCtx *ctx)
{
g_critical ("Request ended without a body!");
invoke_g_hash_table_remove (ctx);
- g_free (ctx);
}
static void
-got_first_request_body_chunk (SoupMessage *msg,
- SoupBuffer *chunk,
- TranscodeServerCtx *ctx)
+got_first_request_body_chunk (SoupMessage *msg,
+ SoupBuffer *chunk,
+ STPServerCtx *ctx)
{
int num;
@@ -512,10 +595,11 @@ got_first_request_body_chunk (SoupMessage *msg,
}
static void
-got_request_headers (SoupMessage *msg,
- GHashTable *ctx_table)
+got_request_headers (SoupMessage *msg,
+ STPHashTables *tables)
{
SoupURI *uri;
+ const char *path, *sessionid, *query;
#ifdef HEADERS_DEBUG
SoupMessageHeadersIter iter;
@@ -526,42 +610,72 @@ got_request_headers (SoupMessage *msg,
#endif
g_object_get (msg, "uri", &uri, NULL);
- g_debug ("%s on uri %s", msg->method, soup_uri_get_path (uri));
+ path = soup_uri_get_path (uri);
+ query = soup_uri_get_query (uri);
+ g_debug ("%s on uri %s", msg->method, path);
+
+ /* Token API methods are handled in the server callbacks,
+ * so we ignore those paths here. */
+ if (g_str_has_prefix (path, STP_REST_LIST_STREAMS) ||
+ g_str_has_prefix (path, STP_REST_ABORT_STREAM) ||
+ g_str_has_prefix (path, STP_REST_ADD_TOKEN) ||
+ g_str_has_prefix (path, STP_REST_REVOKE_TOKEN) ||
+ g_str_has_prefix (path, STP_REST_LIST_TOKENS))
+ goto out;
+
+ /* Remove the leading '/' to get the session id */
+ sessionid = &path[1];
if (msg->method == SOUP_METHOD_PUT ||
msg->method == SOUP_METHOD_POST)
+ /* Incoming stream; do everything needed */
goto PUT;
else if (msg->method == SOUP_METHOD_GET)
+ /* Just set response headers as appropriate */
goto GET;
- else {
+ else if (msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
/* Nothing to do; server doesn't implement this method */
soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
goto out;
}
PUT: {
+ guint http_status_code;
const char *content_range;
SoupEncoding encoding;
gboolean connection_exists;
- TranscodeServerCtx *ctx;
+ STPServerCtx *ctx;
+ STPStreamToken *token;
+
+ token = stp_validate_fetch_token_from_query (tables->tokens,
+ sessionid, query,
+ &http_status_code);
+ if (!token) {
+ soup_message_set_status (msg, http_status_code);
+ goto out;
+ }
- connection_exists = g_hash_table_contains (ctx_table, soup_uri_get_path (uri));
+ connection_exists = g_hash_table_contains (tables->ctxs, sessionid);
/* Data might be sent to us in one of three ways:
* 1. Chunked encoding with the stream (and request body) sent in chunks
* 2. Content-Length encoding with the entire stream in one chunk
* 3. A persistent Content-Length encoding connection, and subsequent
- * requests can send more stream data, forever, subject to a timeout */
+ * requests can send more stream data, forever, subject to a timeout.
+ * The first request will send a Content-Length request, and
+ * subsequent ones will only have Content-Length + Content-Range set */
encoding = soup_message_headers_get_encoding (msg->request_headers);
switch (encoding) {
case SOUP_ENCODING_CHUNKED:
g_debug ("Chunked encoding detected!");
if (connection_exists) {
/* There's already a chunked request streaming on this path */
- g_critical ("Recv duplicate request on the same URI: %s",
- soup_uri_get_path (uri));
+ g_critical ("Recv duplicate request on the same URI: %s", path);
soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
- goto out;
+ goto PUT_out;
}
break;
case SOUP_ENCODING_CONTENT_LENGTH:
@@ -573,10 +687,15 @@ PUT: {
if (connection_exists && !content_range) {
/* A connection already exists, and this one isn't a continuation
* of the previous one, so it's a conflict */
- g_critical ("Recv Content-Length PUT on '%s' without Content-Range",
- soup_uri_get_path (uri));
+ g_critical ("Recv Content-Length PUT on '%s' without Content-Range", path);
soup_message_set_status (msg, SOUP_STATUS_CONFLICT);
- goto out;
+ goto PUT_out;
+ }
+ if (!connection_exists && content_range) {
+ /* We cancelled this connection, but we got another request which is
+ * a continuation of the previous one */
+ soup_message_set_status (msg, SOUP_STATUS_GONE);
+ goto PUT_out;
}
break;
case SOUP_ENCODING_EOF:
@@ -584,9 +703,12 @@ PUT: {
case SOUP_ENCODING_NONE:
case SOUP_ENCODING_BYTERANGES:
g_critical ("Unknown encoding!");
- goto out;
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto PUT_out;
default:
g_critical ("Unsupported encoding?");
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto PUT_out;
}
/* Whether the incoming stream is chunked or fixed length, we want to
@@ -596,28 +718,32 @@ PUT: {
if (connection_exists) {
g_debug ("Stream already exists, connecting everything to that...");
- ctx = get_server_ctx_from_msg (msg, ctx_table);
+ ctx = get_server_ctx_from_msg (msg, tables->ctxs);
if (ctx->status != STP_STATUS_STREAMING) {
- g_critical ("Recv more data on '%s' after timeout",
- soup_uri_get_path (uri));
+ g_critical ("Recv more data on '%s' after timeout", path);
soup_message_set_status (msg, SOUP_STATUS_REQUEST_TIMEOUT);
- goto out;
+ goto PUT_out;
}
/* The chunks we'll get are a continuation of the previous one */
g_signal_connect (msg, "got-chunk",
G_CALLBACK (got_request_body_chunk), ctx);
+ PUT_out:
+ stp_stream_token_free (token);
goto out;
}
/* This is a new connection; treat it as such */
- ctx = g_new0 (TranscodeServerCtx, 1);
+ ctx = g_new0 (STPServerCtx, 1);
+ ctx->stream_type = token->stream_type;
+ if (ctx->stream_type & STP_STREAM_TYPE_RTP_UDP)
+ ctx->udp_clients = token->udp_clients;
ctx->msg = msg;
ctx->encoding = encoding;
ctx->status = STP_STATUS_STREAMING;
- ctx->parent_ctx_table = ctx_table;
- ctx->path = g_strdup (soup_uri_get_path (uri));
- g_hash_table_insert (ctx_table, ctx->path, ctx);
- g_print ("New PUT stream on %s\n", ctx->path);
+ ctx->parent_ctx_table = tables->ctxs;
+ ctx->sessionid = g_strdup (sessionid);
+ g_hash_table_insert (tables->ctxs, ctx->sessionid, ctx);
+ g_debug ("New PUT stream on %s\n", path);
g_signal_connect (msg, "got-chunk",
G_CALLBACK (got_first_request_body_chunk), ctx);
@@ -626,27 +752,20 @@ PUT: {
g_signal_connect (msg, "finished",
G_CALLBACK (request_ended_no_body_cb), ctx);
if (encoding == SOUP_ENCODING_CONTENT_LENGTH)
- g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx);
+ ctx->timeout_handler_id =
+ g_timeout_add_seconds (2, (GSourceFunc)increment_read_timer, ctx);
goto out;
}
GET: {
- const char *query;
- TranscodeServerCtx *server_ctx;
+ STPServerCtx *server_ctx;
- if (!g_hash_table_contains (ctx_table,
- soup_uri_get_path (uri))) {
- g_critical ("No stream on URI: %s", soup_uri_get_path (uri));
- soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
- goto out;
- }
+ server_ctx = g_hash_table_lookup (tables->ctxs, sessionid);
- query = soup_uri_get_query (uri);
- if (query && g_strcmp0 (query, "abort") == 0) {
- /* We just use the "uri" field for this, so this is OK */
- server_ctx = get_server_ctx_from_msg (msg, ctx_table);
- stp_abort_server_ctx (server_ctx);
- soup_message_set_status (msg, SOUP_STATUS_OK);
+ if (!server_ctx ||
+ !(server_ctx->stream_type & STP_STREAM_TYPE_HTTP)) {
+ stp_print_status ("No HTTP GET stream on URI: %s\n", path);
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
goto out;
}
@@ -666,14 +785,351 @@ static void
request_started_cb (SoupServer *server,
SoupMessage *msg,
SoupClientContext *client,
- GHashTable *ctx_table)
+ STPHashTables *tables)
{
g_debug ("New %s request started", msg->method);
/* SoupMessage is useful only once we have headers,
* so we do all initialization there */
g_signal_connect (msg, "got-headers",
- G_CALLBACK (got_request_headers), ctx_table);
+ G_CALLBACK (got_request_headers), tables);
+}
+
+static void
+handle_add_token_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_POST) {
+ goto POST;
+ } else if (g_strcmp0 (path, STP_REST_ADD_TOKEN) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+POST: {
+ STPStreamToken *token;
+ const char *type, *sessionid, *udp_clients;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ if (!query ||
+ !g_hash_table_contains (query, "sessionid") ||
+ !g_hash_table_contains (query, "type")) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+ type = g_hash_table_lookup (query, "type");
+ udp_clients = g_hash_table_lookup (query, "udp-clients");
+
+ token = g_new0 (STPStreamToken, 1);
+ token->stream_type = stp_get_stream_type_from_string (type);
+
+ if ((udp_clients == NULL) &&
+ (token->stream_type & STP_STREAM_TYPE_RTP_UDP)) {
+ g_free (token);
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ token->udp_clients = g_strdup (udp_clients);
+
+ /* We don't care if there's already an identical token, and
+ * we don't care if there's ongoing streams with this token */
+ g_hash_table_insert (tables->tokens, g_strdup (sessionid), token);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+
+out:
+ return;
+}
+
+static void
+handle_revoke_token_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_DELETE) {
+ goto DELETE;
+ } else if (g_strcmp0 (path, STP_REST_REVOKE_TOKEN) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_POST) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+DELETE: {
+ char *sessionid;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ if (!query ||
+ !g_hash_table_contains (query, "sessionid")) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+
+ if (!g_hash_table_remove (tables->tokens, sessionid)) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ if (g_hash_table_contains (tables->ctxs, sessionid))
+ /* If there's a stream using this session id, we remove the token,
+ * but can't guarantee that the stream will stop. */
+ soup_message_set_status (msg, SOUP_STATUS_ACCEPTED);
+ else
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+
+out:
+ return;
+}
+
+static void
+handle_list_tokens_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_GET) {
+ goto GET;
+ } else if (g_strcmp0 (path, STP_REST_LIST_TOKENS) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+GET: {
+ gsize length;
+ char *response;
+ GHashTableIter i;
+ JsonBuilder *b;
+ JsonNode *dict;
+ JsonGenerator *gen;
+ gpointer key, value;
+ STPStreamToken *token;
+ const char *tmp;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ g_hash_table_iter_init (&i, tables->tokens);
+ b = json_builder_new ();
+ json_builder_begin_array (b);
+ while (g_hash_table_iter_next (&i, &key, &value)) {
+ token = value;
+ json_builder_begin_object (b);
+
+ json_builder_set_member_name (b, "sessionid");
+ json_builder_add_string_value (b, key);
+
+ json_builder_set_member_name (b, "udp-clients");
+ tmp = token->udp_clients;
+ json_builder_add_string_value (b, tmp? tmp : "");
+
+ json_builder_set_member_name (b, "stream-type");
+ tmp = stp_get_stream_type_string (token->stream_type);
+ json_builder_add_string_value (b, tmp);
+
+ json_builder_end_object (b);
+ }
+ json_builder_end_array (b);
+ dict = json_builder_get_root (b);
+ g_object_unref (b);
+
+ gen = json_generator_new ();
+ json_generator_set_root (gen, dict);
+ json_node_free (dict);
+
+ json_generator_set_pretty (gen, TRUE);
+ response = json_generator_to_data (gen, &length);
+ g_object_unref (gen);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ soup_message_body_append (msg->response_body,
+ SOUP_MEMORY_TAKE,
+ response, length);
+ }
+out:
+ return;
+}
+
+static void
+handle_list_streams_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_GET) {
+ goto GET;
+ } else if (g_strcmp0 (path, STP_REST_LIST_STREAMS) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_POST ||
+ msg->method == SOUP_METHOD_DELETE) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+GET: {
+ gsize length;
+ const char *tmp;
+ JsonBuilder *b;
+ JsonNode *dict;
+ JsonGenerator *gen;
+ GHashTableIter i;
+ gpointer key, value;
+ STPServerCtx *ctx;
+ char *sessionid, *response;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ g_hash_table_iter_init (&i, tables->ctxs);
+ b = json_builder_new ();
+ json_builder_begin_array (b);
+ while (g_hash_table_iter_next (&i, &key, &value)) {
+ ctx = value;
+ sessionid = key;
+ json_builder_begin_object (b);
+
+ json_builder_set_member_name (b, "type");
+ tmp = stp_get_stream_type_string (ctx->stream_type);
+ json_builder_add_string_value (b, tmp);
+
+ json_builder_set_member_name (b, "sessionid");
+ json_builder_add_string_value (b, sessionid);
+
+ json_builder_set_member_name (b, "udp-clients");
+ tmp = ctx->udp_clients;
+ json_builder_add_string_value (b, tmp? tmp : "");
+
+ json_builder_set_member_name (b, "status");
+ json_builder_add_string_value (b, ctx->status);
+
+ json_builder_end_object (b);
+ }
+ json_builder_end_array (b);
+ dict = json_builder_get_root (b);
+ g_object_unref (b);
+
+ gen = json_generator_new ();
+ json_generator_set_root (gen, dict);
+ json_node_free (dict);
+
+ json_generator_set_pretty (gen, TRUE);
+ response = json_generator_to_data (gen, &length);
+ g_object_unref (gen);
+
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ soup_message_body_append (msg->response_body,
+ SOUP_MEMORY_TAKE,
+ response, length);
+ }
+out:
+ return;
+}
+
+static void
+handle_abort_stream_cb (SoupServer *server,
+ SoupMessage *msg,
+ const char *path,
+ GHashTable *query,
+ SoupClientContext *client,
+ STPHashTables *tables)
+{
+ if (msg->method == SOUP_METHOD_DELETE) {
+ goto DELETE;
+ } else if (g_strcmp0 (path, STP_REST_ABORT_STREAM) != 0) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ } else if (msg->method == SOUP_METHOD_PUT ||
+ msg->method == SOUP_METHOD_GET ||
+ msg->method == SOUP_METHOD_POST) {
+ soup_message_set_status (msg, SOUP_STATUS_METHOD_NOT_ALLOWED);
+ goto out;
+ } else {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+ goto out;
+ }
+
+DELETE: {
+ const char *sessionid;
+ STPServerCtx *ctx;
+
+ if (!stp_validate_token_server (stp_inet_addrmask, client)) {
+ soup_message_set_status (msg, SOUP_STATUS_FORBIDDEN);
+ goto out;
+ }
+
+ sessionid = g_hash_table_lookup (query, "sessionid");
+ if (!sessionid) {
+ soup_message_set_status (msg, SOUP_STATUS_BAD_REQUEST);
+ goto out;
+ }
+
+ ctx = g_hash_table_lookup (tables->ctxs, sessionid);
+ if (!ctx) {
+ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND);
+ goto out;
+ }
+
+ stp_abort_server_ctx (ctx);
+ soup_message_set_status (msg, SOUP_STATUS_OK);
+ }
+out:
+ return;
}
gboolean
@@ -687,7 +1143,7 @@ int
main (int argc,
char *argv[])
{
- GHashTable *ctx_table;
+ STPHashTables *tables;
GOptionContext *optctx;
GError *error = NULL;
@@ -702,20 +1158,64 @@ main (int argc,
}
g_option_context_free (optctx);
- /* Keys are paths, and values are TranscodeServerCtxs */
- ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal,
- (GDestroyNotify)g_free,
- (GDestroyNotify)stp_cleanup_transcode_server_ctx);
+ if (token_server_addrmask) {
+ stp_inet_addrmask = g_inet_address_mask_new_from_string (token_server_addrmask,
+ &error);
+ if (!stp_inet_addrmask) {
+ g_printerr ("Error parsing token server address mask: %s\n", error->message);
+ return 1;
+ }
+ }
+
+ tables = g_new0 (STPHashTables, 1);
+ /* Keys are paths, and values are STPServerCtxs */
+ tables->ctxs = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_server_ctx_cleanup);
+ /* Keys are "session-id:host:port", and there are no values.
+ * If host or port are missing, they will be blank. */
+ tables->tokens = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify)g_free,
+ (GDestroyNotify)stp_stream_token_free);
/* TODO: Accept command-line argument for host */
server = soup_server_new (SOUP_SERVER_SERVER_HEADER,
"soup-transcode-proxy ",
SOUP_SERVER_PORT, port, NULL);
+
+ /* Handle the end of POST/PUT, and all GET requests */
soup_server_add_handler (server, NULL,
(SoupServerCallback)handle_request_cb,
- ctx_table, NULL);
+ tables, NULL);
+
+ /* NOTE: When adding a new path handler, an exception must be made in
+ * got_request_headers() for that path, otherwise a 405 will be returned */
+ soup_server_add_handler (server, STP_REST_ABORT_STREAM,
+ (SoupServerCallback)handle_abort_stream_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_LIST_STREAMS,
+ (SoupServerCallback)handle_list_streams_cb,
+ tables, NULL);
+
+ /* REST API for token handling, if requested */
+ if (token_server_addrmask) {
+ soup_server_add_handler (server, STP_REST_ADD_TOKEN,
+ (SoupServerCallback)handle_add_token_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_REVOKE_TOKEN,
+ (SoupServerCallback)handle_revoke_token_cb,
+ tables, NULL);
+ soup_server_add_handler (server, STP_REST_LIST_TOKENS,
+ (SoupServerCallback)handle_list_tokens_cb,
+ tables, NULL);
+ }
+
+ /* Since chunked streaming requests send all the data in a single request,
+ * we need to handle those when we get the headers. Hence, all PUT stream
+ * handling is done here. */
g_signal_connect (server, "request-started",
- G_CALLBACK (request_started_cb), ctx_table);
+ G_CALLBACK (request_started_cb), tables);
+
g_unix_signal_add (SIGINT, (GSourceFunc)exit_on_signal_cb, server);
soup_server_run (server);