index c3ead2cbc19810952937922b7024181413a200f0..89d2289397e770ee548a414609734870eecabf05 100644 (file)
* Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
* Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
* Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
+ * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* gstmultiqueue.c:
*
* <itemizedlist><title>Multiple streamhandling</title>
* <listitem><para>
* The element handles queueing data on more than one stream at once. To
- * achieve such a feature it has request sink pads (sink%d) and
- * 'sometimes' src pads (src%d).
+ * achieve such a feature it has request sink pads (sink%d) and
+ * 'sometimes' src pads (src%d).
* </para><para>
* When requesting a given sinkpad with gst_element_get_request_pad(),
* the associated srcpad for that stream will be created.
#endif
#include <gst/gst.h>
+#include <stdio.h>
#include "gstmultiqueue.h"
+#include <gst/glib-compat-private.h>
/**
* GstSingleQueue:
GstDataQueueSize max_size, extra_size;
GstClockTime cur_time;
gboolean is_eos;
+ gboolean flushing;
/* Protected by global lock */
guint32 nextid; /* ID of the next object waiting to be pushed */
guint32 oldid; /* ID of the last object pushed (last in a series) */
+ guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
+ GstClockTime next_time; /* End running time of next buffer to be pushed */
+ GstClockTime last_time; /* Start running time of last pushed buffer */
GCond *turn; /* SingleQueue turn waiting conditional */
};
guint32 posid;
};
-static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
+static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, gint id);
static void gst_single_queue_free (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
+static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
+#define DEFAULT_USE_BUFFERING FALSE
+#define DEFAULT_LOW_PERCENT 10
+#define DEFAULT_HIGH_PERCENT 99
+#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
+
enum
{
PROP_0,
PROP_MAX_SIZE_BYTES,
PROP_MAX_SIZE_BUFFERS,
PROP_MAX_SIZE_TIME,
+ PROP_USE_BUFFERING,
+ PROP_LOW_PERCENT,
+ PROP_HIGH_PERCENT,
+ PROP_SYNC_BY_RUNNING_TIME,
PROP_LAST
};
static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name);
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
+static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
+ element, GstStateChange transition);
static void gst_multi_queue_loop (GstPad * pad);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
- gobject_class->set_property =
- GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
- gobject_class->get_property =
- GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
+ gobject_class->set_property = gst_multi_queue_set_property;
+ gobject_class->get_property = gst_multi_queue_get_property;
/* SIGNALS */
0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
+ /**
+ * GstMultiQueue:use-buffering
+ *
+ * Enable the buffering option in multiqueue so that BUFFERING messages are
+ * emited based on low-/high-percent thresholds.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
+ g_param_spec_boolean ("use-buffering", "Use buffering",
+ "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
+ DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstMultiQueue:low-percent
+ *
+ * Low threshold percent for buffering to start.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
+ g_param_spec_int ("low-percent", "Low percent",
+ "Low threshold for buffering to start", 0, 100,
+ DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstMultiQueue:high-percent
+ *
+ * High threshold percent for buffering to finish.
+ *
+ * Since: 0.10.26
+ */
+ g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
+ g_param_spec_int ("high-percent", "High percent",
+ "High threshold for buffering to finish", 0, 100,
+ DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstMultiQueue:sync-by-running-time
+ *
+ * If enabled multiqueue will synchronize deactivated or not-linked streams
+ * to the activated and linked streams by taking the running time.
+ * Otherwise multiqueue will synchronize the deactivated or not-linked
+ * streams by keeping the order in which buffers and events arrived compared
+ * to active and linked streams.
+ *
+ * Since: 0.10.36
+ */
+ g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
+ g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
+ "Synchronize deactivated or not-linked streams by running time",
+ DEFAULT_SYNC_BY_RUNNING_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gobject_class->finalize = gst_multi_queue_finalize;
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
}
static void
mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
+ mqueue->use_buffering = DEFAULT_USE_BUFFERING;
+ mqueue->low_percent = DEFAULT_LOW_PERCENT;
+ mqueue->high_percent = DEFAULT_HIGH_PERCENT;
+
+ mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
+
mqueue->counter = 1;
mqueue->highid = -1;
- mqueue->nextnotlinked = -1;
+ mqueue->high_time = GST_CLOCK_TIME_NONE;
mqueue->qlock = g_mutex_new ();
}
case PROP_EXTRA_SIZE_TIME:
mq->extra_size.time = g_value_get_uint64 (value);
break;
+ case PROP_USE_BUFFERING:
+ mq->use_buffering = g_value_get_boolean (value);
+ break;
+ case PROP_LOW_PERCENT:
+ mq->low_percent = g_value_get_int (value);
+ break;
+ case PROP_HIGH_PERCENT:
+ mq->high_percent = g_value_get_int (value);
+ break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ mq->sync_by_running_time = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_MAX_SIZE_TIME:
g_value_set_uint64 (value, mq->max_size.time);
break;
+ case PROP_USE_BUFFERING:
+ g_value_set_boolean (value, mq->use_buffering);
+ break;
+ case PROP_LOW_PERCENT:
+ g_value_set_int (value, mq->low_percent);
+ break;
+ case PROP_HIGH_PERCENT:
+ g_value_set_int (value, mq->high_percent);
+ break;
+ case PROP_SYNC_BY_RUNNING_TIME:
+ g_value_set_boolean (value, mq->sync_by_running_time);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
{
GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
GstSingleQueue *squeue;
+ gint temp_id = -1;
- GST_LOG_OBJECT (element, "name : %s", GST_STR_NULL (name));
+ if (name) {
+ sscanf (name + 4, "%d", &temp_id);
+ GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
+ }
/* Create a new single queue, add the sink and source pad and return the sink pad */
- squeue = gst_single_queue_new (mqueue);
-
- GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
- mqueue->queues = g_list_append (mqueue->queues, squeue);
- mqueue->queues_cookie++;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ squeue = gst_single_queue_new (mqueue, temp_id);
GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
GST_DEBUG_PAD_NAME (squeue->sinkpad));
- return squeue->sinkpad;
+ return squeue ? squeue->sinkpad : NULL;
}
static void
gst_single_queue_free (sq);
}
+static GstStateChangeReturn
+gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
+{
+ GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
+ GstSingleQueue *sq = NULL;
+ GstStateChangeReturn result;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ GList *tmp;
+
+ /* Set all pads to non-flushing */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ sq = (GstSingleQueue *) tmp->data;
+ sq->flushing = FALSE;
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ break;
+ }
+ case GST_STATE_CHANGE_PAUSED_TO_READY:{
+ GList *tmp;
+
+ /* Un-wait all waiting pads */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ sq = (GstSingleQueue *) tmp->data;
+ sq->flushing = TRUE;
+ g_cond_signal (sq->turn);
+ }
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+ break;
+ }
+ default:
+ break;
+ }
+
+ result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+
+ switch (transition) {
+ default:
+ break;
+ }
+
+ return result;
+
+
+
+}
+
static gboolean
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
@@ -597,6 +747,8 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE);
+ sq->flushing = TRUE;
+
/* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
@@ -618,8 +770,18 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->is_eos = FALSE;
sq->nextid = 0;
sq->oldid = 0;
+ sq->last_oldid = G_MAXUINT32;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
+ /* Reset high time to be recomputed next */
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ mq->high_time = GST_CLOCK_TIME_NONE;
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+
+ sq->flushing = FALSE;
+
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
@@ -628,6 +790,78 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
return result;
}
+static void
+update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
+{
+ GstDataQueueSize size;
+ gint percent, tmp;
+ gboolean post = FALSE;
+
+ /* nothing to dowhen we are not in buffering mode */
+ if (!mq->use_buffering)
+ return;
+
+ gst_data_queue_get_level (sq->queue, &size);
+
+ GST_DEBUG_OBJECT (mq,
+ "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+ G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
+ size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
+
+ /* get bytes and time percentages and take the max */
+ if (sq->is_eos) {
+ percent = 100;
+ } else {
+ percent = 0;
+ if (sq->max_size.time > 0) {
+ tmp = (sq->cur_time * 100) / sq->max_size.time;
+ percent = MAX (percent, tmp);
+ }
+ if (sq->max_size.bytes > 0) {
+ tmp = (size.bytes * 100) / sq->max_size.bytes;
+ percent = MAX (percent, tmp);
+ }
+ }
+
+ if (mq->buffering) {
+ post = TRUE;
+ if (percent >= mq->high_percent) {
+ mq->buffering = FALSE;
+ }
+ /* make sure it increases */
+ percent = MAX (mq->percent, percent);
+
+ if (percent == mq->percent)
+ /* don't post if nothing changed */
+ post = FALSE;
+ else
+ /* else keep last value we posted */
+ mq->percent = percent;
+ } else {
+ if (percent < mq->low_percent) {
+ mq->buffering = TRUE;
+ mq->percent = percent;
+ post = TRUE;
+ }
+ }
+ if (post) {
+ GstMessage *message;
+
+ /* scale to high percent so that it becomes the 100% mark */
+ percent = percent * 100 / mq->high_percent;
+ /* clip */
+ if (percent > 100)
+ percent = 100;
+
+ GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
+ message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
+
+ gst_element_post_message (GST_ELEMENT_CAST (mq), message);
+ } else {
+ GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
+ }
+}
+
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue.
* WITH LOCK TAKEN */
sink_time = sq->sinktime =
gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
sq->sink_segment.last_stop);
- if (sink_time == GST_CLOCK_TIME_NONE)
- goto beach;
- sq->sink_tainted = FALSE;
+
+ if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE))
+ /* if we have a time, we become untainted and use the time */
+ sq->sink_tainted = FALSE;
} else
sink_time = sq->sinktime;
src_time = sq->srctime =
gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
sq->src_segment.last_stop);
- if (src_time == GST_CLOCK_TIME_NONE)
- goto beach;
- sq->src_tainted = FALSE;
+ /* if we have a time, we become untainted and use the time */
+ if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
+ sq->src_tainted = FALSE;
} else
src_time = sq->srctime;
/* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
- if (sink_time < src_time)
- goto beach;
+ if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
+ sq->cur_time = sink_time - src_time;
+ else
+ sq->cur_time = 0;
- sq->cur_time = sink_time - src_time;
- return;
+ /* updating the time level can change the buffering state */
+ update_buffering (mq, sq);
-beach:
- sq->cur_time = 0;
+ return;
}
/* take a NEWSEGMENT event and apply the values to segment, updating the time
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
+static GstClockTime
+get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
+{
+ GstClockTime time = GST_CLOCK_TIME_NONE;
+
+ if (GST_IS_BUFFER (object)) {
+ GstBuffer *buf = GST_BUFFER_CAST (object);
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ }
+ } else if (GST_IS_BUFFER_LIST (object)) {
+ GstBufferList *list = GST_BUFFER_LIST_CAST (object);
+ GstBufferListIterator *it = gst_buffer_list_iterate (list);
+ GstBuffer *buf;
+
+ do {
+ while ((buf = gst_buffer_list_iterator_next (it))) {
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
+ time = GST_BUFFER_TIMESTAMP (buf);
+ if (end && GST_BUFFER_DURATION_IS_VALID (buf))
+ time += GST_BUFFER_DURATION (buf);
+ if (time > segment->stop)
+ time = segment->stop;
+ time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
+ if (!end)
+ goto done;
+ } else if (!end) {
+ goto done;
+ }
+ }
+ } while (gst_buffer_list_iterator_next_group (it));
+ } else if (GST_IS_EVENT (object)) {
+ GstEvent *event = GST_EVENT_CAST (object);
+
+ /* For newsegment events return the running time of the start position */
+ if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
+ GstSegment new_segment = *segment;
+ gboolean update;
+ gdouble rate, applied_rate;
+ GstFormat format;
+ gint64 start, stop, position;
+
+ gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
+ &format, &start, &stop, &position);
+ if (format == GST_FORMAT_TIME) {
+ gst_segment_set_newsegment_full (&new_segment, update, rate,
+ applied_rate, format, start, stop, position);
+
+ time =
+ gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
+ new_segment.start);
+ }
+ }
+ }
+
+done:
+ return time;
+}
+
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object)
GstMultiQueueItem *item;
GstDataQueueItem *sitem;
GstMultiQueue *mq;
- GstMiniObject *object;
+ GstMiniObject *object = NULL;
guint32 newid;
- guint32 oldid = G_MAXUINT32;
GstFlowReturn result;
+ GstClockTime next_time;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
- do {
- GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
+ GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
- /* Get something from the queue, blocking until that happens, or we get
- * flushed */
- if (!(gst_data_queue_pop (sq->queue, &sitem)))
- goto out_flushing;
+ if (sq->flushing)
+ goto out_flushing;
- item = (GstMultiQueueItem *) sitem;
- newid = item->posid;
+ /* Get something from the queue, blocking until that happens, or we get
+ * flushed */
+ if (!(gst_data_queue_pop (sq->queue, &sitem)))
+ goto out_flushing;
- /* steal the object and destroy the item */
- object = gst_multi_queue_item_steal_object (item);
- gst_multi_queue_item_destroy (item);
+ item = (GstMultiQueueItem *) sitem;
+ newid = item->posid;
- GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
- sq->id, newid, oldid);
+ /* steal the object and destroy the item */
+ object = gst_multi_queue_item_steal_object (item);
+ gst_multi_queue_item_destroy (item);
- /* If we're not-linked, we do some extra work because we might need to
- * wait before pushing. If we're linked but there's a gap in the IDs,
- * or it's the first loop, or we just passed the previous highid,
- * we might need to wake some sleeping pad up, so there's extra work
- * there too */
- if (sq->srcresult == GST_FLOW_NOT_LINKED ||
- (oldid == G_MAXUINT32) || (newid != (oldid + 1)) ||
- oldid > mq->highid) {
- GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ /* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
+ next_time = get_running_time (&sq->src_segment, object, TRUE);
- GST_MULTI_QUEUE_MUTEX_LOCK (mq);
+ GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
+ sq->id, newid, sq->last_oldid);
- /* Update the nextid so other threads know when to wake us up */
- sq->nextid = newid;
+ /* If we're not-linked, we do some extra work because we might need to
+ * wait before pushing. If we're linked but there's a gap in the IDs,
+ * or it's the first loop, or we just passed the previous highid,
+ * we might need to wake some sleeping pad up, so there's extra work
+ * there too */
+ if (sq->srcresult == GST_FLOW_NOT_LINKED
+ || (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
+ || sq->last_oldid > mq->highid) {
+ GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
- /* Update the oldid (the last ID we output) for highid tracking */
- if (oldid != G_MAXUINT32)
- sq->oldid = oldid;
+ GST_MULTI_QUEUE_MUTEX_LOCK (mq);
- if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- /* Go to sleep until it's time to push this buffer */
+ /* Check again if we're flushing after the lock is taken,
+ * the flush flag might have been changed in the meantime */
+ if (sq->flushing) {
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ goto out_flushing;
+ }
- /* Recompute the highid */
- compute_high_id (mq);
- while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
- GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
- "newid %u and highid %u", sq->id, newid, mq->highid);
+ /* Update the nextid so other threads know when to wake us up */
+ sq->nextid = newid;
+ sq->next_time = next_time;
+ /* Update the oldid (the last ID we output) for highid tracking */
+ if (sq->last_oldid != G_MAXUINT32)
+ sq->oldid = sq->last_oldid;
- /* Wake up all non-linked pads before we sleep */
- wake_up_next_non_linked (mq);
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* Go to sleep until it's time to push this buffer */
- mq->numwaiting++;
- g_cond_wait (sq->turn, mq->qlock);
- mq->numwaiting--;
+ /* Recompute the highid */
+ compute_high_id (mq);
+ /* Recompute the high time */
+ compute_high_time (mq);
- GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
- "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
- }
+ while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
+ (mq->high_time == GST_CLOCK_TIME_NONE
+ || next_time >= mq->high_time))
+ || (!mq->sync_by_running_time && newid > mq->highid))
+ && sq->srcresult == GST_FLOW_NOT_LINKED) {
- /* Re-compute the high_id in case someone else pushed */
- compute_high_id (mq);
- } else {
- compute_high_id (mq);
- /* Wake up all non-linked pads */
+ GST_DEBUG_OBJECT (mq,
+ "queue %d sleeping for not-linked wakeup with "
+ "newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
+
+ /* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
+
+ mq->numwaiting++;
+ g_cond_wait (sq->turn, mq->qlock);
+ mq->numwaiting--;
+
+ if (sq->flushing) {
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ goto out_flushing;
+ }
+
+ /* Recompute the high time */
+ compute_high_time (mq);
+
+ GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
+ "wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
+ ", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
+ GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
}
- /* We're done waiting, we can clear the nextid */
- sq->nextid = 0;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ /* Re-compute the high_id in case someone else pushed */
+ compute_high_id (mq);
+ } else {
+ compute_high_id (mq);
+ /* Wake up all non-linked pads */
+ wake_up_next_non_linked (mq);
}
+ /* We're done waiting, we can clear the nextid and nexttime */
+ sq->nextid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
- GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
+ }
- /* Try to push out the new object */
- result = gst_single_queue_push_one (mq, sq, object);
- sq->srcresult = result;
+ if (sq->flushing)
+ goto out_flushing;
+
+ GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
+
+ /* Update time stats */
+ next_time = get_running_time (&sq->src_segment, object, FALSE);
+ if (next_time != GST_CLOCK_TIME_NONE) {
+ if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
+ sq->last_time = next_time;
+ if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
+ /* Wake up all non-linked pads now that we advanced the high time */
+ mq->high_time = next_time;
+ wake_up_next_non_linked (mq);
+ }
+ }
- if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
- goto out_flushing;
+ /* Try to push out the new object */
+ result = gst_single_queue_push_one (mq, sq, object);
+ sq->srcresult = result;
+ object = NULL;
- GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
- gst_flow_get_name (sq->srcresult));
+ if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
+ && result != GST_FLOW_UNEXPECTED)
+ goto out_flushing;
- oldid = newid;
- }
- while (TRUE);
+ GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
+ gst_flow_get_name (sq->srcresult));
+
+ sq->last_oldid = newid;
+
+ return;
out_flushing:
{
+ if (object)
+ gst_mini_object_unref (object);
+
/* Need to make sure wake up any sleeping pads when we exit */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
compute_high_id (mq);
/* upstream needs to see fatal result ASAP to shut things down,
* but might be stuck in one of our other full queues;
- * so empty this one and trigger dynamic queue growth */
- if (GST_FLOW_IS_FATAL (sq->srcresult)) {
- gst_data_queue_flush (sq->queue);
- single_queue_underrun_cb (sq->queue, sq);
- }
+ * so empty this one and trigger dynamic queue growth. At
+ * this point the srcresult is not OK, NOT_LINKED
+ * or UNEXPECTED, i.e. a real failure */
+ gst_data_queue_flush (sq->queue);
+ single_queue_underrun_cb (sq->queue, sq);
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad);
GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
* gst_multi_queue_chain:
*
* This is similar to GstQueue's chain function, except:
- * _ we don't have leak behavioures,
+ * _ we don't have leak behaviours,
* _ we push with a unique id (curid)
*/
static GstFlowReturn
GstSingleQueue *sq;
GstMultiQueue *mq;
GstMultiQueueItem *item;
- GstFlowReturn ret = GST_FLOW_OK;
guint32 curid;
GstClockTime timestamp, duration;
sq = gst_pad_get_element_private (pad);
mq = sq->mqueue;
+ /* if eos, we are always full, so avoid hanging incoming indefinitely */
+ if (sq->is_eos)
+ goto was_eos;
+
/* Get a unique incrementing id */
- curid = mq->counter++;
+ curid = G_ATOMIC_INT_ADD ((gint *) & mq->counter, 1);
GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
sq->id, buffer, curid);
apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
done:
- return ret;
+ return sq->srcresult;
/* ERRORS */
flushing:
{
- ret = sq->srcresult;
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
- sq->id, gst_flow_get_name (ret));
+ sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item);
goto done;
}
+was_eos:
+ {
+ GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return UNEXPECTED");
+ gst_buffer_unref (buffer);
+ return GST_FLOW_UNEXPECTED;
+ }
}
static gboolean
break;
}
- /* Get an unique incrementing id. protected with the STREAM_LOCK, unserialized
- * events already got pushed and don't end up in the queue. */
- curid = mq->counter++;
+ /* if eos, we are always full, so avoid hanging incoming indefinitely */
+ if (sq->is_eos)
+ goto was_eos;
+
+ /* Get an unique incrementing id. */
+ curid = G_ATOMIC_INT_ADD ((gint *) & mq->counter, 1);
item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
switch (type) {
case GST_EVENT_EOS:
sq->is_eos = TRUE;
+ /* EOS affects the buffering state */
+ update_buffering (mq, sq);
single_queue_overrun_cb (sq->queue, sq);
break;
case GST_EVENT_NEWSEGMENT:
gst_multi_queue_item_destroy (item);
goto done;
}
+was_eos:
+ {
+ GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return FALSE");
+ gst_event_unref (event);
+ res = FALSE;
+ goto done;
+ }
}
static GstCaps *
return result;
}
+static gboolean
+gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
+{
+ GstSingleQueue *sq = gst_pad_get_element_private (pad);
+ GstPad *otherpad;
+ gboolean result;
+
+ otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
+
+ GST_LOG_OBJECT (otherpad, "Accept caps from the peer of this pad");
+
+ result = gst_pad_peer_accept_caps (otherpad, caps);
+
+ return result;
+}
+
static GstFlowReturn
gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
GstCaps * caps, GstBuffer ** buf)
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
- if (sq->nextid != 0 && sq->nextid <= mq->highid) {
+ if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE
+ && sq->next_time != GST_CLOCK_TIME_NONE
+ && sq->next_time >= mq->high_time)
+ || (sq->nextid != 0 && sq->nextid <= mq->highid)) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (sq->turn);
}
lowest);
}
+/* WITH LOCK TAKEN */
+static void
+compute_high_time (GstMultiQueue * mq)
+{
+ /* The high-id is either the highest id among the linked pads, or if all
+ * pads are not-linked, it's the lowest not-linked pad */
+ GList *tmp;
+ GstClockTime highest = GST_CLOCK_TIME_NONE;
+ GstClockTime lowest = GST_CLOCK_TIME_NONE;
+
+ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
+
+ GST_LOG_OBJECT (mq,
+ "inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
+ GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
+ GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
+
+ if (sq->srcresult == GST_FLOW_NOT_LINKED) {
+ /* No need to consider queues which are not waiting */
+ if (sq->next_time == GST_CLOCK_TIME_NONE) {
+ GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
+ continue;
+ }
+
+ if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
+ lowest = sq->next_time;
+ } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
+ /* If we don't have a global highid, or the global highid is lower than
+ * this single queue's last outputted id, store the queue's one,
+ * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
+ if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest)
+ highest = sq->last_time;
+ }
+ }
+
+ mq->high_time = highest;
+
+ GST_LOG_OBJECT (mq,
+ "High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
+ GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
+}
+
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
((q)->max_size.format) <= (value))
"Another queue is empty, bumping single queue %d max visible to %d",
sq->id, sq->max_size.visible);
}
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
- goto beach;
}
/* check if we reached the hard time/bytes limits */
gst_data_queue_get_level (oq->queue, &ssize);
g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
}
-beach:
return;
}
guint64 time, GstSingleQueue * sq)
{
gboolean res;
+ GstMultiQueue *mq = sq->mqueue;
- GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
- "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
+ GST_DEBUG_OBJECT (mq,
+ "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
+ G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* we are always filled on EOS */
if (sq->is_eos)
return TRUE;
- /* we never go past the max visible items */
- if (IS_FILLED (sq, visible, visible))
+ /* we never go past the max visible items unless we are in buffering mode */
+ if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
return TRUE;
/* check time or bytes */
}
static GstSingleQueue *
-gst_single_queue_new (GstMultiQueue * mqueue)
+gst_single_queue_new (GstMultiQueue * mqueue, gint id)
{
GstSingleQueue *sq;
- gchar *tmp;
+ gchar *name;
+ GList *tmp;
+ gint temp_id = (id == -1) ? 0 : id;
+
+ GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
+
+ /* Find an unused queue ID, if possible the passed one */
+ for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
+ GstSingleQueue *sq2 = (GstSingleQueue *) tmp->data;
+ /* This works because the IDs are sorted in ascending order */
+ if (sq2->id == temp_id) {
+ /* If this ID was requested by the caller return NULL,
+ * otherwise just get us the next one */
+ if (id == -1)
+ temp_id = sq2->id + 1;
+ else
+ return NULL;
+ } else if (sq2->id > temp_id) {
+ break;
+ }
+ }
sq = g_new0 (GstSingleQueue, 1);
+ mqueue->nbqueues++;
+ sq->id = temp_id;
- GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
- sq->id = mqueue->nbqueues++;
+ mqueue->queues = g_list_insert_before (mqueue->queues, tmp, sq);
+ mqueue->queues_cookie++;
/* copy over max_size and extra_size so we don't need to take the lock
* any longer when checking if the queue is full. */
sq->extra_size.bytes = mqueue->extra_size.bytes;
sq->extra_size.time = mqueue->extra_size.time;
- GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
-
GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
sq->mqueue = mqueue;
(GstDataQueueFullCallback) single_queue_overrun_cb,
(GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
sq->is_eos = FALSE;
+ sq->flushing = FALSE;
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
sq->nextid = 0;
sq->oldid = 0;
+ sq->next_time = GST_CLOCK_TIME_NONE;
+ sq->last_time = GST_CLOCK_TIME_NONE;
sq->turn = g_cond_new ();
sq->sinktime = GST_CLOCK_TIME_NONE;
sq->sink_tainted = TRUE;
sq->src_tainted = TRUE;
- tmp = g_strdup_printf ("sink%d", sq->id);
- sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
- g_free (tmp);
+ name = g_strdup_printf ("sink%d", sq->id);
+ sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, name);
+ g_free (name);
gst_pad_set_chain_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
gst_pad_set_getcaps_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
+ gst_pad_set_acceptcaps_function (sq->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
gst_pad_set_bufferalloc_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
gst_pad_set_iterate_internal_links_function (sq->sinkpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
- tmp = g_strdup_printf ("src%d", sq->id);
- sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
- g_free (tmp);
+ name = g_strdup_printf ("src%d", sq->id);
+ sq->srcpad = gst_pad_new_from_static_template (&srctemplate, name);
+ g_free (name);
gst_pad_set_activatepush_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
gst_pad_set_getcaps_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
+ gst_pad_set_acceptcaps_function (sq->srcpad,
+ GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
gst_pad_set_event_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
gst_pad_set_query_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
- gst_pad_set_iterate_internal_links_function (sq->sinkpad,
+ gst_pad_set_iterate_internal_links_function (sq->srcpad,
GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
+ GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
+
/* only activate the pads when we are not in the NULL state
* and add the pad under the state_lock to prevend state changes
* between activating and adding */