]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - glsdk/gstreamer0-10.git/commitdiff
plugins/elements/gstqueue.*: Refactor an cleanup queue a bit.
authorWim Taymans <wim.taymans@gmail.com>
Sat, 12 May 2007 15:35:40 +0000 (15:35 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sat, 12 May 2007 15:35:40 +0000 (15:35 +0000)
Original commit message from CVS:
* plugins/elements/gstqueue.c: (gst_queue_init),
(gst_queue_finalize), (update_time_level), (apply_segment),
(apply_buffer), (gst_queue_locked_flush),
(gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_push_one), (gst_queue_loop):
* plugins/elements/gstqueue.h:
Refactor an cleanup queue a bit.
Do better time level calculations that also work when the srcpad is not
yet running.
Remove some unneeded debug lines.
* tests/check/elements/queue.c: (GST_START_TEST), (queue_suite):
Added testcase for time level measurement.
Try to make some stuff more racefree.

ChangeLog
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h
tests/check/elements/queue.c

index db4f08f16b612810d0abefe7bdc4e1232bdfd44e..5166ee16703bc4b66dc519825cb8d0f76fa699fe 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,21 @@
+2007-05-12  Wim Taymans  <wim@fluendo.com>
+
+       * plugins/elements/gstqueue.c: (gst_queue_init),
+       (gst_queue_finalize), (update_time_level), (apply_segment),
+       (apply_buffer), (gst_queue_locked_flush),
+       (gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
+       (gst_queue_handle_sink_event), (gst_queue_chain),
+       (gst_queue_push_one), (gst_queue_loop):
+       * plugins/elements/gstqueue.h:
+       Refactor an cleanup queue a bit.
+       Do better time level calculations that also work when the srcpad is not
+       yet running.
+       Remove some unneeded debug lines.
+
+       * tests/check/elements/queue.c: (GST_START_TEST), (queue_suite):
+       Added testcase for time level measurement.
+       Try to make some stuff more racefree.
+
 2007-05-11  Tim-Philipp Müller  <tim at centricular dot net>
 
        * gst/gsturi.c: (gst_element_make_from_uri):
index e19b5b82dec268997f201b658aa97a7cb6f5e0ad..558eac64ca30659d1ae2dc1dd8d2e574e042be6b 100644 (file)
@@ -81,7 +81,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                      "-%" G_GUINT64_FORMAT " ns, %u elements", \
+                      "-%" G_GUINT64_FORMAT " ns, %u items", \
                       GST_DEBUG_PAD_NAME (pad), \
                       queue->cur_level.buffers, \
                       queue->min_threshold.buffers, \
@@ -99,7 +99,6 @@ static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
     "Simple data queue",
     "Erik Walthinsen <omega@cse.ogi.edu>");
 
-
 /* Queue signals and args */
 enum
 {
@@ -134,13 +133,7 @@ enum
 #define DEFAULT_MAX_SIZE_TIME     GST_SECOND    /* 1 second    */
 
 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
-  GST_CAT_LOG_OBJECT (queue_dataflow, q,                                \
-      "locking qlock from thread %p",                                   \
-      g_thread_self ());                                                \
   g_mutex_lock (q->qlock);                                              \
-  GST_CAT_LOG_OBJECT (queue_dataflow, q,                                \
-      "locked qlock from thread %p",                                    \
-      g_thread_self ());                                                \
 } G_STMT_END
 
 #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START {              \
@@ -150,12 +143,39 @@ enum
 } G_STMT_END
 
 #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
-  GST_CAT_LOG_OBJECT (queue_dataflow, q,                                \
-      "unlocking qlock from thread %p",                                 \
-      g_thread_self ());                                                \
   g_mutex_unlock (q->qlock);                                            \
 } G_STMT_END
 
+#define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START {               \
+  STATUS (queue, q->sinkpad, "wait for DEL");                           \
+  g_cond_wait (q->item_del, queue->qlock);                              \
+  if (q->srcresult != GST_FLOW_OK) {                                    \
+    STATUS (queue, q->srcpad, "received DEL wakeup");                   \
+    goto label;                                                         \
+  }                                                                     \
+  STATUS (queue, q->sinkpad, "received DEL");                           \
+} G_STMT_END
+
+#define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START {               \
+  STATUS (queue, q->srcpad, "wait for ADD");                            \
+  g_cond_wait (q->item_add, q->qlock);                                  \
+  if (q->srcresult != GST_FLOW_OK) {                                    \
+    STATUS (queue, q->srcpad, "received ADD wakeup");                   \
+    goto label;                                                         \
+  }                                                                     \
+  STATUS (queue, q->srcpad, "received ADD");                            \
+} G_STMT_END
+
+#define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START {                          \
+  STATUS (q, q->srcpad, "signal DEL");                                  \
+  g_cond_signal (q->item_del);                                          \
+} G_STMT_END
+
+#define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START {                          \
+  STATUS (q, q->sinkpad, "signal ADD");                                 \
+  g_cond_signal (q->item_add);                                          \
+} G_STMT_END
+
 #define _do_init(bla) \
     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue", 0, "queue element"); \
     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, \
@@ -376,18 +396,14 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
       GST_DEBUG_FUNCPTR (gst_queue_handle_src_query));
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
 
-  queue->cur_level.buffers = 0; /* no content */
-  queue->cur_level.bytes = 0;   /* no content */
-  queue->cur_level.time = 0;    /* no content */
+  GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   queue->max_size.buffers = DEFAULT_MAX_SIZE_BUFFERS;
   queue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
   queue->max_size.time = DEFAULT_MAX_SIZE_TIME;
-  queue->min_threshold.buffers = 0;     /* no threshold */
-  queue->min_threshold.bytes = 0;       /* no threshold */
-  queue->min_threshold.time = 0;        /* no threshold */
-  queue->orig_min_threshold.buffers = 0;
-  queue->orig_min_threshold.bytes = 0;
-  queue->orig_min_threshold.time = 0;
+  GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
+  GST_QUEUE_CLEAR_LEVEL (queue->orig_min_threshold);
+  gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
+  gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
 
   queue->leaky = GST_QUEUE_NO_LEAK;
   queue->srcresult = GST_FLOW_WRONG_STATE;
@@ -415,14 +431,11 @@ gst_queue_finalize (GObject * object)
     gst_mini_object_unref (data);
   }
   g_queue_free (queue->queue);
-  GST_DEBUG_OBJECT (queue, "free mutex");
   g_mutex_free (queue->qlock);
-  GST_DEBUG_OBJECT (queue, "done free mutex");
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
 
-  if (G_OBJECT_CLASS (parent_class)->finalize)
-    G_OBJECT_CLASS (parent_class)->finalize (object);
+  G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 static GstCaps *
@@ -501,6 +514,8 @@ gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
   return TRUE;
 }
 
+/* calculate the diff between running time on the sink and src of the queue.
+ * This is the total amount of time in the queue. */
 static void
 update_time_level (GstQueue * queue)
 {
@@ -513,12 +528,73 @@ update_time_level (GstQueue * queue)
   src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
       queue->src_segment.last_stop);
 
+  GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+
   if (sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
   else
     queue->cur_level.time = 0;
 }
 
+/* take a NEWSEGMENT event and apply the values to segment, updating the time
+ * level of queue. */
+static void
+apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment)
+{
+  gboolean update;
+  GstFormat format;
+  gdouble rate, arate;
+  gint64 start, stop, time;
+
+  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
+      &format, &start, &stop, &time);
+
+  /* now configure the values, we use these to track timestamps on the
+   * sinkpad. */
+  if (format != GST_FORMAT_TIME) {
+    /* non-time format, pretent the current time segment is closed with a
+     * 0 start and unknown stop time. */
+    update = FALSE;
+    format = GST_FORMAT_TIME;
+    start = 0;
+    stop = -1;
+    time = 0;
+  }
+  gst_segment_set_newsegment_full (segment, update,
+      rate, arate, format, start, stop, time);
+
+  /* segment can update the time level of the queue */
+  update_time_level (queue);
+}
+
+/* take a buffer and update segment, updating the time level of the queue. */
+static void
+apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment)
+{
+  GstClockTime duration, timestamp;
+
+  timestamp = GST_BUFFER_TIMESTAMP (buffer);
+  duration = GST_BUFFER_DURATION (buffer);
+
+  /* if no timestamp is set, assume it's continuous with the previous 
+   * time */
+  if (timestamp == GST_CLOCK_TIME_NONE)
+    timestamp = segment->last_stop;
+
+  /* add duration */
+  if (duration != GST_CLOCK_TIME_NONE)
+    timestamp += duration;
+
+  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
 static void
 gst_queue_locked_flush (GstQueue * queue)
 {
@@ -529,29 +605,121 @@ gst_queue_locked_flush (GstQueue * queue)
        data when flushing */
     gst_mini_object_unref (data);
   }
-  queue->cur_level.buffers = 0;
-  queue->cur_level.bytes = 0;
-  queue->cur_level.time = 0;
+  GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
   queue->min_threshold.time = queue->orig_min_threshold.time;
-  gst_segment_init (&queue->sink_segment, GST_FORMAT_UNDEFINED);
-  gst_segment_init (&queue->src_segment, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
+  gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
 
-  /* we deleted something... */
-  g_cond_signal (queue->item_del);
+  /* we deleted a lot of something */
+  GST_QUEUE_SIGNAL_DEL (queue);
+}
+
+/* enqueue an item an update the level stats */
+static void
+gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
+{
+  if (GST_IS_BUFFER (item)) {
+    GstBuffer *buffer = GST_BUFFER_CAST (item);
+
+    /* add buffer to the statistics */
+    queue->cur_level.buffers++;
+    queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
+    apply_buffer (queue, buffer, &queue->sink_segment);
+
+  } else if (GST_IS_EVENT (item)) {
+    GstEvent *event = GST_EVENT_CAST (item);
+
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_EOS:
+        /* Zero the thresholds, this makes sure the queue is completely
+         * filled and we can read all data from the queue. */
+        GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
+        break;
+      case GST_EVENT_NEWSEGMENT:
+        apply_segment (queue, event, &queue->sink_segment);
+        break;
+      default:
+        break;
+    }
+  } else {
+    g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
+        item, GST_OBJECT_NAME (queue));
+    /* we can't really unref since we don't know what it is */
+    item = NULL;
+  }
+
+  if (item)
+    g_queue_push_tail (queue->queue, item);
+  GST_QUEUE_SIGNAL_ADD (queue);
+}
+
+/* dequeue an item from the queue and update level stats */
+static GstMiniObject *
+gst_queue_locked_dequeue (GstQueue * queue)
+{
+  GstMiniObject *item;
+
+  item = g_queue_pop_head (queue->queue);
+  if (item == NULL)
+    goto no_item;
+
+  if (GST_IS_BUFFER (item)) {
+    GstBuffer *buffer = GST_BUFFER_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer %p from queue", buffer);
+
+    queue->cur_level.buffers--;
+    queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
+    apply_buffer (queue, buffer, &queue->src_segment);
+
+  } else if (GST_IS_EVENT (item)) {
+    GstEvent *event = GST_EVENT_CAST (item);
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved event %p from queue", event);
+
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_EOS:
+        /* queue is empty now that we dequeued the EOS */
+        GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
+        break;
+      case GST_EVENT_NEWSEGMENT:
+        apply_segment (queue, event, &queue->src_segment);
+        break;
+      default:
+        break;
+    }
+  } else {
+    g_warning
+        ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
+        item, GST_OBJECT_NAME (queue));
+    item = NULL;
+  }
+  GST_QUEUE_SIGNAL_DEL (queue);
+
+  return item;
+
+  /* ERRORS */
+no_item:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
+    return NULL;
+  }
 }
 
 static gboolean
 gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
 {
   GstQueue *queue;
-  gboolean have_eos = FALSE;
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
+    {
       STATUS (queue, pad, "received flush start event");
       /* forward event */
       gst_pad_push_event (queue->srcpad, event);
@@ -569,7 +737,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
       goto done;
+    }
     case GST_EVENT_FLUSH_STOP:
+    {
       STATUS (queue, pad, "received flush stop event");
       /* forward event */
       gst_pad_push_event (queue->srcpad, event);
@@ -587,55 +757,28 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
 
       STATUS (queue, pad, "after flush");
       goto done;
-    case GST_EVENT_EOS:
-      STATUS (queue, pad, "received EOS");
-      have_eos = TRUE;
-      break;
-    case GST_EVENT_NEWSEGMENT:
-    {
-      gboolean update;
-      GstFormat format;
-      gdouble rate, arate;
-      gint64 start, stop, time;
-
-      gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
-          &start, &stop, &time);
-
-      GST_DEBUG_OBJECT (queue, "received NEWSEGMENT in %s",
-          gst_format_get_name (format));
-
-      /* now configure the values */
-      gst_segment_set_newsegment_full (&queue->sink_segment, update,
-          rate, arate, format, start, stop, time);
-      break;
     }
     default:
       if (GST_EVENT_IS_SERIALIZED (event)) {
-        /* we put the event in the queue, we don't have to act ourselves */
-        GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-            "adding event %p of type %d", event, GST_EVENT_TYPE (event));
+        /* serialized events go in the queue */
+        GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+        gst_queue_locked_enqueue (queue, event);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
+        /* non-serialized events are passed upstream. */
         gst_pad_push_event (queue->srcpad, event);
-        goto done;
       }
       break;
   }
-
-  GST_QUEUE_MUTEX_LOCK (queue);
-  if (have_eos) {
-    /* Zero the thresholds, this makes sure the queue is completely
-     * filled and we can read all data from the queue. */
-    queue->min_threshold.buffers = 0;
-    queue->min_threshold.bytes = 0;
-    queue->min_threshold.time = 0;
-  }
-  g_queue_push_tail (queue->queue, event);
-  g_cond_signal (queue->item_add);
-  GST_QUEUE_MUTEX_UNLOCK (queue);
-
 done:
-
   return TRUE;
+
+  /* ERRORS */
+out_flushing:
+  {
+    gst_buffer_unref (event);
+    return FALSE;
+  }
 }
 
 static gboolean
@@ -676,7 +819,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   duration = GST_BUFFER_DURATION (buffer);
 
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "adding buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
       GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
       GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
 
@@ -688,141 +831,60 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
+    /* we recheck, the signal could have changed the thresholds */
+    if (!gst_queue_is_filled (queue))
+      break;
+
     /* how are we going to make space for this buffer? */
     switch (queue->leaky) {
-        /* leak current buffer */
       case GST_QUEUE_LEAK_UPSTREAM:
+        /* leak current buffer */
         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
             "queue is full, leaking buffer on upstream end");
         /* now we can clean up and exit right away */
         goto out_unref;
+      case GST_QUEUE_LEAK_DOWNSTREAM:
+      {
+        /* for as long as the queue is filled, dequeue an item and discard 
+         * it. */
+        do {
+          GstMiniObject *leak;
 
-        /* leak first buffer in the queue */
-      case GST_QUEUE_LEAK_DOWNSTREAM:{
-        /* this is a bit hacky. We'll manually iterate the list
-         * and find the first buffer from the head on. We'll
-         * unref that and "fix up" the GQueue object... */
-        GList *item;
-        GstMiniObject *leak = NULL;
+          leak = gst_queue_locked_dequeue (queue);
+          /* there is nothing to dequeue and the queue is still filled.. This
+           * should not happen. */
+          g_assert (leak != NULL);
 
-        if (!gst_queue_is_filled (queue)) {
-          /* Queue was emptied while we sent out the signal, so no need to drop */
           GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
-              "queue emptied while emitting signal, not leaking buffer");
-          break;
-        }
-
-        GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
-            "queue is full, leaking buffer on downstream end");
-
-        for (item = g_queue_peek_head_link (queue->queue); item;
-            item = item->next) {
-          if (GST_IS_BUFFER (item->data)) {
-            leak = item->data;
-            break;
-          }
-        }
-
-        /* if we didn't find anything, it means we have no buffers
-         * in here. That cannot happen, since we had >= 1 bufs */
-        g_assert (leak);
-
-        /* Now remove the link from the queue */
-        g_queue_delete_link (queue->queue, item);
-
-        /* and unref the buffer at the end. Twice, because we keep a ref
-         * to make things read-only. Also keep our list uptodate. */
-        queue->cur_level.bytes -= GST_BUFFER_SIZE (leak);
-        queue->cur_level.buffers--;
-
-        timestamp = GST_BUFFER_TIMESTAMP (buffer);
-        duration = GST_BUFFER_DURATION (buffer);
-
-        /* update start time in queue */
-        if (queue->src_segment.format == GST_FORMAT_TIME) {
-          gint64 last_stop;
-
-          if (timestamp != GST_CLOCK_TIME_NONE)
-            last_stop = timestamp;
-          else
-            last_stop = queue->src_segment.last_stop;
-
-          gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
-              last_stop);
-
-          update_time_level (queue);
-        } else if (duration != GST_CLOCK_TIME_NONE) {
-          if (queue->cur_level.time > duration)
-            queue->cur_level.time -= duration;
-          else
-            queue->cur_level.time = 0;
-        }
-        gst_buffer_unref (leak);
+              "queue is full, leaking item %p on downstream end", leak);
+          gst_buffer_unref (leak);
+        } while (gst_queue_is_filled (queue));
         break;
       }
-
       default:
         g_warning ("Unknown leaky type, using default");
         /* fall-through */
-
-        /* don't leak. Instead, wait for space to be available */
       case GST_QUEUE_NO_LEAK:
-        STATUS (queue, pad, "pre-full wait");
-
-        /* we recheck, the signal could have changed the thresholds */
-        while (gst_queue_is_filled (queue)) {
-          STATUS (queue, pad,
-              "waiting for item_del signal from thread using qlock");
-          g_cond_wait (queue->item_del, queue->qlock);
-
-          if (queue->srcresult != GST_FLOW_OK)
-            goto out_flushing;
+      {
+        GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+            "queue is full, waiting for free space");
 
-          /* if there's a pending state change for this queue
-           * or its manager, switch back to iterator so bottom
-           * half of state change executes */
-          STATUS (queue, pad,
-              "received item_del signal from thread using qlock");
-        }
+        /* don't leak. Instead, wait for space to be available */
+        do {
+          /* for as long as the queue is filled, wait till an item was deleted. */
+          GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
+        } while (gst_queue_is_filled (queue));
 
-        STATUS (queue, pad, "post-full wait");
         GST_QUEUE_MUTEX_UNLOCK (queue);
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-
         break;
+      }
     }
   }
 
-  g_queue_push_tail (queue->queue, buffer);
-
-  /* add buffer to the statistics */
-  queue->cur_level.buffers++;
-  queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
-
-  /* update start time in queue */
-  if (queue->sink_segment.format == GST_FORMAT_TIME) {
-    gint64 last_stop;
-
-    if (timestamp != GST_CLOCK_TIME_NONE)
-      last_stop = timestamp;
-    else
-      last_stop = queue->sink_segment.last_stop;
-
-    if (duration != GST_CLOCK_TIME_NONE)
-      last_stop += duration;
-
-    gst_segment_set_last_stop (&queue->sink_segment, GST_FORMAT_TIME,
-        last_stop);
-
-    update_time_level (queue);
-  } else if (duration != GST_CLOCK_TIME_NONE) {
-    queue->cur_level.time += duration;
-  }
-  STATUS (queue, pad, "+ level");
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
-  g_cond_signal (queue->item_add);
+  /* put buffer in queue now */
+  gst_queue_locked_enqueue (queue, buffer);
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -843,193 +905,110 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-
     gst_buffer_unref (buffer);
 
     return ret;
   }
 }
 
-static gboolean
+/* dequeue an item from the queue an push it downstream. This functions returns
+ * the result of the push. */
+static GstFlowReturn
 gst_queue_push_one (GstQueue * queue)
 {
-  gboolean restart = TRUE;
+  GstFlowReturn result = GST_FLOW_OK;
   GstMiniObject *data;
 
-  /* There's something in the list now, whatever it is */
-  data = g_queue_pop_head (queue->queue);
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "retrieved data %p from queue", data);
+  data = gst_queue_locked_dequeue (queue);
+  if (data == NULL)
+    goto no_item;
 
   if (GST_IS_BUFFER (data)) {
-    GstFlowReturn result;
-    GstClockTime timestamp, duration;
-    GstBuffer *buffer = GST_BUFFER (data);
-
-    /* Update statistics */
-    queue->cur_level.buffers--;
-    queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
-
-    timestamp = GST_BUFFER_TIMESTAMP (buffer);
-    duration = GST_BUFFER_DURATION (buffer);
-
-    /* update start time in queue */
-    if (queue->src_segment.format == GST_FORMAT_TIME) {
-      gint64 last_stop;
-
-      if (timestamp != GST_CLOCK_TIME_NONE)
-        last_stop = timestamp;
-      else
-        last_stop = queue->src_segment.last_stop;
-
-      gst_segment_set_last_stop (&queue->src_segment, GST_FORMAT_TIME,
-          last_stop);
-
-      update_time_level (queue);
-    } else if (duration != GST_CLOCK_TIME_NONE) {
-      if (queue->cur_level.time > duration)
-        queue->cur_level.time -= duration;
-      else
-        queue->cur_level.time = 0;
-    }
+    GstBuffer *buffer = GST_BUFFER_CAST (data);
 
     GST_QUEUE_MUTEX_UNLOCK (queue);
+
     result = gst_pad_push (queue->srcpad, buffer);
+
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-    /* else result of push indicates what happens */
-    if (result != GST_FLOW_OK) {
-      const gchar *flowname;
-
-      flowname = gst_flow_get_name (result);
-
-      queue->srcresult = result;
-
-      GST_DEBUG_OBJECT (queue, "pausing queue, reason %s", flowname);
-      gst_pad_pause_task (queue->srcpad);
-    }
   } else if (GST_IS_EVENT (data)) {
-    GstEvent *event = GST_EVENT (data);
+    GstEvent *event = GST_EVENT_CAST (data);
+    GstEventType type = GST_EVENT_TYPE (event);
 
-    switch (GST_EVENT_TYPE (event)) {
-      case GST_EVENT_EOS:
-        queue->cur_level.buffers = 0;
-        queue->cur_level.bytes = 0;
-        queue->cur_level.time = 0;
-        /* all incomming data is now unexpected */
-        queue->srcresult = GST_FLOW_UNEXPECTED;
-        /* and we don't need to process anymore */
-        GST_DEBUG_OBJECT (queue, "pausing queue, we're EOS now");
-        gst_pad_pause_task (queue->srcpad);
-        restart = FALSE;
-        break;
-      case GST_EVENT_NEWSEGMENT:
-      {
-        gboolean update;
-        GstFormat format;
-        gdouble rate, arate;
-        gint64 start, stop, time;
-
-        gst_event_parse_new_segment_full (event, &update, &rate, &arate,
-            &format, &start, &stop, &time);
-
-        /* now configure the values */
-        gst_segment_set_newsegment_full (&queue->src_segment, update,
-            rate, arate, format, start, stop, time);
-        break;
-      }
-      default:
-        break;
-    }
     GST_QUEUE_MUTEX_UNLOCK (queue);
+
     gst_pad_push_event (queue->srcpad, event);
+
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
-    if (restart == TRUE)
-      return TRUE;
-  } else {
-    g_warning ("Unexpected object in queue %s (refcounting problem?)",
-        GST_OBJECT_NAME (queue));
+    /* if we're EOS, return UNEXPECTED so that the task pauses. */
+    if (type == GST_EVENT_EOS)
+      result = GST_FLOW_UNEXPECTED;
   }
+  return result;
 
-  STATUS (queue, queue->srcpad, "after _get()");
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
-  g_cond_signal (queue->item_del);
-
-  return FALSE;
-
+  /* ERRORS */
+no_item:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because we have no item in the queue");
+    return GST_FLOW_ERROR;
+  }
 out_flushing:
-  gst_pad_pause_task (queue->srcpad);
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "exit because task paused, reason:  %s",
-      gst_flow_get_name (queue->srcresult));
-
-  return FALSE;                 /* FALSE == no restart */
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
+    return GST_FLOW_WRONG_STATE;
+  }
 }
 
 static void
 gst_queue_loop (GstPad * pad)
 {
   GstQueue *queue;
+  GstFlowReturn ret;
 
   queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
   /* have to lock for thread-safety */
   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
-restart:
   while (gst_queue_is_empty (queue)) {
     GST_QUEUE_MUTEX_UNLOCK (queue);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
 
-    STATUS (queue, pad, "pre-empty wait");
     /* we recheck, the signal could have changed the thresholds */
     while (gst_queue_is_empty (queue)) {
-      STATUS (queue, pad, "waiting for item_add");
-
-      GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
-          g_thread_self ());
-      g_cond_wait (queue->item_add, queue->qlock);
-
-      /* we released the lock in the g_cond above so we might be
-       * flushing now */
-      if (queue->srcresult != GST_FLOW_OK)
-        goto out_flushing;
-
-      GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
-          g_thread_self ());
-      STATUS (queue, pad, "got item_add signal");
+      GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
     }
-
-    STATUS (queue, pad, "post-empty wait");
     GST_QUEUE_MUTEX_UNLOCK (queue);
+
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_PUSHING], 0);
+
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
   }
 
-  if (gst_queue_push_one (queue))
-    goto restart;
+  ret = gst_queue_push_one (queue);
+  queue->srcresult = ret;
+  if (ret != GST_FLOW_OK)
+    goto out_flushing;
 
   GST_QUEUE_MUTEX_UNLOCK (queue);
 
   return;
 
+  /* ERRORS */
 out_flushing:
   {
     gst_pad_pause_task (queue->srcpad);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-        "exit because task paused, reason:  %s",
-        gst_flow_get_name (queue->srcresult));
-
+        "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-
     return;
   }
 }
 
-
 static gboolean
 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
 {
index 05ca904a0e6b2d3bc15cbcb81f4e0e8d54ae3bdf..524a0fb41f4d5c27856a267b1c3872f5c8854e88 100644 (file)
@@ -63,6 +63,12 @@ struct _GstQueueSize {
     guint64 time;
 };
 
+#define GST_QUEUE_CLEAR_LEVEL(l) G_STMT_START {         \
+  l.buffers = 0;                                        \
+  l.bytes = 0;                                          \
+  l.time = 0;                                           \
+} G_STMT_END
+
 /**
  * GstQueue:
  *
index d422834ea9423758533a2821e004ce4794c69165..3072bb009c8fb772c8b2607e63a4902f86b33893 100644 (file)
@@ -100,11 +100,10 @@ GST_START_TEST (test_non_leaky_underrun)
 
   GST_DEBUG ("starting");
 
+  g_mutex_lock (check_mutex);
   fail_unless (gst_element_set_state (queue,
           GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
       "could not set to playing");
-
-  g_mutex_lock (check_mutex);
   g_cond_wait (check_cond, check_mutex);
   g_mutex_unlock (check_mutex);
 
@@ -231,10 +230,9 @@ GST_START_TEST (test_leaky_upstream)
   /* pushing gives away my reference ... */
   gst_pad_push (mysrcpad, gst_buffer_ref (buffer3));
 
+  g_mutex_lock (check_mutex);
   /* start the src-task briefly leak buffer3 */
   gst_pad_set_active (mysinkpad, TRUE);
-
-  g_mutex_lock (check_mutex);
   g_cond_wait (check_cond, check_mutex);
   g_mutex_unlock (check_mutex);
 
@@ -301,10 +299,9 @@ GST_START_TEST (test_leaky_downstream)
   /* pushing gives away my reference ... */
   gst_pad_push (mysrcpad, buffer3);
 
+  g_mutex_lock (check_mutex);
   /* start the src-task briefly and leak buffer1 */
   gst_pad_set_active (mysinkpad, TRUE);
-
-  g_mutex_lock (check_mutex);
   g_cond_wait (check_cond, check_mutex);
   g_mutex_unlock (check_mutex);
 
@@ -330,6 +327,110 @@ GST_START_TEST (test_leaky_downstream)
 
 GST_END_TEST;
 
+/* set queue size to 5 buffers
+ * pull 1 buffer
+ * check over/underuns
+ */
+GST_START_TEST (test_time_level)
+{
+  GstElement *queue;
+  GstBuffer *buffer = NULL;
+  GstClockTime time;
+
+  queue = setup_queue ();
+  mysrcpad = gst_check_setup_src_pad (queue, &srctemplate, NULL);
+  mysinkpad = gst_check_setup_sink_pad (queue, &sinktemplate, NULL);
+  g_object_set (G_OBJECT (queue), "max-size-buffers", 6, NULL);
+  g_object_set (G_OBJECT (queue), "max-size-time", 10 * GST_SECOND, NULL);
+  gst_pad_set_active (mysrcpad, TRUE);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  GST_DEBUG ("starting");
+
+  fail_unless (gst_element_set_state (queue,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+      "could not set to playing");
+
+  /* push buffer without duration */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  /* level should be 1 seconds because buffer has no duration and starts at 1
+   * SECOND (sparse stream). */
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != GST_SECOND);
+
+  /* second push should set the level to 2 second */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = 2 * GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != 2 * GST_SECOND);
+
+  /* third push should set the level to 4 seconds, the 1 second diff with the
+   * previous buffer (without duration) and the 1 second duration of this
+   * buffer. */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = 3 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != 4 * GST_SECOND);
+
+  /* fourth push should set the level to 6 seconds, the 2 second diff with the
+   * previous buffer, same duration. */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != 6 * GST_SECOND);
+
+  /* fifth push should not adjust the level, the timestamp and duration are the
+   * same, meaning the previous buffer did not really have a duration. */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = 5 * GST_SECOND;
+  GST_BUFFER_DURATION (buffer) = 1 * GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != 6 * GST_SECOND);
+
+  /* sixth push should adjust the level with 1 second, we now know the
+   * previous buffer actually had a duration of 2 SECONDS */
+  buffer = gst_buffer_new_and_alloc (4);
+  GST_BUFFER_TIMESTAMP (buffer) = 7 * GST_SECOND;
+  ASSERT_BUFFER_REFCOUNT (buffer, "buffer", 1);
+  /* pushing gives away my reference ... */
+  gst_pad_push (mysrcpad, buffer);
+
+  g_object_get (G_OBJECT (queue), "current-level-time", &time, NULL);
+  fail_if (time != 7 * GST_SECOND);
+
+  GST_DEBUG ("stopping");
+
+  /* cleanup */
+  gst_pad_set_active (mysinkpad, FALSE);
+  gst_check_teardown_sink_pad (queue);
+  cleanup_queue (queue);
+}
+
+GST_END_TEST;
+
 Suite *
 queue_suite (void)
 {
@@ -341,6 +442,7 @@ queue_suite (void)
   tcase_add_test (tc_chain, test_non_leaky_overrun);
   tcase_add_test (tc_chain, test_leaky_upstream);
   tcase_add_test (tc_chain, test_leaky_downstream);
+  tcase_add_test (tc_chain, test_time_level);
 
   return s;
 }