/* License: LGPL-2.1 * vim: set sts=2 sw=2 et : */ #include "lib.h" gboolean stp_on_gst_bus_message (GstBus *bus, GstMessage *msg, TranscodeServerCtx *ctx) { GError *error = NULL; char *tmp = NULL; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_ERROR: gst_message_parse_error (msg, &error, &tmp); g_printerr ("ERROR from element %s: %s\n", GST_OBJECT_NAME (msg->src), error->message); g_printerr ("Debug info: %s\n", tmp); g_error_free (error); g_free (tmp); /* Setting the server response will only work if the request * hasn't already finished, so we check that */ if (!ctx->request_finished) soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); break; case GST_MESSAGE_EOS: g_print ("End of file\n"); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); break; default: //g_print ("%s\n", gst_message_type_get_name (msg->type)); break; } return TRUE; } void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) { g_print (">>> Doing server cleanup\n"); /* Closing the stream should send a HUP on the fd and * trigger cleanup in write_get_response_body_chunk() */ g_list_foreach (ctx->clients, (GFunc)stp_close_client_ctx, NULL); g_list_free (ctx->clients); /* Cleanup gstreamer pipeline */ gst_element_set_state (ctx->pipeline, GST_STATE_NULL); gst_object_unref (ctx->pipeline); ctx->pipeline_is_playing = FALSE; g_free (ctx); } static GstPadProbeReturn pad_blocked_cleanup_cb (GstPad *srcpad, GstPadProbeInfo *info, TranscodeClientCtx *ctx) { GstPad *sinkpad; GstElement *tee; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); g_print ("."); /* Remove the probe, XXX: hence unblocking the pipeline? */ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); tee = gst_pad_get_parent_element (srcpad); sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); gst_pad_unlink (srcpad, sinkpad); gst_element_remove_pad (tee, srcpad); gst_object_unref (sinkpad); gst_object_unref (tee); gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin); gst_object_unref (sinkbin); gst_object_unref (ctx->appsink); g_free (ctx); g_print (" Client cleanup done!\n"); return GST_PAD_PROBE_OK; } void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) { GstPad *sinkpad, *srcpad; TranscodeServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); g_print (">>> Doing client cleanup."); /* If we're cleaning up because the server is shutting down, * this will be NULL */ if (server_ctx) server_ctx->clients = g_list_remove (server_ctx->clients, ctx); /* Block sinkpad and srcpad, then unlink and remove */ sinkpad = gst_element_get_static_pad (sinkbin, "qsink"); srcpad = gst_pad_get_peer (sinkpad); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) pad_blocked_cleanup_cb, ctx, NULL); gst_object_unref (sinkbin); g_print ("."); } void stp_close_client_ctx (TranscodeClientCtx *ctx) { g_print (">>> Doing client cleanup\n"); ctx->server_ctx = NULL; g_free (ctx); } /* Returns a copy of the streamheader GstBuffer */ GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps) { GArray *array; GstStructure *s; const GValue *value; s = gst_caps_get_structure (caps, 0); value = gst_structure_get_value (s, "streamheader"); g_return_val_if_fail (G_IS_VALUE (value), NULL); array = g_value_peek_pointer (value); value = &g_array_index (array, GValue, 0); g_return_val_if_fail (G_VALUE_TYPE (value) == GST_TYPE_BUFFER, NULL); return gst_buffer_copy (g_value_peek_pointer (value)); } /* Returns a GstBuffer which has consumed the passed-in data */ GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk) { gsize len; const guint8 *d; SoupBuffer *copy; GstBuffer *buffer; GstMemory *memory; copy = soup_buffer_copy (chunk); soup_buffer_get_data (copy, &d, &len); buffer = gst_buffer_new (); memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (guint8*)d, len, 0, len, copy, (GDestroyNotify)soup_buffer_free); gst_buffer_append_memory (buffer, memory); return buffer; }