]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - glsdk/gstreamer0-10.git/blobdiff - plugins/elements/gstqueue2.c
queue2: fix refactoring of draining-on-eos, munge flow return to FLOW_OK
[glsdk/gstreamer0-10.git] / plugins / elements / gstqueue2.c
index 957b6b83e5f1b7a429da055c755d881b5315a9d8..244023627fec85466547075a95af6864a94b672a 100644 (file)
@@ -64,6 +64,8 @@
 
 #include "gst/gst-i18n-lib.h"
 
+#include <string.h>
+
 #ifdef G_OS_WIN32
 #include <io.h>                 /* lseek, open, close, read */
 #undef lseek
@@ -96,7 +98,10 @@ enum
 /* other defines */
 #define DEFAULT_BUFFER_SIZE 4096
 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
-#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer)    /* for consistency with the above macro */
+#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
+#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
+
+#define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
 
 /* default property values */
 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
@@ -107,8 +112,7 @@ enum
 #define DEFAULT_LOW_PERCENT        10
 #define DEFAULT_HIGH_PERCENT       99
 #define DEFAULT_TEMP_REMOVE        TRUE
-#define DEFAULT_USE_RING_BUFFER    FALSE
-#define DEFAULT_RING_BUFFER_MAX_SIZE (1024 * DEFAULT_BUFFER_SIZE)       /* 4 MB */
+#define DEFAULT_RING_BUFFER_MAX_SIZE 0
 
 enum
 {
@@ -126,7 +130,6 @@ enum
   PROP_TEMP_TEMPLATE,
   PROP_TEMP_LOCATION,
   PROP_TEMP_REMOVE,
-  PROP_USE_RING_BUFFER,
   PROP_RING_BUFFER_MAX_SIZE,
   PROP_LAST
 };
@@ -150,9 +153,9 @@ enum
                       queue->max_level.bytes, \
                       queue->cur_level.time, \
                       queue->max_level.time, \
-                      (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
+                      (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
                         queue->current->writing_pos - queue->current->max_reading_pos : \
-                        queue->queue->length))
+                        queue->queue.length))
 
 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
   g_mutex_lock (q->qlock);                                              \
@@ -222,6 +225,8 @@ static void gst_queue2_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
 
 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
+static GstFlowReturn gst_queue2_chain_list (GstPad * pad,
+    GstBufferList * buffer_list);
 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
     guint size, GstCaps * caps, GstBuffer ** buf);
 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
@@ -252,6 +257,13 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue);
 
 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
 
+typedef enum
+{
+  GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
+  GST_QUEUE2_ITEM_TYPE_BUFFER,
+  GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
+  GST_QUEUE2_ITEM_TYPE_EVENT
+} GstQueue2ItemType;
 
 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
 
@@ -278,8 +290,6 @@ gst_queue2_class_init (GstQueue2Class * klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->set_property = gst_queue2_set_property;
   gobject_class->get_property = gst_queue2_get_property;
 
@@ -323,12 +333,14 @@ gst_queue2_class_init (GstQueue2Class * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
       g_param_spec_int ("low-percent", "Low percent",
-          "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
+          "Low threshold for buffering to start. Only used if use-buffering is True",
+          0, 100, DEFAULT_LOW_PERCENT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
       g_param_spec_int ("high-percent", "High percent",
-          "High threshold for buffering to finish", 0, 100,
-          DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "High threshold for buffering to finish. Only used if use-buffering is True",
+          0, 100, DEFAULT_HIGH_PERCENT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
       g_param_spec_string ("temp-template", "Temporary File Template",
@@ -354,30 +366,19 @@ gst_queue2_class_init (GstQueue2Class * klass)
           "Remove the temp-location after use",
           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  /**
-   * GstQueue2:use-ring-buffer
-   *
-   * When use-ring-buffer is set, buffer data into a ring buffer containing ranges
-   * of source data. Default FALSE.
-   *
-   * Since: 0.10.30
-   */
-  g_object_class_install_property (gobject_class, PROP_USE_RING_BUFFER,
-      g_param_spec_boolean ("use-ring-buffer", "Use a ring buffer",
-          "Use a ring buffer of size ring-buffer-max-size kB",
-          DEFAULT_USE_RING_BUFFER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
    * GstQueue2:ring-buffer-max-size
    *
-   * The maximum size of the ring buffer in kilobytes. If set to 0 kB then the size
-   * is unlimited. Default 16 megabytes.
+   * The maximum size of the ring buffer in bytes. If set to 0, the ring
+   * buffer is disabled. Default 0.
    *
-   * Since: 0.10.30
+   * Since: 0.10.31
    */
   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
-      g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
-          "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
-          DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
+      g_param_spec_uint64 ("ring-buffer-max-size",
+          "Max. ring buffer size (bytes)",
+          "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
+          0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
@@ -394,6 +395,8 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
 
   gst_pad_set_chain_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_chain));
+  gst_pad_set_chain_list_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
   gst_pad_set_activatepush_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
   gst_pad_set_event_function (queue->sinkpad,
@@ -440,6 +443,11 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
 
+  queue->sinktime = GST_CLOCK_TIME_NONE;
+  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sink_tainted = TRUE;
+  queue->src_tainted = TRUE;
+
   queue->srcresult = GST_FLOW_WRONG_STATE;
   queue->sinkresult = GST_FLOW_WRONG_STATE;
   queue->is_eos = FALSE;
@@ -451,7 +459,9 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   queue->item_add = g_cond_new ();
   queue->waiting_del = FALSE;
   queue->item_del = g_cond_new ();
-  queue->queue = g_queue_new ();
+  g_queue_init (&queue->queue);
+
+  queue->buffering_percent = 100;
 
   /* tempfile related */
   queue->temp_template = NULL;
@@ -459,8 +469,9 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
   queue->temp_location_set = FALSE;
   queue->temp_remove = DEFAULT_TEMP_REMOVE;
 
-  queue->use_ring_buffer = DEFAULT_USE_RING_BUFFER;
+  queue->ring_buffer = NULL;
   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
+
   GST_DEBUG_OBJECT (queue,
       "initialized queue's not_empty & not_full conditions");
 }
@@ -473,13 +484,13 @@ gst_queue2_finalize (GObject * object)
 
   GST_DEBUG_OBJECT (queue, "finalizing queue");
 
-  while (!g_queue_is_empty (queue->queue)) {
-    GstMiniObject *data = g_queue_pop_head (queue->queue);
+  while (!g_queue_is_empty (&queue->queue)) {
+    GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
     gst_mini_object_unref (data);
   }
 
-  g_queue_free (queue->queue);
+  g_queue_clear (&queue->queue);
   g_mutex_free (queue->qlock);
   g_cond_free (queue->item_add);
   g_cond_free (queue->item_del);
@@ -499,8 +510,12 @@ debug_ranges (GstQueue2 * queue)
   GstQueue2Range *walk;
 
   for (walk = queue->ranges; walk; walk = walk->next) {
-    GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
-        walk->offset, walk->writing_pos);
+    GST_DEBUG_OBJECT (queue,
+        "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
+        G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
+        " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
+        walk->rb_writing_pos, walk->reading_pos,
+        walk == queue->current ? "**y**" : "  n  ");
   }
 }
 
@@ -517,7 +532,7 @@ clean_ranges (GstQueue2 * queue)
 
 /* find a range that contains @offset or NULL when nothing does */
 static GstQueue2Range *
-find_range (GstQueue2 * queue, guint64 offset, guint64 length)
+find_range (GstQueue2 * queue, guint64 offset)
 {
   GstQueue2Range *range = NULL;
   GstQueue2Range *walk;
@@ -530,9 +545,30 @@ find_range (GstQueue2 * queue, guint64 offset, guint64 length)
       break;
     }
   }
+  if (range) {
+    GST_DEBUG_OBJECT (queue,
+        "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
+        G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
+  } else {
+    GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
+  }
   return range;
 }
 
+static void
+update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
+{
+  guint64 max_reading_pos, writing_pos;
+
+  writing_pos = range->writing_pos;
+  max_reading_pos = range->max_reading_pos;
+
+  if (writing_pos > max_reading_pos)
+    queue->cur_level.bytes = writing_pos - max_reading_pos;
+  else
+    queue->cur_level.bytes = 0;
+}
+
 /* make a new range for @offset or reuse an existing range */
 static GstQueue2Range *
 add_range (GstQueue2 * queue, guint64 offset)
@@ -541,7 +577,7 @@ add_range (GstQueue2 * queue, guint64 offset)
 
   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
 
-  if ((range = find_range (queue, offset, 0))) {
+  if ((range = find_range (queue, offset))) {
     GST_DEBUG_OBJECT (queue,
         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
         range->writing_pos);
@@ -557,7 +593,6 @@ add_range (GstQueue2 * queue, guint64 offset)
     range->writing_pos = offset;
     range->rb_writing_pos = range->rb_offset;
     range->reading_pos = offset;
-    range->rb_reading_pos = range->rb_offset;
     range->max_reading_pos = offset;
 
     /* insert sorted */
@@ -583,6 +618,9 @@ add_range (GstQueue2 * queue, guint64 offset)
   }
   debug_ranges (queue);
 
+  /* update the stats for this range */
+  update_cur_level (queue, range);
+
   return range;
 }
 
@@ -621,13 +659,17 @@ gst_queue2_getcaps (GstPad * pad)
   GstPad *otherpad;
   GstCaps *result;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  if (G_UNLIKELY (queue == NULL))
+    return gst_caps_new_any ();
 
   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
   result = gst_pad_peer_get_caps (otherpad);
   if (result == NULL)
     result = gst_caps_new_any ();
 
+  gst_object_unref (queue);
+
   return result;
 }
 
@@ -651,20 +693,27 @@ gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
 static void
 update_time_level (GstQueue2 * queue)
 {
-  gint64 sink_time, src_time;
-
-  sink_time =
-      gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
-      queue->sink_segment.last_stop);
+  if (queue->sink_tainted) {
+    queue->sinktime =
+        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        queue->sink_segment.last_stop);
+    queue->sink_tainted = FALSE;
+  }
 
-  src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
-      queue->src_segment.last_stop);
+  if (queue->src_tainted) {
+    queue->srctime =
+        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        queue->src_segment.last_stop);
+    queue->src_tainted = FALSE;
+  }
 
   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+      GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
 
-  if (sink_time >= src_time)
-    queue->cur_level.time = sink_time - src_time;
+  if (queue->sinktime != GST_CLOCK_TIME_NONE
+      && queue->srctime != GST_CLOCK_TIME_NONE
+      && queue->sinktime >= queue->srctime)
+    queue->cur_level.time = queue->sinktime - queue->srctime;
   else
     queue->cur_level.time = 0;
 }
@@ -672,7 +721,8 @@ update_time_level (GstQueue2 * queue)
 /* take a NEWSEGMENT event and apply the values to segment, updating the time
  * level of queue. */
 static void
-apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
+apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
+    gboolean is_sink)
 {
   gboolean update;
   GstFormat format;
@@ -714,20 +764,26 @@ apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
   GST_DEBUG_OBJECT (queue,
       "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
 
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
   /* segment can update the time level of the queue */
   update_time_level (queue);
 }
 
 /* take a buffer and update segment, updating the time level of the queue. */
 static void
-apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
+apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
+    gboolean is_sink)
 {
   GstClockTime duration, timestamp;
 
   timestamp = GST_BUFFER_TIMESTAMP (buffer);
   duration = GST_BUFFER_DURATION (buffer);
 
-  /* if no timestamp is set, assume it's continuous with the previous 
+  /* if no timestamp is set, assume it's continuous with the previous
    * time */
   if (timestamp == GST_CLOCK_TIME_NONE)
     timestamp = segment->last_stop;
@@ -741,6 +797,57 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
 
   gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
 
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
+  /* calc diff with other end */
+  update_time_level (queue);
+}
+
+static GstBufferListItem
+buffer_list_apply_time (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  GstClockTime *timestamp = data;
+
+  GST_TRACE ("buffer %u in group %u has ts %" GST_TIME_FORMAT
+      " duration %" GST_TIME_FORMAT, idx, group,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
+
+  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
+    *timestamp = GST_BUFFER_TIMESTAMP (*buf);
+
+  if (GST_BUFFER_DURATION_IS_VALID (*buf))
+    *timestamp += GST_BUFFER_DURATION (*buf);
+
+  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+/* take a buffer list and update segment, updating the time level of the queue */
+static void
+apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
+    GstSegment * segment, gboolean is_sink)
+{
+  GstClockTime timestamp;
+
+  /* if no timestamp is set, assume it's continuous with the previous time */
+  timestamp = segment->last_stop;
+
+  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
+
+  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (timestamp));
+
+  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
+
+  if (is_sink)
+    queue->sink_tainted = TRUE;
+  else
+    queue->src_tainted = TRUE;
+
   /* calc diff with other end */
   update_time_level (queue);
 }
@@ -751,25 +858,31 @@ update_buffering (GstQueue2 * queue)
   gint64 percent;
   gboolean post = FALSE;
 
-  if (!queue->use_buffering || queue->high_percent <= 0)
+  if (queue->high_percent <= 0)
     return;
 
-#define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \
-               (queue->cur_level.format) * 100 / (queue->max_level.format) : 0)
+#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
 
   if (queue->is_eos) {
     /* on EOS we are always 100% full, we set the var here so that it we can
      * reuse the logic below to stop buffering */
     percent = 100;
+    GST_LOG_OBJECT (queue, "we are EOS");
   } else {
     /* figure out the percent we are filled, we take the max of all formats. */
-    percent = GET_PERCENT (bytes);
-    percent = MAX (percent, GET_PERCENT (time));
-    percent = MAX (percent, GET_PERCENT (buffers));
+
+    if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
+      percent = GET_PERCENT (bytes, 0);
+    } else {
+      guint64 rb_size = queue->ring_buffer_max_size;
+      percent = GET_PERCENT (bytes, rb_size);
+    }
+    percent = MAX (percent, GET_PERCENT (time, 0));
+    percent = MAX (percent, GET_PERCENT (buffers, 0));
 
     /* also apply the rate estimate when we need to */
     if (queue->use_rate_estimate)
-      percent = MAX (percent, GET_PERCENT (rate_time));
+      percent = MAX (percent, GET_PERCENT (rate_time, 0));
   }
 
   if (queue->is_buffering) {
@@ -797,33 +910,38 @@ update_buffering (GstQueue2 * queue)
     if (percent > 100)
       percent = 100;
 
-    queue->buffering_percent = percent;
+    if (percent != queue->buffering_percent) {
+      queue->buffering_percent = percent;
 
-    if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
-      GstFormat fmt = GST_FORMAT_BYTES;
-      gint64 duration;
+      if (!QUEUE_IS_USING_QUEUE (queue)) {
+        GstFormat fmt = GST_FORMAT_BYTES;
+        gint64 duration;
 
-      mode = GST_BUFFERING_DOWNLOAD;
-      if (queue->byte_in_rate > 0) {
-        if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
-          buffering_left =
-              (gdouble) ((duration -
-                  queue->current->writing_pos) * 1000) / queue->byte_in_rate;
+        if (QUEUE_IS_USING_RING_BUFFER (queue))
+          mode = GST_BUFFERING_TIMESHIFT;
+        else
+          mode = GST_BUFFERING_DOWNLOAD;
+
+        if (queue->byte_in_rate > 0) {
+          if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
+            buffering_left =
+                (gdouble) ((duration -
+                    queue->current->writing_pos) * 1000) / queue->byte_in_rate;
+        } else {
+          buffering_left = G_MAXINT64;
+        }
       } else {
-        buffering_left = G_MAXINT64;
+        mode = GST_BUFFERING_STREAM;
       }
-    } else {
-      mode = GST_BUFFERING_STREAM;
-    }
-
-    GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
-    message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
-        (gint) percent);
-    gst_message_set_buffering_stats (message, mode,
-        queue->byte_in_rate, queue->byte_out_rate, buffering_left);
 
-    gst_element_post_message (GST_ELEMENT_CAST (queue), message);
+      GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
+      message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
+          (gint) percent);
+      gst_message_set_buffering_stats (message, mode,
+          queue->byte_in_rate, queue->byte_out_rate, buffering_left);
 
+      gst_element_post_message (GST_ELEMENT_CAST (queue), message);
+    }
   } else {
     GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
   }
@@ -837,6 +955,7 @@ reset_rate_timer (GstQueue2 * queue)
   queue->bytes_in = 0;
   queue->bytes_out = 0;
   queue->byte_in_rate = 0.0;
+  queue->byte_in_period = 0;
   queue->byte_out_rate = 0.0;
   queue->last_in_elapsed = 0.0;
   queue->last_out_elapsed = 0.0;
@@ -849,8 +968,11 @@ reset_rate_timer (GstQueue2 * queue)
 /* Tuning for rate estimation. We use a large window for the input rate because
  * it should be stable when connected to a network. The output rate is less
  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
- * therefore adapt more quickly. */
-#define AVG_IN(avg,val)  ((avg) * 15.0 + (val)) / 16.0
+ * therefore adapt more quickly.
+ * However, initial input rate may be subject to a burst, and should therefore
+ * initially also adapt more quickly to changes, and only later on give higher
+ * weight to previous values. */
+#define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
 
 static void
@@ -872,14 +994,20 @@ update_in_rates (GstQueue2 * queue)
     period = elapsed - queue->last_in_elapsed;
 
     GST_DEBUG_OBJECT (queue,
-        "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
+        "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
+        period, queue->bytes_in, queue->byte_in_period);
 
     byte_in_rate = queue->bytes_in / period;
 
     if (queue->byte_in_rate == 0.0)
       queue->byte_in_rate = byte_in_rate;
     else
-      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
+      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
+          (double) queue->byte_in_period, period);
+
+    /* another data point, cap at 16 for long time running average */
+    if (queue->byte_in_period < 16 * RATE_INTERVAL)
+      queue->byte_in_period += period;
 
     /* reset the values to calculate rate over the next interval */
     queue->last_in_elapsed = elapsed;
@@ -934,101 +1062,6 @@ update_out_rates (GstQueue2 * queue)
       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
 }
 
-static void
-update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
-{
-  guint64 max_reading_pos, writing_pos;
-
-  writing_pos = range->writing_pos;
-  max_reading_pos = range->max_reading_pos;
-
-  if (writing_pos > max_reading_pos)
-    queue->cur_level.bytes = writing_pos - max_reading_pos;
-  else
-    queue->cur_level.bytes = 0;
-}
-
-#ifdef HAVE_FSEEKO
-#define FSEEK_FILE(file, offset)  (fseeko (file, (off_t) offset, SEEK_SET))
-#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-#define FSEEK_FILE(file, offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET))
-#else
-#define FSEEK_FILE(file, offset)  (fseek (file, offset, SEEK_SET))
-#endif
-
-static gboolean
-gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
-{
-  guint size;
-  guint8 *data;
-  guint64 writing_pos, max_reading_pos;
-  GstQueue2Range *next;
-
-  writing_pos = queue->current->writing_pos;
-  max_reading_pos = queue->current->max_reading_pos;
-
-  FSEEK_FILE (queue->temp_file, writing_pos);
-
-  data = GST_BUFFER_DATA (buffer);
-  size = GST_BUFFER_SIZE (buffer);
-
-  if (fwrite (data, size, 1, queue->temp_file) != 1)
-    goto handle_error;
-
-  writing_pos += size;
-
-  GST_INFO_OBJECT (queue,
-      "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
-      writing_pos, max_reading_pos);
-
-  if (writing_pos > max_reading_pos)
-    queue->cur_level.bytes = writing_pos - max_reading_pos;
-  else
-    queue->cur_level.bytes = 0;
-
-  /* try to merge with next range */
-  while ((next = queue->current->next)) {
-    GST_INFO_OBJECT (queue,
-        "checking merge with next range %" G_GUINT64_FORMAT " < %"
-        G_GUINT64_FORMAT, writing_pos, next->offset);
-    if (writing_pos < next->offset)
-      break;
-
-    GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
-        next->writing_pos);
-
-    /* remove the group, we could choose to not read the data in this range
-     * again. This would involve us doing a seek to the current writing position
-     * in the range. FIXME, It would probably make sense to do a seek when there
-     * is a lot of data in the range we merged with to avoid reading it all
-     * again. */
-    queue->current->next = next->next;
-    g_slice_free (GstQueue2Range, next);
-
-    debug_ranges (queue);
-  }
-  queue->current->writing_pos = writing_pos;
-
-  return TRUE;
-
-  /* ERRORS */
-handle_error:
-  {
-    switch (errno) {
-      case ENOSPC:{
-        GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
-        break;
-      }
-      default:{
-        GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
-            (_("Error while writing to download file.")),
-            ("%s", g_strerror (errno)));
-      }
-    }
-    return FALSE;
-  }
-}
-
 static void
 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
 {
@@ -1039,6 +1072,9 @@ update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
 
   max_reading_pos = MAX (max_reading_pos, reading_pos);
 
+  GST_DEBUG_OBJECT (queue,
+      "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
+      G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
   range->max_reading_pos = max_reading_pos;
 
   update_cur_level (queue, range);
@@ -1050,6 +1086,8 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
   GstEvent *event;
   gboolean res;
 
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
 
   event =
@@ -1057,10 +1095,12 @@ perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
       GST_SEEK_TYPE_NONE, -1);
 
-  GST_QUEUE2_MUTEX_UNLOCK (queue);
   res = gst_pad_push_event (queue->sinkpad, event);
   GST_QUEUE2_MUTEX_LOCK (queue);
 
+  if (res)
+    queue->current = add_range (queue, offset);
+
   return res;
 }
 
@@ -1073,36 +1113,45 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
       offset, length);
 
-  if ((range = find_range (queue, offset, length))) {
+  if ((range = find_range (queue, offset))) {
     if (queue->current != range) {
       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
       perform_seek_to_offset (queue, range->writing_pos);
     }
 
-    /* update the current reading position in the range */
-    update_cur_pos (queue, queue->current, offset + length);
-
-    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
-        queue->cur_level.bytes, MIN (queue->max_level.bytes,
-            queue->ring_buffer_max_size));
+    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
+        queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
 
     /* we have a range for offset */
     GST_DEBUG_OBJECT (queue,
         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
 
-    if (queue->is_eos)
+    if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
       return TRUE;
 
-    if (offset + length < range->writing_pos)
+    if (offset + length <= range->writing_pos)
       return TRUE;
+    else
+      GST_DEBUG_OBJECT (queue,
+          "Need more data (%" G_GUINT64_FORMAT " bytes more)",
+          (offset + length) - range->writing_pos);
 
   } else {
     GST_INFO_OBJECT (queue, "not found in any range");
     /* we don't have the range, see how far away we are, FIXME, find a good
-     * threshold based on the incomming rate. */
+     * threshold based on the incoming rate. */
     if (!queue->is_eos && queue->current) {
-      if (offset < queue->current->writing_pos + 200000) {
+      if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+        if (offset < queue->current->offset || offset >
+            queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
+            queue->cur_level.bytes) {
+          perform_seek_to_offset (queue, offset);
+        } else {
+          GST_INFO_OBJECT (queue,
+              "requested data is within range, wait for data");
+        }
+      } else if (offset < queue->current->writing_pos + 200000) {
         update_cur_pos (queue, queue->current, offset + length);
         GST_INFO_OBJECT (queue, "wait for data");
         return FALSE;
@@ -1116,31 +1165,41 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
   return FALSE;
 }
 
-static GstFlowReturn
+#ifdef HAVE_FSEEKO
+#define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
+#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
+#define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
+#else
+#define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
+#endif
+
+static gint64
 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
     guint8 * dst)
 {
+  guint8 *ring_buffer;
   size_t res;
 
-#ifdef HAVE_FSEEKO
-  if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
-    goto seek_failed;
-#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-  if (lseek (fileno (queue->temp_file), (off_t) offset,
-          SEEK_SET) == (off_t) - 1)
-    goto seek_failed;
-#else
-  if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
+  ring_buffer = queue->ring_buffer;
+
+  if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
     goto seek_failed;
-#endif
 
   /* this should not block */
   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
       length, offset);
-  res = fread (dst, 1, length, queue->temp_file);
+  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+    res = fread (dst, 1, length, queue->temp_file);
+  } else {
+    memcpy (dst, ring_buffer + offset, length);
+    res = length;
+  }
+
   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
 
   if (G_UNLIKELY (res < length)) {
+    if (!QUEUE_IS_USING_TEMP_FILE (queue))
+      goto could_not_read;
     /* check for errors or EOF */
     if (ferror (queue->temp_file))
       goto could_not_read;
@@ -1148,11 +1207,11 @@ gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
       goto eos;
   }
 
-  return GST_FLOW_OK;
+  return res;
 
 seek_failed:
   {
-    GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
     return GST_FLOW_ERROR;
   }
 could_not_read:
@@ -1171,53 +1230,127 @@ static GstFlowReturn
 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
     GstBuffer ** buffer)
 {
-  GstFlowReturn flow_ret;
   GstBuffer *buf;
   guint8 *data;
   guint64 file_offset;
-  guint block_length;
+  guint block_length, remaining, read_length;
+  gint64 read_return;
+  guint64 rb_size;
+  guint64 rpos;
 
-  /* check if we have enough data at @offset. If there is not enough data, we
-   * block and wait. */
-  while (!gst_queue2_have_data (queue, offset, length)) {
-    GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
-  }
+  /* allocate the output buffer of the requested size */
+  buf = gst_buffer_new_and_alloc (length);
+  data = GST_BUFFER_DATA (buf);
 
-  if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-    file_offset =
-        (queue->current->rb_offset + (offset -
-            queue->current->offset)) % queue->ring_buffer_max_size;
-    if (file_offset + length > queue->ring_buffer_max_size) {
-      block_length = queue->ring_buffer_max_size - file_offset;
+  GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
+      offset);
+
+  rpos = offset;
+  rb_size = queue->ring_buffer_max_size;
+
+  remaining = length;
+  while (remaining > 0) {
+    /* configure how much/whether to read */
+    if (!gst_queue2_have_data (queue, rpos, remaining)) {
+      read_length = 0;
+
+      if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+        guint64 level;
+
+        /* calculate how far away the offset is */
+        if (queue->current->writing_pos > rpos)
+          level = queue->current->writing_pos - rpos;
+        else
+          level = 0;
+
+        GST_DEBUG_OBJECT (queue,
+            "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
+            ", level %" G_GUINT64_FORMAT,
+            rpos, queue->current->writing_pos, level);
+
+        if (level >= rb_size) {
+          /* we don't have the data but if we have a ring buffer that is full, we
+           * need to read */
+          GST_DEBUG_OBJECT (queue,
+              "ring buffer full, reading ring-buffer-max-size %"
+              G_GUINT64_FORMAT " bytes", rb_size);
+          read_length = rb_size;
+        } else if (queue->is_eos) {
+          /* won't get any more data so read any data we have */
+          if (level) {
+            GST_DEBUG_OBJECT (queue,
+                "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
+                level);
+            read_length = level;
+          } else {
+            GST_DEBUG_OBJECT (queue,
+                "EOS hit and we don't have any requested data");
+            gst_buffer_unref (buf);
+            return GST_FLOW_UNEXPECTED;
+          }
+        }
+      }
+
+      if (read_length == 0) {
+        if (QUEUE_IS_USING_RING_BUFFER (queue)
+            && queue->current->max_reading_pos > rpos) {
+          /* protect cached data (data between offset and max_reading_pos)
+           * and update current level */
+          GST_DEBUG_OBJECT (queue,
+              "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
+              "]", rpos, queue->current->max_reading_pos);
+          queue->current->max_reading_pos = rpos;
+          update_cur_level (queue, queue->current);
+        }
+        GST_DEBUG_OBJECT (queue, "waiting for add");
+        GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
+        continue;
+      }
     } else {
-      block_length = length;
+      /* we have the requested data so read it */
+      read_length = remaining;
     }
-  } else {
-    file_offset = offset;
-    block_length = length;
-  }
 
-  buf = gst_buffer_new_and_alloc (length);
-  data = GST_BUFFER_DATA (buf);
+    /* set range reading_pos to actual reading position for this read */
+    queue->current->reading_pos = rpos;
+
+    /* configure how much and from where to read */
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      file_offset =
+          (queue->current->rb_offset + (rpos -
+              queue->current->offset)) % rb_size;
+      if (file_offset + read_length > rb_size) {
+        block_length = rb_size - file_offset;
+      } else {
+        block_length = read_length;
+      }
+    } else {
+      file_offset = rpos;
+      block_length = read_length;
+    }
 
-  if ((flow_ret =
+    /* while we still have data to read, we loop */
+    while (read_length > 0) {
+      read_return =
           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
-              data)) != GST_FLOW_OK) {
-    gst_buffer_unref (buf);
-    return flow_ret;
-  }
+          data);
+      if (read_return < 0)
+        goto read_error;
 
-  if (block_length < length) {
-    /* read second block into a second buffer, then merge the two */
-    data += block_length;
-    block_length = length - block_length;
+      file_offset += read_return;
+      if (QUEUE_IS_USING_RING_BUFFER (queue))
+        file_offset %= rb_size;
 
-    if ((flow_ret =
-            gst_queue2_read_data_at_offset (queue, 0, block_length,
-                data)) != GST_FLOW_OK) {
-      gst_buffer_unref (buf);
-      return flow_ret;
+      data += read_return;
+      read_length -= read_return;
+      block_length = read_length;
+      remaining -= read_return;
+
+      rpos = (queue->current->reading_pos += read_return);
+      update_cur_pos (queue, queue->current, queue->current->reading_pos);
     }
+    GST_QUEUE2_SIGNAL_DEL (queue);
+    GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
   }
 
   GST_BUFFER_SIZE (buf) = length;
@@ -1232,8 +1365,15 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
 out_flushing:
   {
     GST_DEBUG_OBJECT (queue, "we are flushing");
+    gst_buffer_unref (buf);
     return GST_FLOW_WRONG_STATE;
   }
+read_error:
+  {
+    GST_DEBUG_OBJECT (queue, "we have a read error");
+    gst_buffer_unref (buf);
+    return read_return;
+  }
 }
 
 /* should be called with QUEUE_LOCK */
@@ -1255,14 +1395,10 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
     ret =
         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
         &buffer);
+
     switch (ret) {
       case GST_FLOW_OK:
         item = GST_MINI_OBJECT_CAST (buffer);
-        queue->current->reading_pos += DEFAULT_BUFFER_SIZE;
-        if (QUEUE_IS_USING_RING_BUFFER (queue))
-          queue->current->rb_reading_pos =
-              (queue->current->rb_reading_pos +
-              DEFAULT_BUFFER_SIZE) % queue->ring_buffer_max_size;
         break;
       case GST_FLOW_UNEXPECTED:
         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
@@ -1275,6 +1411,8 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
   return item;
 }
 
+/* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
+ * the temp filename. */
 static gboolean
 gst_queue2_open_temp_location_file (GstQueue2 * queue)
 {
@@ -1311,7 +1449,12 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
     g_free (queue->temp_location);
     queue->temp_location = name;
 
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+    /* we can't emit the notify with the lock */
     g_object_notify (G_OBJECT (queue), "temp-location");
+
+    GST_QUEUE2_MUTEX_LOCK (queue);
   } else {
     /* open the file for update/writing, this is deprecated but we still need to
      * support it for API/ABI compatibility */
@@ -1322,8 +1465,6 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
   }
   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
 
-  init_ranges (queue);
-
   return TRUE;
 
   /* ERRORS */
@@ -1385,18 +1526,18 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
   GST_DEBUG_OBJECT (queue, "flushing temp file");
 
   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
-
-  init_ranges (queue);
 }
 
 static void
 gst_queue2_locked_flush (GstQueue2 * queue)
 {
-  if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
-    gst_queue2_flush_temp_file (queue);
+  if (!QUEUE_IS_USING_QUEUE (queue)) {
+    if (QUEUE_IS_USING_TEMP_FILE (queue))
+      gst_queue2_flush_temp_file (queue);
+    init_ranges (queue);
   } else {
-    while (!g_queue_is_empty (queue->queue)) {
-      GstMiniObject *data = g_queue_pop_head (queue->queue);
+    while (!g_queue_is_empty (&queue->queue)) {
+      GstMiniObject *data = g_queue_pop_head (&queue->queue);
 
       /* Then lose another reference because we are supposed to destroy that
          data when flushing */
@@ -1406,6 +1547,8 @@ gst_queue2_locked_flush (GstQueue2 * queue)
   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
+  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sink_tainted = queue->src_tainted = TRUE;
   if (queue->starting_segment != NULL)
     gst_event_unref (queue->starting_segment);
   queue->starting_segment = NULL;
@@ -1416,177 +1559,279 @@ gst_queue2_locked_flush (GstQueue2 * queue)
 }
 
 static gboolean
-gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
+gst_queue2_wait_free_space (GstQueue2 * queue)
 {
-  GstBuffer *buf, *rem;
-  guint buf_size, rem_size;
-  const guint rb_size = queue->ring_buffer_max_size;
-  guint8 *data;
-  guint64 writing_pos, reading_pos, new_writing_pos;
-  gint64 space;
-  GstQueue2Range *range, *prev;
-
-  writing_pos = queue->current->rb_writing_pos;
-  reading_pos = queue->current->rb_reading_pos;
-
-  rem = buffer;
-
-  /* loop if we can't write the whole buffer at once */
-  do {
-    /* calculate the space in the ring buffer not used by data from the
-     * current range */
-    space =
-        MIN (queue->max_level.bytes,
-        queue->ring_buffer_max_size) - queue->cur_level.bytes;
-
-    rem_size = GST_BUFFER_SIZE (rem);
-    /* don't try to process 0 size buffers */
-    if (!rem_size)
-      break;
-
-    /* calculate if we need to split or if we can write the entire buffer now */
-    if (rem_size > space) {
-      buf_size = space;
-      buf = gst_buffer_create_sub (rem, 0, space);
+  /* We make space available if we're "full" according to whatever
+   * the user defined as "full". */
+  if (gst_queue2_is_filled (queue)) {
+    gboolean started;
 
-      rem_size -= space;
-      rem = gst_buffer_create_sub (rem, space, rem_size);
-      space = 0;
-    } else {
-      buf_size = rem_size;
-      buf = rem;
+    /* pause the timer while we wait. The fact that we are waiting does not mean
+     * the byterate on the input pad is lower */
+    if ((started = queue->in_timer_started))
+      g_timer_stop (queue->in_timer);
 
-      rem_size = 0;
-      rem = NULL;
-      space -= buf_size;
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "queue is full, waiting for free space");
+    do {
+      /* Wait for space to be available, we could be unlocked because of a flush. */
+      GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
     }
+    while (gst_queue2_is_filled (queue));
 
-    data = GST_BUFFER_DATA (buf);
+    /* and continue if we were running before */
+    if (started)
+      g_timer_continue (queue->in_timer);
+  }
+  return TRUE;
 
-    /* the writing position in the ring buffer after writing (part or all of)
-     * the buffer */
-    new_writing_pos = (writing_pos + buf_size) % rb_size;
+  /* ERRORS */
+out_flushing:
+  {
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
+    return FALSE;
+  }
+}
 
-    prev = NULL;
-    range = queue->ranges;
+static gboolean
+gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
+{
+  guint8 *data, *ring_buffer;
+  guint size, rb_size;
+  guint64 writing_pos, new_writing_pos;
+  GstQueue2Range *range, *prev, *next;
 
-    /* if we need to overwrite data in the ring buffer, we need to update the
-     * ranges
-     * warning: this code is complicated and includes some simplifications -
-     * pen, paper and diagrams for the cases recommended! */
-    while (range) {
-      guint64 range_data_start, range_data_end;
-      GstQueue2Range *range_to_destroy = NULL;
+  if (QUEUE_IS_USING_RING_BUFFER (queue))
+    writing_pos = queue->current->rb_writing_pos;
+  else
+    writing_pos = queue->current->writing_pos;
+  ring_buffer = queue->ring_buffer;
+  rb_size = queue->ring_buffer_max_size;
 
-      /* we don't edit the current range here */
-      if (range == queue->current)
-        goto next_range;
+  size = GST_BUFFER_SIZE (buffer);
+  data = GST_BUFFER_DATA (buffer);
 
-      range_data_start = range->rb_offset;
-      range_data_end = range->rb_writing_pos;
+  GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
+      GST_BUFFER_OFFSET (buffer));
 
-      if (range_data_end > range_data_start) {
-        if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
-          goto next_range;
+  while (size > 0) {
+    guint to_write;
 
-        if (new_writing_pos > range_data_start) {
-          if (new_writing_pos >= range_data_end) {
-            /* remove range */
-            range_to_destroy = range;
-            if (prev)
-              prev->next = range->next;
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      gint64 space;
+
+      /* calculate the space in the ring buffer not used by data from
+       * the current range */
+      while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
+        /* wait until there is some free space */
+        GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+      }
+      /* get the amount of space we have */
+      space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+
+      /* calculate if we need to split or if we can write the entire
+       * buffer now */
+      to_write = MIN (size, space);
+
+      /* the writing position in the ring buffer after writing (part
+       * or all of) the buffer */
+      new_writing_pos = (writing_pos + to_write) % rb_size;
+
+      prev = NULL;
+      range = queue->ranges;
+
+      /* if we need to overwrite data in the ring buffer, we need to
+       * update the ranges
+       *
+       * warning: this code is complicated and includes some
+       * simplifications - pen, paper and diagrams for the cases
+       * recommended! */
+      while (range) {
+        guint64 range_data_start, range_data_end;
+        GstQueue2Range *range_to_destroy = NULL;
+
+        range_data_start = range->rb_offset;
+        range_data_end = range->rb_writing_pos;
+
+        /* handle the special case where the range has no data in it */
+        if (range->writing_pos == range->offset) {
+          if (range != queue->current) {
+            GST_DEBUG_OBJECT (queue,
+                "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
+                G_GUINT64_FORMAT, range->offset, range->writing_pos);
+            /* remove range */
+            range_to_destroy = range;
+            if (prev)
+              prev->next = range->next;
+          }
+          goto next_range;
+        }
+
+        if (range_data_end > range_data_start) {
+          if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
+            goto next_range;
+
+          if (new_writing_pos > range_data_start) {
+            if (new_writing_pos >= range_data_end) {
+              GST_DEBUG_OBJECT (queue,
+                  "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
+                  G_GUINT64_FORMAT, range->offset, range->writing_pos);
+              /* remove range */
+              range_to_destroy = range;
+              if (prev)
+                prev->next = range->next;
+            } else {
+              GST_DEBUG_OBJECT (queue,
+                  "advancing offsets from %" G_GUINT64_FORMAT " (%"
+                  G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
+                  G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
+                  range->offset + new_writing_pos - range_data_start,
+                  new_writing_pos);
+              range->offset += (new_writing_pos - range_data_start);
+              range->rb_offset = new_writing_pos;
+            }
+          }
+        } else {
+          guint64 new_wpos_virt = writing_pos + to_write;
+
+          if (new_wpos_virt <= range_data_start)
+            goto next_range;
+
+          if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
+            GST_DEBUG_OBJECT (queue,
+                "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
+                G_GUINT64_FORMAT, range->offset, range->writing_pos);
+            /* remove range */
+            range_to_destroy = range;
+            if (prev)
+              prev->next = range->next;
           } else {
-            range->offset += (new_writing_pos - range_data_start);
+            GST_DEBUG_OBJECT (queue,
+                "advancing offsets from %" G_GUINT64_FORMAT " (%"
+                G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
+                G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
+                range->offset + new_writing_pos - range_data_start,
+                new_writing_pos);
+            range->offset += (new_wpos_virt - range_data_start);
             range->rb_offset = new_writing_pos;
           }
         }
-      } else {
-        guint64 new_wpos_virt = writing_pos + buf_size;
 
-        if (new_wpos_virt <= range_data_start)
-          goto next_range;
+      next_range:
+        if (!range_to_destroy)
+          prev = range;
 
-        if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
-          /* remove range */
-          range_to_destroy = range;
-          if (prev)
-            prev->next = range->next;
-        } else {
-          range->offset += (new_wpos_virt - range_data_start);
-          range->rb_offset = new_writing_pos;
+        range = range->next;
+        if (range_to_destroy) {
+          if (range_to_destroy == queue->ranges)
+            queue->ranges = range;
+          g_slice_free (GstQueue2Range, range_to_destroy);
+          range_to_destroy = NULL;
         }
       }
+    } else {
+      to_write = size;
+      new_writing_pos = writing_pos + to_write;
+    }
 
-    next_range:
-      if (!range_to_destroy)
-        prev = range;
+    if (QUEUE_IS_USING_TEMP_FILE (queue)
+        && FSEEK_FILE (queue->temp_file, writing_pos))
+      goto seek_failed;
 
-      range = range->next;
-      if (range_to_destroy) {
-        if (range_to_destroy == queue->ranges)
-          queue->ranges = range;
-        g_slice_free1 (sizeof (GstQueue2Range), range_to_destroy);
-        range_to_destroy = NULL;
+    if (new_writing_pos > writing_pos) {
+      GST_INFO_OBJECT (queue,
+          "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
+          "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
+          queue->current->writing_pos, queue->current->rb_writing_pos);
+      /* either not using ring buffer or no wrapping, just write */
+      if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+        if (fwrite (data, to_write, 1, queue->temp_file) != 1)
+          goto handle_error;
+      } else {
+        memcpy (ring_buffer + writing_pos, data, to_write);
       }
-    }
 
-    FSEEK_FILE (queue->temp_file, writing_pos);
+      if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
+        /* try to merge with next range */
+        while ((next = queue->current->next)) {
+          GST_INFO_OBJECT (queue,
+              "checking merge with next range %" G_GUINT64_FORMAT " < %"
+              G_GUINT64_FORMAT, new_writing_pos, next->offset);
+          if (new_writing_pos < next->offset)
+            break;
 
-    if (new_writing_pos > writing_pos) {
-      /* no wrapping, just write */
-      if (fwrite (data, buf_size, 1, queue->temp_file) != 1)
-        goto handle_error;
+          GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
+              next->writing_pos);
+
+          /* remove the group, we could choose to not read the data in this range
+           * again. This would involve us doing a seek to the current writing position
+           * in the range. FIXME, It would probably make sense to do a seek when there
+           * is a lot of data in the range we merged with to avoid reading it all
+           * again. */
+          queue->current->next = next->next;
+          g_slice_free (GstQueue2Range, next);
+
+          debug_ranges (queue);
+        }
+        goto update_and_signal;
+      }
     } else {
       /* wrapping */
       guint block_one, block_two;
 
       block_one = rb_size - writing_pos;
-      block_two = buf_size - block_one;
-
-      /* write data to end of ring buffer */
-      if (fwrite (data, block_one, 1, queue->temp_file) != 1)
-        goto handle_error;
+      block_two = to_write - block_one;
+
+      if (block_one > 0) {
+        GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
+        /* write data to end of ring buffer */
+        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+          if (fwrite (data, block_one, 1, queue->temp_file) != 1)
+            goto handle_error;
+        } else {
+          memcpy (ring_buffer + writing_pos, data, block_one);
+        }
+      }
 
-      FSEEK_FILE (queue->temp_file, 0);
+      if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
+        goto seek_failed;
 
-      data += block_one;
-      if (fwrite (data, block_two, 1, queue->temp_file) != 1)
-        goto handle_error;
+      if (block_two > 0) {
+        GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
+        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+          if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
+            goto handle_error;
+        } else {
+          memcpy (ring_buffer, data + block_one, block_two);
+        }
+      }
     }
 
+  update_and_signal:
     /* update the writing positions */
-    GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT, buf_size,
-        writing_pos);
-    queue->current->writing_pos += buf_size;
-    queue->current->rb_writing_pos = writing_pos = new_writing_pos;
+    size -= to_write;
+    GST_INFO_OBJECT (queue,
+        "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
+        to_write, writing_pos, size);
 
+    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+      data += to_write;
+      queue->current->writing_pos += to_write;
+      queue->current->rb_writing_pos = writing_pos = new_writing_pos;
+    } else {
+      queue->current->writing_pos = writing_pos = new_writing_pos;
+    }
     update_cur_level (queue, queue->current);
-    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
-        queue->cur_level.bytes, MIN (queue->max_level.bytes,
-            queue->ring_buffer_max_size));
-
-    /* if we have a remainder of the buffer data, wait until there's space to
-     * write before looping */
-    if (rem_size) {
-      gboolean started;
-
-      /* pause the timer while we wait. The fact that we are waiting does not mean
-       * the byterate on the input pad is lower */
-      if ((started = queue->in_timer_started))
-        g_timer_stop (queue->in_timer);
-
-      GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
-          "queue is full, waiting for free space");
-      do {
-        /* Wait for space to be available, we could be unlocked because of a flush. */
-        GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
-      } while (gst_queue2_is_filled (queue));
 
-      /* and continue if we were running before */
-      if (started)
-        g_timer_continue (queue->in_timer);
-    }
-  } while (rem_size);
+    /* update the buffering status */
+    if (queue->use_buffering)
+      update_buffering (queue);
+
+    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
+        queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
+
+    GST_QUEUE2_SIGNAL_ADD (queue);
+  };
 
   return TRUE;
 
@@ -1597,6 +1842,11 @@ out_flushing:
     /* FIXME - GST_FLOW_UNEXPECTED ? */
     return FALSE;
   }
+seek_failed:
+  {
+    GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
 handle_error:
   {
     switch (errno) {
@@ -1614,11 +1864,41 @@ handle_error:
   }
 }
 
+static GstBufferListItem
+buffer_list_create_write (GstBuffer ** buf, guint group, guint idx, gpointer q)
+{
+  GstQueue2 *queue = q;
+
+  GST_TRACE_OBJECT (queue, "writing buffer %u in group %u of size %u bytes",
+      idx, group, GST_BUFFER_SIZE (*buf));
+
+  if (!gst_queue2_create_write (queue, *buf)) {
+    GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
+    return GST_BUFFER_LIST_END;
+  }
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
+static GstBufferListItem
+buffer_list_calc_size (GstBuffer ** buf, guint group, guint idx, gpointer data)
+{
+  guint *p_size = data;
+  guint buf_size;
+
+  buf_size = GST_BUFFER_SIZE (*buf);
+  GST_TRACE ("buffer %u in group %u has size %u", idx, group, buf_size);
+  *p_size += buf_size;
+
+  return GST_BUFFER_LIST_CONTINUE;
+}
+
 /* enqueue an item an update the level stats */
 static void
-gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
+gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
+    GstQueue2ItemType item_type)
 {
-  if (GST_IS_BUFFER (item)) {
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
     guint size;
 
@@ -1626,26 +1906,47 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
     size = GST_BUFFER_SIZE (buffer);
 
     /* add buffer to the statistics */
-    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
-            || QUEUE_IS_USING_RING_BUFFER (queue))) {
+    if (QUEUE_IS_USING_QUEUE (queue)) {
       queue->cur_level.buffers++;
       queue->cur_level.bytes += size;
     }
     queue->bytes_in += size;
 
     /* apply new buffer to segment stats */
-    apply_buffer (queue, buffer, &queue->sink_segment);
+    apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
     /* update the byterate stats */
     update_in_rates (queue);
 
-    /* FIXME - check return values? */
-    if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-      gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
-    } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
-      gst_queue2_write_buffer_to_file (queue, buffer);
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      /* FIXME - check return value? */
+      gst_queue2_create_write (queue, buffer);
     }
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    guint size = 0;
 
-  } else if (GST_IS_EVENT (item)) {
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
+
+    /* add buffer to the statistics */
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers++;
+      queue->cur_level.bytes += size;
+    }
+    queue->bytes_in += size;
+
+    /* apply new buffer to segment stats */
+    apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
+
+    /* update the byterate stats */
+    update_in_rates (queue);
+
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
+    }
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event;
 
     event = GST_EVENT_CAST (item);
@@ -1658,11 +1959,10 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
         queue->is_eos = TRUE;
         break;
       case GST_EVENT_NEWSEGMENT:
-        apply_segment (queue, event, &queue->sink_segment);
+        apply_segment (queue, event, &queue->sink_segment, TRUE);
         /* This is our first new segment, we hold it
          * as we can't save it on the temp file */
-        if (QUEUE_IS_USING_RING_BUFFER (queue)
-            || QUEUE_IS_USING_TEMP_FILE (queue)) {
+        if (!QUEUE_IS_USING_QUEUE (queue)) {
           if (queue->segment_event_received)
             goto unexpected_event;
 
@@ -1677,8 +1977,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
         queue->unexpected = FALSE;
         break;
       default:
-        if (QUEUE_IS_USING_RING_BUFFER (queue)
-            || QUEUE_IS_USING_TEMP_FILE (queue))
+        if (!QUEUE_IS_USING_QUEUE (queue))
           goto unexpected_event;
         break;
     }
@@ -1691,13 +1990,14 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
 
   if (item) {
     /* update the buffering status */
-    update_buffering (queue);
+    if (queue->use_buffering)
+      update_buffering (queue);
 
-    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
-            || QUEUE_IS_USING_RING_BUFFER (queue)))
-      g_queue_push_tail (queue->queue, item);
-    else
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      g_queue_push_tail (&queue->queue, item);
+    } else {
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
+    }
 
     GST_QUEUE2_SIGNAL_ADD (queue);
   }
@@ -1718,14 +2018,14 @@ unexpected_event:
 
 /* dequeue an item from the queue and update level stats */
 static GstMiniObject *
-gst_queue2_locked_dequeue (GstQueue2 * queue)
+gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
 {
   GstMiniObject *item;
 
-  if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
+  if (!QUEUE_IS_USING_QUEUE (queue))
     item = gst_queue2_read_item_from_file (queue);
   else
-    item = g_queue_pop_head (queue->queue);
+    item = g_queue_pop_head (&queue->queue);
 
   if (item == NULL)
     goto no_item;
@@ -1736,26 +2036,29 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
 
     buffer = GST_BUFFER_CAST (item);
     size = GST_BUFFER_SIZE (buffer);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
 
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved buffer %p from queue", buffer);
 
-    if (!(QUEUE_IS_USING_TEMP_FILE (queue)
-            || QUEUE_IS_USING_RING_BUFFER (queue))) {
+    if (QUEUE_IS_USING_QUEUE (queue)) {
       queue->cur_level.buffers--;
       queue->cur_level.bytes -= size;
     }
     queue->bytes_out += size;
 
-    apply_buffer (queue, buffer, &queue->src_segment);
+    apply_buffer (queue, buffer, &queue->src_segment, FALSE);
     /* update the byterate stats */
     update_out_rates (queue);
     /* update the buffering */
-    update_buffering (queue);
+    if (queue->use_buffering)
+      update_buffering (queue);
 
   } else if (GST_IS_EVENT (item)) {
     GstEvent *event = GST_EVENT_CAST (item);
 
+    *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
+
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "retrieved event %p from queue", event);
 
@@ -1765,16 +2068,41 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
         break;
       case GST_EVENT_NEWSEGMENT:
-        apply_segment (queue, event, &queue->src_segment);
+        apply_segment (queue, event, &queue->src_segment, FALSE);
         break;
       default:
         break;
     }
+  } else if (GST_IS_BUFFER_LIST (item)) {
+    GstBufferList *buffer_list;
+    guint size = 0;
+
+    buffer_list = GST_BUFFER_LIST_CAST (item);
+    gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size);
+    *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "retrieved buffer list %p from queue", buffer_list);
+
+    if (QUEUE_IS_USING_QUEUE (queue)) {
+      queue->cur_level.buffers--;
+      queue->cur_level.bytes -= size;
+    }
+    queue->bytes_out += size;
+
+    apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
+    /* update the byterate stats */
+    update_out_rates (queue);
+    /* update the buffering */
+    if (queue->use_buffering)
+      update_buffering (queue);
+
   } else {
     g_warning
         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
         item, GST_OBJECT_NAME (queue));
     item = NULL;
+    *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
   }
   GST_QUEUE2_SIGNAL_DEL (queue);
 
@@ -1799,8 +2127,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_FLUSH_START:
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
-      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
-              || QUEUE_IS_USING_TEMP_FILE (queue))) {
+      if (QUEUE_IS_USING_QUEUE (queue)) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1817,6 +2144,14 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
          * flush_start downstream. */
         gst_pad_pause_task (queue->srcpad);
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+      } else {
+        GST_QUEUE2_MUTEX_LOCK (queue);
+        /* flush the sink pad */
+        queue->sinkresult = GST_FLOW_WRONG_STATE;
+        GST_QUEUE2_SIGNAL_DEL (queue);
+        GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+        gst_event_unref (event);
       }
       goto done;
     }
@@ -1824,8 +2159,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
     {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
 
-      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
-              || QUEUE_IS_USING_TEMP_FILE (queue))) {
+      if (QUEUE_IS_USING_QUEUE (queue)) {
         /* forward event */
         gst_pad_push_event (queue->srcpad, event);
 
@@ -1845,7 +2179,10 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         queue->segment_event_received = FALSE;
         queue->is_eos = FALSE;
         queue->unexpected = FALSE;
+        queue->sinkresult = GST_FLOW_OK;
         GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+        gst_event_unref (event);
       }
       goto done;
     }
@@ -1856,7 +2193,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
         /* refuse more events on EOS */
         if (queue->is_eos)
           goto out_eos;
-        gst_queue2_locked_enqueue (queue, event);
+        gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
         GST_QUEUE2_MUTEX_UNLOCK (queue);
       } else {
         /* non-serialized events are passed upstream. */
@@ -1891,10 +2228,10 @@ gst_queue2_is_empty (GstQueue2 * queue)
   if (queue->is_eos)
     return FALSE;
 
-  if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
+  if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
     return queue->current->writing_pos <= queue->current->max_reading_pos;
   } else {
-    if (queue->queue->length == 0)
+    if (queue->queue.length == 0)
       return TRUE;
   }
 
@@ -1910,16 +2247,18 @@ gst_queue2_is_filled (GstQueue2 * queue)
   if (queue->is_eos)
     return TRUE;
 
+#define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
+    (queue->cur_level.format) >= ((alt_max) ? \
+      MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
+
   /* if using a ring buffer we're filled if all ring buffer space is used
    * _by the current range_ */
   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
-    guint max_bytes = queue->max_level.bytes;
     guint64 rb_size = queue->ring_buffer_max_size;
     GST_DEBUG_OBJECT (queue,
-        "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u", max_bytes,
-        rb_size, queue->cur_level.bytes);
-    return queue->cur_level.bytes >= (max_bytes ? MIN (max_bytes,
-            rb_size) : rb_size);
+        "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
+        queue->max_level.bytes, rb_size, queue->cur_level.bytes);
+    return CHECK_FILLED (bytes, rb_size);
   }
 
   /* if using file, we're never filled if we don't have EOS */
@@ -1930,34 +2269,23 @@ gst_queue2_is_filled (GstQueue2 * queue)
   if (queue->cur_level.buffers == 0)
     return FALSE;
 
-#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
-               (queue->cur_level.format) >= (queue->max_level.format))
-
   /* we are filled if one of the current levels exceeds the max */
-  res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time);
+  res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
+      || CHECK_FILLED (time, 0);
 
   /* if we need to, use the rate estimate to check against the max time we are
    * allowed to queue */
   if (queue->use_rate_estimate)
-    res |= CHECK_FILLED (rate_time);
+    res |= CHECK_FILLED (rate_time, 0);
 
 #undef CHECK_FILLED
   return res;
 }
 
 static GstFlowReturn
-gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
+    GstMiniObject * item, GstQueue2ItemType item_type)
 {
-  GstQueue2 *queue;
-
-  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
-
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
-      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
-      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
-      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
   /* when we received EOS, we refuse more data */
@@ -1967,31 +2295,11 @@ gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
   if (queue->unexpected)
     goto out_unexpected;
 
-  /* We make space available if we're "full" according to whatever
-   * the user defined as "full". */
-  if (gst_queue2_is_filled (queue)) {
-    gboolean started;
-
-    /* pause the timer while we wait. The fact that we are waiting does not mean
-     * the byterate on the input pad is lower */
-    if ((started = queue->in_timer_started))
-      g_timer_stop (queue->in_timer);
-
-    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
-        "queue is full, waiting for free space");
-    do {
-      /* Wait for space to be available, we could be unlocked because of a flush. */
-      GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
-    }
-    while (gst_queue2_is_filled (queue));
-
-    /* and continue if we were running before */
-    if (started)
-      g_timer_continue (queue->in_timer);
-  }
+  if (!gst_queue2_wait_free_space (queue))
+    goto out_flushing;
 
   /* put buffer in queue now */
-  gst_queue2_locked_enqueue (queue, buffer);
+  gst_queue2_locked_enqueue (queue, item, item_type);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   return GST_FLOW_OK;
@@ -2004,7 +2312,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return ret;
   }
@@ -2012,7 +2320,7 @@ out_eos:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
@@ -2021,12 +2329,89 @@ out_unexpected:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because we received UNEXPECTED");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (buffer);
+    gst_mini_object_unref (item);
 
     return GST_FLOW_UNEXPECTED;
   }
 }
 
+static GstFlowReturn
+gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
+      GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
+}
+
+static GstFlowReturn
+gst_queue2_chain_list (GstPad * pad, GstBufferList * buffer_list)
+{
+  GstQueue2 *queue;
+
+  queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+      "received buffer list %p", buffer_list);
+
+  return gst_queue2_chain_buffer_or_buffer_list (queue,
+      GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
+}
+
+static GstMiniObject *
+gst_queue2_dequeue_on_unexpected (GstQueue2 * queue,
+    GstQueue2ItemType * item_type)
+{
+  GstMiniObject *data;
+
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got UNEXPECTED from downstream");
+
+  /* stop pushing buffers, we dequeue all items until we see an item that we
+   * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+   * queue we can push, we set a flag to make the sinkpad refuse more
+   * buffers with an UNEXPECTED return value until we receive something
+   * pushable again or we get flushed. */
+  while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
+    if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED buffer %p", data);
+      gst_buffer_unref (GST_BUFFER_CAST (data));
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
+      GstEvent *event = GST_EVENT_CAST (data);
+      GstEventType type = GST_EVENT_TYPE (event);
+
+      if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+        /* we found a pushable item in the queue, push it out */
+        GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+            "pushing pushable event %s after UNEXPECTED",
+            GST_EVENT_TYPE_NAME (event));
+        return data;
+      }
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED event %p", event);
+      gst_event_unref (event);
+    } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "dropping UNEXPECTED buffer list %p", data);
+      gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
+    }
+  }
+  /* no more items in the queue. Set the unexpected flag so that upstream
+   * make us refuse any more buffers on the sinkpad. Since we will still
+   * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+   * task function does not shut down. */
+  queue->unexpected = TRUE;
+  return NULL;
+}
+
 /* dequeue an item from the queue an push it downstream. This functions returns
  * the result of the push. */
 static GstFlowReturn
@@ -2034,21 +2419,22 @@ gst_queue2_push_one (GstQueue2 * queue)
 {
   GstFlowReturn result = GST_FLOW_OK;
   GstMiniObject *data;
+  GstQueue2ItemType item_type;
 
-  data = gst_queue2_locked_dequeue (queue);
+  data = gst_queue2_locked_dequeue (queue, &item_type);
   if (data == NULL)
     goto no_item;
 
 next:
-  if (GST_IS_BUFFER (data)) {
+  GST_QUEUE2_MUTEX_UNLOCK (queue);
+
+  if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
     GstBuffer *buffer;
     GstCaps *caps;
 
     buffer = GST_BUFFER_CAST (data);
     caps = GST_BUFFER_CAPS (buffer);
 
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
-
     /* set caps before pushing the buffer so that core does not try to do
      * something fancy to check if this is possible. */
     if (caps && caps != GST_PAD_CAPS (queue->srcpad))
@@ -2059,56 +2445,54 @@ next:
     /* need to check for srcresult here as well */
     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
     if (result == GST_FLOW_UNEXPECTED) {
-      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-          "got UNEXPECTED from downstream");
-      /* stop pushing buffers, we dequeue all items until we see an item that we
-       * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
-       * queue we can push, we set a flag to make the sinkpad refuse more
-       * buffers with an UNEXPECTED return value until we receive something
-       * pushable again or we get flushed. */
-      while ((data = gst_queue2_locked_dequeue (queue))) {
-        if (GST_IS_BUFFER (data)) {
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED buffer %p", data);
-          gst_buffer_unref (GST_BUFFER_CAST (data));
-        } else if (GST_IS_EVENT (data)) {
-          GstEvent *event = GST_EVENT_CAST (data);
-          GstEventType type = GST_EVENT_TYPE (event);
-
-          if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
-            /* we found a pushable item in the queue, push it out */
-            GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-                "pushing pushable event %s after UNEXPECTED",
-                GST_EVENT_TYPE_NAME (event));
-            goto next;
-          }
-          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
-              "dropping UNEXPECTED event %p", event);
-          gst_event_unref (event);
-        }
-      }
-      /* no more items in the queue. Set the unexpected flag so that upstream
-       * make us refuse any more buffers on the sinkpad. Since we will still
-       * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
-       * task function does not shut down. */
-      queue->unexpected = TRUE;
+      data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
       result = GST_FLOW_OK;
     }
-  } else if (GST_IS_EVENT (data)) {
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
     GstEvent *event = GST_EVENT_CAST (data);
     GstEventType type = GST_EVENT_TYPE (event);
 
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
-
     gst_pad_push_event (queue->srcpad, event);
 
-    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
     /* if we're EOS, return UNEXPECTED so that the task pauses. */
     if (type == GST_EVENT_EOS) {
       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
           "pushed EOS event %p, return UNEXPECTED", event);
       result = GST_FLOW_UNEXPECTED;
     }
+
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+  } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
+    GstBufferList *buffer_list;
+    GstBuffer *first_buf;
+    GstCaps *caps;
+
+    buffer_list = GST_BUFFER_LIST_CAST (data);
+
+    first_buf = gst_buffer_list_get (buffer_list, 0, 0);
+    caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL;
+
+    /* set caps before pushing the buffer so that core does not try to do
+     * something fancy to check if this is possible. */
+    if (caps && caps != GST_PAD_CAPS (queue->srcpad))
+      gst_pad_set_caps (queue->srcpad, caps);
+
+    result = gst_pad_push_list (queue->srcpad, buffer_list);
+
+    /* need to check for srcresult here as well */
+    GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
+    if (result == GST_FLOW_UNEXPECTED) {
+      data = gst_queue2_dequeue_on_unexpected (queue, &item_type);
+      if (data != NULL)
+        goto next;
+      /* Since we will still accept EOS and NEWSEGMENT we return _FLOW_OK
+       * to the caller so that the task function does not shut down */
+      result = GST_FLOW_OK;
+    }
   }
   return result;
 
@@ -2126,7 +2510,7 @@ out_flushing:
   }
 }
 
-/* called repeadedly with @pad as the source pad. This function should push out
+/* called repeatedly with @pad as the source pad. This function should push out
  * data to the peer element. */
 static void
 gst_queue2_loop (GstPad * pad)
@@ -2176,13 +2560,12 @@ out_flushing:
     GstFlowReturn ret = queue->srcresult;
 
     gst_pad_pause_task (queue->srcpad);
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
-    GST_QUEUE2_MUTEX_UNLOCK (queue);
     /* let app know about us giving up if upstream is not expected to do so */
     /* UNEXPECTED is already taken care of elsewhere */
-    if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) &&
-        (ret != GST_FLOW_UNEXPECTED)) {
+    if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
           (_("Internal data flow error.")),
           ("streaming task paused, reason %s (%d)",
@@ -2197,8 +2580,12 @@ static gboolean
 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
 {
   gboolean res = TRUE;
-  GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  GstQueue2 *queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
+  if (G_UNLIKELY (queue == NULL)) {
+    gst_event_unref (event);
+    return FALSE;
+  }
 #ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
       event, GST_EVENT_TYPE_NAME (event));
@@ -2206,8 +2593,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
-              || QUEUE_IS_USING_TEMP_FILE (queue))) {
+      if (QUEUE_IS_USING_QUEUE (queue)) {
         /* just forward upstream */
         res = gst_pad_push_event (queue->sinkpad, event);
       } else {
@@ -2224,16 +2610,19 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
       }
       break;
     case GST_EVENT_FLUSH_STOP:
-      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
-              || QUEUE_IS_USING_TEMP_FILE (queue))) {
+      if (QUEUE_IS_USING_QUEUE (queue)) {
         /* just forward upstream */
         res = gst_pad_push_event (queue->sinkpad, event);
       } else {
         /* now unblock the getrange function */
         GST_QUEUE2_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_OK;
-        if (queue->current)
+        if (queue->current) {
+          /* forget the highest read offset, we'll calculate a new one when we
+           * get the next getrange request. We need to do this in order to reset
+           * the buffering percentage */
           queue->current->max_reading_pos = 0;
+        }
         GST_QUEUE2_MUTEX_UNLOCK (queue);
 
         /* when using a temp file, we eat the event */
@@ -2246,6 +2635,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
       break;
   }
 
+  gst_object_unref (queue);
   return res;
 }
 
@@ -2267,7 +2657,9 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
 {
   GstQueue2 *queue;
 
-  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
+  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
+  if (G_UNLIKELY (queue == NULL))
+    return FALSE;
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
@@ -2314,14 +2706,14 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
 
       GST_DEBUG_OBJECT (queue, "query buffering");
 
-      if (!(QUEUE_IS_USING_RING_BUFFER (queue)
-              || QUEUE_IS_USING_TEMP_FILE (queue))) {
+      /* FIXME - is this condition correct? what should ring buffer do? */
+      if (QUEUE_IS_USING_QUEUE (queue)) {
         /* no temp file, just forward to the peer */
         if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
           goto peer_failed;
         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
       } else {
-        gint64 start, stop;
+        gint64 start, stop, range_start, range_stop;
         guint64 writing_pos;
         gint percent;
         gint64 estimated_total, buffering_left;
@@ -2329,6 +2721,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
         gint64 duration;
         gboolean peer_res, is_buffering, is_eos;
         gdouble byte_in_rate, byte_out_rate;
+        GstQueue2Range *queued_ranges;
 
         /* we need a current download region */
         if (queue->current == NULL)
@@ -2391,6 +2784,37 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
             stop = -1;
             break;
         }
+
+        /* fill out the buffered ranges */
+        for (queued_ranges = queue->ranges; queued_ranges;
+            queued_ranges = queued_ranges->next) {
+          switch (format) {
+            case GST_FORMAT_PERCENT:
+              if (duration == -1) {
+                range_start = 0;
+                range_stop = 0;
+                break;
+              }
+              range_start = 100 * queued_ranges->offset / duration;
+              range_stop = 100 * queued_ranges->writing_pos / duration;
+              break;
+            case GST_FORMAT_BYTES:
+              range_start = queued_ranges->offset;
+              range_stop = queued_ranges->writing_pos;
+              break;
+            default:
+              range_start = -1;
+              range_stop = -1;
+              break;
+          }
+          if (range_start == range_stop)
+            continue;
+          GST_DEBUG_OBJECT (queue,
+              "range starting at %" G_GINT64_FORMAT " and finishing at %"
+              G_GINT64_FORMAT, range_start, range_stop);
+          gst_query_add_buffering_range (query, range_start, range_stop);
+        }
+
         gst_query_set_buffering_percent (query, is_buffering, percent);
         gst_query_set_buffering_range (query, format, start, stop,
             estimated_total);
@@ -2406,12 +2830,14 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
       break;
   }
 
+  gst_object_unref (queue);
   return TRUE;
 
   /* ERRORS */
 peer_failed:
   {
     GST_DEBUG_OBJECT (queue, "failed peer query");
+    gst_object_unref (queue);
     return FALSE;
   }
 }
@@ -2423,6 +2849,18 @@ gst_queue2_handle_query (GstElement * element, GstQuery * query)
   return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
 }
 
+static void
+gst_queue2_update_upstream_size (GstQueue2 * queue)
+{
+  GstFormat fmt = GST_FORMAT_BYTES;
+  gint64 upstream_size = -1;
+
+  if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &upstream_size)) {
+    GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
+    queue->upstream_size = upstream_size;
+  }
+}
+
 static GstFlowReturn
 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
     GstBuffer ** buffer)
@@ -2431,21 +2869,35 @@ gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
   GstFlowReturn ret;
 
   queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
-  if (length > queue->ring_buffer_max_size) {
-    GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
-        (_("Buffer is too large to fit in ring buffer")),
-        ("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
-    return GST_FLOW_ERROR;
-  }
 
-  GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
+  GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
   offset = (offset == -1) ? queue->current->reading_pos : offset;
 
+  GST_DEBUG_OBJECT (queue,
+      "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
+
+  /* catch any reads beyond the size of the file here to make sure queue2
+   * doesn't send seek events beyond the size of the file upstream, since
+   * that would confuse elements such as souphttpsrc and/or http servers.
+   * Demuxers often just loop until EOS at the end of the file to figure out
+   * when they've read all the end-headers or index chunks. */
+  if (G_UNLIKELY (offset >= queue->upstream_size)) {
+    gst_queue2_update_upstream_size (queue);
+    if (queue->upstream_size > 0 && offset >= queue->upstream_size)
+      goto out_unexpected;
+  }
+
+  if (G_UNLIKELY (offset + length > queue->upstream_size)) {
+    gst_queue2_update_upstream_size (queue);
+    if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
+      length = queue->upstream_size - offset;
+      GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
+    }
+  }
+
   /* FIXME - function will block when the range is not yet available */
   ret = gst_queue2_create_read (queue, offset, length, buffer);
-
-  GST_QUEUE2_SIGNAL_DEL (queue);
   GST_QUEUE2_MUTEX_UNLOCK (queue);
 
   gst_object_unref (queue);
@@ -2459,8 +2911,16 @@ out_flushing:
 
     GST_DEBUG_OBJECT (queue, "we are flushing");
     GST_QUEUE2_MUTEX_UNLOCK (queue);
+    gst_object_unref (queue);
     return ret;
   }
+out_unexpected:
+  {
+    GST_DEBUG_OBJECT (queue, "read beyond end of file");
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
+    gst_object_unref (queue);
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 static gboolean
@@ -2472,7 +2932,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad)
   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
   /* we can operate in pull mode when we are using a tempfile */
-  ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
+  ret = !QUEUE_IS_USING_QUEUE (queue);
 
   gst_object_unref (GST_OBJECT (queue));
 
@@ -2560,27 +3020,34 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
 
   if (active) {
-    if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
-      /* open the temp file now */
-      result = gst_queue2_open_temp_location_file (queue);
+    GST_QUEUE2_MUTEX_LOCK (queue);
+    if (!QUEUE_IS_USING_QUEUE (queue)) {
+      if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+        /* open the temp file now */
+        result = gst_queue2_open_temp_location_file (queue);
+      } else if (!queue->ring_buffer) {
+        queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
+        result = ! !queue->ring_buffer;
+      } else {
+        result = TRUE;
+      }
 
-      GST_QUEUE2_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "activating pull mode");
+      init_ranges (queue);
       queue->srcresult = GST_FLOW_OK;
       queue->sinkresult = GST_FLOW_OK;
       queue->is_eos = FALSE;
       queue->unexpected = FALSE;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
+      queue->upstream_size = 0;
     } else {
-      GST_QUEUE2_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
       /* this is not allowed, we cannot operate in pull mode without a temp
        * file. */
       queue->srcresult = GST_FLOW_WRONG_STATE;
       queue->sinkresult = GST_FLOW_WRONG_STATE;
       result = FALSE;
-      GST_QUEUE2_MUTEX_UNLOCK (queue);
     }
+    GST_QUEUE2_MUTEX_UNLOCK (queue);
   } else {
     GST_QUEUE2_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
@@ -2608,13 +3075,24 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_NULL_TO_READY:
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
-      if (QUEUE_IS_USING_RING_BUFFER (queue)
-          || QUEUE_IS_USING_TEMP_FILE (queue)) {
-        if (!gst_queue2_open_temp_location_file (queue))
-          ret = GST_STATE_CHANGE_FAILURE;
+      GST_QUEUE2_MUTEX_LOCK (queue);
+      if (!QUEUE_IS_USING_QUEUE (queue)) {
+        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+          if (!gst_queue2_open_temp_location_file (queue))
+            ret = GST_STATE_CHANGE_FAILURE;
+        } else {
+          if (queue->ring_buffer) {
+            g_free (queue->ring_buffer);
+            queue->ring_buffer = NULL;
+          }
+          if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
+            ret = GST_STATE_CHANGE_FAILURE;
+        }
+        init_ranges (queue);
       }
       queue->segment_event_received = FALSE;
       queue->starting_segment = NULL;
+      GST_QUEUE2_MUTEX_UNLOCK (queue);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;
@@ -2634,13 +3112,21 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
-      if (QUEUE_IS_USING_RING_BUFFER (queue)
-          || QUEUE_IS_USING_TEMP_FILE (queue))
-        gst_queue2_close_temp_location_file (queue);
+      GST_QUEUE2_MUTEX_LOCK (queue);
+      if (!QUEUE_IS_USING_QUEUE (queue)) {
+        if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+          gst_queue2_close_temp_location_file (queue);
+        } else if (queue->ring_buffer) {
+          g_free (queue->ring_buffer);
+          queue->ring_buffer = NULL;
+        }
+        clean_ranges (queue);
+      }
       if (queue->starting_segment != NULL) {
         gst_event_unref (queue->starting_segment);
         queue->starting_segment = NULL;
       }
+      GST_QUEUE2_MUTEX_UNLOCK (queue);
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -2741,11 +3227,8 @@ gst_queue2_set_property (GObject * object,
     case PROP_TEMP_REMOVE:
       queue->temp_remove = g_value_get_boolean (value);
       break;
-    case PROP_USE_RING_BUFFER:
-      queue->use_ring_buffer = g_value_get_boolean (value);
-      break;
     case PROP_RING_BUFFER_MAX_SIZE:
-      queue->ring_buffer_max_size = g_value_get_uint (value);
+      queue->ring_buffer_max_size = g_value_get_uint64 (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -2803,11 +3286,8 @@ gst_queue2_get_property (GObject * object,
     case PROP_TEMP_REMOVE:
       g_value_set_boolean (value, queue->temp_remove);
       break;
-    case PROP_USE_RING_BUFFER:
-      g_value_set_boolean (value, queue->use_ring_buffer);
-      break;
     case PROP_RING_BUFFER_MAX_SIZE:
-      g_value_set_uint (value, queue->ring_buffer_max_size);
+      g_value_set_uint64 (value, queue->ring_buffer_max_size);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);