From 3839e9836d2e4740efd9610a1aa50dd792e78a59 Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Wed, 9 Jul 2014 21:38:32 +0530 Subject: appsink: Use a GMutex to control when we pull samples We can only pull a new sample and write it out if the previous one has been written, otherwise we get a segfault in SoupMessage. --- src/lib.h | 1 + src/main.c | 62 ++++++++++++++++++++++++++++---------------------------------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/src/lib.h b/src/lib.h index 733212c..14ec096 100644 --- a/src/lib.h +++ b/src/lib.h @@ -34,6 +34,7 @@ struct _TranscodeClientCtx { SoupMessage *msg; GstElement *appsink; GstPad *ghostsinkpad; + GMutex can_write_chunk; gulong first_sample_handler_id; /* The transcode server context */ TranscodeServerCtx *server_ctx; diff --git a/src/main.c b/src/main.c index 1a710ed..7834c53 100644 --- a/src/main.c +++ b/src/main.c @@ -105,14 +105,29 @@ err: } static void -write_next_client_chunk_cb (SoupMessage *msg, - TranscodeClientCtx *ctx) +can_write_next_client_chunk_cb (SoupMessage *msg, + TranscodeClientCtx *ctx) +{ + g_mutex_unlock (&ctx->can_write_chunk); +} + +static GstFlowReturn +write_client_chunk_cb (GstElement *appsink, + TranscodeClientCtx *ctx) { GstMapInfo info; GstSample *sample; GstBuffer *buffer; GstMemory *memory; gboolean eos; + + if (!g_mutex_trylock (&ctx->can_write_chunk)) + /* We cannot safely append to the message body till + * the previous chunk is written out, otherwise we get + * a segfault. This is likely because SoupMessageBody + * uses a GSList for the chunks, which isn't MT-safe. */ + return GST_FLOW_OK; + g_print ("*"); g_signal_emit_by_name (ctx->appsink, "pull-sample", &sample); @@ -120,39 +135,26 @@ write_next_client_chunk_cb (SoupMessage *msg, g_print ("Null sample, ending stream\n"); g_object_get (ctx->appsink, "eos", &eos, NULL); if (eos) - soup_message_set_status (msg, SOUP_STATUS_OK); + soup_message_set_status (ctx->msg, SOUP_STATUS_OK); else - soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); - soup_message_body_complete (msg->response_body); - return; + soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); + soup_message_body_complete (ctx->msg->response_body); + return GST_FLOW_OK; } buffer = gst_sample_get_buffer (sample); memory = gst_buffer_get_all_memory (buffer); /* copy */ gst_memory_map (memory, &info, GST_MAP_READ); - soup_message_body_append (msg->response_body, SOUP_MEMORY_COPY, + soup_message_body_append (ctx->msg->response_body, SOUP_MEMORY_COPY, info.data, info.size); /* copy */ gst_memory_unmap (memory, &info); gst_memory_unref (memory); gst_sample_unref (sample); - soup_server_unpause_message (server, msg); + soup_server_unpause_message (server, ctx->msg); g_print ("."); - return; -} - -static GstFlowReturn -write_first_client_chunk_cb (GstElement *appsink, - TranscodeClientCtx *ctx) -{ - g_assert (ctx->first_sample_handler_id); - - //g_signal_handler_disconnect (appsink, ctx->first_sample_handler_id); - //ctx->first_sample_handler_id = 0; - - write_next_client_chunk_cb (ctx->msg, ctx); return GST_FLOW_OK; } @@ -253,6 +255,7 @@ GET: client_ctx = g_new0 (TranscodeClientCtx, 1); client_ctx->msg = msg; client_ctx->server_ctx = server_ctx; + g_mutex_init (&client_ctx->can_write_chunk); bin = gst_bin_new (NULL); @@ -296,9 +299,9 @@ GET: client_ctx->first_sample_handler_id = g_signal_connect (appsink, "new-sample", - G_CALLBACK (write_first_client_chunk_cb), client_ctx); - /*g_signal_connect (client_ctx->msg, "wrote-chunk", - G_CALLBACK (write_next_client_chunk_cb), client_ctx);*/ + G_CALLBACK (write_client_chunk_cb), client_ctx); + g_signal_connect (client_ctx->msg, "wrote-chunk", + G_CALLBACK (can_write_next_client_chunk_cb), client_ctx); g_signal_connect (client_ctx->msg, "finished", G_CALLBACK (client_finished_cb), client_ctx); @@ -479,16 +482,7 @@ request_read_cb (SoupServer *server, PUT: { - gboolean ret; - TranscodeServerCtx *ctx; - - ctx = get_server_ctx_from_msg (msg, ctx_table); - g_return_if_fail (ctx); - - /* Incoming stream has ended */ - g_signal_emit_by_name (ctx->appsrc, "end-of-stream", &ret); - if (!ret) - g_printerr ("Unable to emit end-of-stream\n"); + /* Everything is handled in the "finished" signal. Nothing to do. */ } GET: -- cgit v0.11.2-2-gd1dd