/* License: LGPL-2.1 * vim: set sts=2 sw=2 et : */ #include "lib.h" static void stp_shutdown_client (TranscodeClientCtx *ctx); gboolean stp_on_gst_bus_message (GstBus *bus, GstMessage *msg, TranscodeServerCtx *ctx) { GError *error = NULL; char *tmp = NULL; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_ERROR: gst_message_parse_error (msg, &error, &tmp); g_printerr ("ERROR from element %s: %s\n", GST_OBJECT_NAME (msg->src), error->message); g_printerr ("Debug info: %s\n", tmp); g_error_free (error); g_free (tmp); /* Setting the server response will only work if the request * hasn't already finished, so we check that */ if (!ctx->request_finished) soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); break; case GST_MESSAGE_EOS: g_print ("End of file\n"); g_hash_table_remove (ctx->parent_ctx_table, ctx->path); break; default: //g_print ("%s\n", gst_message_type_get_name (msg->type)); break; } return TRUE; } /* When the incoming stream reaches EOS, we call this * which initiates a shutdown for all clients and then * the server itself */ void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) { g_print (">>> Doing server cleanup\n"); g_list_foreach (ctx->clients, (GFunc)stp_shutdown_client, NULL); g_list_free (ctx->clients); /* Cleanup gstreamer pipeline */ gst_element_set_state (ctx->pipeline, GST_STATE_NULL); gst_object_unref (ctx->pipeline); ctx->pipeline_is_playing = FALSE; g_free (ctx); } static GstPadProbeReturn pad_blocked_cleanup_cb (GstPad *srcpad, GstPadProbeInfo *info, TranscodeClientCtx *ctx) { GstElement *tee; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); g_print ("."); /* Remove the probe */ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); tee = gst_pad_get_parent_element (srcpad); gst_pad_unlink (srcpad, ctx->ghostsinkpad); gst_element_remove_pad (tee, srcpad); gst_object_unref (srcpad); gst_object_unref (tee); gst_element_set_state (sinkbin, GST_STATE_NULL); gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin); gst_object_unref (sinkbin); gst_object_unref (ctx->appsink); g_free (ctx); g_print (" Client cleanup done!\n"); return GST_PAD_PROBE_OK; } /* When a client leaves or is kicked, we set the response status * and then call cleanup here. This involves removing the appsink * branch for the client from the pipeline, and then freeing the * context. */ void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) { GstPad *srcpad; TranscodeServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); g_print (">>> Doing client cleanup."); g_source_remove (ctx->timeout_handler_id); server_ctx->clients = g_list_remove (server_ctx->clients, ctx); /* Block sinkpad and srcpad, then unlink and remove */ srcpad = gst_pad_get_peer (ctx->ghostsinkpad); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) pad_blocked_cleanup_cb, ctx, NULL); gst_object_unref (sinkbin); g_print ("."); } /* When shutting down a client due to a server shutdown, we * don't need to bother with removing the appsink branch of * the pipeline, and the response is set by the "eos" callback * on the appsink, so we just free the context */ static void stp_shutdown_client (TranscodeClientCtx *ctx) { TranscodeServerCtx *server_ctx = ctx->server_ctx; g_print (">>> Doing client cleanup on server shutdown\n"); g_source_remove (ctx->timeout_handler_id); server_ctx->clients = g_list_remove (server_ctx->clients, ctx); gst_object_unref (ctx->appsink); g_free (ctx); } /* Returns a copy of the streamheader GstBuffer */ GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps) { GArray *array; GstStructure *s; const GValue *value; s = gst_caps_get_structure (caps, 0); value = gst_structure_get_value (s, "streamheader"); g_return_val_if_fail (G_IS_VALUE (value), NULL); array = g_value_peek_pointer (value); value = &g_array_index (array, GValue, 0); g_return_val_if_fail (G_VALUE_TYPE (value) == GST_TYPE_BUFFER, NULL); return gst_buffer_copy (g_value_peek_pointer (value)); } /* Returns a GstBuffer which has consumed the passed-in data */ GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk) { gsize len; const guint8 *d; SoupBuffer *copy; GstBuffer *buffer; GstMemory *memory; copy = soup_buffer_copy (chunk); soup_buffer_get_data (copy, &d, &len); buffer = gst_buffer_new (); memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (guint8*)d, len, 0, len, copy, (GDestroyNotify)soup_buffer_free); gst_buffer_append_memory (buffer, memory); return buffer; }