/* * vim: set sts=2 sw=2 et : * * License: LGPL-2.1+ * Copyright (c) 2014 Nirbheek Chauhan * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include "lib.h" static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx); gboolean invoke_g_hash_table_remove (TranscodeServerCtx *ctx) { g_hash_table_remove (ctx->parent_ctx_table, ctx->path); return G_SOURCE_REMOVE; } gboolean invoke_g_free_client_context (TranscodeClientCtx *ctx) { g_free (ctx); return G_SOURCE_REMOVE; } gboolean stp_on_gst_bus_message (GstBus *bus, GstMessage *msg, TranscodeServerCtx *ctx) { GError *error = NULL; char *tmp = NULL; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_ERROR: gst_message_parse_error (msg, &error, &tmp); g_critical ("ERROR from element %s: %s", GST_OBJECT_NAME (msg->src), error->message); g_critical ("Debug info: %s", tmp); g_error_free (error); g_free (tmp); /* Setting the server response will only work if the request * hasn't already finished, so we check that */ if (ctx->status == STP_STATUS_STREAMING && ctx->encoding != SOUP_ENCODING_CONTENT_LENGTH) soup_message_set_status (ctx->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); /* Cleanup in the default main context, * because GHashTable is not thread-safe */ g_main_context_invoke_full (NULL, G_PRIORITY_LOW, (GSourceFunc)invoke_g_hash_table_remove, ctx, NULL); break; case GST_MESSAGE_EOS: g_debug ("End of file"); /* Cleanup in the default main context, * because GHashTable is not thread-safe */ g_main_context_invoke_full (NULL, G_PRIORITY_LOW, (GSourceFunc)invoke_g_hash_table_remove, ctx, NULL); break; default: //stp_print_status ("%s\n", gst_message_type_get_name (msg->type)); break; } return TRUE; } /* When the incoming stream reaches EOS, we call this * which initiates a shutdown for all clients and then * the server itself */ void stp_cleanup_transcode_server_ctx (TranscodeServerCtx *ctx) { g_debug (">>> Doing server cleanup"); /* Disconnect and free the context for each client */ g_list_foreach (ctx->clients, (GFunc)stp_disconnect_cleanup_client, NULL); g_list_free (ctx->clients); /* Cleanup gstreamer pipeline */ if (ctx->pipeline) { gst_element_set_state (ctx->pipeline, GST_STATE_NULL); gst_object_unref (ctx->pipeline); } g_free (ctx); } static GstPadProbeReturn pad_blocked_cleanup_cb (GstPad *srcpad, GstPadProbeInfo *info, TranscodeClientCtx *ctx) { GstElement *tee; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); stp_print_status ("."); /* Remove the probe */ gst_pad_remove_probe (srcpad, GST_PAD_PROBE_INFO_ID (info)); tee = gst_pad_get_parent_element (srcpad); gst_pad_unlink (srcpad, ctx->ghostsinkpad); gst_element_release_request_pad (tee, srcpad); gst_object_unref (srcpad); gst_object_unref (tee); gst_element_set_state (sinkbin, GST_STATE_NULL); gst_bin_remove (GST_BIN (ctx->server_ctx->pipeline), sinkbin); gst_object_unref (sinkbin); stp_print_status (" Client cleanup done!\n"); return GST_PAD_PROBE_OK; } /* When a client leaves or is kicked, we set the response status * and then call cleanup here. This involves removing the appsink * branch for the client from the pipeline, and then freeing the * context. */ void stp_cleanup_transcode_client_ctx (TranscodeClientCtx *ctx) { GstPad *srcpad; TranscodeServerCtx *server_ctx = ctx->server_ctx; GstElement *sinkbin = GST_ELEMENT (gst_element_get_parent (ctx->appsink)); stp_print_status (">>> Doing client cleanup."); g_source_remove (ctx->timeout_handler_id); ctx->timeout_handler_id = 0; server_ctx->clients = g_list_remove (server_ctx->clients, ctx); /* Block sinkpad and srcpad, then unlink and remove */ srcpad = gst_pad_get_peer (ctx->ghostsinkpad); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK, (GstPadProbeCallback) pad_blocked_cleanup_cb, ctx, (GDestroyNotify)g_free); gst_object_unref (sinkbin); stp_print_status ("."); } /* When shutting down a client due to a server shutdown, * we just need to remove the client timeout handler and * free the context. The rest is owned by the pipeline. */ static void stp_disconnect_cleanup_client (TranscodeClientCtx *ctx) { stp_print_status (">>> Disconnecting client on server shutdown\n"); /* FIXME: This isn't actually setting the status for client * connections, and clients are just left hanging */ soup_message_set_status (ctx->msg, SOUP_STATUS_GONE); soup_message_body_complete (ctx->msg->response_body); g_source_remove (ctx->timeout_handler_id); ctx->timeout_handler_id = 0; /* Don't call the "finished" handler and do a double-free */ g_signal_handler_disconnect (ctx->msg, ctx->finished_handler_id); /* There might still be functions which use the ctx waiting * to be invoked on the next main context dispatch, so we * free this at the end of all those dispatches. */ g_main_context_invoke_full (NULL, G_PRIORITY_LOW, (GSourceFunc)invoke_g_free_client_context, ctx, NULL); } /* Returns a copy of the streamheader GstBuffer */ GstBuffer* stp_get_streamheader_from_caps (GstCaps *caps) { GstStructure *s; const GValue *array, *value; s = gst_caps_get_structure (caps, 0); array = gst_structure_get_value (s, "streamheader"); g_return_val_if_fail (GST_VALUE_HOLDS_ARRAY (array), NULL); value = gst_value_array_get_value (array, 0); g_return_val_if_fail (G_VALUE_TYPE (value) == GST_TYPE_BUFFER, NULL); return gst_buffer_copy (g_value_peek_pointer (value)); } /* Returns a GstBuffer which has consumed the passed-in data */ GstBuffer* stp_get_gst_buffer (SoupBuffer *chunk) { gsize len; const guint8 *d; SoupBuffer *copy; GstBuffer *buffer; GstMemory *memory; copy = soup_buffer_copy (chunk); soup_buffer_get_data (copy, &d, &len); buffer = gst_buffer_new (); memory = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (guint8*)d, len, 0, len, copy, (GDestroyNotify)soup_buffer_free); gst_buffer_append_memory (buffer, memory); return buffer; }