/* License: LGPL-2.1 * vim: set sts=2 sw=2 et : */ #include "lib.h" #include "encode.h" #ifdef DEBUG #include "debug/local-play.h" #endif static int port = 8000; static GOptionEntry entries[] = { { "port", 'p', 0, G_OPTION_ARG_INT, &port, "Port to listen on (default: 8000)", "PORT" }, { NULL } }; static SoupServer *server; static TranscodeServerCtx* get_server_ctx_from_msg (SoupMessage *msg, GHashTable *ctx_table) { SoupURI *uri; TranscodeServerCtx *ctx; g_object_get (msg, "uri", &uri, NULL); ctx = g_hash_table_lookup (ctx_table, soup_uri_get_path (uri)); soup_uri_free (uri); return ctx; } static void enough_data_pause_message (GstElement *appsrc, guint size, TranscodeServerCtx *ctx) { /* Pausing the request can cause weirdness if * the recv rate is too high. So, we don't do this. */ //g_print ("Enough data, pause. "); //soup_server_pause_message (server, ctx->msg); } static void need_data_unpause_message (GstElement *appsrc, guint size, TranscodeServerCtx *ctx) { //g_print ("Need data, unpause. \n"); //soup_server_unpause_message (server, ctx->msg); } static GstPadProbeReturn tee_src_pad_blocked_cb (GstPad *srcpad, GstPadProbeInfo *info, TranscodeClientCtx *ctx) { GstCaps *caps; GstBuffer *buffer; GstState state; GstStateChangeReturn ret; TranscodeServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); /* Remove the probe, XXX: hence unblocking the pipeline? */ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin); /* t. ! queue ! appsink */ gst_pad_link (srcpad, ctx->ghostsinkpad); /* Send the WebM stream header through the pipeline first */ caps = gst_pad_get_current_caps (srcpad); /* TODO: cache keyframes */ buffer = stp_get_streamheader_from_caps (caps); if (!buffer) goto err; gst_pad_push (srcpad, buffer); ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1); g_print ("Linked pads, removed probe. State was %s:%s.\n", gst_element_state_change_return_get_name (ret), gst_element_state_get_name (state)); if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline back to PLAYING\n"); goto err; } else { g_print ("pipeline set to PLAYING successfully\n"); } out: gst_caps_unref (caps); gst_object_unref (sinkbin); return GST_PAD_PROBE_OK; err: soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_client_ctx (ctx); /* FIXME: PROBE_OK for errors too? What to do here? */ goto out; } static void write_next_client_chunk_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { GstMapInfo info; GstSample *sample; GstBuffer *buffer; GstMemory *memory; gboolean eos; g_print ("*"); g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); if (!sample) { g_print ("Null sample, ending stream\n"); g_object_get (ctx->appsink, "eos", &eos, NULL); if (eos) soup_message_set_status (msg, SOUP_STATUS_OK); else soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_message_body_complete (msg->response_body); return; } buffer = gst_sample_get_buffer (sample); memory = gst_buffer_get_all_memory (buffer); /* copy */ gst_memory_map (memory, &info, GST_MAP_READ); soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY, info.data, info.size); /* copy */ gst_memory_unmap (memory, &info); gst_memory_unref (memory); gst_sample_unref (sample); soup_server_unpause_message (server, msg); g_print ("."); return; } static GstFlowReturn write_first_client_chunk_cb (GstElement *appsink, TranscodeClientCtx *ctx) { g_assert (ctx->first_sample_handler_id); //g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id); //ctx->first_sample_handler_id = 0; write_next_client_chunk_cb (ctx->msg, ctx); return GST_FLOW_OK; } static void client_finished_cb (SoupMessage *msg, TranscodeClientCtx *ctx) { g_print ("Client finished/aborted, doing cleanup...\n"); stp_cleanup_transcode_client_ctx (ctx); } static void stream_finished_cb (SoupMessage *msg, TranscodeServerCtx *ctx) { gboolean ret; g_print ("Stream finished/aborted, sending EOS...\n"); ctx->request_finished = TRUE; /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (!ret) g_printerr ("Unable to emit end-of-stream after an aborted stream\n"); } static void handle_request_cb (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, GHashTable *ctx_table) { TranscodeServerCtx *server_ctx; if (!msg) return; g_print ("Handling %s request on path %s\n", msg->method, path); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* The PUT request has finished streaming. Cleanup * will be done on EOS/ERROR in the gst pipeline. */ server_ctx = get_server_ctx_from_msg (msg, ctx_table); server_ctx->request_finished = TRUE; soup_message_set_status (server_ctx->msg, SOUP_STATUS_OK); return; } GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ GstElement *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; TranscodeClientCtx *client_ctx; GstStateChangeReturn state; server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx == NULL || server_ctx->pipeline == NULL) { /* There's no request streaming on this path (yet) */ soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); return; } /* Check if the pipeline successfully started PLAYING */ state = gst_element_get_state (server_ctx->pipeline, NULL, NULL, 100*GST_MSECOND); switch (state) { case GST_STATE_CHANGE_SUCCESS: break; case GST_STATE_CHANGE_FAILURE: /* PUT stream should've */ g_critical ("GET request, but state change failure?"); case GST_STATE_CHANGE_ASYNC: soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); return; default: g_assert_not_reached (); } /* Connect appsink to tee, and start streaming */ client_ctx = g_new0 (TranscodeClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; bin = gst_bin_new (NULL); /* queue ! appsink */ q2 = gst_element_factory_make ("queue", NULL); appsink = gst_element_factory_make ("appsink", NULL); g_object_set (appsink, "drop", FALSE, "emit-signals", TRUE, "max-buffers", 0, NULL); gst_bin_add_many (GST_BIN (bin), q2, appsink, NULL); gst_element_link (q2, appsink); sinkpadq2 = gst_element_get_static_pad (q2, "sink"); client_ctx->ghostsinkpad = gst_ghost_pad_new (NULL, sinkpadq2); gst_element_add_pad (bin, client_ctx->ghostsinkpad); gst_object_unref (sinkpadq2); /* Set to PAUSED so the bin can pre-roll */ if (gst_element_set_state (bin, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set appsink to PAUSED\n"); soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_client_ctx (client_ctx); } else { g_print ("sinkbin set to PAUSED successfully\n"); } /* Request a new src pad from the tee in the pipeline */ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) tee_src_pad_blocked_cb, client_ctx, NULL); gst_object_unref (srcpad); gst_object_ref (appsink); client_ctx->appsink = appsink; client_ctx->first_sample_handler_id = g_signal_connect (appsink, "new-sample", G_CALLBACK (write_first_client_chunk_cb), client_ctx); /*g_signal_connect (client_ctx->msg, "wrote-chunk", G_CALLBACK (write_next_client_chunk_cb), client_ctx);*/ g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); server_ctx->clients = g_list_prepend (server_ctx->clients, client_ctx); /* Since we set the response as chunked, the server will * automatically pause the message for us, and the response * won't be completed till we call soup_message_body_complete() */ soup_message_set_status (msg, SOUP_STATUS_OK); } } static void got_request_body_chunk (SoupMessage *msg, SoupBuffer *chunk, TranscodeServerCtx *ctx) { GstBuffer *buffer; GstFlowReturn ret; /* XXX: Unfortunately, there doesn't seem to be a way to cancel the * reception of further chunks when we've prematurely set the response * (say due to an error), so we check if the status code is set and skip * all further chunks */ if (msg->status_code != 0) return; /* We need to update this every chunk */ ctx->msg = msg; /* Actually just a ref */ buffer = stp_get_gst_buffer (chunk); g_signal_emit_by_name (ctx->appsrc, "push-buffer", buffer, &ret); gst_buffer_unref (buffer); if (ret != GST_FLOW_OK) { g_critical ("Unable to push buffer\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_server_ctx (ctx); return; } /* We can only set the pipeline to playing when * the first chunk has been received */ if (!ctx->pipeline_is_playing) { if (gst_element_set_state (ctx->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { g_critical ("Unable to set pipeline to PLAYING\n"); soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); stp_cleanup_transcode_server_ctx (ctx); } else { g_print ("Set pipeline to PLAYING\n"); ctx->pipeline_is_playing = TRUE; } } } static void got_request_headers (SoupMessage *msg, GHashTable *ctx_table) { TranscodeServerCtx *ctx; SoupURI *uri; #ifdef DEBUG SoupMessageHeadersIter iter; const char *name, *value; soup_message_headers_iter_init (&iter, msg->request_headers); while (soup_message_headers_iter_next (&iter, &name, &value)) g_print ("%s: %s\n", name, value); #endif g_object_get (msg, "uri", &uri, NULL); g_print ("%s on uri %s\n", msg->method, soup_uri_get_path (uri)); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else { /* Nothing to do; server doesn't implement this method */ soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED); goto out; } PUT: { if (g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { /* There's already a request streaming on this path */ g_print ("Recv duplicate request on the same URI: %s\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_CONFLICT); goto out; } ctx = g_new0 (TranscodeServerCtx, 1); ctx->msg = msg; ctx->parent_ctx_table = ctx_table; ctx->path = g_strdup (soup_uri_get_path (uri)); g_hash_table_insert (ctx_table, ctx->path, ctx); /* We only do this when we receive the first chunk */ if (!ctx->pipeline) { ctx->pipeline = gst_pipeline_new ("pipe"); /* The chunked request is copied into this stream * for consumption by the gst pipeline */ #ifdef DEBUG stp_play_from_msg (ctx); #else stp_encode_from_msg (ctx); #endif g_signal_connect (ctx->appsrc, "need-data", G_CALLBACK (need_data_unpause_message), ctx); g_signal_connect (ctx->appsrc, "enough-data", G_CALLBACK (enough_data_pause_message), ctx); } /* The request body isn't fixed length, so tell libsoup to * not collect chunks for forming a complete request body */ soup_message_body_set_accumulate (msg->request_body, FALSE); g_signal_connect (msg, "got-chunk", G_CALLBACK (got_request_body_chunk), ctx); /* This will also be called when the client sending * the PUT request disconnects prematurely */ g_signal_connect (msg, "finished", G_CALLBACK (stream_finished_cb), ctx); goto out; } GET: { if (!g_hash_table_contains (ctx_table, soup_uri_get_path (uri))) { g_print ("No stream on URI: %s\n", soup_uri_get_path (uri)); soup_message_set_status (msg, SOUP_STATUS_NOT_FOUND); goto out; } /* Our response will be chunked, and not with a fixed Content-Length */ soup_message_headers_set_encoding (msg->response_headers, SOUP_ENCODING_CHUNKED); /* The request body isn't fixed length, so tell libsoup to * not collect chunks for forming a complete request body */ soup_message_body_set_accumulate (msg->response_body, FALSE); } out: soup_uri_free (uri); return; } static void request_started_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("New %s request started\n", msg->method); /* SoupMessage is useful only once we have headers, * so we do all initialization there */ g_signal_connect (msg, "got-headers", G_CALLBACK (got_request_headers), ctx_table); } static void request_read_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { gboolean ret; TranscodeServerCtx *ctx; ctx = get_server_ctx_from_msg (msg, ctx_table); g_return_if_fail (ctx); /* Incoming stream has ended */ g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); if (!ret) g_printerr ("Unable to emit end-of-stream\n"); } GET: { /* Nothing to do */ } g_print ("%s request read successfully\n", msg->method); } static void request_finished_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("%s request ended\n", msg->method); } static void request_aborted_cb (SoupServer *server, SoupMessage *msg, SoupClientContext *client, GHashTable *ctx_table) { g_print ("%s request aborted!\n", msg->method); if (msg->method == SOUP_METHOD_PUT || msg->method == SOUP_METHOD_POST) goto PUT; else if (msg->method == SOUP_METHOD_GET) goto GET; else /* We return NOT_IMPLEMENTED for this * when we get the request headers */ g_assert_not_reached (); PUT: { /* FIXME: Is there anything to do here? */ } GET: { /* We do all cleanup in the "finished" msg signal, nothing to do here */ } } int main (int argc, char *argv[]) { GHashTable *ctx_table; GOptionContext *optctx; GError *error = NULL; gst_init (&argc, &argv); optctx = g_option_context_new ("- Soup Transcoding Proxy"); g_option_context_add_main_entries (optctx, entries, NULL); g_option_context_add_group (optctx, gst_init_get_option_group ()); if (!g_option_context_parse (optctx, &argc, &argv, &error)) { g_printerr ("Error parsing options: %s\n", error->message); return 1; } g_option_context_free (optctx); /* Keys are paths, and values are TranscodeServerCtxs */ ctx_table = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, (GDestroyNotify)stp_cleanup_transcode_server_ctx); /* TODO: Accept command-line argument for host */ server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "soup-transcode-proxy ", SOUP_SERVER_PORT, port, NULL); soup_server_add_handler (server, NULL, (SoupServerCallback)handle_request_cb, ctx_table, NULL); g_signal_connect (server, "request-started", G_CALLBACK (request_started_cb), ctx_table); g_signal_connect (server, "request-read", G_CALLBACK (request_read_cb), ctx_table); g_signal_connect (server, "request-finished", G_CALLBACK (request_finished_cb), ctx_table); g_signal_connect (server, "request-aborted", G_CALLBACK (request_aborted_cb), ctx_table); soup_server_run (server); return 0; }