]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - glsdk/gstreamer0-10.git/blob - plugins/elements/gstqueue2.c
queue2: ring buffer work in progress
[glsdk/gstreamer0-10.git] / plugins / elements / gstqueue2.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
26 /**
27  * SECTION:element-queue2
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue2:current-level-buffers property.
40  *
41  * The default queue size limits are 100 buffers, 2MB of data, or
42  * two seconds worth of data, whichever is reached first.
43  *
44  * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
45  * will allocate a random free filename and buffer data in the file.
46  * By using this, it will buffer the entire stream data on the file independently
47  * of the queue size limits, they will only be used for buffering statistics.
48  *
49  * Since 0.10.24, setting the temp-location property with a filename is deprecated
50  * because it's impossible to securely open a temporary file in this way. The
51  * property will still be used to notify the application of the allocated
52  * filename, though.
53  *
54  * Last reviewed on 2009-07-10 (0.10.24)
55  */
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
61 #include "gstqueue2.h"
63 #include <glib/gstdio.h>
65 #include "gst/gst-i18n-lib.h"
67 #ifdef G_OS_WIN32
68 #include <io.h>                 /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
77 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
78     GST_PAD_SINK,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS_ANY);
82 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
83     GST_PAD_SRC,
84     GST_PAD_ALWAYS,
85     GST_STATIC_CAPS_ANY);
87 GST_DEBUG_CATEGORY_STATIC (queue_debug);
88 #define GST_CAT_DEFAULT (queue_debug)
89 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
91 enum
92 {
93   LAST_SIGNAL
94 };
96 /* other defines */
97 #define DEFAULT_BUFFER_SIZE 4096
98 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
99 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer)    /* for consistency with the above macro */
101 /* default property values */
102 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
103 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
104 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
105 #define DEFAULT_USE_BUFFERING      FALSE
106 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
107 #define DEFAULT_LOW_PERCENT        10
108 #define DEFAULT_HIGH_PERCENT       99
109 #define DEFAULT_TEMP_REMOVE        TRUE
110 #define DEFAULT_USE_RING_BUFFER    FALSE
111 #define DEFAULT_RING_BUFFER_MAX_SIZE (1024 * DEFAULT_BUFFER_SIZE)       /* 4 MB */
113 enum
115   PROP_0,
116   PROP_CUR_LEVEL_BUFFERS,
117   PROP_CUR_LEVEL_BYTES,
118   PROP_CUR_LEVEL_TIME,
119   PROP_MAX_SIZE_BUFFERS,
120   PROP_MAX_SIZE_BYTES,
121   PROP_MAX_SIZE_TIME,
122   PROP_USE_BUFFERING,
123   PROP_USE_RATE_ESTIMATE,
124   PROP_LOW_PERCENT,
125   PROP_HIGH_PERCENT,
126   PROP_TEMP_TEMPLATE,
127   PROP_TEMP_LOCATION,
128   PROP_TEMP_REMOVE,
129   PROP_USE_RING_BUFFER,
130   PROP_RING_BUFFER_MAX_SIZE,
131   PROP_LAST
132 };
134 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
135   l.buffers = 0;                                        \
136   l.bytes = 0;                                          \
137   l.time = 0;                                           \
138   l.rate_time = 0;                                      \
139 } G_STMT_END
141 #define STATUS(queue, pad, msg) \
142   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
143                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
144                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
145                       " ns, %"G_GUINT64_FORMAT" items", \
146                       GST_DEBUG_PAD_NAME (pad), \
147                       queue->cur_level.buffers, \
148                       queue->max_level.buffers, \
149                       queue->cur_level.bytes, \
150                       queue->max_level.bytes, \
151                       queue->cur_level.time, \
152                       queue->max_level.time, \
153                       (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
154                         queue->current->writing_pos - queue->current->max_reading_pos : \
155                         queue->queue->length))
157 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
158   g_mutex_lock (q->qlock);                                              \
159 } G_STMT_END
161 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
162   GST_QUEUE2_MUTEX_LOCK (q);                                            \
163   if (res != GST_FLOW_OK)                                               \
164     goto label;                                                         \
165 } G_STMT_END
167 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
168   g_mutex_unlock (q->qlock);                                            \
169 } G_STMT_END
171 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
172   STATUS (queue, q->sinkpad, "wait for DEL");                           \
173   q->waiting_del = TRUE;                                                \
174   g_cond_wait (q->item_del, queue->qlock);                              \
175   q->waiting_del = FALSE;                                               \
176   if (res != GST_FLOW_OK) {                                             \
177     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
178     goto label;                                                         \
179   }                                                                     \
180   STATUS (queue, q->sinkpad, "received DEL");                           \
181 } G_STMT_END
183 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
184   STATUS (queue, q->srcpad, "wait for ADD");                            \
185   q->waiting_add = TRUE;                                                \
186   g_cond_wait (q->item_add, q->qlock);                                  \
187   q->waiting_add = FALSE;                                               \
188   if (res != GST_FLOW_OK) {                                             \
189     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
190     goto label;                                                         \
191   }                                                                     \
192   STATUS (queue, q->srcpad, "received ADD");                            \
193 } G_STMT_END
195 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
196   if (q->waiting_del) {                                                 \
197     STATUS (q, q->srcpad, "signal DEL");                                \
198     g_cond_signal (q->item_del);                                        \
199   }                                                                     \
200 } G_STMT_END
202 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
203   if (q->waiting_add) {                                                 \
204     STATUS (q, q->sinkpad, "signal ADD");                               \
205     g_cond_signal (q->item_add);                                        \
206   }                                                                     \
207 } G_STMT_END
209 #define _do_init(bla) \
210     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
211     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
212         "dataflow inside the queue element");
214 GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
215     _do_init);
217 static void gst_queue2_finalize (GObject * object);
219 static void gst_queue2_set_property (GObject * object,
220     guint prop_id, const GValue * value, GParamSpec * pspec);
221 static void gst_queue2_get_property (GObject * object,
222     guint prop_id, GValue * value, GParamSpec * pspec);
224 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
225 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
226     guint size, GstCaps * caps, GstBuffer ** buf);
227 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
228 static void gst_queue2_loop (GstPad * pad);
230 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
232 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
233 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
234 static gboolean gst_queue2_handle_query (GstElement * element,
235     GstQuery * query);
237 static GstCaps *gst_queue2_getcaps (GstPad * pad);
238 static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
240 static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
241     guint length, GstBuffer ** buffer);
242 static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
244 static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
245 static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
246 static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
247 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
248     GstStateChange transition);
250 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
251 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
253 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
256 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
258 static void
259 gst_queue2_base_init (gpointer g_class)
261   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
263   gst_element_class_add_pad_template (gstelement_class,
264       gst_static_pad_template_get (&srctemplate));
265   gst_element_class_add_pad_template (gstelement_class,
266       gst_static_pad_template_get (&sinktemplate));
268   gst_element_class_set_details_simple (gstelement_class, "Queue 2",
269       "Generic",
270       "Simple data queue",
271       "Erik Walthinsen <omega@cse.ogi.edu>, "
272       "Wim Taymans <wim.taymans@gmail.com>");
275 static void
276 gst_queue2_class_init (GstQueue2Class * klass)
278   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
279   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
281   parent_class = g_type_class_peek_parent (klass);
283   gobject_class->set_property = gst_queue2_set_property;
284   gobject_class->get_property = gst_queue2_get_property;
286   /* properties */
287   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
288       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
289           "Current amount of data in the queue (bytes)",
290           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
291   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
292       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
293           "Current number of buffers in the queue",
294           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
295   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
296       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
297           "Current amount of data in the queue (in ns)",
298           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
301       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
302           "Max. amount of data in the queue (bytes, 0=disable)",
303           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
304           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
306       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
307           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
308           DEFAULT_MAX_SIZE_BUFFERS,
309           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
311       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
312           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
313           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
316       g_param_spec_boolean ("use-buffering", "Use buffering",
317           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
318           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
319   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
320       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
321           "Estimate the bitrate of the stream to calculate time level",
322           DEFAULT_USE_RATE_ESTIMATE,
323           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
325       g_param_spec_int ("low-percent", "Low percent",
326           "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
329       g_param_spec_int ("high-percent", "High percent",
330           "High threshold for buffering to finish", 0, 100,
331           DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
334       g_param_spec_string ("temp-template", "Temporary File Template",
335           "File template to store temporary files in, should contain directory "
336           "and XXXXXX. (NULL == disabled)",
337           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
340       g_param_spec_string ("temp-location", "Temporary File Location",
341           "Location to store temporary files in (Deprecated: Only read this "
342           "property, use temp-template to configure the name template)",
343           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345   /**
346    * GstQueue2:temp-remove
347    *
348    * When temp-template is set, remove the temporary file when going to READY.
349    *
350    * Since: 0.10.26
351    */
352   g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
353       g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
354           "Remove the temp-location after use",
355           DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357   /**
358    * GstQueue2:use-ring-buffer
359    *
360    * When use-ring-buffer is set, buffer data into a ring buffer containing ranges
361    * of source data. Default FALSE.
362    *
363    * Since: 0.10.30
364    */
365   g_object_class_install_property (gobject_class, PROP_USE_RING_BUFFER,
366       g_param_spec_boolean ("use-ring-buffer", "Use a ring buffer",
367           "Use a ring buffer of size ring-buffer-max-size kB",
368           DEFAULT_USE_RING_BUFFER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstQueue2:ring-buffer-max-size
371    *
372    * The maximum size of the ring buffer in kilobytes. If set to 0 kB then the size
373    * is unlimited. Default 16 megabytes.
374    *
375    * Since: 0.10.30
376    */
377   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
378       g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
379           "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
380           DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   /* set several parent class virtual functions */
384   gobject_class->finalize = gst_queue2_finalize;
386   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
387   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
390 static void
391 gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
393   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
395   gst_pad_set_chain_function (queue->sinkpad,
396       GST_DEBUG_FUNCPTR (gst_queue2_chain));
397   gst_pad_set_activatepush_function (queue->sinkpad,
398       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
399   gst_pad_set_event_function (queue->sinkpad,
400       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
401   gst_pad_set_getcaps_function (queue->sinkpad,
402       GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
403   gst_pad_set_acceptcaps_function (queue->sinkpad,
404       GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
405   gst_pad_set_bufferalloc_function (queue->sinkpad,
406       GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
407   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
409   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
411   gst_pad_set_activatepull_function (queue->srcpad,
412       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
413   gst_pad_set_activatepush_function (queue->srcpad,
414       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
415   gst_pad_set_getrange_function (queue->srcpad,
416       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
417   gst_pad_set_checkgetrange_function (queue->srcpad,
418       GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
419   gst_pad_set_getcaps_function (queue->srcpad,
420       GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
421   gst_pad_set_acceptcaps_function (queue->srcpad,
422       GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
423   gst_pad_set_event_function (queue->srcpad,
424       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
425   gst_pad_set_query_function (queue->srcpad,
426       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
427   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
429   /* levels */
430   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
431   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
432   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
433   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
434   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
435   queue->use_buffering = DEFAULT_USE_BUFFERING;
436   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
437   queue->low_percent = DEFAULT_LOW_PERCENT;
438   queue->high_percent = DEFAULT_HIGH_PERCENT;
440   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
441   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
443   queue->srcresult = GST_FLOW_WRONG_STATE;
444   queue->sinkresult = GST_FLOW_WRONG_STATE;
445   queue->is_eos = FALSE;
446   queue->in_timer = g_timer_new ();
447   queue->out_timer = g_timer_new ();
449   queue->qlock = g_mutex_new ();
450   queue->waiting_add = FALSE;
451   queue->item_add = g_cond_new ();
452   queue->waiting_del = FALSE;
453   queue->item_del = g_cond_new ();
454   queue->queue = g_queue_new ();
456   /* tempfile related */
457   queue->temp_template = NULL;
458   queue->temp_location = NULL;
459   queue->temp_location_set = FALSE;
460   queue->temp_remove = DEFAULT_TEMP_REMOVE;
462   queue->use_ring_buffer = DEFAULT_USE_RING_BUFFER;
463   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
464   GST_DEBUG_OBJECT (queue,
465       "initialized queue's not_empty & not_full conditions");
468 /* called only once, as opposed to dispose */
469 static void
470 gst_queue2_finalize (GObject * object)
472   GstQueue2 *queue = GST_QUEUE2 (object);
474   GST_DEBUG_OBJECT (queue, "finalizing queue");
476   while (!g_queue_is_empty (queue->queue)) {
477     GstMiniObject *data = g_queue_pop_head (queue->queue);
479     gst_mini_object_unref (data);
480   }
482   g_queue_free (queue->queue);
483   g_mutex_free (queue->qlock);
484   g_cond_free (queue->item_add);
485   g_cond_free (queue->item_del);
486   g_timer_destroy (queue->in_timer);
487   g_timer_destroy (queue->out_timer);
489   /* temp_file path cleanup  */
490   g_free (queue->temp_template);
491   g_free (queue->temp_location);
493   G_OBJECT_CLASS (parent_class)->finalize (object);
496 static void
497 debug_ranges (GstQueue2 * queue)
499   GstQueue2Range *walk;
501   for (walk = queue->ranges; walk; walk = walk->next) {
502     GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
503         walk->offset, walk->writing_pos);
504   }
507 /* clear all the downloaded ranges */
508 static void
509 clean_ranges (GstQueue2 * queue)
511   GST_DEBUG_OBJECT (queue, "clean queue ranges");
513   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
514   queue->ranges = NULL;
515   queue->current = NULL;
518 /* find a range that contains @offset or NULL when nothing does */
519 static GstQueue2Range *
520 find_range (GstQueue2 * queue, guint64 offset, guint64 length)
522   GstQueue2Range *range = NULL;
523   GstQueue2Range *walk;
525   /* first do a quick check for the current range */
526   for (walk = queue->ranges; walk; walk = walk->next) {
527     if (offset >= walk->offset && offset <= walk->writing_pos) {
528       /* we can reuse an existing range */
529       range = walk;
530       break;
531     }
532   }
533   return range;
536 /* make a new range for @offset or reuse an existing range */
537 static GstQueue2Range *
538 add_range (GstQueue2 * queue, guint64 offset)
540   GstQueue2Range *range, *prev, *next;
542   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
544   if ((range = find_range (queue, offset, 0))) {
545     GST_DEBUG_OBJECT (queue,
546         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
547         range->writing_pos);
548     range->writing_pos = offset;
549   } else {
550     GST_DEBUG_OBJECT (queue,
551         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
553     range = g_slice_new0 (GstQueue2Range);
554     range->offset = offset;
555     /* we want to write to the next location in the ring buffer */
556     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
557     range->writing_pos = offset;
558     range->rb_writing_pos = range->rb_offset;
559     range->reading_pos = offset;
560     range->rb_reading_pos = range->rb_offset;
561     range->max_reading_pos = offset;
563     /* insert sorted */
564     prev = NULL;
565     next = queue->ranges;
566     while (next) {
567       if (next->offset > offset) {
568         /* insert before next */
569         GST_DEBUG_OBJECT (queue,
570             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
571             next->offset);
572         break;
573       }
574       /* try next */
575       prev = next;
576       next = next->next;
577     }
578     range->next = next;
579     if (prev)
580       prev->next = range;
581     else
582       queue->ranges = range;
583   }
584   debug_ranges (queue);
586   return range;
590 /* clear and init the download ranges for offset 0 */
591 static void
592 init_ranges (GstQueue2 * queue)
594   GST_DEBUG_OBJECT (queue, "init queue ranges");
596   /* get rid of all the current ranges */
597   clean_ranges (queue);
598   /* make a range for offset 0 */
599   queue->current = add_range (queue, 0);
602 static gboolean
603 gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
605   GstQueue2 *queue;
606   GstPad *otherpad;
607   gboolean result;
609   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
611   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
612   result = gst_pad_peer_accept_caps (otherpad, caps);
614   return result;
617 static GstCaps *
618 gst_queue2_getcaps (GstPad * pad)
620   GstQueue2 *queue;
621   GstPad *otherpad;
622   GstCaps *result;
624   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
626   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
627   result = gst_pad_peer_get_caps (otherpad);
628   if (result == NULL)
629     result = gst_caps_new_any ();
631   return result;
634 static GstFlowReturn
635 gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
636     GstCaps * caps, GstBuffer ** buf)
638   GstQueue2 *queue;
639   GstFlowReturn result;
641   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
643   /* Forward to src pad, without setting caps on the src pad */
644   result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
646   return result;
649 /* calculate the diff between running time on the sink and src of the queue.
650  * This is the total amount of time in the queue. */
651 static void
652 update_time_level (GstQueue2 * queue)
654   gint64 sink_time, src_time;
656   sink_time =
657       gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
658       queue->sink_segment.last_stop);
660   src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
661       queue->src_segment.last_stop);
663   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
664       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
666   if (sink_time >= src_time)
667     queue->cur_level.time = sink_time - src_time;
668   else
669     queue->cur_level.time = 0;
672 /* take a NEWSEGMENT event and apply the values to segment, updating the time
673  * level of queue. */
674 static void
675 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
677   gboolean update;
678   GstFormat format;
679   gdouble rate, arate;
680   gint64 start, stop, time;
682   gst_event_parse_new_segment_full (event, &update, &rate, &arate,
683       &format, &start, &stop, &time);
685   GST_DEBUG_OBJECT (queue,
686       "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
687       "format %d, "
688       "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
689       G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
691   if (format == GST_FORMAT_BYTES) {
692     if (QUEUE_IS_USING_TEMP_FILE (queue)) {
693       /* start is where we'll be getting from and as such writing next */
694       queue->current = add_range (queue, start);
695       /* update the stats for this range */
696       update_cur_level (queue, queue->current);
697     }
698   }
700   /* now configure the values, we use these to track timestamps on the
701    * sinkpad. */
702   if (format != GST_FORMAT_TIME) {
703     /* non-time format, pretent the current time segment is closed with a
704      * 0 start and unknown stop time. */
705     update = FALSE;
706     format = GST_FORMAT_TIME;
707     start = 0;
708     stop = -1;
709     time = 0;
710   }
711   gst_segment_set_newsegment_full (segment, update,
712       rate, arate, format, start, stop, time);
714   GST_DEBUG_OBJECT (queue,
715       "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
717   /* segment can update the time level of the queue */
718   update_time_level (queue);
721 /* take a buffer and update segment, updating the time level of the queue. */
722 static void
723 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
725   GstClockTime duration, timestamp;
727   timestamp = GST_BUFFER_TIMESTAMP (buffer);
728   duration = GST_BUFFER_DURATION (buffer);
730   /* if no timestamp is set, assume it's continuous with the previous 
731    * time */
732   if (timestamp == GST_CLOCK_TIME_NONE)
733     timestamp = segment->last_stop;
735   /* add duration */
736   if (duration != GST_CLOCK_TIME_NONE)
737     timestamp += duration;
739   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
740       GST_TIME_ARGS (timestamp));
742   gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
744   /* calc diff with other end */
745   update_time_level (queue);
748 static void
749 update_buffering (GstQueue2 * queue)
751   gint64 percent;
752   gboolean post = FALSE;
754   if (!queue->use_buffering || queue->high_percent <= 0)
755     return;
757 #define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \
758                 (queue->cur_level.format) * 100 / (queue->max_level.format) : 0)
760   if (queue->is_eos) {
761     /* on EOS we are always 100% full, we set the var here so that it we can
762      * reuse the logic below to stop buffering */
763     percent = 100;
764   } else {
765     /* figure out the percent we are filled, we take the max of all formats. */
766     percent = GET_PERCENT (bytes);
767     percent = MAX (percent, GET_PERCENT (time));
768     percent = MAX (percent, GET_PERCENT (buffers));
770     /* also apply the rate estimate when we need to */
771     if (queue->use_rate_estimate)
772       percent = MAX (percent, GET_PERCENT (rate_time));
773   }
775   if (queue->is_buffering) {
776     post = TRUE;
777     /* if we were buffering see if we reached the high watermark */
778     if (percent >= queue->high_percent)
779       queue->is_buffering = FALSE;
780   } else {
781     /* we were not buffering, check if we need to start buffering if we drop
782      * below the low threshold */
783     if (percent < queue->low_percent) {
784       queue->is_buffering = TRUE;
785       queue->buffering_iteration++;
786       post = TRUE;
787     }
788   }
789   if (post) {
790     GstMessage *message;
791     GstBufferingMode mode;
792     gint64 buffering_left = -1;
794     /* scale to high percent so that it becomes the 100% mark */
795     percent = percent * 100 / queue->high_percent;
796     /* clip */
797     if (percent > 100)
798       percent = 100;
800     queue->buffering_percent = percent;
802     if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
803       GstFormat fmt = GST_FORMAT_BYTES;
804       gint64 duration;
806       mode = GST_BUFFERING_DOWNLOAD;
807       if (queue->byte_in_rate > 0) {
808         if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
809           buffering_left =
810               (gdouble) ((duration -
811                   queue->current->writing_pos) * 1000) / queue->byte_in_rate;
812       } else {
813         buffering_left = G_MAXINT64;
814       }
815     } else {
816       mode = GST_BUFFERING_STREAM;
817     }
819     GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
820     message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
821         (gint) percent);
822     gst_message_set_buffering_stats (message, mode,
823         queue->byte_in_rate, queue->byte_out_rate, buffering_left);
825     gst_element_post_message (GST_ELEMENT_CAST (queue), message);
827   } else {
828     GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
829   }
831 #undef GET_PERCENT
834 static void
835 reset_rate_timer (GstQueue2 * queue)
837   queue->bytes_in = 0;
838   queue->bytes_out = 0;
839   queue->byte_in_rate = 0.0;
840   queue->byte_out_rate = 0.0;
841   queue->last_in_elapsed = 0.0;
842   queue->last_out_elapsed = 0.0;
843   queue->in_timer_started = FALSE;
844   queue->out_timer_started = FALSE;
847 /* the interval in seconds to recalculate the rate */
848 #define RATE_INTERVAL    0.2
849 /* Tuning for rate estimation. We use a large window for the input rate because
850  * it should be stable when connected to a network. The output rate is less
851  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
852  * therefore adapt more quickly. */
853 #define AVG_IN(avg,val)  ((avg) * 15.0 + (val)) / 16.0
854 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
856 static void
857 update_in_rates (GstQueue2 * queue)
859   gdouble elapsed, period;
860   gdouble byte_in_rate;
862   if (!queue->in_timer_started) {
863     queue->in_timer_started = TRUE;
864     g_timer_start (queue->in_timer);
865     return;
866   }
868   elapsed = g_timer_elapsed (queue->in_timer, NULL);
870   /* recalc after each interval. */
871   if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
872     period = elapsed - queue->last_in_elapsed;
874     GST_DEBUG_OBJECT (queue,
875         "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
877     byte_in_rate = queue->bytes_in / period;
879     if (queue->byte_in_rate == 0.0)
880       queue->byte_in_rate = byte_in_rate;
881     else
882       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
884     /* reset the values to calculate rate over the next interval */
885     queue->last_in_elapsed = elapsed;
886     queue->bytes_in = 0;
887   }
889   if (queue->byte_in_rate > 0.0) {
890     queue->cur_level.rate_time =
891         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
892   }
893   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
894       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
897 static void
898 update_out_rates (GstQueue2 * queue)
900   gdouble elapsed, period;
901   gdouble byte_out_rate;
903   if (!queue->out_timer_started) {
904     queue->out_timer_started = TRUE;
905     g_timer_start (queue->out_timer);
906     return;
907   }
909   elapsed = g_timer_elapsed (queue->out_timer, NULL);
911   /* recalc after each interval. */
912   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
913     period = elapsed - queue->last_out_elapsed;
915     GST_DEBUG_OBJECT (queue,
916         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
918     byte_out_rate = queue->bytes_out / period;
920     if (queue->byte_out_rate == 0.0)
921       queue->byte_out_rate = byte_out_rate;
922     else
923       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
925     /* reset the values to calculate rate over the next interval */
926     queue->last_out_elapsed = elapsed;
927     queue->bytes_out = 0;
928   }
929   if (queue->byte_in_rate > 0.0) {
930     queue->cur_level.rate_time =
931         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
932   }
933   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
934       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
937 static void
938 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
940   guint64 max_reading_pos, writing_pos;
942   writing_pos = range->writing_pos;
943   max_reading_pos = range->max_reading_pos;
945   if (writing_pos > max_reading_pos)
946     queue->cur_level.bytes = writing_pos - max_reading_pos;
947   else
948     queue->cur_level.bytes = 0;
951 #ifdef HAVE_FSEEKO
952 #define FSEEK_FILE(file, offset)  (fseeko (file, (off_t) offset, SEEK_SET))
953 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
954 #define FSEEK_FILE(file, offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET))
955 #else
956 #define FSEEK_FILE(file, offset)  (fseek (file, offset, SEEK_SET))
957 #endif
959 static gboolean
960 gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
962   guint size;
963   guint8 *data;
964   guint64 writing_pos, max_reading_pos;
965   GstQueue2Range *next;
967   writing_pos = queue->current->writing_pos;
968   max_reading_pos = queue->current->max_reading_pos;
970   FSEEK_FILE (queue->temp_file, writing_pos);
972   data = GST_BUFFER_DATA (buffer);
973   size = GST_BUFFER_SIZE (buffer);
975   if (fwrite (data, size, 1, queue->temp_file) != 1)
976     goto handle_error;
978   writing_pos += size;
980   GST_INFO_OBJECT (queue,
981       "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
982       writing_pos, max_reading_pos);
984   if (writing_pos > max_reading_pos)
985     queue->cur_level.bytes = writing_pos - max_reading_pos;
986   else
987     queue->cur_level.bytes = 0;
989   /* try to merge with next range */
990   while ((next = queue->current->next)) {
991     GST_INFO_OBJECT (queue,
992         "checking merge with next range %" G_GUINT64_FORMAT " < %"
993         G_GUINT64_FORMAT, writing_pos, next->offset);
994     if (writing_pos < next->offset)
995       break;
997     GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
998         next->writing_pos);
1000     /* remove the group, we could choose to not read the data in this range
1001      * again. This would involve us doing a seek to the current writing position
1002      * in the range. FIXME, It would probably make sense to do a seek when there
1003      * is a lot of data in the range we merged with to avoid reading it all
1004      * again. */
1005     queue->current->next = next->next;
1006     g_slice_free (GstQueue2Range, next);
1008     debug_ranges (queue);
1009   }
1010   queue->current->writing_pos = writing_pos;
1012   return TRUE;
1014   /* ERRORS */
1015 handle_error:
1016   {
1017     switch (errno) {
1018       case ENOSPC:{
1019         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1020         break;
1021       }
1022       default:{
1023         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1024             (_("Error while writing to download file.")),
1025             ("%s", g_strerror (errno)));
1026       }
1027     }
1028     return FALSE;
1029   }
1032 static void
1033 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1035   guint64 reading_pos, max_reading_pos;
1037   reading_pos = pos;
1038   max_reading_pos = range->max_reading_pos;
1040   max_reading_pos = MAX (max_reading_pos, reading_pos);
1042   range->max_reading_pos = max_reading_pos;
1044   update_cur_level (queue, range);
1047 static gboolean
1048 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1050   GstEvent *event;
1051   gboolean res;
1053   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1055   event =
1056       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1057       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1058       GST_SEEK_TYPE_NONE, -1);
1060   GST_QUEUE2_MUTEX_UNLOCK (queue);
1061   res = gst_pad_push_event (queue->sinkpad, event);
1062   GST_QUEUE2_MUTEX_LOCK (queue);
1064   return res;
1067 /* see if there is enough data in the file to read a full buffer */
1068 static gboolean
1069 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1071   GstQueue2Range *range;
1073   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1074       offset, length);
1076   if ((range = find_range (queue, offset, length))) {
1077     if (queue->current != range) {
1078       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1079       perform_seek_to_offset (queue, range->writing_pos);
1080     }
1082     /* update the current reading position in the range */
1083     update_cur_pos (queue, queue->current, offset + length);
1085     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
1086         queue->cur_level.bytes, MIN (queue->max_level.bytes,
1087             queue->ring_buffer_max_size));
1089     /* we have a range for offset */
1090     GST_DEBUG_OBJECT (queue,
1091         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1092         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1094     if (queue->is_eos)
1095       return TRUE;
1097     if (offset + length < range->writing_pos)
1098       return TRUE;
1100   } else {
1101     GST_INFO_OBJECT (queue, "not found in any range");
1102     /* we don't have the range, see how far away we are, FIXME, find a good
1103      * threshold based on the incomming rate. */
1104     if (!queue->is_eos && queue->current) {
1105       if (offset < queue->current->writing_pos + 200000) {
1106         update_cur_pos (queue, queue->current, offset + length);
1107         GST_INFO_OBJECT (queue, "wait for data");
1108         return FALSE;
1109       }
1110     }
1112     /* too far away, do a seek */
1113     perform_seek_to_offset (queue, offset);
1114   }
1116   return FALSE;
1119 static GstFlowReturn
1120 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1121     guint8 * dst)
1123   size_t res;
1125 #ifdef HAVE_FSEEKO
1126   if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
1127     goto seek_failed;
1128 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1129   if (lseek (fileno (queue->temp_file), (off_t) offset,
1130           SEEK_SET) == (off_t) - 1)
1131     goto seek_failed;
1132 #else
1133   if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
1134     goto seek_failed;
1135 #endif
1137   /* this should not block */
1138   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1139       length, offset);
1140   res = fread (dst, 1, length, queue->temp_file);
1141   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1143   if (G_UNLIKELY (res < length)) {
1144     /* check for errors or EOF */
1145     if (ferror (queue->temp_file))
1146       goto could_not_read;
1147     if (feof (queue->temp_file) && length > 0)
1148       goto eos;
1149   }
1151   return GST_FLOW_OK;
1153 seek_failed:
1154   {
1155     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1156     return GST_FLOW_ERROR;
1157   }
1158 could_not_read:
1159   {
1160     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1161     return GST_FLOW_ERROR;
1162   }
1163 eos:
1164   {
1165     GST_DEBUG ("non-regular file hits EOS");
1166     return GST_FLOW_UNEXPECTED;
1167   }
1170 static GstFlowReturn
1171 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1172     GstBuffer ** buffer)
1174   GstFlowReturn flow_ret;
1175   GstBuffer *buf;
1176   guint8 *data;
1177   guint64 file_offset;
1178   guint block_length;
1180   /* check if we have enough data at @offset. If there is not enough data, we
1181    * block and wait. */
1182   while (!gst_queue2_have_data (queue, offset, length)) {
1183     GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1184   }
1186   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1187     file_offset =
1188         (queue->current->rb_offset + (offset -
1189             queue->current->offset)) % queue->ring_buffer_max_size;
1190     if (file_offset + length > queue->ring_buffer_max_size) {
1191       block_length = queue->ring_buffer_max_size - file_offset;
1192     } else {
1193       block_length = length;
1194     }
1195   } else {
1196     file_offset = offset;
1197     block_length = length;
1198   }
1200   buf = gst_buffer_new_and_alloc (length);
1201   data = GST_BUFFER_DATA (buf);
1203   if ((flow_ret =
1204           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1205               data)) != GST_FLOW_OK) {
1206     gst_buffer_unref (buf);
1207     return flow_ret;
1208   }
1210   if (block_length < length) {
1211     /* read second block into a second buffer, then merge the two */
1212     data += block_length;
1213     block_length = length - block_length;
1215     if ((flow_ret =
1216             gst_queue2_read_data_at_offset (queue, 0, block_length,
1217                 data)) != GST_FLOW_OK) {
1218       gst_buffer_unref (buf);
1219       return flow_ret;
1220     }
1221   }
1223   GST_BUFFER_SIZE (buf) = length;
1224   GST_BUFFER_OFFSET (buf) = offset;
1225   GST_BUFFER_OFFSET_END (buf) = offset + length;
1227   *buffer = buf;
1229   return GST_FLOW_OK;
1231   /* ERRORS */
1232 out_flushing:
1233   {
1234     GST_DEBUG_OBJECT (queue, "we are flushing");
1235     return GST_FLOW_WRONG_STATE;
1236   }
1239 /* should be called with QUEUE_LOCK */
1240 static GstMiniObject *
1241 gst_queue2_read_item_from_file (GstQueue2 * queue)
1243   GstMiniObject *item;
1245   if (queue->starting_segment != NULL) {
1246     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1247     queue->starting_segment = NULL;
1248   } else {
1249     GstFlowReturn ret;
1250     GstBuffer *buffer;
1251     guint64 reading_pos;
1253     reading_pos = queue->current->reading_pos;
1255     ret =
1256         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1257         &buffer);
1258     switch (ret) {
1259       case GST_FLOW_OK:
1260         item = GST_MINI_OBJECT_CAST (buffer);
1261         queue->current->reading_pos += DEFAULT_BUFFER_SIZE;
1262         if (QUEUE_IS_USING_RING_BUFFER (queue))
1263           queue->current->rb_reading_pos =
1264               (queue->current->rb_reading_pos +
1265               DEFAULT_BUFFER_SIZE) % queue->ring_buffer_max_size;
1266         break;
1267       case GST_FLOW_UNEXPECTED:
1268         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1269         break;
1270       default:
1271         item = NULL;
1272         break;
1273     }
1274   }
1275   return item;
1278 static gboolean
1279 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1281   gint fd = -1;
1282   gchar *name = NULL;
1284   if (queue->temp_file)
1285     goto already_opened;
1287   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1289   /* we have two cases:
1290    * - temp_location was set to something !NULL (Deprecated). in this case we
1291    *   open the specified filename.
1292    * - temp_template was set, allocate a filename and open that filename
1293    */
1294   if (!queue->temp_location_set) {
1295     /* nothing to do */
1296     if (queue->temp_template == NULL)
1297       goto no_directory;
1299     /* make copy of the template, we don't want to change this */
1300     name = g_strdup (queue->temp_template);
1301     fd = g_mkstemp (name);
1302     if (fd == -1)
1303       goto mkstemp_failed;
1305     /* open the file for update/writing */
1306     queue->temp_file = fdopen (fd, "wb+");
1307     /* error creating file */
1308     if (queue->temp_file == NULL)
1309       goto open_failed;
1311     g_free (queue->temp_location);
1312     queue->temp_location = name;
1314     g_object_notify (G_OBJECT (queue), "temp-location");
1315   } else {
1316     /* open the file for update/writing, this is deprecated but we still need to
1317      * support it for API/ABI compatibility */
1318     queue->temp_file = g_fopen (queue->temp_location, "wb+");
1319     /* error creating file */
1320     if (queue->temp_file == NULL)
1321       goto open_failed;
1322   }
1323   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1325   init_ranges (queue);
1327   return TRUE;
1329   /* ERRORS */
1330 already_opened:
1331   {
1332     GST_DEBUG_OBJECT (queue, "temp file was already open");
1333     return TRUE;
1334   }
1335 no_directory:
1336   {
1337     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1338         (_("No Temp directory specified.")), (NULL));
1339     return FALSE;
1340   }
1341 mkstemp_failed:
1342   {
1343     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1344         (_("Could not create temp file \"%s\"."), queue->temp_template),
1345         GST_ERROR_SYSTEM);
1346     g_free (name);
1347     return FALSE;
1348   }
1349 open_failed:
1350   {
1351     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1352         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1353     g_free (name);
1354     if (fd != -1)
1355       close (fd);
1356     return FALSE;
1357   }
1360 static void
1361 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1363   /* nothing to do */
1364   if (queue->temp_file == NULL)
1365     return;
1367   GST_DEBUG_OBJECT (queue, "closing temp file");
1369   fflush (queue->temp_file);
1370   fclose (queue->temp_file);
1372   if (queue->temp_remove)
1373     remove (queue->temp_location);
1375   queue->temp_file = NULL;
1376   clean_ranges (queue);
1379 static void
1380 gst_queue2_flush_temp_file (GstQueue2 * queue)
1382   if (queue->temp_file == NULL)
1383     return;
1385   GST_DEBUG_OBJECT (queue, "flushing temp file");
1387   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1389   init_ranges (queue);
1392 static void
1393 gst_queue2_locked_flush (GstQueue2 * queue)
1395   if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
1396     gst_queue2_flush_temp_file (queue);
1397   } else {
1398     while (!g_queue_is_empty (queue->queue)) {
1399       GstMiniObject *data = g_queue_pop_head (queue->queue);
1401       /* Then lose another reference because we are supposed to destroy that
1402          data when flushing */
1403       gst_mini_object_unref (data);
1404     }
1405   }
1406   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1407   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1408   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1409   if (queue->starting_segment != NULL)
1410     gst_event_unref (queue->starting_segment);
1411   queue->starting_segment = NULL;
1412   queue->segment_event_received = FALSE;
1414   /* we deleted a lot of something */
1415   GST_QUEUE2_SIGNAL_DEL (queue);
1418 static gboolean
1419 gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
1421   GstBuffer *buf, *rem;
1422   guint buf_size, rem_size;
1423   const guint rb_size = queue->ring_buffer_max_size;
1424   guint8 *data;
1425   guint64 writing_pos, reading_pos, new_writing_pos;
1426   gint64 space;
1427   GstQueue2Range *range, *prev;
1429   writing_pos = queue->current->rb_writing_pos;
1430   reading_pos = queue->current->rb_reading_pos;
1432   rem = buffer;
1434   /* loop if we can't write the whole buffer at once */
1435   do {
1436     /* calculate the space in the ring buffer not used by data from the
1437      * current range */
1438     space =
1439         MIN (queue->max_level.bytes,
1440         queue->ring_buffer_max_size) - queue->cur_level.bytes;
1442     rem_size = GST_BUFFER_SIZE (rem);
1443     /* don't try to process 0 size buffers */
1444     if (!rem_size)
1445       break;
1447     /* calculate if we need to split or if we can write the entire buffer now */
1448     if (rem_size > space) {
1449       buf_size = space;
1450       buf = gst_buffer_create_sub (rem, 0, space);
1452       rem_size -= space;
1453       rem = gst_buffer_create_sub (rem, space, rem_size);
1454       space = 0;
1455     } else {
1456       buf_size = rem_size;
1457       buf = rem;
1459       rem_size = 0;
1460       rem = NULL;
1461       space -= buf_size;
1462     }
1464     data = GST_BUFFER_DATA (buf);
1466     /* the writing position in the ring buffer after writing (part or all of)
1467      * the buffer */
1468     new_writing_pos = (writing_pos + buf_size) % rb_size;
1470     prev = NULL;
1471     range = queue->ranges;
1473     /* if we need to overwrite data in the ring buffer, we need to update the
1474      * ranges
1475      * warning: this code is complicated and includes some simplifications -
1476      * pen, paper and diagrams for the cases recommended! */
1477     while (range) {
1478       guint64 range_data_start, range_data_end;
1479       GstQueue2Range *range_to_destroy = NULL;
1481       /* we don't edit the current range here */
1482       if (range == queue->current)
1483         goto next_range;
1485       range_data_start = range->rb_offset;
1486       range_data_end = range->rb_writing_pos;
1488       if (range_data_end > range_data_start) {
1489         if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1490           goto next_range;
1492         if (new_writing_pos > range_data_start) {
1493           if (new_writing_pos >= range_data_end) {
1494             /* remove range */
1495             range_to_destroy = range;
1496             if (prev)
1497               prev->next = range->next;
1498           } else {
1499             range->offset += (new_writing_pos - range_data_start);
1500             range->rb_offset = new_writing_pos;
1501           }
1502         }
1503       } else {
1504         guint64 new_wpos_virt = writing_pos + buf_size;
1506         if (new_wpos_virt <= range_data_start)
1507           goto next_range;
1509         if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1510           /* remove range */
1511           range_to_destroy = range;
1512           if (prev)
1513             prev->next = range->next;
1514         } else {
1515           range->offset += (new_wpos_virt - range_data_start);
1516           range->rb_offset = new_writing_pos;
1517         }
1518       }
1520     next_range:
1521       if (!range_to_destroy)
1522         prev = range;
1524       range = range->next;
1525       if (range_to_destroy) {
1526         if (range_to_destroy == queue->ranges)
1527           queue->ranges = range;
1528         g_slice_free1 (sizeof (GstQueue2Range), range_to_destroy);
1529         range_to_destroy = NULL;
1530       }
1531     }
1533     FSEEK_FILE (queue->temp_file, writing_pos);
1535     if (new_writing_pos > writing_pos) {
1536       /* no wrapping, just write */
1537       if (fwrite (data, buf_size, 1, queue->temp_file) != 1)
1538         goto handle_error;
1539     } else {
1540       /* wrapping */
1541       guint block_one, block_two;
1543       block_one = rb_size - writing_pos;
1544       block_two = buf_size - block_one;
1546       /* write data to end of ring buffer */
1547       if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1548         goto handle_error;
1550       FSEEK_FILE (queue->temp_file, 0);
1552       data += block_one;
1553       if (fwrite (data, block_two, 1, queue->temp_file) != 1)
1554         goto handle_error;
1555     }
1557     /* update the writing positions */
1558     GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT, buf_size,
1559         writing_pos);
1560     queue->current->writing_pos += buf_size;
1561     queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1563     update_cur_level (queue, queue->current);
1564     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
1565         queue->cur_level.bytes, MIN (queue->max_level.bytes,
1566             queue->ring_buffer_max_size));
1568     /* if we have a remainder of the buffer data, wait until there's space to
1569      * write before looping */
1570     if (rem_size) {
1571       gboolean started;
1573       /* pause the timer while we wait. The fact that we are waiting does not mean
1574        * the byterate on the input pad is lower */
1575       if ((started = queue->in_timer_started))
1576         g_timer_stop (queue->in_timer);
1578       GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1579           "queue is full, waiting for free space");
1580       while (gst_queue2_is_filled (queue)) {
1581         /* Wait for space to be available, we could be unlocked because of a flush. */
1582         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1583       }
1585       /* and continue if we were running before */
1586       if (started)
1587         g_timer_continue (queue->in_timer);
1588     }
1589   } while (rem_size);
1591   return TRUE;
1593   /* ERRORS */
1594 out_flushing:
1595   {
1596     GST_DEBUG_OBJECT (queue, "we are flushing");
1597     /* FIXME - GST_FLOW_UNEXPECTED ? */
1598     return FALSE;
1599   }
1600 handle_error:
1601   {
1602     switch (errno) {
1603       case ENOSPC:{
1604         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1605         break;
1606       }
1607       default:{
1608         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1609             (_("Error while writing to download file.")),
1610             ("%s", g_strerror (errno)));
1611       }
1612     }
1613     return FALSE;
1614   }
1617 /* enqueue an item an update the level stats */
1618 static void
1619 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
1621   if (GST_IS_BUFFER (item)) {
1622     GstBuffer *buffer;
1623     guint size;
1625     buffer = GST_BUFFER_CAST (item);
1626     size = GST_BUFFER_SIZE (buffer);
1628     /* add buffer to the statistics */
1629     if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1630             || QUEUE_IS_USING_RING_BUFFER (queue))) {
1631       queue->cur_level.buffers++;
1632       queue->cur_level.bytes += size;
1633     }
1634     queue->bytes_in += size;
1636     /* apply new buffer to segment stats */
1637     apply_buffer (queue, buffer, &queue->sink_segment);
1638     /* update the byterate stats */
1639     update_in_rates (queue);
1641     /* FIXME - check return values? */
1642     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1643       gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
1644     } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1645       gst_queue2_write_buffer_to_file (queue, buffer);
1646     }
1648   } else if (GST_IS_EVENT (item)) {
1649     GstEvent *event;
1651     event = GST_EVENT_CAST (item);
1653     switch (GST_EVENT_TYPE (event)) {
1654       case GST_EVENT_EOS:
1655         /* Zero the thresholds, this makes sure the queue is completely
1656          * filled and we can read all data from the queue. */
1657         GST_DEBUG_OBJECT (queue, "we have EOS");
1658         queue->is_eos = TRUE;
1659         break;
1660       case GST_EVENT_NEWSEGMENT:
1661         apply_segment (queue, event, &queue->sink_segment);
1662         /* This is our first new segment, we hold it
1663          * as we can't save it on the temp file */
1664         if (QUEUE_IS_USING_RING_BUFFER (queue)
1665             || QUEUE_IS_USING_TEMP_FILE (queue)) {
1666           if (queue->segment_event_received)
1667             goto unexpected_event;
1669           queue->segment_event_received = TRUE;
1670           if (queue->starting_segment != NULL)
1671             gst_event_unref (queue->starting_segment);
1672           queue->starting_segment = event;
1673           item = NULL;
1674         }
1675         /* a new segment allows us to accept more buffers if we got UNEXPECTED
1676          * from downstream */
1677         queue->unexpected = FALSE;
1678         break;
1679       default:
1680         if (QUEUE_IS_USING_RING_BUFFER (queue)
1681             || QUEUE_IS_USING_TEMP_FILE (queue))
1682           goto unexpected_event;
1683         break;
1684     }
1685   } else {
1686     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
1687         item, GST_OBJECT_NAME (queue));
1688     /* we can't really unref since we don't know what it is */
1689     item = NULL;
1690   }
1692   if (item) {
1693     /* update the buffering status */
1694     update_buffering (queue);
1696     if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1697             || QUEUE_IS_USING_RING_BUFFER (queue)))
1698       g_queue_push_tail (queue->queue, item);
1699     else
1700       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
1702     GST_QUEUE2_SIGNAL_ADD (queue);
1703   }
1705   return;
1707   /* ERRORS */
1708 unexpected_event:
1709   {
1710     g_warning
1711         ("Unexpected event of kind %s can't be added in temp file of queue %s ",
1712         gst_event_type_get_name (GST_EVENT_TYPE (item)),
1713         GST_OBJECT_NAME (queue));
1714     gst_event_unref (GST_EVENT_CAST (item));
1715     return;
1716   }
1719 /* dequeue an item from the queue and update level stats */
1720 static GstMiniObject *
1721 gst_queue2_locked_dequeue (GstQueue2 * queue)
1723   GstMiniObject *item;
1725   if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
1726     item = gst_queue2_read_item_from_file (queue);
1727   else
1728     item = g_queue_pop_head (queue->queue);
1730   if (item == NULL)
1731     goto no_item;
1733   if (GST_IS_BUFFER (item)) {
1734     GstBuffer *buffer;
1735     guint size;
1737     buffer = GST_BUFFER_CAST (item);
1738     size = GST_BUFFER_SIZE (buffer);
1740     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1741         "retrieved buffer %p from queue", buffer);
1743     if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1744             || QUEUE_IS_USING_RING_BUFFER (queue))) {
1745       queue->cur_level.buffers--;
1746       queue->cur_level.bytes -= size;
1747     }
1748     queue->bytes_out += size;
1750     apply_buffer (queue, buffer, &queue->src_segment);
1751     /* update the byterate stats */
1752     update_out_rates (queue);
1753     /* update the buffering */
1754     update_buffering (queue);
1756   } else if (GST_IS_EVENT (item)) {
1757     GstEvent *event = GST_EVENT_CAST (item);
1759     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1760         "retrieved event %p from queue", event);
1762     switch (GST_EVENT_TYPE (event)) {
1763       case GST_EVENT_EOS:
1764         /* queue is empty now that we dequeued the EOS */
1765         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1766         break;
1767       case GST_EVENT_NEWSEGMENT:
1768         apply_segment (queue, event, &queue->src_segment);
1769         break;
1770       default:
1771         break;
1772     }
1773   } else {
1774     g_warning
1775         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
1776         item, GST_OBJECT_NAME (queue));
1777     item = NULL;
1778   }
1779   GST_QUEUE2_SIGNAL_DEL (queue);
1781   return item;
1783   /* ERRORS */
1784 no_item:
1785   {
1786     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
1787     return NULL;
1788   }
1791 static gboolean
1792 gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
1794   GstQueue2 *queue;
1796   queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1798   switch (GST_EVENT_TYPE (event)) {
1799     case GST_EVENT_FLUSH_START:
1800     {
1801       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
1802       if (!(QUEUE_IS_USING_RING_BUFFER (queue)
1803               || QUEUE_IS_USING_TEMP_FILE (queue))) {
1804         /* forward event */
1805         gst_pad_push_event (queue->srcpad, event);
1807         /* now unblock the chain function */
1808         GST_QUEUE2_MUTEX_LOCK (queue);
1809         queue->srcresult = GST_FLOW_WRONG_STATE;
1810         queue->sinkresult = GST_FLOW_WRONG_STATE;
1811         /* unblock the loop and chain functions */
1812         GST_QUEUE2_SIGNAL_ADD (queue);
1813         GST_QUEUE2_SIGNAL_DEL (queue);
1814         GST_QUEUE2_MUTEX_UNLOCK (queue);
1816         /* make sure it pauses, this should happen since we sent
1817          * flush_start downstream. */
1818         gst_pad_pause_task (queue->srcpad);
1819         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
1820       }
1821       goto done;
1822     }
1823     case GST_EVENT_FLUSH_STOP:
1824     {
1825       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
1827       if (!(QUEUE_IS_USING_RING_BUFFER (queue)
1828               || QUEUE_IS_USING_TEMP_FILE (queue))) {
1829         /* forward event */
1830         gst_pad_push_event (queue->srcpad, event);
1832         GST_QUEUE2_MUTEX_LOCK (queue);
1833         gst_queue2_locked_flush (queue);
1834         queue->srcresult = GST_FLOW_OK;
1835         queue->sinkresult = GST_FLOW_OK;
1836         queue->is_eos = FALSE;
1837         queue->unexpected = FALSE;
1838         /* reset rate counters */
1839         reset_rate_timer (queue);
1840         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
1841             queue->srcpad);
1842         GST_QUEUE2_MUTEX_UNLOCK (queue);
1843       } else {
1844         GST_QUEUE2_MUTEX_LOCK (queue);
1845         queue->segment_event_received = FALSE;
1846         queue->is_eos = FALSE;
1847         queue->unexpected = FALSE;
1848         GST_QUEUE2_MUTEX_UNLOCK (queue);
1849       }
1850       goto done;
1851     }
1852     default:
1853       if (GST_EVENT_IS_SERIALIZED (event)) {
1854         /* serialized events go in the queue */
1855         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
1856         /* refuse more events on EOS */
1857         if (queue->is_eos)
1858           goto out_eos;
1859         gst_queue2_locked_enqueue (queue, event);
1860         GST_QUEUE2_MUTEX_UNLOCK (queue);
1861       } else {
1862         /* non-serialized events are passed upstream. */
1863         gst_pad_push_event (queue->srcpad, event);
1864       }
1865       break;
1866   }
1867 done:
1868   return TRUE;
1870   /* ERRORS */
1871 out_flushing:
1872   {
1873     GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
1874     GST_QUEUE2_MUTEX_UNLOCK (queue);
1875     gst_event_unref (event);
1876     return FALSE;
1877   }
1878 out_eos:
1879   {
1880     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
1881     GST_QUEUE2_MUTEX_UNLOCK (queue);
1882     gst_event_unref (event);
1883     return FALSE;
1884   }
1887 static gboolean
1888 gst_queue2_is_empty (GstQueue2 * queue)
1890   /* never empty on EOS */
1891   if (queue->is_eos)
1892     return FALSE;
1894   if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
1895     return queue->current->writing_pos <= queue->current->max_reading_pos;
1896   } else {
1897     if (queue->queue->length == 0)
1898       return TRUE;
1899   }
1901   return FALSE;
1904 static gboolean
1905 gst_queue2_is_filled (GstQueue2 * queue)
1907   gboolean res;
1909   /* always filled on EOS */
1910   if (queue->is_eos)
1911     return TRUE;
1913   /* if using a ring buffer we're filled if all ring buffer space is used
1914    * _by the current range_ */
1915   if (QUEUE_IS_USING_RING_BUFFER (queue))
1916     return queue->cur_level.bytes >= MIN (queue->max_level.bytes,
1917         queue->ring_buffer_max_size);
1919   /* if using file, we're never filled if we don't have EOS */
1920   if (QUEUE_IS_USING_TEMP_FILE (queue))
1921     return FALSE;
1923   /* we are never filled when we have no buffers at all */
1924   if (queue->cur_level.buffers == 0)
1925     return FALSE;
1927 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
1928                 (queue->cur_level.format) >= (queue->max_level.format))
1930   /* we are filled if one of the current levels exceeds the max */
1931   res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time);
1933   /* if we need to, use the rate estimate to check against the max time we are
1934    * allowed to queue */
1935   if (queue->use_rate_estimate)
1936     res |= CHECK_FILLED (rate_time);
1938 #undef CHECK_FILLED
1939   return res;
1942 static GstFlowReturn
1943 gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
1945   GstQueue2 *queue;
1947   queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1949   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1950       "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
1951       GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
1952       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1953       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1955   /* we have to lock the queue since we span threads */
1956   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
1957   /* when we received EOS, we refuse more data */
1958   if (queue->is_eos)
1959     goto out_eos;
1960   /* when we received unexpected from downstream, refuse more buffers */
1961   if (queue->unexpected)
1962     goto out_unexpected;
1964   /* We make space available if we're "full" according to whatever
1965    * the user defined as "full". */
1966   if (gst_queue2_is_filled (queue)) {
1967     gboolean started;
1969     /* pause the timer while we wait. The fact that we are waiting does not mean
1970      * the byterate on the input pad is lower */
1971     if ((started = queue->in_timer_started))
1972       g_timer_stop (queue->in_timer);
1974     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1975         "queue is full, waiting for free space");
1976     do {
1977       /* Wait for space to be available, we could be unlocked because of a flush. */
1978       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1979     }
1980     while (gst_queue2_is_filled (queue));
1982     /* and continue if we were running before */
1983     if (started)
1984       g_timer_continue (queue->in_timer);
1985   }
1987   /* put buffer in queue now */
1988   gst_queue2_locked_enqueue (queue, buffer);
1989   GST_QUEUE2_MUTEX_UNLOCK (queue);
1991   return GST_FLOW_OK;
1993   /* special conditions */
1994 out_flushing:
1995   {
1996     GstFlowReturn ret = queue->sinkresult;
1998     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1999         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2000     GST_QUEUE2_MUTEX_UNLOCK (queue);
2001     gst_buffer_unref (buffer);
2003     return ret;
2004   }
2005 out_eos:
2006   {
2007     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2008     GST_QUEUE2_MUTEX_UNLOCK (queue);
2009     gst_buffer_unref (buffer);
2011     return GST_FLOW_UNEXPECTED;
2012   }
2013 out_unexpected:
2014   {
2015     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2016         "exit because we received UNEXPECTED");
2017     GST_QUEUE2_MUTEX_UNLOCK (queue);
2018     gst_buffer_unref (buffer);
2020     return GST_FLOW_UNEXPECTED;
2021   }
2024 /* dequeue an item from the queue an push it downstream. This functions returns
2025  * the result of the push. */
2026 static GstFlowReturn
2027 gst_queue2_push_one (GstQueue2 * queue)
2029   GstFlowReturn result = GST_FLOW_OK;
2030   GstMiniObject *data;
2032   data = gst_queue2_locked_dequeue (queue);
2033   if (data == NULL)
2034     goto no_item;
2036 next:
2037   if (GST_IS_BUFFER (data)) {
2038     GstBuffer *buffer;
2039     GstCaps *caps;
2041     buffer = GST_BUFFER_CAST (data);
2042     caps = GST_BUFFER_CAPS (buffer);
2044     GST_QUEUE2_MUTEX_UNLOCK (queue);
2046     /* set caps before pushing the buffer so that core does not try to do
2047      * something fancy to check if this is possible. */
2048     if (caps && caps != GST_PAD_CAPS (queue->srcpad))
2049       gst_pad_set_caps (queue->srcpad, caps);
2051     result = gst_pad_push (queue->srcpad, buffer);
2053     /* need to check for srcresult here as well */
2054     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2055     if (result == GST_FLOW_UNEXPECTED) {
2056       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2057           "got UNEXPECTED from downstream");
2058       /* stop pushing buffers, we dequeue all items until we see an item that we
2059        * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
2060        * queue we can push, we set a flag to make the sinkpad refuse more
2061        * buffers with an UNEXPECTED return value until we receive something
2062        * pushable again or we get flushed. */
2063       while ((data = gst_queue2_locked_dequeue (queue))) {
2064         if (GST_IS_BUFFER (data)) {
2065           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2066               "dropping UNEXPECTED buffer %p", data);
2067           gst_buffer_unref (GST_BUFFER_CAST (data));
2068         } else if (GST_IS_EVENT (data)) {
2069           GstEvent *event = GST_EVENT_CAST (data);
2070           GstEventType type = GST_EVENT_TYPE (event);
2072           if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
2073             /* we found a pushable item in the queue, push it out */
2074             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2075                 "pushing pushable event %s after UNEXPECTED",
2076                 GST_EVENT_TYPE_NAME (event));
2077             goto next;
2078           }
2079           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2080               "dropping UNEXPECTED event %p", event);
2081           gst_event_unref (event);
2082         }
2083       }
2084       /* no more items in the queue. Set the unexpected flag so that upstream
2085        * make us refuse any more buffers on the sinkpad. Since we will still
2086        * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
2087        * task function does not shut down. */
2088       queue->unexpected = TRUE;
2089       result = GST_FLOW_OK;
2090     }
2091   } else if (GST_IS_EVENT (data)) {
2092     GstEvent *event = GST_EVENT_CAST (data);
2093     GstEventType type = GST_EVENT_TYPE (event);
2095     GST_QUEUE2_MUTEX_UNLOCK (queue);
2097     gst_pad_push_event (queue->srcpad, event);
2099     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2100     /* if we're EOS, return UNEXPECTED so that the task pauses. */
2101     if (type == GST_EVENT_EOS) {
2102       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2103           "pushed EOS event %p, return UNEXPECTED", event);
2104       result = GST_FLOW_UNEXPECTED;
2105     }
2106   }
2107   return result;
2109   /* ERRORS */
2110 no_item:
2111   {
2112     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2113         "exit because we have no item in the queue");
2114     return GST_FLOW_ERROR;
2115   }
2116 out_flushing:
2117   {
2118     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2119     return GST_FLOW_WRONG_STATE;
2120   }
2123 /* called repeadedly with @pad as the source pad. This function should push out
2124  * data to the peer element. */
2125 static void
2126 gst_queue2_loop (GstPad * pad)
2128   GstQueue2 *queue;
2129   GstFlowReturn ret;
2131   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2133   /* have to lock for thread-safety */
2134   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2136   if (gst_queue2_is_empty (queue)) {
2137     gboolean started;
2139     /* pause the timer while we wait. The fact that we are waiting does not mean
2140      * the byterate on the output pad is lower */
2141     if ((started = queue->out_timer_started))
2142       g_timer_stop (queue->out_timer);
2144     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2145         "queue is empty, waiting for new data");
2146     do {
2147       /* Wait for data to be available, we could be unlocked because of a flush. */
2148       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2149     }
2150     while (gst_queue2_is_empty (queue));
2152     /* and continue if we were running before */
2153     if (started)
2154       g_timer_continue (queue->out_timer);
2155   }
2156   ret = gst_queue2_push_one (queue);
2157   queue->srcresult = ret;
2158   queue->sinkresult = ret;
2159   if (ret != GST_FLOW_OK)
2160     goto out_flushing;
2162   GST_QUEUE2_MUTEX_UNLOCK (queue);
2164   return;
2166   /* ERRORS */
2167 out_flushing:
2168   {
2169     gboolean eos = queue->is_eos;
2170     GstFlowReturn ret = queue->srcresult;
2172     gst_pad_pause_task (queue->srcpad);
2173     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2174         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
2175     GST_QUEUE2_MUTEX_UNLOCK (queue);
2176     /* let app know about us giving up if upstream is not expected to do so */
2177     /* UNEXPECTED is already taken care of elsewhere */
2178     if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) &&
2179         (ret != GST_FLOW_UNEXPECTED)) {
2180       GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2181           (_("Internal data flow error.")),
2182           ("streaming task paused, reason %s (%d)",
2183               gst_flow_get_name (ret), ret));
2184       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2185     }
2186     return;
2187   }
2190 static gboolean
2191 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
2193   gboolean res = TRUE;
2194   GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2196 #ifndef GST_DISABLE_GST_DEBUG
2197   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2198       event, GST_EVENT_TYPE_NAME (event));
2199 #endif
2201   switch (GST_EVENT_TYPE (event)) {
2202     case GST_EVENT_FLUSH_START:
2203       if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2204               || QUEUE_IS_USING_TEMP_FILE (queue))) {
2205         /* just forward upstream */
2206         res = gst_pad_push_event (queue->sinkpad, event);
2207       } else {
2208         /* now unblock the getrange function */
2209         GST_QUEUE2_MUTEX_LOCK (queue);
2210         GST_DEBUG_OBJECT (queue, "flushing");
2211         queue->srcresult = GST_FLOW_WRONG_STATE;
2212         GST_QUEUE2_SIGNAL_ADD (queue);
2213         GST_QUEUE2_MUTEX_UNLOCK (queue);
2215         /* when using a temp file, we eat the event */
2216         res = TRUE;
2217         gst_event_unref (event);
2218       }
2219       break;
2220     case GST_EVENT_FLUSH_STOP:
2221       if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2222               || QUEUE_IS_USING_TEMP_FILE (queue))) {
2223         /* just forward upstream */
2224         res = gst_pad_push_event (queue->sinkpad, event);
2225       } else {
2226         /* now unblock the getrange function */
2227         GST_QUEUE2_MUTEX_LOCK (queue);
2228         queue->srcresult = GST_FLOW_OK;
2229         if (queue->current)
2230           queue->current->max_reading_pos = 0;
2231         GST_QUEUE2_MUTEX_UNLOCK (queue);
2233         /* when using a temp file, we eat the event */
2234         res = TRUE;
2235         gst_event_unref (event);
2236       }
2237       break;
2238     default:
2239       res = gst_pad_push_event (queue->sinkpad, event);
2240       break;
2241   }
2243   return res;
2246 static gboolean
2247 gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
2249   gboolean ret = FALSE;
2250   GstPad *peer;
2252   if ((peer = gst_pad_get_peer (pad))) {
2253     ret = gst_pad_query (peer, query);
2254     gst_object_unref (peer);
2255   }
2256   return ret;
2259 static gboolean
2260 gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
2262   GstQueue2 *queue;
2264   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2266   switch (GST_QUERY_TYPE (query)) {
2267     case GST_QUERY_POSITION:
2268     {
2269       gint64 peer_pos;
2270       GstFormat format;
2272       if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2273         goto peer_failed;
2275       /* get peer position */
2276       gst_query_parse_position (query, &format, &peer_pos);
2278       /* FIXME: this code assumes that there's no discont in the queue */
2279       switch (format) {
2280         case GST_FORMAT_BYTES:
2281           peer_pos -= queue->cur_level.bytes;
2282           break;
2283         case GST_FORMAT_TIME:
2284           peer_pos -= queue->cur_level.time;
2285           break;
2286         default:
2287           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2288               "know how to adjust value", gst_format_get_name (format));
2289           return FALSE;
2290       }
2291       /* set updated position */
2292       gst_query_set_position (query, format, peer_pos);
2293       break;
2294     }
2295     case GST_QUERY_DURATION:
2296     {
2297       GST_DEBUG_OBJECT (queue, "doing peer query");
2299       if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2300         goto peer_failed;
2302       GST_DEBUG_OBJECT (queue, "peer query success");
2303       break;
2304     }
2305     case GST_QUERY_BUFFERING:
2306     {
2307       GstFormat format;
2309       GST_DEBUG_OBJECT (queue, "query buffering");
2311       if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2312               || QUEUE_IS_USING_TEMP_FILE (queue))) {
2313         /* no temp file, just forward to the peer */
2314         if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2315           goto peer_failed;
2316         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
2317       } else {
2318         gint64 start, stop;
2319         guint64 writing_pos;
2320         gint percent;
2321         gint64 estimated_total, buffering_left;
2322         GstFormat peer_fmt;
2323         gint64 duration;
2324         gboolean peer_res, is_buffering, is_eos;
2325         gdouble byte_in_rate, byte_out_rate;
2327         /* we need a current download region */
2328         if (queue->current == NULL)
2329           return FALSE;
2331         writing_pos = queue->current->writing_pos;
2332         byte_in_rate = queue->byte_in_rate;
2333         byte_out_rate = queue->byte_out_rate;
2334         is_buffering = queue->is_buffering;
2335         is_eos = queue->is_eos;
2336         percent = queue->buffering_percent;
2338         if (is_eos) {
2339           /* we're EOS, we know the duration in bytes now */
2340           peer_res = TRUE;
2341           duration = writing_pos;
2342         } else {
2343           /* get duration of upstream in bytes */
2344           peer_fmt = GST_FORMAT_BYTES;
2345           peer_res = gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
2346               &duration);
2347         }
2349         /* calculate remaining and total download time */
2350         if (peer_res && byte_in_rate > 0.0) {
2351           estimated_total = (duration * 1000) / byte_in_rate;
2352           buffering_left = ((duration - writing_pos) * 1000) / byte_in_rate;
2353         } else {
2354           estimated_total = -1;
2355           buffering_left = -1;
2356         }
2357         GST_DEBUG_OBJECT (queue, "estimated %" G_GINT64_FORMAT ", left %"
2358             G_GINT64_FORMAT, estimated_total, buffering_left);
2360         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2362         switch (format) {
2363           case GST_FORMAT_PERCENT:
2364             /* we need duration */
2365             if (!peer_res)
2366               goto peer_failed;
2368             GST_DEBUG_OBJECT (queue,
2369                 "duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
2370                 duration, writing_pos);
2372             start = 0;
2373             /* get our available data relative to the duration */
2374             if (duration != -1)
2375               stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
2376             else
2377               stop = -1;
2378             break;
2379           case GST_FORMAT_BYTES:
2380             start = 0;
2381             stop = writing_pos;
2382             break;
2383           default:
2384             start = -1;
2385             stop = -1;
2386             break;
2387         }
2388         gst_query_set_buffering_percent (query, is_buffering, percent);
2389         gst_query_set_buffering_range (query, format, start, stop,
2390             estimated_total);
2391         gst_query_set_buffering_stats (query, GST_BUFFERING_DOWNLOAD,
2392             byte_in_rate, byte_out_rate, buffering_left);
2393       }
2394       break;
2395     }
2396     default:
2397       /* peer handled other queries */
2398       if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2399         goto peer_failed;
2400       break;
2401   }
2403   return TRUE;
2405   /* ERRORS */
2406 peer_failed:
2407   {
2408     GST_DEBUG_OBJECT (queue, "failed peer query");
2409     return FALSE;
2410   }
2413 static gboolean
2414 gst_queue2_handle_query (GstElement * element, GstQuery * query)
2416   /* simply forward to the srcpad query function */
2417   return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
2420 static GstFlowReturn
2421 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
2422     GstBuffer ** buffer)
2424   GstQueue2 *queue;
2425   GstFlowReturn ret;
2427   queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
2428   if (length > queue->ring_buffer_max_size) {
2429     GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
2430         (_("Buffer is too large to fit in ring buffer")),
2431         ("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
2432     return GST_FLOW_ERROR;
2433   }
2435   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2436   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
2437   offset = (offset == -1) ? queue->current->reading_pos : offset;
2439   /* FIXME - function will block when the range is not yet available */
2440   ret = gst_queue2_create_read (queue, offset, length, buffer);
2441   GST_QUEUE2_MUTEX_UNLOCK (queue);
2443   gst_object_unref (queue);
2445   return ret;
2447   /* ERRORS */
2448 out_flushing:
2449   {
2450     ret = queue->srcresult;
2452     GST_DEBUG_OBJECT (queue, "we are flushing");
2453     GST_QUEUE2_MUTEX_UNLOCK (queue);
2454     return ret;
2455   }
2458 static gboolean
2459 gst_queue2_src_checkgetrange_function (GstPad * pad)
2461   GstQueue2 *queue;
2462   gboolean ret;
2464   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2466   /* we can operate in pull mode when we are using a tempfile */
2467   ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
2469   gst_object_unref (GST_OBJECT (queue));
2471   return ret;
2474 /* sink currently only operates in push mode */
2475 static gboolean
2476 gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
2478   gboolean result = TRUE;
2479   GstQueue2 *queue;
2481   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2483   if (active) {
2484     GST_QUEUE2_MUTEX_LOCK (queue);
2485     GST_DEBUG_OBJECT (queue, "activating push mode");
2486     queue->srcresult = GST_FLOW_OK;
2487     queue->sinkresult = GST_FLOW_OK;
2488     queue->is_eos = FALSE;
2489     queue->unexpected = FALSE;
2490     reset_rate_timer (queue);
2491     GST_QUEUE2_MUTEX_UNLOCK (queue);
2492   } else {
2493     /* unblock chain function */
2494     GST_QUEUE2_MUTEX_LOCK (queue);
2495     GST_DEBUG_OBJECT (queue, "deactivating push mode");
2496     queue->srcresult = GST_FLOW_WRONG_STATE;
2497     queue->sinkresult = GST_FLOW_WRONG_STATE;
2498     gst_queue2_locked_flush (queue);
2499     GST_QUEUE2_MUTEX_UNLOCK (queue);
2500   }
2502   gst_object_unref (queue);
2504   return result;
2507 /* src operating in push mode, we start a task on the source pad that pushes out
2508  * buffers from the queue */
2509 static gboolean
2510 gst_queue2_src_activate_push (GstPad * pad, gboolean active)
2512   gboolean result = FALSE;
2513   GstQueue2 *queue;
2515   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2517   if (active) {
2518     GST_QUEUE2_MUTEX_LOCK (queue);
2519     GST_DEBUG_OBJECT (queue, "activating push mode");
2520     queue->srcresult = GST_FLOW_OK;
2521     queue->sinkresult = GST_FLOW_OK;
2522     queue->is_eos = FALSE;
2523     queue->unexpected = FALSE;
2524     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
2525     GST_QUEUE2_MUTEX_UNLOCK (queue);
2526   } else {
2527     /* unblock loop function */
2528     GST_QUEUE2_MUTEX_LOCK (queue);
2529     GST_DEBUG_OBJECT (queue, "deactivating push mode");
2530     queue->srcresult = GST_FLOW_WRONG_STATE;
2531     queue->sinkresult = GST_FLOW_WRONG_STATE;
2532     /* the item add signal will unblock */
2533     GST_QUEUE2_SIGNAL_ADD (queue);
2534     GST_QUEUE2_MUTEX_UNLOCK (queue);
2536     /* step 2, make sure streaming finishes */
2537     result = gst_pad_stop_task (pad);
2538   }
2540   gst_object_unref (queue);
2542   return result;
2545 /* pull mode, downstream will call our getrange function */
2546 static gboolean
2547 gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
2549   gboolean result;
2550   GstQueue2 *queue;
2552   queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2554   if (active) {
2555     if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
2556       /* open the temp file now */
2557       result = gst_queue2_open_temp_location_file (queue);
2559       GST_QUEUE2_MUTEX_LOCK (queue);
2560       GST_DEBUG_OBJECT (queue, "activating pull mode");
2561       queue->srcresult = GST_FLOW_OK;
2562       queue->sinkresult = GST_FLOW_OK;
2563       queue->is_eos = FALSE;
2564       queue->unexpected = FALSE;
2565       GST_QUEUE2_MUTEX_UNLOCK (queue);
2566     } else {
2567       GST_QUEUE2_MUTEX_LOCK (queue);
2568       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
2569       /* this is not allowed, we cannot operate in pull mode without a temp
2570        * file. */
2571       queue->srcresult = GST_FLOW_WRONG_STATE;
2572       queue->sinkresult = GST_FLOW_WRONG_STATE;
2573       result = FALSE;
2574       GST_QUEUE2_MUTEX_UNLOCK (queue);
2575     }
2576   } else {
2577     GST_QUEUE2_MUTEX_LOCK (queue);
2578     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
2579     queue->srcresult = GST_FLOW_WRONG_STATE;
2580     queue->sinkresult = GST_FLOW_WRONG_STATE;
2581     /* this will unlock getrange */
2582     GST_QUEUE2_SIGNAL_ADD (queue);
2583     result = TRUE;
2584     GST_QUEUE2_MUTEX_UNLOCK (queue);
2585   }
2586   gst_object_unref (queue);
2588   return result;
2591 static GstStateChangeReturn
2592 gst_queue2_change_state (GstElement * element, GstStateChange transition)
2594   GstQueue2 *queue;
2595   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
2597   queue = GST_QUEUE2 (element);
2599   switch (transition) {
2600     case GST_STATE_CHANGE_NULL_TO_READY:
2601       break;
2602     case GST_STATE_CHANGE_READY_TO_PAUSED:
2603       if (QUEUE_IS_USING_RING_BUFFER (queue)
2604           || QUEUE_IS_USING_TEMP_FILE (queue)) {
2605         if (!gst_queue2_open_temp_location_file (queue))
2606           ret = GST_STATE_CHANGE_FAILURE;
2607       }
2608       queue->segment_event_received = FALSE;
2609       queue->starting_segment = NULL;
2610       break;
2611     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2612       break;
2613     default:
2614       break;
2615   }
2617   if (ret == GST_STATE_CHANGE_FAILURE)
2618     return ret;
2620   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2622   if (ret == GST_STATE_CHANGE_FAILURE)
2623     return ret;
2625   switch (transition) {
2626     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2627       break;
2628     case GST_STATE_CHANGE_PAUSED_TO_READY:
2629       if (QUEUE_IS_USING_RING_BUFFER (queue)
2630           || QUEUE_IS_USING_TEMP_FILE (queue))
2631         gst_queue2_close_temp_location_file (queue);
2632       if (queue->starting_segment != NULL) {
2633         gst_event_unref (queue->starting_segment);
2634         queue->starting_segment = NULL;
2635       }
2636       break;
2637     case GST_STATE_CHANGE_READY_TO_NULL:
2638       break;
2639     default:
2640       break;
2641   }
2643   return ret;
2646 /* changing the capacity of the queue must wake up
2647  * the _chain function, it might have more room now
2648  * to store the buffer/event in the queue */
2649 #define QUEUE_CAPACITY_CHANGE(q)\
2650   GST_QUEUE2_SIGNAL_DEL (queue);
2652 /* Changing the minimum required fill level must
2653  * wake up the _loop function as it might now
2654  * be able to preceed.
2655  */
2656 #define QUEUE_THRESHOLD_CHANGE(q)\
2657   GST_QUEUE2_SIGNAL_ADD (queue);
2659 static void
2660 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
2662   GstState state;
2664   /* the element must be stopped in order to do this */
2665   GST_OBJECT_LOCK (queue);
2666   state = GST_STATE (queue);
2667   if (state != GST_STATE_READY && state != GST_STATE_NULL)
2668     goto wrong_state;
2669   GST_OBJECT_UNLOCK (queue);
2671   /* set new location */
2672   g_free (queue->temp_template);
2673   queue->temp_template = g_strdup (template);
2675   return;
2677 /* ERROR */
2678 wrong_state:
2679   {
2680     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
2681     GST_OBJECT_UNLOCK (queue);
2682   }
2685 static void
2686 gst_queue2_set_property (GObject * object,
2687     guint prop_id, const GValue * value, GParamSpec * pspec)
2689   GstQueue2 *queue = GST_QUEUE2 (object);
2691   /* someone could change levels here, and since this
2692    * affects the get/put funcs, we need to lock for safety. */
2693   GST_QUEUE2_MUTEX_LOCK (queue);
2695   switch (prop_id) {
2696     case PROP_MAX_SIZE_BYTES:
2697       queue->max_level.bytes = g_value_get_uint (value);
2698       QUEUE_CAPACITY_CHANGE (queue);
2699       break;
2700     case PROP_MAX_SIZE_BUFFERS:
2701       queue->max_level.buffers = g_value_get_uint (value);
2702       QUEUE_CAPACITY_CHANGE (queue);
2703       break;
2704     case PROP_MAX_SIZE_TIME:
2705       queue->max_level.time = g_value_get_uint64 (value);
2706       /* set rate_time to the same value. We use an extra field in the level
2707        * structure so that we can easily access and compare it */
2708       queue->max_level.rate_time = queue->max_level.time;
2709       QUEUE_CAPACITY_CHANGE (queue);
2710       break;
2711     case PROP_USE_BUFFERING:
2712       queue->use_buffering = g_value_get_boolean (value);
2713       break;
2714     case PROP_USE_RATE_ESTIMATE:
2715       queue->use_rate_estimate = g_value_get_boolean (value);
2716       break;
2717     case PROP_LOW_PERCENT:
2718       queue->low_percent = g_value_get_int (value);
2719       break;
2720     case PROP_HIGH_PERCENT:
2721       queue->high_percent = g_value_get_int (value);
2722       break;
2723     case PROP_TEMP_TEMPLATE:
2724       gst_queue2_set_temp_template (queue, g_value_get_string (value));
2725       break;
2726     case PROP_TEMP_LOCATION:
2727       g_free (queue->temp_location);
2728       queue->temp_location = g_value_dup_string (value);
2729       /* you can set the property back to NULL to make it use the temp-tmpl
2730        * property. */
2731       queue->temp_location_set = queue->temp_location != NULL;
2732       break;
2733     case PROP_TEMP_REMOVE:
2734       queue->temp_remove = g_value_get_boolean (value);
2735       break;
2736     case PROP_USE_RING_BUFFER:
2737       queue->use_ring_buffer = g_value_get_boolean (value);
2738       break;
2739     case PROP_RING_BUFFER_MAX_SIZE:
2740       queue->ring_buffer_max_size = g_value_get_uint (value);
2741       break;
2742     default:
2743       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2744       break;
2745   }
2747   GST_QUEUE2_MUTEX_UNLOCK (queue);
2750 static void
2751 gst_queue2_get_property (GObject * object,
2752     guint prop_id, GValue * value, GParamSpec * pspec)
2754   GstQueue2 *queue = GST_QUEUE2 (object);
2756   GST_QUEUE2_MUTEX_LOCK (queue);
2758   switch (prop_id) {
2759     case PROP_CUR_LEVEL_BYTES:
2760       g_value_set_uint (value, queue->cur_level.bytes);
2761       break;
2762     case PROP_CUR_LEVEL_BUFFERS:
2763       g_value_set_uint (value, queue->cur_level.buffers);
2764       break;
2765     case PROP_CUR_LEVEL_TIME:
2766       g_value_set_uint64 (value, queue->cur_level.time);
2767       break;
2768     case PROP_MAX_SIZE_BYTES:
2769       g_value_set_uint (value, queue->max_level.bytes);
2770       break;
2771     case PROP_MAX_SIZE_BUFFERS:
2772       g_value_set_uint (value, queue->max_level.buffers);
2773       break;
2774     case PROP_MAX_SIZE_TIME:
2775       g_value_set_uint64 (value, queue->max_level.time);
2776       break;
2777     case PROP_USE_BUFFERING:
2778       g_value_set_boolean (value, queue->use_buffering);
2779       break;
2780     case PROP_USE_RATE_ESTIMATE:
2781       g_value_set_boolean (value, queue->use_rate_estimate);
2782       break;
2783     case PROP_LOW_PERCENT:
2784       g_value_set_int (value, queue->low_percent);
2785       break;
2786     case PROP_HIGH_PERCENT:
2787       g_value_set_int (value, queue->high_percent);
2788       break;
2789     case PROP_TEMP_TEMPLATE:
2790       g_value_set_string (value, queue->temp_template);
2791       break;
2792     case PROP_TEMP_LOCATION:
2793       g_value_set_string (value, queue->temp_location);
2794       break;
2795     case PROP_TEMP_REMOVE:
2796       g_value_set_boolean (value, queue->temp_remove);
2797       break;
2798     case PROP_USE_RING_BUFFER:
2799       g_value_set_boolean (value, queue->use_ring_buffer);
2800       break;
2801     case PROP_RING_BUFFER_MAX_SIZE:
2802       g_value_set_uint (value, queue->ring_buffer_max_size);
2803       break;
2804     default:
2805       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2806       break;
2807   }
2809   GST_QUEUE2_MUTEX_UNLOCK (queue);