diff options
-rw-r--r-- | src/main.c | 126 |
1 files changed, 56 insertions, 70 deletions
@@ -60,59 +60,6 @@ get_server_ctx_from_msg (SoupMessage *msg, return ctx; } -static GstPadProbeReturn -tee_src_pad_blocked_cb (GstPad *srcpad, - GstPadProbeInfo *info, - TranscodeClientCtx *ctx) -{ - GstCaps *caps; - GstBuffer *buffer; - GstState state; - GstStateChangeReturn ret; - TranscodeServerCtx *server_ctx = ctx->server_ctx; - GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); - - /* Remove the probe, XXX: hence unblocking the pipeline? */ - gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); - - gst_bin_add (GST_BIN (server_ctx->pipeline), sinkbin); - - /* t. ! queue ! appsink */ - gst_pad_link (srcpad, ctx->ghostsinkpad); - - /* Send the WebM stream header through the pipeline first */ - caps = gst_pad_get_current_caps (srcpad); - /* TODO: cache keyframes */ - buffer = stp_get_streamheader_from_caps (caps); - if (!buffer) - goto err; - gst_pad_push (srcpad, buffer); - - ret = gst_element_get_state (server_ctx->pipeline, &state, NULL, 1); - g_print ("Linked pads, removed probe. State was %s:%s.\n", - gst_element_state_change_return_get_name (ret), - gst_element_state_get_name (state)); - - if (gst_element_set_state (server_ctx->pipeline, GST_STATE_PLAYING) == - GST_STATE_CHANGE_FAILURE) { - g_critical ("Unable to set pipeline back to PLAYING\n"); - goto err; - } else { - g_print ("pipeline set to PLAYING successfully\n"); - } - -out: - gst_caps_unref (caps); - gst_object_unref (sinkbin); - return GST_PAD_PROBE_OK; -err: - soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - soup_message_body_complete (ctx->msg->response_body); - stp_cleanup_transcode_client_ctx (ctx); - /* FIXME: PROBE_OK for errors too? What to do here? */ - goto out; -} - /* If it's been more than 10 seconds since the last time we got * a chunk for a PUT request, we timeout and drop the connection */ static gboolean @@ -277,11 +224,15 @@ GET: { /* A GET request was received. We connect from the pipeline to the * client requesting the stream and start writing the response. */ + GstCaps *caps; + GstState state; + GstBuffer *buffer; + GstFlowReturn ret; + GstStateChangeReturn state_change; GstElement *bin, *tee, *q2, *appsink; GstPadTemplate *template; GstPad *srcpad, *sinkpadq2; TranscodeClientCtx *client_ctx; - GstStateChangeReturn state; server_ctx = get_server_ctx_from_msg (msg, ctx_table); if (server_ctx == NULL || @@ -292,14 +243,16 @@ GET: } /* Check if the pipeline successfully started PLAYING */ - state = gst_element_get_state (server_ctx->pipeline, - NULL, NULL, 100*GST_MSECOND); - switch (state) { + state_change = gst_element_get_state (server_ctx->pipeline, + NULL, NULL, 100*GST_MSECOND); + switch (state_change) { case GST_STATE_CHANGE_SUCCESS: break; case GST_STATE_CHANGE_FAILURE: /* PUT stream should've */ g_critical ("GET request, but state change failure?"); + soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + return; case GST_STATE_CHANGE_ASYNC: soup_message_set_status (msg, SOUP_STATUS_SERVICE_UNAVAILABLE); return; @@ -329,25 +282,49 @@ GET: gst_element_add_pad (bin, client_ctx->ghostsinkpad); gst_object_unref (sinkpadq2); - /* Set to PAUSED so the bin can pre-roll */ - if (gst_element_set_state (bin, GST_STATE_PAUSED) == - GST_STATE_CHANGE_FAILURE) { - g_critical ("Unable to set appsink to PAUSED\n"); - soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - stp_cleanup_transcode_client_ctx (client_ctx); - } else { - g_print ("sinkbin set to PAUSED successfully\n"); - } - /* Request a new src pad from the tee in the pipeline */ tee = gst_bin_get_by_name (GST_BIN (server_ctx->pipeline), "tee"); template = gst_pad_template_new ("teesrc", GST_PAD_SRC, GST_PAD_REQUEST, GST_CAPS_ANY); srcpad = gst_element_request_pad (tee, template, NULL, GST_CAPS_ANY); - gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, - (GstPadProbeCallback) tee_src_pad_blocked_cb, - client_ctx, NULL); + /* Add to pipeline and sync state */ + gst_bin_add (GST_BIN (server_ctx->pipeline), bin); + + if (!gst_element_sync_state_with_parent (bin)) { + g_critical ("Unable to sync appsink bin with parent pipeline\n"); + return; + } + + g_print ("appsink bin state synced successfully\n"); + + /* t. ! queue ! appsink */ + gst_pad_link (srcpad, client_ctx->ghostsinkpad); + + /* Send the WebM stream header through the src pad first */ + caps = gst_pad_get_current_caps (srcpad); + + /* TODO: cache keyframes */ + buffer = stp_get_streamheader_from_caps (caps); + if (!buffer) { + g_printerr ("Unable to get streamheader from caps\n"); + goto nostreamheader; + } + + ret = gst_pad_push (srcpad, buffer); + if (ret != GST_FLOW_OK) { + g_printerr ("Unable to push buffer: %s\n", + gst_flow_get_name (ret)); + goto err; + } + + state_change = gst_element_get_state (server_ctx->pipeline, + &state, NULL, 1); + g_print ("Pushed buffer. State was %s:%s.\n", + gst_element_state_change_return_get_name (state_change), + gst_element_state_get_name (state)); + +nostreamheader: gst_object_unref (template); gst_object_unref (srcpad); gst_object_unref (tee); @@ -371,6 +348,15 @@ GET: * automatically pause the message for us, and the response * won't be completed till we call soup_message_body_complete() */ soup_message_set_status (msg, SOUP_STATUS_OK); + +out: + gst_caps_unref (caps); + return; +err: + soup_message_set_status (client_ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + soup_message_body_complete (client_ctx->msg->response_body); + stp_cleanup_transcode_client_ctx (client_ctx); + goto out; } } |