index 9c43b231fed482812ffd8cda547f5d266c44f5f6..89d2289397e770ee548a414609734870eecabf05 100644 (file)
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
* Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
* Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* gstmultiqueue.c:
*
* Boston, MA 02111-1307, USA.
*/
+/**
+ * SECTION:element-multiqueue
+ * @see_also: #GstQueue
+ *
+ * <refsect2>
+ * <para>
+ * Multiqueue is similar to a normal #GstQueue with the following additional
+ * features:
+ * <orderedlist>
+ * <listitem>
+ * <itemizedlist><title>Multiple streamhandling</title>
+ * <listitem><para>
+ * The element handles queueing data on more than one stream at once. To
+ * achieve such a feature it has request sink pads (sink%d) and
+ * 'sometimes' src pads (src%d).
+ * </para><para>
+ * When requesting a given sinkpad with gst_element_get_request_pad(),
+ * the associated srcpad for that stream will be created.
+ * Example: requesting sink1 will generate src1.
+ * </para></listitem>
+ * </itemizedlist>
+ * </listitem>
+ * <listitem>
+ * <itemizedlist><title>Non-starvation on multiple streams</title>
+ * <listitem><para>
+ * If more than one stream is used with the element, the streams' queues
+ * will be dynamically grown (up to a limit), in order to ensure that no
+ * stream is risking data starvation. This guarantees that at any given
+ * time there are at least N bytes queued and available for each individual
+ * stream.
+ * </para><para>
+ * If an EOS event comes through a srcpad, the associated queue will be
+ * considered as 'not-empty' in the queue-size-growing algorithm.
+ * </para></listitem>
+ * </itemizedlist>
+ * </listitem>
+ * <listitem>
+ * <itemizedlist><title>Non-linked srcpads graceful handling</title>
+ * <listitem><para>
+ * In order to better support dynamic switching between streams, the multiqueue
+ * (unlike the current GStreamer queue) continues to push buffers on non-linked
+ * pads rather than shutting down.
+ * </para><para>
+ * In addition, to prevent a non-linked stream from very quickly consuming all
+ * available buffers and thus 'racing ahead' of the other streams, the element
+ * must ensure that buffers and inlined events for a non-linked stream are pushed
+ * in the same order as they were received, relative to the other streams
+ * controlled by the element. This means that a buffer cannot be pushed to a
+ * non-linked pad any sooner than buffers in any other stream which were received
+ * before it.
+ * </para></listitem>
+ * </itemizedlist>
+ * </listitem>
+ * </orderedlist>
+ * </para>
+ * <para>
+ * Data is queued until one of the limits specified by the
+ * #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
+ * #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
+ * more buffers into the queue will block the pushing thread until more space
+ * becomes available. #GstMultiQueue:extra-size-buffers,
+ * </para>
+ * <para>
+ * #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
+ * currently unused.
+ * </para>
+ * <para>
+ * The default queue size limits are 5 buffers, 10MB of data, or
+ * two second worth of data, whichever is reached first. Note that the number
+ * of buffers will dynamically grow depending on the fill level of
+ * other queues.
+ * </para>
+ * <para>
+ * The #GstMultiQueue::underrun signal is emitted when all of the queues
+ * are empty. The #GstMultiQueue::overrun signal is emitted when one of the
+ * queues is filled.
+ * Both signals are emitted from the context of the streaming thread.
+ * </para>
+ * </refsect2>
+ *
+ * Last reviewed on 2008-01-25 (0.10.17)
+ */
+
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <gst/gst.h>
+#include <stdio.h>
#include "gstmultiqueue.h"
+#include <gst/glib-compat-private.h>
/**
* GstSingleQueue:
/* flowreturn of previous srcpad push */
GstFlowReturn srcresult;
+
+ /* segments */
GstSegment sink_segment;
GstSegment src_segment;
+ /* position of src/sink */
+ GstClockTime sinktime, srctime;
+ /* TRUE if either position needs to be recalculated */
+ gboolean sink_tainted, src_tainted;
+
/* queue of data */
GstDataQueue *queue;
GstDataQueueSize max_size, extra_size;
GstClockTime cur_time;
gboolean is_eos;
- gboolean inextra; /* TRUE if the queue is currently in extradata mode */
+ gboolean flushing;
/* Protected by global lock */
guint32 nextid; /* ID of the next object waiting to be pushed */
guint32 oldid; /* ID of the last object pushed (last in a series) */
+ guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
+ GstClockTime next_time; /* End running time of next buffer to be pushed */
+ GstClockTime last_time; /* Start running time of last pushed buffer */
GCond *turn; /* SingleQueue turn waiting conditional */
};
guint32 posid;
};
-static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
+static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, gint id);
static void gst_single_queue_free (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
+static void compute_high_time (GstMultiQueue * mq);
+static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
+static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
GST_PAD_SINK,
GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
#define GST_CAT_DEFAULT (multi_queue_debug)
+/* Signals and args */
+enum
+{
+ SIGNAL_UNDERRUN,
+ SIGNAL_OVERRUN,
+ LAST_SIGNAL
+};
+
/* default limits, we try to keep up to 2 seconds of data and if there is not
* time, up to 10 MB. The number of buffers is dynamically scaled to make sure
* there is data in the queues. Normally, the byte and time limits are not hit
/* second limits. When we hit one of the above limits we are probably dealing
* with a badly muxed file and we scale the limits to these emergency values.
- * This is currently not yet implemented. */
+ * This is currently not yet implemented.
+ * Since we dynamically scale the queue buffer size up to the limits but avoid
+ * going above the max-size-buffers when we can, we don't really need this
+ * aditional extra size. */
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
-/* Signals and args */
-enum
-{
- SIGNAL_UNDERRUN,
- SIGNAL_OVERRUN,
- LAST_SIGNAL
-};
+#define DEFAULT_USE_BUFFERING FALSE
+#define DEFAULT_LOW_PERCENT 10
+#define DEFAULT_HIGH_PERCENT 99
+#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
enum
{
- ARG_0,
- ARG_EXTRA_SIZE_BYTES,
- ARG_EXTRA_SIZE_BUFFERS,
- ARG_EXTRA_SIZE_TIME,
- ARG_MAX_SIZE_BYTES,
- ARG_MAX_SIZE_BUFFERS,
- ARG_MAX_SIZE_TIME,
+ PROP_0,
+ PROP_EXTRA_SIZE_BYTES,
+ PROP_EXTRA_SIZE_BUFFERS,
+ PROP_EXTRA_SIZE_TIME,
+ PROP_MAX_SIZE_BYTES,
+ PROP_MAX_SIZE_BUFFERS,
+ PROP_MAX_SIZE_TIME,
+ PROP_USE_BUFFERING,
+ PROP_LOW_PERCENT,
+ PROP_HIGH_PERCENT,
+ PROP_SYNC_BY_RUNNING_TIME,
+ PROP_LAST
};
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name);
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
+static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
+ element, GstStateChange transition);
static void gst_multi_queue_loop (GstPad * pad);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
- gobject_class->set_property =
- GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
- gobject_class->get_property =
- GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
+ gobject_class->set_property = gst_multi_queue_set_property;
+ gobject_class->get_property = gst_multi_queue_get_property;
/* SIGNALS */
+
+ /**
+ * GstMultiQueue::underrun:
+ * @multiqueue: the multqueue instance
+ *
+ * This signal is emitted from the streaming thread when there is
+ * no data in any of the queues inside the multiqueue instance (underrun).
+ *
+ * This indicates either starvation or EOS from the upstream data sources.
+ */
gst_multi_queue_signals[SIGNAL_UNDERRUN] =
g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
+ /**
+ * GstMultiQueue::overrun:
+ * @multiqueue: the multiqueue instance
+ *
+ * Reports that one of the queues in the multiqueue is full (overrun).
+ * A queue is full if the total amount of data inside it (num-buffers, time,
+ * size) is higher than the boundary values which can be set through the
+ * GObject properties.
+ *
+ * This can be used as an indicator of pre-roll.
+ */
gst_multi_queue_signals[SIGNAL_OVERRUN] =
g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
/* PROPERTIES */
- g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
"Max. amount of data in the queue (bytes, 0=disable)",
- 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
+ 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
- "Max. number of buffers in the queue (0=disable)",
- 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
+ "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
+ DEFAULT_MAX_SIZE_BUFFERS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
- "Max. amount of data in the queue (in ns, 0=disable)",
- 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
+ "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
+ g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
- "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
- 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
+ "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
+ " (NOT IMPLEMENTED)",
+ 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
- "Amount of buffers the queues can grow if one of them is empty (0=disable)",
- 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
+ "Amount of buffers the queues can grow if one of them is empty (0=disable)"
+ " (NOT IMPLEMENTED)",
+ 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
- "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
- 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
-
- gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
+ "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
+ " (NOT IMPLEMENTED)",
+ 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstMultiQueue:use-buffering
+ *
+ * Enable the buffering option in multiqueue so that BUFFERING messages are
+ * emited based on low-/high-percent thresholds.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
+ g_param_spec_boolean ("use-buffering", "Use buffering",
+ "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
+ DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstMultiQueue:low-percent
+ *
+ * Low threshold percent for buffering to start.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
+ g_param_spec_int ("low-percent", "Low percent",
+ "Low threshold for buffering to start", 0, 100,
+ DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstMultiQueue:high-percent
+ *
+ * High threshold percent for buffering to finish.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
+ g_param_spec_int ("high-percent", "High percent",
+ "High threshold for buffering to finish", 0, 100,
+ DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstMultiQueue:sync-by-running-time
+ *
+ * If enabled multiqueue will synchronize deactivated or not-linked streams
+ * to the activated and linked streams by taking the running time.
+ * Otherwise multiqueue will synchronize the deactivated or not-linked
+ * streams by keeping the order in which buffers and events arrived compared
+ * to active and linked streams.
+ *
+ * Since: 0.10.36
+ */
+ g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
+ g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
+ "Synchronize deactivated or not-linked streams by running time",
+ DEFAULT_SYNC_BY_RUNNING_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gobject_class->finalize = gst_multi_queue_finalize;
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
}
static void
mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
+ mqueue->use_buffering = DEFAULT_USE_BUFFERING;
+ mqueue->low_percent = DEFAULT_LOW_PERCENT;
+ mqueue->high_percent = DEFAULT_HIGH_PERCENT;
+
+ mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+
mqueue->counter = 1;
mqueue->highid = -1;
- mqueue->nextnotlinked = -1;
+ mqueue->high_time = GST_CLOCK_TIME_NONE;
mqueue->qlock = g_mutex_new ();
}
g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
g_list_free (mqueue->queues);
mqueue->queues = NULL;
+ mqueue->queues_cookie++;
/* free/unref instance data */
g_mutex_free (mqueue->qlock);
GstMultiQueue *mq = GST_MULTI_QUEUE (object);
switch (prop_id) {
- case ARG_MAX_SIZE_BYTES:
+ case PROP_MAX_SIZE_BYTES:
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.bytes = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, bytes);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
- case ARG_MAX_SIZE_BUFFERS:
+ case PROP_MAX_SIZE_BUFFERS:
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.visible = g_value_get_uint (value);
SET_CHILD_PROPERTY (mq, visible);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
- case ARG_MAX_SIZE_TIME:
+ case PROP_MAX_SIZE_TIME:
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->max_size.time = g_value_get_uint64 (value);
SET_CHILD_PROPERTY (mq, time);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
break;
- case ARG_EXTRA_SIZE_BYTES:
+ case PROP_EXTRA_SIZE_BYTES:
mq->extra_size.bytes = g_value_get_uint (value);
break;
- case ARG_EXTRA_SIZE_BUFFERS:
+ case PROP_EXTRA_SIZE_BUFFERS:
mq->extra_size.visible = g_value_get_uint (value);
break;
- case ARG_EXTRA_SIZE_TIME:
+ case PROP_EXTRA_SIZE_TIME:
mq->extra_size.time = g_value_get_uint64 (value);
break;
+ case PROP_USE_BUFFERING:
+ mq->use_buffering = g_value_get_boolean (value);
+ break;
+ case PROP_LOW_PERCENT:
+ mq->low_percent = g_value_get_int (value);
+ break;
+ case PROP_HIGH_PERCENT:
+ mq->high_percent = g_value_get_int (value);
+ break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ mq->sync_by_running_time = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
switch (prop_id) {
- case ARG_EXTRA_SIZE_BYTES:
+ case PROP_EXTRA_SIZE_BYTES:
g_value_set_uint (value, mq->extra_size.bytes);
break;
- case ARG_EXTRA_SIZE_BUFFERS:
+ case PROP_EXTRA_SIZE_BUFFERS:
g_value_set_uint (value, mq->extra_size.visible);
break;
- case ARG_EXTRA_SIZE_TIME:
+ case PROP_EXTRA_SIZE_TIME:
g_value_set_uint64 (value, mq->extra_size.time);
break;
- case ARG_MAX_SIZE_BYTES:
+ case PROP_MAX_SIZE_BYTES:
g_value_set_uint (value, mq->max_size.bytes);
break;
- case ARG_MAX_SIZE_BUFFERS:
+ case PROP_MAX_SIZE_BUFFERS:
g_value_set_uint (value, mq->max_size.visible);
break;
- case ARG_MAX_SIZE_TIME:
+ case PROP_MAX_SIZE_TIME:
g_value_set_uint64 (value, mq->max_size.time);
break;
+ case PROP_USE_BUFFERING:
+ g_value_set_boolean (value, mq->use_buffering);
+ break;
+ case PROP_LOW_PERCENT:
+ g_value_set_int (value, mq->low_percent);
+ break;
+ case PROP_HIGH_PERCENT:
+ g_value_set_int (value, mq->high_percent);
+ break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ g_value_set_boolean (value, mq->sync_by_running_time);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
+static GstIterator *
+gst_multi_queue_iterate_internal_links (GstPad * pad)
+{
+ GstIterator *it = NULL;
+ GstPad *opad;
+ GstSingleQueue *squeue;
+ GstMultiQueue *mq = GST_MULTI_QUEUE (gst_pad_get_parent (pad));
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ squeue = gst_pad_get_element_private (pad);
+ if (!squeue)
+ goto out;
+
+ if (squeue->sinkpad == pad)
+ opad = gst_object_ref (squeue->srcpad);
+ else if (squeue->srcpad == pad)
+ opad = gst_object_ref (squeue->sinkpad);
+ else
+ goto out;
+
+ it = gst_iterator_new_single (GST_TYPE_PAD, opad,
+ (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
+
+ gst_object_unref (opad);
+
+out:
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ gst_object_unref (mq);
+
+ return it;
+}
+
/*
* GstElement methods
{
GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
GstSingleQueue *squeue;
+ gint temp_id = -1;
- GST_LOG_OBJECT (element, "name : %s", name);
+ if (name) {
+ sscanf (name + 4, "%d", &temp_id);
+ GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
+ }
/* Create a new single queue, add the sink and source pad and return the sink pad */
- squeue = gst_single_queue_new (mqueue);
-
- GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
- mqueue->queues = g_list_append (mqueue->queues, squeue);
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ squeue = gst_single_queue_new (mqueue, temp_id);
GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
GST_DEBUG_PAD_NAME (squeue->sinkpad));
- return squeue->sinkpad;
+ return squeue ? squeue->sinkpad : NULL;
}
static void
/* remove it from the list */
mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
+ mqueue->queues_cookie++;
/* FIXME : recompute next-non-linked */
GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
gst_pad_set_active (sq->srcpad, FALSE);
gst_pad_set_active (sq->sinkpad, FALSE);
+ gst_pad_set_element_private (sq->srcpad, NULL);
+ gst_pad_set_element_private (sq->sinkpad, NULL);
gst_element_remove_pad (element, sq->srcpad);
gst_element_remove_pad (element, sq->sinkpad);
gst_single_queue_free (sq);
}
+static GstStateChangeReturn
+gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
+{
+ GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
+ GstSingleQueue *sq = NULL;
+ GstStateChangeReturn result;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ GList *tmp;
+
+ /* Set all pads to non-flushing */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ sq = (GstSingleQueue *) tmp->data;
+ sq->flushing = FALSE;
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ break;
+ }
+ case GST_STATE_CHANGE_PAUSED_TO_READY:{
+ GList *tmp;
+
+ /* Un-wait all waiting pads */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ sq = (GstSingleQueue *) tmp->data;
+ sq->flushing = TRUE;
+ g_cond_signal (sq->turn);
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ break;
+ }
+ default:
+ break;
+ }
+
+ result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ default:
+ break;
+ }
+
+ return result;
+
+
+
+}
+
static gboolean
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
@@ -430,6 +747,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE);
+ sq->flushing = TRUE;
+
/* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
@@ -439,20 +758,30 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
result = gst_pad_pause_task (sq->srcpad);
+ sq->sink_tainted = sq->src_tainted = TRUE;
} else {
gst_data_queue_flush (sq->queue);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
/* All pads start off not-linked for a smooth kick-off */
- sq->srcresult = GST_FLOW_NOT_LINKED;
+ sq->srcresult = GST_FLOW_OK;
sq->cur_time = 0;
sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE;
- sq->inextra = FALSE;
sq->nextid = 0;
sq->oldid = 0;
+ sq->last_oldid = G_MAXUINT32;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
+ /* Reset high time to be recomputed next */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ mq->high_time = GST_CLOCK_TIME_NONE;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
+ sq->flushing = FALSE;
+
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
@@ -461,6 +790,78 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
return result;
}
+static void
+update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+ GstDataQueueSize size;
+ gint percent, tmp;
+ gboolean post = FALSE;
+
+ /* nothing to dowhen we are not in buffering mode */
+ if (!mq->use_buffering)
+ return;
+
+ gst_data_queue_get_level (sq->queue, &size);
+
+ GST_DEBUG_OBJECT (mq,
+ "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+ G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
+ size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
+
+ /* get bytes and time percentages and take the max */
+ if (sq->is_eos) {
+ percent = 100;
+ } else {
+ percent = 0;
+ if (sq->max_size.time > 0) {
+ tmp = (sq->cur_time * 100) / sq->max_size.time;
+ percent = MAX (percent, tmp);
+ }
+ if (sq->max_size.bytes > 0) {
+ tmp = (size.bytes * 100) / sq->max_size.bytes;
+ percent = MAX (percent, tmp);
+ }
+ }
+
+ if (mq->buffering) {
+ post = TRUE;
+ if (percent >= mq->high_percent) {
+ mq->buffering = FALSE;
+ }
+ /* make sure it increases */
+ percent = MAX (mq->percent, percent);
+
+ if (percent == mq->percent)
+ /* don't post if nothing changed */
+ post = FALSE;
+ else
+ /* else keep last value we posted */
+ mq->percent = percent;
+ } else {
+ if (percent < mq->low_percent) {
+ mq->buffering = TRUE;
+ mq->percent = percent;
+ post = TRUE;
+ }
+ }
+ if (post) {
+ GstMessage *message;
+
+ /* scale to high percent so that it becomes the 100% mark */
+ percent = percent * 100 / mq->high_percent;
+ /* clip */
+ if (percent > 100)
+ percent = 100;
+
+ GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
+ message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
+
+ gst_element_post_message (GST_ELEMENT_CAST (mq), message);
+ } else {
+ GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
+ }
+}
+
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue.
* WITH LOCK TAKEN */
{
gint64 sink_time, src_time;
- sink_time =
- gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
- sq->sink_segment.last_stop);
-
- src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
- sq->src_segment.last_stop);
+ if (sq->sink_tainted) {
+ sink_time = sq->sinktime =
+ gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
+ sq->sink_segment.last_stop);
+
+ if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE))
+ /* if we have a time, we become untainted and use the time */
+ sq->sink_tainted = FALSE;
+ } else
+ sink_time = sq->sinktime;
+
+ if (sq->src_tainted) {
+ src_time = sq->srctime =
+ gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
+ sq->src_segment.last_stop);
+ /* if we have a time, we become untainted and use the time */
+ if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
+ sq->src_tainted = FALSE;
+ } else
+ src_time = sq->srctime;
GST_DEBUG_OBJECT (mq,
"queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
- /* This allows for streams with out of order timestamping - sometimes the
+ /* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
- if (sink_time >= src_time)
+ if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
sq->cur_time = sink_time - src_time;
else
sq->cur_time = 0;
+
+ /* updating the time level can change the buffering state */
+ update_buffering (mq, sq);
+
+ return;
}
/* take a NEWSEGMENT event and apply the values to segment, updating the time
gst_segment_set_newsegment_full (segment, update,
rate, arate, format, start, stop, time);
+ if (segment == &sq->sink_segment)
+ sq->sink_tainted = TRUE;
+ else
+ sq->src_tainted = TRUE;
+
GST_DEBUG_OBJECT (mq,
"queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
@@ -549,11 +974,81 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+ if (segment == &sq->sink_segment)
+ sq->sink_tainted = TRUE;
+ else
+ sq->src_tainted = TRUE;
+
/* calc diff with other end */
update_time_level (mq, sq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
+static GstClockTime
+get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
+{
+ GstClockTime time = GST_CLOCK_TIME_NONE;
+
+ if (GST_IS_BUFFER (object)) {
+ GstBuffer *buf = GST_BUFFER_CAST (object);
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ }
+ } else if (GST_IS_BUFFER_LIST (object)) {
+ GstBufferList *list = GST_BUFFER_LIST_CAST (object);
+ GstBufferListIterator *it = gst_buffer_list_iterate (list);
+ GstBuffer *buf;
+
+ do {
+ while ((buf = gst_buffer_list_iterator_next (it))) {
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ if (!end)
+ goto done;
+ } else if (!end) {
+ goto done;
+ }
+ }
+ } while (gst_buffer_list_iterator_next_group (it));
+ } else if (GST_IS_EVENT (object)) {
+ GstEvent *event = GST_EVENT_CAST (object);
+
+ /* For newsegment events return the running time of the start position */
+ if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
+ GstSegment new_segment = *segment;
+ gboolean update;
+ gdouble rate, applied_rate;
+ GstFormat format;
+ gint64 start, stop, position;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
+ &format, &start, &stop, &position);
+ if (format == GST_FORMAT_TIME) {
+ gst_segment_set_newsegment_full (&new_segment, update, rate,
+ applied_rate, format, start, stop, position);
+
+ time =
+ gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
+ new_segment.start);
+ }
+ }
+ }
+
+done:
+ return time;
+}
+
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object)
if (GST_IS_BUFFER (object)) {
GstBuffer *buffer;
GstClockTime timestamp, duration;
+ GstCaps *caps;
buffer = GST_BUFFER_CAST (object);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
+ caps = GST_BUFFER_CAPS (buffer);
apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
"SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
sq->id, buffer, GST_TIME_ARGS (timestamp));
+ /* Set caps on pad before pushing, this avoids core calling the acceptcaps
+ * function on the srcpad, which will call acceptcaps upstream, which might
+ * not accept these caps (anymore). */
+ if (caps && caps != GST_PAD_CAPS (sq->srcpad))
+ gst_pad_set_caps (sq->srcpad, caps);
+
result = gst_pad_push (sq->srcpad, buffer);
} else if (GST_IS_EVENT (object)) {
GstEvent *event;
{
if (item->object)
gst_mini_object_unref (item->object);
- g_free (item);
+ g_slice_free (GstMultiQueueItem, item);
}
/* takes ownership of passed mini object! */
static GstMultiQueueItem *
-gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
+gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
{
GstMultiQueueItem *item;
- item = g_new (GstMultiQueueItem, 1);
+ item = g_slice_new (GstMultiQueueItem);
item->object = object;
item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
item->posid = curid;
- if (GST_IS_BUFFER (object)) {
- item->size = GST_BUFFER_SIZE (object);
- item->duration = GST_BUFFER_DURATION (object);
- if (item->duration == GST_CLOCK_TIME_NONE)
- item->duration = 0;
- item->visible = TRUE;
- } else {
- item->size = 0;
+ item->size = GST_BUFFER_SIZE (object);
+ item->duration = GST_BUFFER_DURATION (object);
+ if (item->duration == GST_CLOCK_TIME_NONE)
item->duration = 0;
- item->visible = FALSE;
- }
+ item->visible = TRUE;
+ return item;
+}
+
+static GstMultiQueueItem *
+gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
+{
+ GstMultiQueueItem *item;
+
+ item = g_slice_new (GstMultiQueueItem);
+ item->object = object;
+ item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
+ item->posid = curid;
+
+ item->size = 0;
+ item->duration = 0;
+ item->visible = FALSE;
return item;
}
GstMultiQueueItem *item;
GstDataQueueItem *sitem;
GstMultiQueue *mq;
- GstMiniObject *object;
+ GstMiniObject *object = NULL;
guint32 newid;
- guint32 oldid = -1;
GstFlowReturn result;
+ GstClockTime next_time;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
- do {
- GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
+ GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
- /* Get something from the queue, blocking until that happens, or we get
- * flushed */
- if (!(gst_data_queue_pop (sq->queue, &sitem)))
- goto out_flushing;
+ if (sq->flushing)
+ goto out_flushing;
- item = (GstMultiQueueItem *) sitem;
- newid = item->posid;
+ /* Get something from the queue, blocking until that happens, or we get
+ * flushed */
+ if (!(gst_data_queue_pop (sq->queue, &sitem)))
+ goto out_flushing;
- /* steal the object and destroy the item */
- object = gst_multi_queue_item_steal_object (item);
- gst_multi_queue_item_destroy (item);
+ item = (GstMultiQueueItem *) sitem;
+ newid = item->posid;
- GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
- sq->id, newid, oldid);
+ /* steal the object and destroy the item */
+ object = gst_multi_queue_item_steal_object (item);
+ gst_multi_queue_item_destroy (item);
- /* If we're not-linked, we do some extra work because we might need to
- * wait before pushing. If we're linked but there's a gap in the IDs,
- * or it's the first loop, or we just passed the previous highid,
- * we might need to wake some sleeping pad up, so there's extra work
- * there too */
- if (sq->srcresult == GST_FLOW_NOT_LINKED ||
- (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) {
- GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
+ next_time = get_running_time (&sq->src_segment, object, TRUE);
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
+ sq->id, newid, sq->last_oldid);
- /* Update the nextid so other threads know when to wake us up */
- sq->nextid = newid;
+ /* If we're not-linked, we do some extra work because we might need to
+ * wait before pushing. If we're linked but there's a gap in the IDs,
+ * or it's the first loop, or we just passed the previous highid,
+ * we might need to wake some sleeping pad up, so there's extra work
+ * there too */
+ if (sq->srcresult == GST_FLOW_NOT_LINKED
+ || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
+ || sq->last_oldid > mq->highid) {
+ GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
- /* Update the oldid (the last ID we output) for highid tracking */
- if (oldid != -1)
- sq->oldid = oldid;
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- /* Go to sleep until it's time to push this buffer */
+ /* Check again if we're flushing after the lock is taken,
+ * the flush flag might have been changed in the meantime */
+ if (sq->flushing) {
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ goto out_flushing;
+ }
- /* Recompute the highid */
- compute_high_id (mq);
- while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
- GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
- "newid %u and highid %u", sq->id, newid, mq->highid);
+ /* Update the nextid so other threads know when to wake us up */
+ sq->nextid = newid;
+ sq->next_time = next_time;
+ /* Update the oldid (the last ID we output) for highid tracking */
+ if (sq->last_oldid != G_MAXUINT32)
+ sq->oldid = sq->last_oldid;
- /* Wake up all non-linked pads before we sleep */
- wake_up_next_non_linked (mq);
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* Go to sleep until it's time to push this buffer */
- mq->numwaiting++;
- g_cond_wait (sq->turn, mq->qlock);
- mq->numwaiting--;
+ /* Recompute the highid */
+ compute_high_id (mq);
+ /* Recompute the high time */
+ compute_high_time (mq);
- GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
- "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
- }
+ while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
+ (mq->high_time == GST_CLOCK_TIME_NONE
+ || next_time >= mq->high_time))
+ || (!mq->sync_by_running_time && newid > mq->highid))
+ && sq->srcresult == GST_FLOW_NOT_LINKED) {
- /* Re-compute the high_id in case someone else pushed */
- compute_high_id (mq);
- } else {
- compute_high_id (mq);
- /* Wake up all non-linked pads */
+ GST_DEBUG_OBJECT (mq,
+ "queue %d sleeping for not-linked wakeup with "
+ "newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
+
+ /* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
+
+ mq->numwaiting++;
+ g_cond_wait (sq->turn, mq->qlock);
+ mq->numwaiting--;
+
+ if (sq->flushing) {
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ goto out_flushing;
+ }
+
+ /* Recompute the high time */
+ compute_high_time (mq);
+
+ GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
+ "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
}
- /* We're done waiting, we can clear the nextid */
- sq->nextid = 0;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ /* Re-compute the high_id in case someone else pushed */
+ compute_high_id (mq);
+ } else {
+ compute_high_id (mq);
+ /* Wake up all non-linked pads */
+ wake_up_next_non_linked (mq);
}
+ /* We're done waiting, we can clear the nextid and nexttime */
+ sq->nextid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
- GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ }
+
+ if (sq->flushing)
+ goto out_flushing;
+
+ GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
+
+ /* Update time stats */
+ next_time = get_running_time (&sq->src_segment, object, FALSE);
+ if (next_time != GST_CLOCK_TIME_NONE) {
+ if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
+ sq->last_time = next_time;
+ if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
+ /* Wake up all non-linked pads now that we advanced the high time */
+ mq->high_time = next_time;
+ wake_up_next_non_linked (mq);
+ }
+ }
- /* Try to push out the new object */
- result = gst_single_queue_push_one (mq, sq, object);
- sq->srcresult = result;
+ /* Try to push out the new object */
+ result = gst_single_queue_push_one (mq, sq, object);
+ sq->srcresult = result;
+ object = NULL;
- if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
- goto out_flushing;
+ if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
+ && result != GST_FLOW_UNEXPECTED)
+ goto out_flushing;
- GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
- oldid = newid;
- }
- while (TRUE);
+ sq->last_oldid = newid;
+
+ return;
out_flushing:
{
+ if (object)
+ gst_mini_object_unref (object);
+
/* Need to make sure wake up any sleeping pads when we exit */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
compute_high_id (mq);
wake_up_next_non_linked (mq);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ /* upstream needs to see fatal result ASAP to shut things down,
+ * but might be stuck in one of our other full queues;
+ * so empty this one and trigger dynamic queue growth. At
+ * this point the srcresult is not OK, NOT_LINKED
+ * or UNEXPECTED, i.e. a real failure */
+ gst_data_queue_flush (sq->queue);
+ single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad);
GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
* gst_multi_queue_chain:
*
* This is similar to GstQueue's chain function, except:
- * _ we don't have leak behavioures,
+ * _ we don't have leak behaviours,
* _ we push with a unique id (curid)
*/
static GstFlowReturn
GstSingleQueue *sq;
GstMultiQueue *mq;
GstMultiQueueItem *item;
- GstFlowReturn ret = GST_FLOW_OK;
guint32 curid;
GstClockTime timestamp, duration;
sq = gst_pad_get_element_private (pad);
- mq = (GstMultiQueue *) gst_pad_get_parent (pad);
+ mq = sq->mqueue;
+
+ /* if eos, we are always full, so avoid hanging incoming indefinitely */
+ if (sq->is_eos)
+ goto was_eos;
/* Get a unique incrementing id */
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- curid = mq->counter++;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ curid = G_ATOMIC_INT_ADD ((gint *) & mq->counter, 1);
GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
sq->id, buffer, curid);
- item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
+ item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
done:
- gst_object_unref (mq);
-
- return ret;
+ return sq->srcresult;
/* ERRORS */
flushing:
{
- ret = sq->srcresult;
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
- sq->id, gst_flow_get_name (ret));
+ sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item);
goto done;
}
+was_eos:
+ {
+ GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return UNEXPECTED");
+ gst_buffer_unref (buffer);
+ return GST_FLOW_UNEXPECTED;
+ }
}
static gboolean
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
if (active) {
- /* All pads start off not-linked for a smooth kick-off */
- sq->srcresult = GST_FLOW_NOT_LINKED;
+ /* All pads start off linked until they push one buffer */
+ sq->srcresult = GST_FLOW_OK;
} else {
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_flush (sq->queue);
break;
}
- /* Get an unique incrementing id */
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- curid = mq->counter++;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ /* if eos, we are always full, so avoid hanging incoming indefinitely */
+ if (sq->is_eos)
+ goto was_eos;
+
+ /* Get an unique incrementing id. */
+ curid = G_ATOMIC_INT_ADD ((gint *) & mq->counter, 1);
- item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
+ item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Enqueuing event %p of type %s with id %d",
switch (type) {
case GST_EVENT_EOS:
sq->is_eos = TRUE;
+ /* EOS affects the buffering state */
+ update_buffering (mq, sq);
+ single_queue_overrun_cb (sq->queue, sq);
break;
case GST_EVENT_NEWSEGMENT:
apply_segment (mq, sq, sref, &sq->sink_segment);
gst_multi_queue_item_destroy (item);
goto done;
}
+was_eos:
+ {
+ GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return FALSE");
+ gst_event_unref (event);
+ res = FALSE;
+ goto done;
+ }
}
static GstCaps *
return result;
}
+static gboolean
+gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
+{
+ GstSingleQueue *sq = gst_pad_get_element_private (pad);
+ GstPad *otherpad;
+ gboolean result;
+
+ otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
+
+ GST_LOG_OBJECT (otherpad, "Accept caps from the peer of this pad");
+
+ result = gst_pad_peer_accept_caps (otherpad, caps);
+
+ return result;
+}
+
static GstFlowReturn
gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
GstCaps * caps, GstBuffer ** buf)
return result;
}
-static gboolean
-gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
-{
- return TRUE;
-}
-
static gboolean
gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
{
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- if (sq->nextid != 0 && sq->nextid <= mq->highid) {
+ if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE
+ && sq->next_time != GST_CLOCK_TIME_NONE
+ && sq->next_time >= mq->high_time)
+ || (sq->nextid != 0 && sq->nextid <= mq->highid)) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (sq->turn);
}
lowest);
}
-#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
- (sq->max_size.format) <= (value))
+/* WITH LOCK TAKEN */
+static void
+compute_high_time (GstMultiQueue * mq)
+{
+ /* The high-id is either the highest id among the linked pads, or if all
+ * pads are not-linked, it's the lowest not-linked pad */
+ GList *tmp;
+ GstClockTime highest = GST_CLOCK_TIME_NONE;
+ GstClockTime lowest = GST_CLOCK_TIME_NONE;
+
+ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+
+ GST_LOG_OBJECT (mq,
+ "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
+ GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
+ GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
+
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* No need to consider queues which are not waiting */
+ if (sq->next_time == GST_CLOCK_TIME_NONE) {
+ GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+ continue;
+ }
+
+ if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
+ lowest = sq->next_time;
+ } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
+ /* If we don't have a global highid, or the global highid is lower than
+ * this single queue's last outputted id, store the queue's one,
+ * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
+ if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest)
+ highest = sq->last_time;
+ }
+ }
+
+ mq->high_time = highest;
+
+ GST_LOG_OBJECT (mq,
+ "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
+}
+
+#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
+ ((q)->max_size.format) <= (value))
/*
* GstSingleQueue functions
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
- GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
+ GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
GstDataQueueSize ssize;
- GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
+ GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
- if (gst_data_queue_is_empty (ssq->queue)) {
- GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
- if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible++;
+ if (gst_data_queue_is_empty (oq->queue)) {
+ GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
+ if (IS_FILLED (sq, visible, size.visible)) {
+ sq->max_size.visible = size.visible + 1;
GST_DEBUG_OBJECT (mq,
"Another queue is empty, bumping single queue %d max visible to %d",
sq->id, sq->max_size.visible);
}
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- goto beach;
}
/* check if we reached the hard time/bytes limits */
- gst_data_queue_get_level (ssq->queue, &ssize);
+ gst_data_queue_get_level (oq->queue, &ssize);
GST_DEBUG_OBJECT (mq,
"queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
- G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
- ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
-
- /* if this queue is filled completely we must signal overrun */
- if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
- GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
+ G_GUINT64_FORMAT, oq->id, ssize.visible, oq->max_size.visible,
+ ssize.bytes, oq->max_size.bytes, oq->cur_time, oq->max_size.time);
+
+ /* if this queue is filled completely we must signal overrun.
+ * FIXME, this seems wrong in many ways
+ * - we're comparing the filled level of this queue against the
+ * values of the other one
+ * - we should only do this after we found no empty queues, ie, move
+ * this check outside of the loop
+ * - the debug statement talks about a different queue than the one
+ * we are checking here.
+ */
+ if (sq->is_eos || IS_FILLED (sq, bytes, ssize.bytes) ||
+ IS_FILLED (sq, time, sq->cur_time)) {
+ GST_LOG_OBJECT (mq, "Queue %d is filled", oq->id);
filled = TRUE;
}
}
/* Overrun is always forwarded, since this is blocking the upstream element */
if (filled) {
GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
- g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
+ g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
}
-beach:
return;
}
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
- GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+ GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
- if (gst_data_queue_is_full (sq->queue)) {
+ if (gst_data_queue_is_full (oq->queue)) {
GstDataQueueSize size;
- gst_data_queue_get_level (sq->queue, &size);
- if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible++;
+ gst_data_queue_get_level (oq->queue, &size);
+ if (IS_FILLED (oq, visible, size.visible)) {
+ oq->max_size.visible = size.visible + 1;
GST_DEBUG_OBJECT (mq,
- "queue %d is filled, bumping its max visible to %d", sq->id,
- sq->max_size.visible);
- gst_data_queue_limits_changed (sq->queue);
+ "queue %d is filled, bumping its max visible to %d", oq->id,
+ oq->max_size.visible);
+ gst_data_queue_limits_changed (oq->queue);
}
}
- if (!gst_data_queue_is_empty (sq->queue))
+ if (!gst_data_queue_is_empty (oq->queue))
empty = FALSE;
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
if (empty) {
GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
- g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
+ g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
}
}
guint64 time, GstSingleQueue * sq)
{
gboolean res;
+ GstMultiQueue *mq = sq->mqueue;
- GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
- "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
+ GST_DEBUG_OBJECT (mq,
+ "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+ G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* we are always filled on EOS */
if (sq->is_eos)
return TRUE;
- /* we never go past the max visible items */
- if (IS_FILLED (visible, visible))
+ /* we never go past the max visible items unless we are in buffering mode */
+ if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
return TRUE;
- if (sq->cur_time != 0) {
- /* if we have valid time in the queue, check */
- res = IS_FILLED (time, sq->cur_time);
- } else {
- /* no valid time, check bytes */
- res = IS_FILLED (bytes, bytes);
- }
+ /* check time or bytes */
+ res = IS_FILLED (sq, time, sq->cur_time) || IS_FILLED (sq, bytes, bytes);
+
return res;
}
}
static GstSingleQueue *
-gst_single_queue_new (GstMultiQueue * mqueue)
+gst_single_queue_new (GstMultiQueue * mqueue, gint id)
{
GstSingleQueue *sq;
- gchar *tmp;
+ gchar *name;
+ GList *tmp;
+ gint temp_id = (id == -1) ? 0 : id;
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+
+ /* Find an unused queue ID, if possible the passed one */
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
+ /* This works because the IDs are sorted in ascending order */
+ if (sq2->id == temp_id) {
+ /* If this ID was requested by the caller return NULL,
+ * otherwise just get us the next one */
+ if (id == -1)
+ temp_id = sq2->id + 1;
+ else
+ return NULL;
+ } else if (sq2->id > temp_id) {
+ break;
+ }
+ }
sq = g_new0 (GstSingleQueue, 1);
+ mqueue->nbqueues++;
+ sq->id = temp_id;
- GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
- sq->id = mqueue->nbqueues++;
+ mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
+ mqueue->queues_cookie++;
/* copy over max_size and extra_size so we don't need to take the lock
* any longer when checking if the queue is full. */
sq->extra_size.bytes = mqueue->extra_size.bytes;
sq->extra_size.time = mqueue->extra_size.time;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
-
GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_WRONG_STATE;
- sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
- single_queue_check_full, sq);
+ sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
+ single_queue_check_full,
+ (GstDataQueueFullCallback) single_queue_overrun_cb,
+ (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
sq->is_eos = FALSE;
+ sq->flushing = FALSE;
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
sq->nextid = 0;
sq->oldid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
sq->turn = g_cond_new ();
- /* attach to underrun/overrun signals to handle non-starvation */
- g_signal_connect (G_OBJECT (sq->queue), "full",
- G_CALLBACK (single_queue_overrun_cb), sq);
- g_signal_connect (G_OBJECT (sq->queue), "empty",
- G_CALLBACK (single_queue_underrun_cb), sq);
+ sq->sinktime = GST_CLOCK_TIME_NONE;
+ sq->srctime = GST_CLOCK_TIME_NONE;
+ sq->sink_tainted = TRUE;
+ sq->src_tainted = TRUE;
- tmp = g_strdup_printf ("sink%d", sq->id);
- sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
- g_free (tmp);
+ name = g_strdup_printf ("sink%d", sq->id);
+ sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, name);
+ g_free (name);
gst_pad_set_chain_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
gst_pad_set_getcaps_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
+ gst_pad_set_acceptcaps_function (sq->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
gst_pad_set_bufferalloc_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
+ gst_pad_set_iterate_internal_links_function (sq->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
- tmp = g_strdup_printf ("src%d", sq->id);
- sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
- g_free (tmp);
+ name = g_strdup_printf ("src%d", sq->id);
+ sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
+ g_free (name);
gst_pad_set_activatepush_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
- gst_pad_set_acceptcaps_function (sq->srcpad,
- GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
gst_pad_set_getcaps_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
+ gst_pad_set_acceptcaps_function (sq->srcpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
gst_pad_set_event_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
gst_pad_set_query_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
+ gst_pad_set_iterate_internal_links_function (sq->srcpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
- gst_pad_set_active (sq->srcpad, TRUE);
- gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
- gst_pad_set_active (sq->sinkpad, TRUE);
+ /* only activate the pads when we are not in the NULL state
+ * and add the pad under the state_lock to prevend state changes
+ * between activating and adding */
+ g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
+ if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
+ gst_pad_set_active (sq->srcpad, TRUE);
+ gst_pad_set_active (sq->sinkpad, TRUE);
+ }
+ gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
+ g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
sq->id);