index 9f6248885664472339ae2757c59cc84242f7eeb5..244023627fec85466547075a95af6864a94b672a 100644 (file)
queue->max_level.time, \
(guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
- queue->queue->length))
+ queue->queue.length))
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
guint prop_id, GValue * value, GParamSpec * pspec);
static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad,
+ GstBufferList * buffer_list);
static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf);
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
+typedef enum
+{
+ GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+ GST_QUEUE2_ITEM_TYPE_BUFFER,
+ GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+ GST_QUEUE2_ITEM_TYPE_EVENT
+} GstQueue2ItemType;
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint64 ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
- "Max. amount of data in the ring buffer (bytes, 0 = disabled",
+ "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_chain));
+ gst_pad_set_chain_list_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
gst_pad_set_activatepush_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
gst_pad_set_event_function (queue->sinkpad,
queue->item_add = g_cond_new ();
queue->waiting_del = FALSE;
queue->item_del = g_cond_new ();
- queue->queue = g_queue_new ();
+ g_queue_init (&queue->queue);
queue->buffering_percent = 100;
GST_DEBUG_OBJECT (queue, "finalizing queue");
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
gst_mini_object_unref (data);
}
- g_queue_free (queue->queue);
+ g_queue_clear (&queue->queue);
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
update_time_level (queue);
}
+static GstBufferListItem
+buffer_list_apply_time (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+ GstClockTime *timestamp = data;
+
+ GST_TRACE ("buffer %u in group %u has ts %" GST_TIME_FORMAT
+ " duration %" GST_TIME_FORMAT, idx, group,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+ if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+ *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+ if (GST_BUFFER_DURATION_IS_VALID (*buf))
+ *timestamp += GST_BUFFER_DURATION (*buf);
+
+ GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+ return GST_BUFFER_LIST_CONTINUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+ GstSegment * segment, gboolean is_sink)
+{
+ GstClockTime timestamp;
+
+ /* if no timestamp is set, assume it's continuous with the previous time */
+ timestamp = segment->last_stop;
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp);
+
+ GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timestamp));
+
+ gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+
+ if (is_sink)
+ queue->sink_tainted = TRUE;
+ else
+ queue->src_tainted = TRUE;
+
+ /* calc diff with other end */
+ update_time_level (queue);
+}
+
static void
update_buffering (GstQueue2 * queue)
{
queue->bytes_in = 0;
queue->bytes_out = 0;
queue->byte_in_rate = 0.0;
+ queue->byte_in_period = 0;
queue->byte_out_rate = 0.0;
queue->last_in_elapsed = 0.0;
queue->last_out_elapsed = 0.0;
/* Tuning for rate estimation. We use a large window for the input rate because
* it should be stable when connected to a network. The output rate is less
* stable (the elements preroll, queues behind a demuxer fill, ...) and should
- * therefore adapt more quickly. */
-#define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0
+ * therefore adapt more quickly.
+ * However, initial input rate may be subject to a burst, and should therefore
+ * initially also adapt more quickly to changes, and only later on give higher
+ * weight to previous values. */
+#define AVG_IN(avg,val,w1,w2) ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
static void
period = elapsed - queue->last_in_elapsed;
GST_DEBUG_OBJECT (queue,
- "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
+ "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
+ period, queue->bytes_in, queue->byte_in_period);
byte_in_rate = queue->bytes_in / period;
if (queue->byte_in_rate == 0.0)
queue->byte_in_rate = byte_in_rate;
else
- queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
+ queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
+ (double) queue->byte_in_period, period);
+
+ /* another data point, cap at 16 for long time running average */
+ if (queue->byte_in_period < 16 * RATE_INTERVAL)
+ queue->byte_in_period += period;
/* reset the values to calculate rate over the next interval */
queue->last_in_elapsed = elapsed;
} else {
GST_INFO_OBJECT (queue, "not found in any range");
/* we don't have the range, see how far away we are, FIXME, find a good
- * threshold based on the incomming rate. */
+ * threshold based on the incoming rate. */
if (!queue->is_eos && queue->current) {
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
if (offset < queue->current->offset || offset >
/* set range reading_pos to actual reading position for this read */
queue->current->reading_pos = rpos;
- /* congfigure how much and from where to read */
+ /* configure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
(queue->current->rb_offset + (rpos -
gst_queue2_flush_temp_file (queue);
init_ranges (queue);
} else {
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ while (!g_queue_is_empty (&queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (&queue->queue);
/* Then lose another reference because we are supposed to destroy that
data when flushing */
}
}
+static GstBufferListItem
+buffer_list_create_write (GstBuffer ** buf, guint group, guint idx, gpointer q)
+{
+ GstQueue2 *queue = q;
+
+ GST_TRACE_OBJECT (queue, "writing buffer %u in group %u of size %u bytes",
+ idx, group, GST_BUFFER_SIZE (*buf));
+
+ if (!gst_queue2_create_write (queue, *buf)) {
+ GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+ return GST_BUFFER_LIST_END;
+ }
+
+ return GST_BUFFER_LIST_CONTINUE;
+}
+
+static GstBufferListItem
+buffer_list_calc_size (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+ guint *p_size = data;
+ guint buf_size;
+
+ buf_size = GST_BUFFER_SIZE (*buf);
+ GST_TRACE ("buffer %u in group %u has size %u", idx, group, buf_size);
+ *p_size += buf_size;
+
+ return GST_BUFFER_LIST_CONTINUE;
+}
+
/* enqueue an item an update the level stats */
static void
-gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
+ GstQueue2ItemType item_type)
{
- if (isbuffer) {
+ if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
guint size;
@@ -1824,7 +1921,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
/* FIXME - check return value? */
gst_queue2_create_write (queue, buffer);
}
- } else if (GST_IS_EVENT (item)) {
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list;
+ guint size = 0;
+
+ buffer_list = GST_BUFFER_LIST_CAST (item);
+
+ gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+ /* add buffer to the statistics */
+ if (QUEUE_IS_USING_QUEUE (queue)) {
+ queue->cur_level.buffers++;
+ queue->cur_level.bytes += size;
+ }
+ queue->bytes_in += size;
+
+ /* apply new buffer to segment stats */
+ apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+ /* update the byterate stats */
+ update_in_rates (queue);
+
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
+ gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+ }
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event;
event = GST_EVENT_CAST (item);
@@ -1872,7 +1994,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer)
update_buffering (queue);
if (QUEUE_IS_USING_QUEUE (queue)) {
- g_queue_push_tail (queue->queue, item);
+ g_queue_push_tail (&queue->queue, item);
} else {
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
}
/* dequeue an item from the queue and update level stats */
static GstMiniObject *
-gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer)
+gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
{
GstMiniObject *item;
if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue);
else
- item = g_queue_pop_head (queue->queue);
+ item = g_queue_pop_head (&queue->queue);
if (item == NULL)
goto no_item;
buffer = GST_BUFFER_CAST (item);
size = GST_BUFFER_SIZE (buffer);
- *is_buffer = TRUE;
+ *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
- *is_buffer = FALSE;
+ *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved event %p from queue", event);
default:
break;
}
+ } else if (GST_IS_BUFFER_LIST (item)) {
+ GstBufferList *buffer_list;
+ guint size = 0;
+
+ buffer_list = GST_BUFFER_LIST_CAST (item);
+ gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+ *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "retrieved buffer list %p from queue", buffer_list);
+
+ if (QUEUE_IS_USING_QUEUE (queue)) {
+ queue->cur_level.buffers--;
+ queue->cur_level.bytes -= size;
+ }
+ queue->bytes_out += size;
+
+ apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+ /* update the byterate stats */
+ update_out_rates (queue);
+ /* update the buffering */
+ if (queue->use_buffering)
+ update_buffering (queue);
+
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue));
item = NULL;
+ *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
}
GST_QUEUE2_SIGNAL_DEL (queue);
/* refuse more events on EOS */
if (queue->is_eos)
goto out_eos;
- gst_queue2_locked_enqueue (queue, event, FALSE);
+ gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
/* non-serialized events are passed upstream. */
if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
- if (queue->queue->length == 0)
+ if (queue->queue.length == 0)
return TRUE;
}
}
static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+ GstMiniObject * item, GstQueue2ItemType item_type)
{
- GstQueue2 *queue;
-
- queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
- GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
/* we have to lock the queue since we span threads */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
/* when we received EOS, we refuse more data */
goto out_flushing;
/* put buffer in queue now */
- gst_queue2_locked_enqueue (queue, buffer, TRUE);
+ gst_queue2_locked_enqueue (queue, item, item_type);
GST_QUEUE2_MUTEX_UNLOCK (queue);
return GST_FLOW_OK;
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
return ret;
}
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
return GST_FLOW_UNEXPECTED;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because we received UNEXPECTED");
GST_QUEUE2_MUTEX_UNLOCK (queue);
- gst_buffer_unref (buffer);
+ gst_mini_object_unref (item);
return GST_FLOW_UNEXPECTED;
}
}
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+{
+ GstQueue2 *queue;
+
+ queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+ GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstBufferList * buffer_list)
+{
+ GstQueue2 *queue;
+
+ queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "received buffer list %p", buffer_list);
+
+ return gst_queue2_chain_buffer_or_buffer_list (queue,
+ GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
+static GstMiniObject *
+gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
+ GstQueue2ItemType * item_type)
+{
+ GstMiniObject *data;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream");
+
+ /* stop pushing buffers, we dequeue all items until we see an item that we
+ * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+ * queue we can push, we set a flag to make the sinkpad refuse more
+ * buffers with an UNEXPECTED return value until we receive something
+ * pushable again or we get flushed. */
+ while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+ if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping UNEXPECTED buffer %p", data);
+ gst_buffer_unref (GST_BUFFER_CAST (data));
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+ GstEvent *event = GST_EVENT_CAST (data);
+ GstEventType type = GST_EVENT_TYPE (event);
+
+ if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+ /* we found a pushable item in the queue, push it out */
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "pushing pushable event %s after UNEXPECTED",
+ GST_EVENT_TYPE_NAME (event));
+ return data;
+ }
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping UNEXPECTED event %p", event);
+ gst_event_unref (event);
+ } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "dropping UNEXPECTED buffer list %p", data);
+ gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+ }
+ }
+ /* no more items in the queue. Set the unexpected flag so that upstream
+ * make us refuse any more buffers on the sinkpad. Since we will still
+ * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+ * task function does not shut down. */
+ queue->unexpected = TRUE;
+ return NULL;
+}
+
/* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */
static GstFlowReturn
{
GstFlowReturn result = GST_FLOW_OK;
GstMiniObject *data;
- gboolean is_buffer = FALSE;
+ GstQueue2ItemType item_type;
- data = gst_queue2_locked_dequeue (queue, &is_buffer);
+ data = gst_queue2_locked_dequeue (queue, &item_type);
if (data == NULL)
goto no_item;
next:
GST_QUEUE2_MUTEX_UNLOCK (queue);
- if (is_buffer) {
+ if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
GstBuffer *buffer;
GstCaps *caps;
/* need to check for srcresult here as well */
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
if (result == GST_FLOW_UNEXPECTED) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "got UNEXPECTED from downstream");
- /* stop pushing buffers, we dequeue all items until we see an item that we
- * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
- * queue we can push, we set a flag to make the sinkpad refuse more
- * buffers with an UNEXPECTED return value until we receive something
- * pushable again or we get flushed. */
- while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) {
- if (is_buffer) {
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED buffer %p", data);
- gst_buffer_unref (GST_BUFFER_CAST (data));
- } else if (GST_IS_EVENT (data)) {
- GstEvent *event = GST_EVENT_CAST (data);
- GstEventType type = GST_EVENT_TYPE (event);
-
- if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
- /* we found a pushable item in the queue, push it out */
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "pushing pushable event %s after UNEXPECTED",
- GST_EVENT_TYPE_NAME (event));
- goto next;
- }
- GST_CAT_LOG_OBJECT (queue_dataflow, queue,
- "dropping UNEXPECTED event %p", event);
- gst_event_unref (event);
- }
- }
- /* no more items in the queue. Set the unexpected flag so that upstream
- * make us refuse any more buffers on the sinkpad. Since we will still
- * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
- * task function does not shut down. */
- queue->unexpected = TRUE;
+ data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+ if (data != NULL)
+ goto next;
+ /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+ * to the caller so that the task function does not shut down */
result = GST_FLOW_OK;
}
- } else if (GST_IS_EVENT (data)) {
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);
}
GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+ } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+ GstBufferList *buffer_list;
+ GstBuffer *first_buf;
+ GstCaps *caps;
+
+ buffer_list = GST_BUFFER_LIST_CAST (data);
+
+ first_buf = gst_buffer_list_get (buffer_list, 0, 0);
+ caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+ /* set caps before pushing the buffer so that core does not try to do
+ * something fancy to check if this is possible. */
+ if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+ gst_pad_set_caps (queue->srcpad, caps);
+
+ result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+ /* need to check for srcresult here as well */
+ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+ if (result == GST_FLOW_UNEXPECTED) {
+ data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+ if (data != NULL)
+ goto next;
+ /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+ * to the caller so that the task function does not shut down */
+ result = GST_FLOW_OK;
+ }
}
return result;
}
}
-/* called repeadedly with @pad as the source pad. This function should push out
+/* called repeatedly with @pad as the source pad. This function should push out
* data to the peer element. */
static void
gst_queue2_loop (GstPad * pad)
queue->is_eos = FALSE;
queue->unexpected = FALSE;
queue->upstream_size = 0;
- GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
- GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
/* this is not allowed, we cannot operate in pull mode without a temp
* file. */
queue->srcresult = GST_FLOW_WRONG_STATE;
queue->sinkresult = GST_FLOW_WRONG_STATE;
result = FALSE;
- GST_QUEUE2_MUTEX_UNLOCK (queue);
}
+ GST_QUEUE2_MUTEX_UNLOCK (queue);
} else {
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "deactivating pull mode");