1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2003 Colin Walters <cwalters@gnome.org>
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6 * SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7 *
8 * gstqueue2.c:
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
19 *
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 02111-1307, USA.
24 */
26 /**
27 * SECTION:element-queue2
28 *
29 * Data is queued until one of the limits specified by the
30 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32 * more buffers into the queue will block the pushing thread until more space
33 * becomes available.
34 *
35 * The queue will create a new thread on the source pad to decouple the
36 * processing on sink and source pad.
37 *
38 * You can query how many buffers are queued by reading the
39 * #GstQueue2:current-level-buffers property.
40 *
41 * The default queue size limits are 100 buffers, 2MB of data, or
42 * two seconds worth of data, whichever is reached first.
43 *
44 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
45 * will allocate a random free filename and buffer data in the file.
46 * By using this, it will buffer the entire stream data on the file independently
47 * of the queue size limits, they will only be used for buffering statistics.
48 *
49 * Since 0.10.24, setting the temp-location property with a filename is deprecated
50 * because it's impossible to securely open a temporary file in this way. The
51 * property will still be used to notify the application of the allocated
52 * filename, though.
53 *
54 * Last reviewed on 2009-07-10 (0.10.24)
55 */
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
61 #include "gstqueue2.h"
63 #include <glib/gstdio.h>
65 #include "gst/gst-i18n-lib.h"
67 #include <string.h>
69 #ifdef G_OS_WIN32
70 #include <io.h> /* lseek, open, close, read */
71 #undef lseek
72 #define lseek _lseeki64
73 #undef off_t
74 #define off_t guint64
75 #else
76 #include <unistd.h>
77 #endif
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80 GST_PAD_SINK,
81 GST_PAD_ALWAYS,
82 GST_STATIC_CAPS_ANY);
84 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
85 GST_PAD_SRC,
86 GST_PAD_ALWAYS,
87 GST_STATIC_CAPS_ANY);
89 GST_DEBUG_CATEGORY_STATIC (queue_debug);
90 #define GST_CAT_DEFAULT (queue_debug)
91 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
93 enum
94 {
95 LAST_SIGNAL
96 };
98 /* other defines */
99 #define DEFAULT_BUFFER_SIZE 4096
100 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
101 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */
102 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
104 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
106 /* default property values */
107 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
108 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
109 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
110 #define DEFAULT_USE_BUFFERING FALSE
111 #define DEFAULT_USE_RATE_ESTIMATE TRUE
112 #define DEFAULT_LOW_PERCENT 10
113 #define DEFAULT_HIGH_PERCENT 99
114 #define DEFAULT_TEMP_REMOVE TRUE
115 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
117 enum
118 {
119 PROP_0,
120 PROP_CUR_LEVEL_BUFFERS,
121 PROP_CUR_LEVEL_BYTES,
122 PROP_CUR_LEVEL_TIME,
123 PROP_MAX_SIZE_BUFFERS,
124 PROP_MAX_SIZE_BYTES,
125 PROP_MAX_SIZE_TIME,
126 PROP_USE_BUFFERING,
127 PROP_USE_RATE_ESTIMATE,
128 PROP_LOW_PERCENT,
129 PROP_HIGH_PERCENT,
130 PROP_TEMP_TEMPLATE,
131 PROP_TEMP_LOCATION,
132 PROP_TEMP_REMOVE,
133 PROP_RING_BUFFER_MAX_SIZE,
134 PROP_LAST
135 };
137 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
138 l.buffers = 0; \
139 l.bytes = 0; \
140 l.time = 0; \
141 l.rate_time = 0; \
142 } G_STMT_END
144 #define STATUS(queue, pad, msg) \
145 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
146 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
147 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
148 " ns, %"G_GUINT64_FORMAT" items", \
149 GST_DEBUG_PAD_NAME (pad), \
150 queue->cur_level.buffers, \
151 queue->max_level.buffers, \
152 queue->cur_level.bytes, \
153 queue->max_level.bytes, \
154 queue->cur_level.time, \
155 queue->max_level.time, \
156 (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
157 queue->current->writing_pos - queue->current->max_reading_pos : \
158 queue->queue->length))
160 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
161 g_mutex_lock (q->qlock); \
162 } G_STMT_END
164 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
165 GST_QUEUE2_MUTEX_LOCK (q); \
166 if (res != GST_FLOW_OK) \
167 goto label; \
168 } G_STMT_END
170 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
171 g_mutex_unlock (q->qlock); \
172 } G_STMT_END
174 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
175 STATUS (queue, q->sinkpad, "wait for DEL"); \
176 q->waiting_del = TRUE; \
177 g_cond_wait (q->item_del, queue->qlock); \
178 q->waiting_del = FALSE; \
179 if (res != GST_FLOW_OK) { \
180 STATUS (queue, q->srcpad, "received DEL wakeup"); \
181 goto label; \
182 } \
183 STATUS (queue, q->sinkpad, "received DEL"); \
184 } G_STMT_END
186 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
187 STATUS (queue, q->srcpad, "wait for ADD"); \
188 q->waiting_add = TRUE; \
189 g_cond_wait (q->item_add, q->qlock); \
190 q->waiting_add = FALSE; \
191 if (res != GST_FLOW_OK) { \
192 STATUS (queue, q->srcpad, "received ADD wakeup"); \
193 goto label; \
194 } \
195 STATUS (queue, q->srcpad, "received ADD"); \
196 } G_STMT_END
198 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
199 if (q->waiting_del) { \
200 STATUS (q, q->srcpad, "signal DEL"); \
201 g_cond_signal (q->item_del); \
202 } \
203 } G_STMT_END
205 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
206 if (q->waiting_add) { \
207 STATUS (q, q->sinkpad, "signal ADD"); \
208 g_cond_signal (q->item_add); \
209 } \
210 } G_STMT_END
212 #define _do_init(bla) \
213 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
214 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
215 "dataflow inside the queue element");
217 GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
218 _do_init);
220 static void gst_queue2_finalize (GObject * object);
222 static void gst_queue2_set_property (GObject * object,
223 guint prop_id, const GValue * value, GParamSpec * pspec);
224 static void gst_queue2_get_property (GObject * object,
225 guint prop_id, GValue * value, GParamSpec * pspec);
227 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
228 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
229 guint size, GstCaps * caps, GstBuffer ** buf);
230 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
231 static void gst_queue2_loop (GstPad * pad);
233 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
235 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
236 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
237 static gboolean gst_queue2_handle_query (GstElement * element,
238 GstQuery * query);
240 static GstCaps *gst_queue2_getcaps (GstPad * pad);
241 static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
243 static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
244 guint length, GstBuffer ** buffer);
245 static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
247 static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
248 static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
249 static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
250 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
251 GstStateChange transition);
253 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
254 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
256 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
259 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
261 static void
262 gst_queue2_base_init (gpointer g_class)
263 {
264 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
266 gst_element_class_add_pad_template (gstelement_class,
267 gst_static_pad_template_get (&srctemplate));
268 gst_element_class_add_pad_template (gstelement_class,
269 gst_static_pad_template_get (&sinktemplate));
271 gst_element_class_set_details_simple (gstelement_class, "Queue 2",
272 "Generic",
273 "Simple data queue",
274 "Erik Walthinsen <omega@cse.ogi.edu>, "
275 "Wim Taymans <wim.taymans@gmail.com>");
276 }
278 static void
279 gst_queue2_class_init (GstQueue2Class * klass)
280 {
281 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
282 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
284 parent_class = g_type_class_peek_parent (klass);
286 gobject_class->set_property = gst_queue2_set_property;
287 gobject_class->get_property = gst_queue2_get_property;
289 /* properties */
290 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
291 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
292 "Current amount of data in the queue (bytes)",
293 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
294 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
295 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
296 "Current number of buffers in the queue",
297 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
298 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
299 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
300 "Current amount of data in the queue (in ns)",
301 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
303 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
304 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
305 "Max. amount of data in the queue (bytes, 0=disable)",
306 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
307 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
309 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
310 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
311 DEFAULT_MAX_SIZE_BUFFERS,
312 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
314 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
315 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
316 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
318 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
319 g_param_spec_boolean ("use-buffering", "Use buffering",
320 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
321 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
322 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
323 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
324 "Estimate the bitrate of the stream to calculate time level",
325 DEFAULT_USE_RATE_ESTIMATE,
326 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
328 g_param_spec_int ("low-percent", "Low percent",
329 "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
330 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
331 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
332 g_param_spec_int ("high-percent", "High percent",
333 "High threshold for buffering to finish", 0, 100,
334 DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
337 g_param_spec_string ("temp-template", "Temporary File Template",
338 "File template to store temporary files in, should contain directory "
339 "and XXXXXX. (NULL == disabled)",
340 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
343 g_param_spec_string ("temp-location", "Temporary File Location",
344 "Location to store temporary files in (Deprecated: Only read this "
345 "property, use temp-template to configure the name template)",
346 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348 /**
349 * GstQueue2:temp-remove
350 *
351 * When temp-template is set, remove the temporary file when going to READY.
352 *
353 * Since: 0.10.26
354 */
355 g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
356 g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
357 "Remove the temp-location after use",
358 DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360 /**
361 * GstQueue2:ring-buffer-max-size
362 *
363 * The maximum size of the ring buffer in bytes. If set to 0, the ring
364 * buffer is disabled. Default 0.
365 *
366 * Since: 0.10.30
367 */
368 g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
369 g_param_spec_uint64 ("ring-buffer-max-size",
370 "Max. ring buffer size (bytes)",
371 "Max. amount of data in the ring buffer (bytes, 0 = disabled",
372 0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
373 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 /* set several parent class virtual functions */
376 gobject_class->finalize = gst_queue2_finalize;
378 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
379 gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
380 }
382 static void
383 gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
384 {
385 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
387 gst_pad_set_chain_function (queue->sinkpad,
388 GST_DEBUG_FUNCPTR (gst_queue2_chain));
389 gst_pad_set_activatepush_function (queue->sinkpad,
390 GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
391 gst_pad_set_event_function (queue->sinkpad,
392 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
393 gst_pad_set_getcaps_function (queue->sinkpad,
394 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
395 gst_pad_set_acceptcaps_function (queue->sinkpad,
396 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
397 gst_pad_set_bufferalloc_function (queue->sinkpad,
398 GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
399 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
401 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
403 gst_pad_set_activatepull_function (queue->srcpad,
404 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
405 gst_pad_set_activatepush_function (queue->srcpad,
406 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
407 gst_pad_set_getrange_function (queue->srcpad,
408 GST_DEBUG_FUNCPTR (gst_queue2_get_range));
409 gst_pad_set_checkgetrange_function (queue->srcpad,
410 GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
411 gst_pad_set_getcaps_function (queue->srcpad,
412 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
413 gst_pad_set_acceptcaps_function (queue->srcpad,
414 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
415 gst_pad_set_event_function (queue->srcpad,
416 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
417 gst_pad_set_query_function (queue->srcpad,
418 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
419 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
421 /* levels */
422 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
423 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
424 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
425 queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
426 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
427 queue->use_buffering = DEFAULT_USE_BUFFERING;
428 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
429 queue->low_percent = DEFAULT_LOW_PERCENT;
430 queue->high_percent = DEFAULT_HIGH_PERCENT;
432 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
433 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
435 queue->srcresult = GST_FLOW_WRONG_STATE;
436 queue->sinkresult = GST_FLOW_WRONG_STATE;
437 queue->is_eos = FALSE;
438 queue->in_timer = g_timer_new ();
439 queue->out_timer = g_timer_new ();
441 queue->qlock = g_mutex_new ();
442 queue->waiting_add = FALSE;
443 queue->item_add = g_cond_new ();
444 queue->waiting_del = FALSE;
445 queue->item_del = g_cond_new ();
446 queue->queue = g_queue_new ();
448 queue->buffering_percent = 100;
450 /* tempfile related */
451 queue->temp_template = NULL;
452 queue->temp_location = NULL;
453 queue->temp_location_set = FALSE;
454 queue->temp_remove = DEFAULT_TEMP_REMOVE;
456 queue->use_ring_buffer = FALSE;
457 queue->ring_buffer = NULL;
458 queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
460 GST_DEBUG_OBJECT (queue,
461 "initialized queue's not_empty & not_full conditions");
462 }
464 /* called only once, as opposed to dispose */
465 static void
466 gst_queue2_finalize (GObject * object)
467 {
468 GstQueue2 *queue = GST_QUEUE2 (object);
470 GST_DEBUG_OBJECT (queue, "finalizing queue");
472 while (!g_queue_is_empty (queue->queue)) {
473 GstMiniObject *data = g_queue_pop_head (queue->queue);
475 gst_mini_object_unref (data);
476 }
478 g_queue_free (queue->queue);
479 g_mutex_free (queue->qlock);
480 g_cond_free (queue->item_add);
481 g_cond_free (queue->item_del);
482 g_timer_destroy (queue->in_timer);
483 g_timer_destroy (queue->out_timer);
485 /* temp_file path cleanup */
486 g_free (queue->temp_template);
487 g_free (queue->temp_location);
489 G_OBJECT_CLASS (parent_class)->finalize (object);
490 }
492 static void
493 debug_ranges (GstQueue2 * queue)
494 {
495 GstQueue2Range *walk;
497 for (walk = queue->ranges; walk; walk = walk->next) {
498 GST_DEBUG_OBJECT (queue,
499 "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
500 G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
501 " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
502 walk->rb_writing_pos, walk->reading_pos,
503 walk == queue->current ? "**y**" : " n ");
504 }
505 }
507 /* clear all the downloaded ranges */
508 static void
509 clean_ranges (GstQueue2 * queue)
510 {
511 GST_DEBUG_OBJECT (queue, "clean queue ranges");
513 g_slice_free_chain (GstQueue2Range, queue->ranges, next);
514 queue->ranges = NULL;
515 queue->current = NULL;
516 }
518 /* find a range that contains @offset or NULL when nothing does */
519 static GstQueue2Range *
520 find_range (GstQueue2 * queue, guint64 offset, guint64 length)
521 {
522 GstQueue2Range *range = NULL;
523 GstQueue2Range *walk;
525 /* first do a quick check for the current range */
526 for (walk = queue->ranges; walk; walk = walk->next) {
527 if (offset >= walk->offset && offset <= walk->writing_pos) {
528 /* we can reuse an existing range */
529 range = walk;
530 break;
531 }
532 }
533 if (range) {
534 GST_DEBUG_OBJECT (queue,
535 "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
536 G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
537 } else {
538 GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
539 }
540 return range;
541 }
543 static void
544 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
545 {
546 guint64 max_reading_pos, writing_pos;
548 writing_pos = range->writing_pos;
549 max_reading_pos = range->max_reading_pos;
551 if (writing_pos > max_reading_pos)
552 queue->cur_level.bytes = writing_pos - max_reading_pos;
553 else
554 queue->cur_level.bytes = 0;
555 }
557 /* make a new range for @offset or reuse an existing range */
558 static GstQueue2Range *
559 add_range (GstQueue2 * queue, guint64 offset)
560 {
561 GstQueue2Range *range, *prev, *next;
563 GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
565 if ((range = find_range (queue, offset, 0))) {
566 GST_DEBUG_OBJECT (queue,
567 "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
568 range->writing_pos);
569 range->writing_pos = offset;
570 } else {
571 GST_DEBUG_OBJECT (queue,
572 "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
574 range = g_slice_new0 (GstQueue2Range);
575 range->offset = offset;
576 /* we want to write to the next location in the ring buffer */
577 range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
578 range->writing_pos = offset;
579 range->rb_writing_pos = range->rb_offset;
580 range->reading_pos = offset;
581 range->max_reading_pos = offset;
583 /* insert sorted */
584 prev = NULL;
585 next = queue->ranges;
586 while (next) {
587 if (next->offset > offset) {
588 /* insert before next */
589 GST_DEBUG_OBJECT (queue,
590 "insert before range %p, offset %" G_GUINT64_FORMAT, next,
591 next->offset);
592 break;
593 }
594 /* try next */
595 prev = next;
596 next = next->next;
597 }
598 range->next = next;
599 if (prev)
600 prev->next = range;
601 else
602 queue->ranges = range;
603 }
604 debug_ranges (queue);
606 /* update the stats for this range */
607 update_cur_level (queue, range);
609 return range;
610 }
613 /* clear and init the download ranges for offset 0 */
614 static void
615 init_ranges (GstQueue2 * queue)
616 {
617 GST_DEBUG_OBJECT (queue, "init queue ranges");
619 /* get rid of all the current ranges */
620 clean_ranges (queue);
621 /* make a range for offset 0 */
622 queue->current = add_range (queue, 0);
623 }
625 static gboolean
626 gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
627 {
628 GstQueue2 *queue;
629 GstPad *otherpad;
630 gboolean result;
632 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
634 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
635 result = gst_pad_peer_accept_caps (otherpad, caps);
637 return result;
638 }
640 static GstCaps *
641 gst_queue2_getcaps (GstPad * pad)
642 {
643 GstQueue2 *queue;
644 GstPad *otherpad;
645 GstCaps *result;
647 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
649 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
650 result = gst_pad_peer_get_caps (otherpad);
651 if (result == NULL)
652 result = gst_caps_new_any ();
654 return result;
655 }
657 static GstFlowReturn
658 gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
659 GstCaps * caps, GstBuffer ** buf)
660 {
661 GstQueue2 *queue;
662 GstFlowReturn result;
664 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
666 /* Forward to src pad, without setting caps on the src pad */
667 result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
669 return result;
670 }
672 /* calculate the diff between running time on the sink and src of the queue.
673 * This is the total amount of time in the queue. */
674 static void
675 update_time_level (GstQueue2 * queue)
676 {
677 gint64 sink_time, src_time;
679 sink_time =
680 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
681 queue->sink_segment.last_stop);
683 src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
684 queue->src_segment.last_stop);
686 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
687 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
689 if (sink_time >= src_time)
690 queue->cur_level.time = sink_time - src_time;
691 else
692 queue->cur_level.time = 0;
693 }
695 /* take a NEWSEGMENT event and apply the values to segment, updating the time
696 * level of queue. */
697 static void
698 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
699 {
700 gboolean update;
701 GstFormat format;
702 gdouble rate, arate;
703 gint64 start, stop, time;
705 gst_event_parse_new_segment_full (event, &update, &rate, &arate,
706 &format, &start, &stop, &time);
708 GST_DEBUG_OBJECT (queue,
709 "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
710 "format %d, "
711 "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
712 G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
714 if (format == GST_FORMAT_BYTES) {
715 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
716 /* start is where we'll be getting from and as such writing next */
717 queue->current = add_range (queue, start);
718 /* update the stats for this range */
719 update_cur_level (queue, queue->current);
720 }
721 }
723 /* now configure the values, we use these to track timestamps on the
724 * sinkpad. */
725 if (format != GST_FORMAT_TIME) {
726 /* non-time format, pretent the current time segment is closed with a
727 * 0 start and unknown stop time. */
728 update = FALSE;
729 format = GST_FORMAT_TIME;
730 start = 0;
731 stop = -1;
732 time = 0;
733 }
734 gst_segment_set_newsegment_full (segment, update,
735 rate, arate, format, start, stop, time);
737 GST_DEBUG_OBJECT (queue,
738 "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
740 /* segment can update the time level of the queue */
741 update_time_level (queue);
742 }
744 /* take a buffer and update segment, updating the time level of the queue. */
745 static void
746 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
747 {
748 GstClockTime duration, timestamp;
750 timestamp = GST_BUFFER_TIMESTAMP (buffer);
751 duration = GST_BUFFER_DURATION (buffer);
753 /* if no timestamp is set, assume it's continuous with the previous
754 * time */
755 if (timestamp == GST_CLOCK_TIME_NONE)
756 timestamp = segment->last_stop;
758 /* add duration */
759 if (duration != GST_CLOCK_TIME_NONE)
760 timestamp += duration;
762 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
763 GST_TIME_ARGS (timestamp));
765 gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
767 /* calc diff with other end */
768 update_time_level (queue);
769 }
771 static void
772 update_buffering (GstQueue2 * queue)
773 {
774 gint64 percent;
775 gboolean post = FALSE;
777 if (!queue->use_buffering || queue->high_percent <= 0)
778 return;
780 #define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
782 if (queue->is_eos) {
783 /* on EOS we are always 100% full, we set the var here so that it we can
784 * reuse the logic below to stop buffering */
785 percent = 100;
786 GST_LOG_OBJECT (queue, "we are EOS");
787 } else {
788 /* figure out the percent we are filled, we take the max of all formats. */
790 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
791 percent = GET_PERCENT (bytes, 0);
792 } else {
793 guint64 rb_size = queue->ring_buffer_max_size;
794 percent = GET_PERCENT (bytes, rb_size);
795 }
796 percent = MAX (percent, GET_PERCENT (time, 0));
797 percent = MAX (percent, GET_PERCENT (buffers, 0));
799 /* also apply the rate estimate when we need to */
800 if (queue->use_rate_estimate)
801 percent = MAX (percent, GET_PERCENT (rate_time, 0));
802 }
804 if (queue->is_buffering) {
805 post = TRUE;
806 /* if we were buffering see if we reached the high watermark */
807 if (percent >= queue->high_percent)
808 queue->is_buffering = FALSE;
809 } else {
810 /* we were not buffering, check if we need to start buffering if we drop
811 * below the low threshold */
812 if (percent < queue->low_percent) {
813 queue->is_buffering = TRUE;
814 queue->buffering_iteration++;
815 post = TRUE;
816 }
817 }
818 if (post) {
819 GstMessage *message;
820 GstBufferingMode mode;
821 gint64 buffering_left = -1;
823 /* scale to high percent so that it becomes the 100% mark */
824 percent = percent * 100 / queue->high_percent;
825 /* clip */
826 if (percent > 100)
827 percent = 100;
829 if (percent != queue->buffering_percent) {
830 queue->buffering_percent = percent;
832 if (!QUEUE_IS_USING_QUEUE (queue)) {
833 GstFormat fmt = GST_FORMAT_BYTES;
834 gint64 duration;
836 if (QUEUE_IS_USING_RING_BUFFER (queue))
837 mode = GST_BUFFERING_TIMESHIFT;
838 else
839 mode = GST_BUFFERING_DOWNLOAD;
841 if (queue->byte_in_rate > 0) {
842 if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
843 buffering_left =
844 (gdouble) ((duration -
845 queue->current->writing_pos) * 1000) / queue->byte_in_rate;
846 } else {
847 buffering_left = G_MAXINT64;
848 }
849 } else {
850 mode = GST_BUFFERING_STREAM;
851 }
853 GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
854 message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
855 (gint) percent);
856 gst_message_set_buffering_stats (message, mode,
857 queue->byte_in_rate, queue->byte_out_rate, buffering_left);
859 gst_element_post_message (GST_ELEMENT_CAST (queue), message);
860 }
861 } else {
862 GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
863 }
865 #undef GET_PERCENT
866 }
868 static void
869 reset_rate_timer (GstQueue2 * queue)
870 {
871 queue->bytes_in = 0;
872 queue->bytes_out = 0;
873 queue->byte_in_rate = 0.0;
874 queue->byte_out_rate = 0.0;
875 queue->last_in_elapsed = 0.0;
876 queue->last_out_elapsed = 0.0;
877 queue->in_timer_started = FALSE;
878 queue->out_timer_started = FALSE;
879 }
881 /* the interval in seconds to recalculate the rate */
882 #define RATE_INTERVAL 0.2
883 /* Tuning for rate estimation. We use a large window for the input rate because
884 * it should be stable when connected to a network. The output rate is less
885 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
886 * therefore adapt more quickly. */
887 #define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0
888 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
890 static void
891 update_in_rates (GstQueue2 * queue)
892 {
893 gdouble elapsed, period;
894 gdouble byte_in_rate;
896 if (!queue->in_timer_started) {
897 queue->in_timer_started = TRUE;
898 g_timer_start (queue->in_timer);
899 return;
900 }
902 elapsed = g_timer_elapsed (queue->in_timer, NULL);
904 /* recalc after each interval. */
905 if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
906 period = elapsed - queue->last_in_elapsed;
908 GST_DEBUG_OBJECT (queue,
909 "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
911 byte_in_rate = queue->bytes_in / period;
913 if (queue->byte_in_rate == 0.0)
914 queue->byte_in_rate = byte_in_rate;
915 else
916 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
918 /* reset the values to calculate rate over the next interval */
919 queue->last_in_elapsed = elapsed;
920 queue->bytes_in = 0;
921 }
923 if (queue->byte_in_rate > 0.0) {
924 queue->cur_level.rate_time =
925 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
926 }
927 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
928 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
929 }
931 static void
932 update_out_rates (GstQueue2 * queue)
933 {
934 gdouble elapsed, period;
935 gdouble byte_out_rate;
937 if (!queue->out_timer_started) {
938 queue->out_timer_started = TRUE;
939 g_timer_start (queue->out_timer);
940 return;
941 }
943 elapsed = g_timer_elapsed (queue->out_timer, NULL);
945 /* recalc after each interval. */
946 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
947 period = elapsed - queue->last_out_elapsed;
949 GST_DEBUG_OBJECT (queue,
950 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
952 byte_out_rate = queue->bytes_out / period;
954 if (queue->byte_out_rate == 0.0)
955 queue->byte_out_rate = byte_out_rate;
956 else
957 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
959 /* reset the values to calculate rate over the next interval */
960 queue->last_out_elapsed = elapsed;
961 queue->bytes_out = 0;
962 }
963 if (queue->byte_in_rate > 0.0) {
964 queue->cur_level.rate_time =
965 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
966 }
967 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
968 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
969 }
971 static void
972 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
973 {
974 guint64 reading_pos, max_reading_pos;
976 reading_pos = pos;
977 max_reading_pos = range->max_reading_pos;
979 max_reading_pos = MAX (max_reading_pos, reading_pos);
981 GST_DEBUG_OBJECT (queue,
982 "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
983 G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
984 range->max_reading_pos = max_reading_pos;
986 update_cur_level (queue, range);
987 }
989 static gboolean
990 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
991 {
992 GstEvent *event;
993 gboolean res;
995 GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
997 event =
998 gst_event_new_seek (1.0, GST_FORMAT_BYTES,
999 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1000 GST_SEEK_TYPE_NONE, -1);
1002 GST_QUEUE2_MUTEX_UNLOCK (queue);
1003 res = gst_pad_push_event (queue->sinkpad, event);
1004 GST_QUEUE2_MUTEX_LOCK (queue);
1006 if (res)
1007 queue->current = add_range (queue, offset);
1009 return res;
1010 }
1012 /* see if there is enough data in the file to read a full buffer */
1013 static gboolean
1014 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1015 {
1016 GstQueue2Range *range;
1018 GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1019 offset, length);
1021 if ((range = find_range (queue, offset, length))) {
1022 if (queue->current != range) {
1023 GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1024 perform_seek_to_offset (queue, range->writing_pos);
1025 }
1027 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1028 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1030 /* we have a range for offset */
1031 GST_DEBUG_OBJECT (queue,
1032 "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1033 G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1035 if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1036 return TRUE;
1038 if (offset + length <= range->writing_pos)
1039 return TRUE;
1040 else
1041 GST_DEBUG_OBJECT (queue,
1042 "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1043 (offset + length) - range->writing_pos);
1045 } else {
1046 GST_INFO_OBJECT (queue, "not found in any range");
1047 /* we don't have the range, see how far away we are, FIXME, find a good
1048 * threshold based on the incomming rate. */
1049 if (!queue->is_eos && queue->current) {
1050 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1051 if (offset < queue->current->offset || offset >
1052 queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
1053 queue->cur_level.bytes) {
1054 perform_seek_to_offset (queue, offset);
1055 } else {
1056 GST_INFO_OBJECT (queue,
1057 "requested data is within range, wait for data");
1058 }
1059 } else if (offset < queue->current->writing_pos + 200000) {
1060 update_cur_pos (queue, queue->current, offset + length);
1061 GST_INFO_OBJECT (queue, "wait for data");
1062 return FALSE;
1063 }
1064 }
1066 /* too far away, do a seek */
1067 perform_seek_to_offset (queue, offset);
1068 }
1070 return FALSE;
1071 }
1073 #ifdef HAVE_FSEEKO
1074 #define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1075 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1076 #define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1077 #else
1078 #define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
1079 #endif
1081 static gint64
1082 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1083 guint8 * dst)
1084 {
1085 guint8 *ring_buffer;
1086 size_t res;
1088 ring_buffer = queue->ring_buffer;
1090 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1091 goto seek_failed;
1093 /* this should not block */
1094 GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1095 length, offset);
1096 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1097 res = fread (dst, 1, length, queue->temp_file);
1098 } else {
1099 memcpy (dst, ring_buffer + offset, length);
1100 res = length;
1101 }
1103 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1105 if (G_UNLIKELY (res < length)) {
1106 if (!QUEUE_IS_USING_TEMP_FILE (queue))
1107 goto could_not_read;
1108 /* check for errors or EOF */
1109 if (ferror (queue->temp_file))
1110 goto could_not_read;
1111 if (feof (queue->temp_file) && length > 0)
1112 goto eos;
1113 }
1115 return res;
1117 seek_failed:
1118 {
1119 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1120 return GST_FLOW_ERROR;
1121 }
1122 could_not_read:
1123 {
1124 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1125 return GST_FLOW_ERROR;
1126 }
1127 eos:
1128 {
1129 GST_DEBUG ("non-regular file hits EOS");
1130 return GST_FLOW_UNEXPECTED;
1131 }
1132 }
1134 static GstFlowReturn
1135 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1136 GstBuffer ** buffer)
1137 {
1138 GstBuffer *buf;
1139 guint8 *data;
1140 guint64 file_offset;
1141 guint block_length, remaining, read_length;
1142 gint64 read_return;
1143 guint64 rb_size;
1144 guint64 rpos;
1146 /* allocate the output buffer of the requested size */
1147 buf = gst_buffer_new_and_alloc (length);
1148 data = GST_BUFFER_DATA (buf);
1150 GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1151 offset);
1153 rpos = offset;
1154 rb_size = queue->ring_buffer_max_size;
1156 remaining = length;
1157 while (remaining > 0) {
1158 /* configure how much/whether to read */
1159 if (!gst_queue2_have_data (queue, rpos, remaining)) {
1160 read_length = 0;
1162 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1163 guint64 level;
1165 /* calculate how far away the offset is */
1166 if (queue->current->writing_pos > rpos)
1167 level = queue->current->writing_pos - rpos;
1168 else
1169 level = 0;
1171 GST_DEBUG_OBJECT (queue,
1172 "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1173 ", level %" G_GUINT64_FORMAT,
1174 rpos, queue->current->writing_pos, level);
1176 if (level >= rb_size) {
1177 /* we don't have the data but if we have a ring buffer that is full, we
1178 * need to read */
1179 GST_DEBUG_OBJECT (queue,
1180 "ring buffer full, reading ring-buffer-max-size %"
1181 G_GUINT64_FORMAT " bytes", rb_size);
1182 read_length = rb_size;
1183 } else if (queue->is_eos) {
1184 /* won't get any more data so read any data we have */
1185 if (level) {
1186 GST_DEBUG_OBJECT (queue,
1187 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1188 level);
1189 read_length = level;
1190 } else {
1191 GST_DEBUG_OBJECT (queue,
1192 "EOS hit and we don't have any requested data");
1193 return GST_FLOW_UNEXPECTED;
1194 }
1195 }
1196 }
1198 if (read_length == 0) {
1199 if (QUEUE_IS_USING_RING_BUFFER (queue)
1200 && queue->current->max_reading_pos > rpos) {
1201 /* protect cached data (data between offset and max_reading_pos)
1202 * and update current level */
1203 GST_DEBUG_OBJECT (queue,
1204 "protecting cached data [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1205 "]", rpos, queue->current->max_reading_pos);
1206 queue->current->max_reading_pos = rpos;
1207 update_cur_level (queue, queue->current);
1208 }
1209 GST_DEBUG_OBJECT (queue, "waiting for add");
1210 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1211 continue;
1212 }
1213 } else {
1214 /* we have the requested data so read it */
1215 read_length = remaining;
1216 }
1218 /* set range reading_pos to actual reading position for this read */
1219 queue->current->reading_pos = rpos;
1221 /* congfigure how much and from where to read */
1222 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1223 file_offset =
1224 (queue->current->rb_offset + (rpos -
1225 queue->current->offset)) % rb_size;
1226 if (file_offset + read_length > rb_size) {
1227 block_length = rb_size - file_offset;
1228 } else {
1229 block_length = read_length;
1230 }
1231 } else {
1232 file_offset = rpos;
1233 block_length = read_length;
1234 }
1236 /* while we still have data to read, we loop */
1237 while (read_length > 0) {
1238 read_return =
1239 gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1240 data);
1241 if (read_return < 0)
1242 goto read_error;
1244 file_offset += read_return;
1245 if (QUEUE_IS_USING_RING_BUFFER (queue))
1246 file_offset %= rb_size;
1248 data += read_return;
1249 read_length -= read_return;
1250 block_length = read_length;
1251 remaining -= read_return;
1253 rpos = (queue->current->reading_pos += read_return);
1254 update_cur_pos (queue, queue->current, queue->current->reading_pos);
1255 }
1256 GST_QUEUE2_SIGNAL_DEL (queue);
1257 GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1258 }
1260 GST_BUFFER_SIZE (buf) = length;
1261 GST_BUFFER_OFFSET (buf) = offset;
1262 GST_BUFFER_OFFSET_END (buf) = offset + length;
1264 *buffer = buf;
1266 return GST_FLOW_OK;
1268 /* ERRORS */
1269 out_flushing:
1270 {
1271 GST_DEBUG_OBJECT (queue, "we are flushing");
1272 return GST_FLOW_WRONG_STATE;
1273 }
1274 read_error:
1275 {
1276 GST_DEBUG_OBJECT (queue, "we have a read error");
1277 gst_buffer_unref (buf);
1278 return read_return;
1279 }
1280 }
1282 /* should be called with QUEUE_LOCK */
1283 static GstMiniObject *
1284 gst_queue2_read_item_from_file (GstQueue2 * queue)
1285 {
1286 GstMiniObject *item;
1288 if (queue->starting_segment != NULL) {
1289 item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1290 queue->starting_segment = NULL;
1291 } else {
1292 GstFlowReturn ret;
1293 GstBuffer *buffer;
1294 guint64 reading_pos;
1296 reading_pos = queue->current->reading_pos;
1298 ret =
1299 gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1300 &buffer);
1302 switch (ret) {
1303 case GST_FLOW_OK:
1304 item = GST_MINI_OBJECT_CAST (buffer);
1305 break;
1306 case GST_FLOW_UNEXPECTED:
1307 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1308 break;
1309 default:
1310 item = NULL;
1311 break;
1312 }
1313 }
1314 return item;
1315 }
1317 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1318 * the temp filename. */
1319 static gboolean
1320 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1321 {
1322 gint fd = -1;
1323 gchar *name = NULL;
1325 if (queue->temp_file)
1326 goto already_opened;
1328 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1330 /* we have two cases:
1331 * - temp_location was set to something !NULL (Deprecated). in this case we
1332 * open the specified filename.
1333 * - temp_template was set, allocate a filename and open that filename
1334 */
1335 if (!queue->temp_location_set) {
1336 /* nothing to do */
1337 if (queue->temp_template == NULL)
1338 goto no_directory;
1340 /* make copy of the template, we don't want to change this */
1341 name = g_strdup (queue->temp_template);
1342 fd = g_mkstemp (name);
1343 if (fd == -1)
1344 goto mkstemp_failed;
1346 /* open the file for update/writing */
1347 queue->temp_file = fdopen (fd, "wb+");
1348 /* error creating file */
1349 if (queue->temp_file == NULL)
1350 goto open_failed;
1352 g_free (queue->temp_location);
1353 queue->temp_location = name;
1355 GST_QUEUE2_MUTEX_UNLOCK (queue);
1357 /* we can't emit the notify with the lock */
1358 g_object_notify (G_OBJECT (queue), "temp-location");
1360 GST_QUEUE2_MUTEX_LOCK (queue);
1361 } else {
1362 /* open the file for update/writing, this is deprecated but we still need to
1363 * support it for API/ABI compatibility */
1364 queue->temp_file = g_fopen (queue->temp_location, "wb+");
1365 /* error creating file */
1366 if (queue->temp_file == NULL)
1367 goto open_failed;
1368 }
1369 GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1371 return TRUE;
1373 /* ERRORS */
1374 already_opened:
1375 {
1376 GST_DEBUG_OBJECT (queue, "temp file was already open");
1377 return TRUE;
1378 }
1379 no_directory:
1380 {
1381 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1382 (_("No Temp directory specified.")), (NULL));
1383 return FALSE;
1384 }
1385 mkstemp_failed:
1386 {
1387 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1388 (_("Could not create temp file \"%s\"."), queue->temp_template),
1389 GST_ERROR_SYSTEM);
1390 g_free (name);
1391 return FALSE;
1392 }
1393 open_failed:
1394 {
1395 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1396 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1397 g_free (name);
1398 if (fd != -1)
1399 close (fd);
1400 return FALSE;
1401 }
1402 }
1404 static void
1405 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1406 {
1407 /* nothing to do */
1408 if (queue->temp_file == NULL)
1409 return;
1411 GST_DEBUG_OBJECT (queue, "closing temp file");
1413 fflush (queue->temp_file);
1414 fclose (queue->temp_file);
1416 if (queue->temp_remove)
1417 remove (queue->temp_location);
1419 queue->temp_file = NULL;
1420 clean_ranges (queue);
1421 }
1423 static void
1424 gst_queue2_flush_temp_file (GstQueue2 * queue)
1425 {
1426 if (queue->temp_file == NULL)
1427 return;
1429 GST_DEBUG_OBJECT (queue, "flushing temp file");
1431 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1432 }
1434 static void
1435 gst_queue2_locked_flush (GstQueue2 * queue)
1436 {
1437 if (!QUEUE_IS_USING_QUEUE (queue)) {
1438 if (QUEUE_IS_USING_TEMP_FILE (queue))
1439 gst_queue2_flush_temp_file (queue);
1440 init_ranges (queue);
1441 } else {
1442 while (!g_queue_is_empty (queue->queue)) {
1443 GstMiniObject *data = g_queue_pop_head (queue->queue);
1445 /* Then lose another reference because we are supposed to destroy that
1446 data when flushing */
1447 gst_mini_object_unref (data);
1448 }
1449 }
1450 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1451 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1452 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1453 if (queue->starting_segment != NULL)
1454 gst_event_unref (queue->starting_segment);
1455 queue->starting_segment = NULL;
1456 queue->segment_event_received = FALSE;
1458 /* we deleted a lot of something */
1459 GST_QUEUE2_SIGNAL_DEL (queue);
1460 }
1462 static gboolean
1463 gst_queue2_wait_free_space (GstQueue2 * queue)
1464 {
1465 /* We make space available if we're "full" according to whatever
1466 * the user defined as "full". */
1467 if (gst_queue2_is_filled (queue)) {
1468 gboolean started;
1470 /* pause the timer while we wait. The fact that we are waiting does not mean
1471 * the byterate on the input pad is lower */
1472 if ((started = queue->in_timer_started))
1473 g_timer_stop (queue->in_timer);
1475 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1476 "queue is full, waiting for free space");
1477 do {
1478 /* Wait for space to be available, we could be unlocked because of a flush. */
1479 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1480 }
1481 while (gst_queue2_is_filled (queue));
1483 /* and continue if we were running before */
1484 if (started)
1485 g_timer_continue (queue->in_timer);
1486 }
1487 return TRUE;
1489 /* ERRORS */
1490 out_flushing:
1491 {
1492 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1493 return FALSE;
1494 }
1495 }
1497 static gboolean
1498 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1499 {
1500 guint8 *data, *ring_buffer;
1501 guint size, rb_size;
1502 guint64 writing_pos, new_writing_pos, max_reading_pos;
1503 gint64 space;
1504 GstQueue2Range *range, *prev, *next;
1506 if (QUEUE_IS_USING_RING_BUFFER (queue))
1507 writing_pos = queue->current->rb_writing_pos;
1508 else
1509 writing_pos = queue->current->writing_pos;
1510 max_reading_pos = queue->current->max_reading_pos;
1511 ring_buffer = queue->ring_buffer;
1512 rb_size = queue->ring_buffer_max_size;
1514 size = GST_BUFFER_SIZE (buffer);
1515 data = GST_BUFFER_DATA (buffer);
1517 GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1518 GST_BUFFER_OFFSET (buffer));
1520 while (size > 0) {
1521 guint to_write;
1523 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1524 /* calculate the space in the ring buffer not used by data from
1525 * the current range */
1526 while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
1527 /* wait until there is some free space */
1528 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1529 }
1530 /* get the amount of space we have */
1531 space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
1533 /* calculate if we need to split or if we can write the entire
1534 * buffer now */
1535 to_write = MIN (size, space);
1537 /* the writing position in the ring buffer after writing (part
1538 * or all of) the buffer */
1539 new_writing_pos = (writing_pos + to_write) % rb_size;
1541 prev = NULL;
1542 range = queue->ranges;
1544 /* if we need to overwrite data in the ring buffer, we need to
1545 * update the ranges
1546 *
1547 * warning: this code is complicated and includes some
1548 * simplifications - pen, paper and diagrams for the cases
1549 * recommended! */
1550 while (range) {
1551 guint64 range_data_start, range_data_end;
1552 GstQueue2Range *range_to_destroy = NULL;
1554 range_data_start = range->rb_offset;
1555 range_data_end = range->rb_writing_pos;
1557 /* handle the special case where the range has no data in it */
1558 if (range->writing_pos == range->offset) {
1559 if (range != queue->current) {
1560 GST_DEBUG_OBJECT (queue,
1561 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1562 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1563 /* remove range */
1564 range_to_destroy = range;
1565 if (prev)
1566 prev->next = range->next;
1567 }
1568 goto next_range;
1569 }
1571 if (range_data_end > range_data_start) {
1572 if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1573 goto next_range;
1575 if (new_writing_pos > range_data_start) {
1576 if (new_writing_pos >= range_data_end) {
1577 GST_DEBUG_OBJECT (queue,
1578 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1579 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1580 /* remove range */
1581 range_to_destroy = range;
1582 if (prev)
1583 prev->next = range->next;
1584 } else {
1585 GST_DEBUG_OBJECT (queue,
1586 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1587 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1588 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1589 range->offset + new_writing_pos - range_data_start,
1590 new_writing_pos);
1591 range->offset += (new_writing_pos - range_data_start);
1592 range->rb_offset = new_writing_pos;
1593 }
1594 }
1595 } else {
1596 guint64 new_wpos_virt = writing_pos + to_write;
1598 if (new_wpos_virt <= range_data_start)
1599 goto next_range;
1601 if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1602 GST_DEBUG_OBJECT (queue,
1603 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
1604 G_GUINT64_FORMAT, range->offset, range->writing_pos);
1605 /* remove range */
1606 range_to_destroy = range;
1607 if (prev)
1608 prev->next = range->next;
1609 } else {
1610 GST_DEBUG_OBJECT (queue,
1611 "advancing offsets from %" G_GUINT64_FORMAT " (%"
1612 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
1613 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
1614 range->offset + new_writing_pos - range_data_start,
1615 new_writing_pos);
1616 range->offset += (new_wpos_virt - range_data_start);
1617 range->rb_offset = new_writing_pos;
1618 }
1619 }
1621 next_range:
1622 if (!range_to_destroy)
1623 prev = range;
1625 range = range->next;
1626 if (range_to_destroy) {
1627 if (range_to_destroy == queue->ranges)
1628 queue->ranges = range;
1629 g_slice_free (GstQueue2Range, range_to_destroy);
1630 range_to_destroy = NULL;
1631 }
1632 }
1633 } else {
1634 space = to_write = size;
1635 new_writing_pos = writing_pos + to_write;
1636 }
1638 if (QUEUE_IS_USING_TEMP_FILE (queue)
1639 && FSEEK_FILE (queue->temp_file, writing_pos))
1640 goto seek_failed;
1642 if (new_writing_pos > writing_pos) {
1643 GST_INFO_OBJECT (queue,
1644 "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
1645 "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
1646 queue->current->writing_pos, queue->current->rb_writing_pos);
1647 /* either not using ring buffer or no wrapping, just write */
1648 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1649 if (fwrite (data, to_write, 1, queue->temp_file) != 1)
1650 goto handle_error;
1651 } else {
1652 memcpy (ring_buffer + writing_pos, data, to_write);
1653 }
1655 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1656 /* try to merge with next range */
1657 while ((next = queue->current->next)) {
1658 GST_INFO_OBJECT (queue,
1659 "checking merge with next range %" G_GUINT64_FORMAT " < %"
1660 G_GUINT64_FORMAT, new_writing_pos, next->offset);
1661 if (new_writing_pos < next->offset)
1662 break;
1664 GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
1665 next->writing_pos);
1667 /* remove the group, we could choose to not read the data in this range
1668 * again. This would involve us doing a seek to the current writing position
1669 * in the range. FIXME, It would probably make sense to do a seek when there
1670 * is a lot of data in the range we merged with to avoid reading it all
1671 * again. */
1672 queue->current->next = next->next;
1673 g_slice_free (GstQueue2Range, next);
1675 debug_ranges (queue);
1676 }
1677 goto update_and_signal;
1678 }
1679 } else {
1680 /* wrapping */
1681 guint block_one, block_two;
1683 block_one = rb_size - writing_pos;
1684 block_two = to_write - block_one;
1686 if (block_one > 0) {
1687 GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
1688 /* write data to end of ring buffer */
1689 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1690 if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1691 goto handle_error;
1692 } else {
1693 memcpy (ring_buffer + writing_pos, data, block_one);
1694 }
1695 }
1697 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
1698 goto seek_failed;
1700 if (block_two > 0) {
1701 GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
1702 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1703 if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
1704 goto handle_error;
1705 } else {
1706 memcpy (ring_buffer, data + block_one, block_two);
1707 }
1708 }
1709 }
1711 update_and_signal:
1712 /* update the writing positions */
1713 size -= to_write;
1714 GST_INFO_OBJECT (queue,
1715 "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
1716 to_write, writing_pos, size);
1718 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1719 data += to_write;
1720 queue->current->writing_pos += to_write;
1721 queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1722 } else {
1723 queue->current->writing_pos = writing_pos = new_writing_pos;
1724 }
1725 update_cur_level (queue, queue->current);
1727 /* update the buffering status */
1728 update_buffering (queue);
1730 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1731 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1733 GST_QUEUE2_SIGNAL_ADD (queue);
1734 };
1736 return TRUE;
1738 /* ERRORS */
1739 out_flushing:
1740 {
1741 GST_DEBUG_OBJECT (queue, "we are flushing");
1742 /* FIXME - GST_FLOW_UNEXPECTED ? */
1743 return FALSE;
1744 }
1745 seek_failed:
1746 {
1747 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1748 return FALSE;
1749 }
1750 handle_error:
1751 {
1752 switch (errno) {
1753 case ENOSPC:{
1754 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1755 break;
1756 }
1757 default:{
1758 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1759 (_("Error while writing to download file.")),
1760 ("%s", g_strerror (errno)));
1761 }
1762 }
1763 return FALSE;
1764 }
1765 }
1767 /* enqueue an item an update the level stats */
1768 static void
1769 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
1770 {
1771 if (GST_IS_BUFFER (item)) {
1772 GstBuffer *buffer;
1773 guint size;
1775 buffer = GST_BUFFER_CAST (item);
1776 size = GST_BUFFER_SIZE (buffer);
1778 /* add buffer to the statistics */
1779 if (QUEUE_IS_USING_QUEUE (queue)) {
1780 queue->cur_level.buffers++;
1781 queue->cur_level.bytes += size;
1782 }
1783 queue->bytes_in += size;
1785 /* apply new buffer to segment stats */
1786 apply_buffer (queue, buffer, &queue->sink_segment);
1787 /* update the byterate stats */
1788 update_in_rates (queue);
1790 if (!QUEUE_IS_USING_QUEUE (queue)) {
1791 /* FIXME - check return value? */
1792 gst_queue2_create_write (queue, buffer);
1793 }
1794 } else if (GST_IS_EVENT (item)) {
1795 GstEvent *event;
1797 event = GST_EVENT_CAST (item);
1799 switch (GST_EVENT_TYPE (event)) {
1800 case GST_EVENT_EOS:
1801 /* Zero the thresholds, this makes sure the queue is completely
1802 * filled and we can read all data from the queue. */
1803 GST_DEBUG_OBJECT (queue, "we have EOS");
1804 queue->is_eos = TRUE;
1805 break;
1806 case GST_EVENT_NEWSEGMENT:
1807 apply_segment (queue, event, &queue->sink_segment);
1808 /* This is our first new segment, we hold it
1809 * as we can't save it on the temp file */
1810 if (!QUEUE_IS_USING_QUEUE (queue)) {
1811 if (queue->segment_event_received)
1812 goto unexpected_event;
1814 queue->segment_event_received = TRUE;
1815 if (queue->starting_segment != NULL)
1816 gst_event_unref (queue->starting_segment);
1817 queue->starting_segment = event;
1818 item = NULL;
1819 }
1820 /* a new segment allows us to accept more buffers if we got UNEXPECTED
1821 * from downstream */
1822 queue->unexpected = FALSE;
1823 break;
1824 default:
1825 if (!QUEUE_IS_USING_QUEUE (queue))
1826 goto unexpected_event;
1827 break;
1828 }
1829 } else {
1830 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
1831 item, GST_OBJECT_NAME (queue));
1832 /* we can't really unref since we don't know what it is */
1833 item = NULL;
1834 }
1836 if (item) {
1837 /* update the buffering status */
1838 update_buffering (queue);
1840 if (QUEUE_IS_USING_QUEUE (queue)) {
1841 g_queue_push_tail (queue->queue, item);
1842 } else {
1843 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
1844 }
1846 GST_QUEUE2_SIGNAL_ADD (queue);
1847 }
1849 return;
1851 /* ERRORS */
1852 unexpected_event:
1853 {
1854 g_warning
1855 ("Unexpected event of kind %s can't be added in temp file of queue %s ",
1856 gst_event_type_get_name (GST_EVENT_TYPE (item)),
1857 GST_OBJECT_NAME (queue));
1858 gst_event_unref (GST_EVENT_CAST (item));
1859 return;
1860 }
1861 }
1863 /* dequeue an item from the queue and update level stats */
1864 static GstMiniObject *
1865 gst_queue2_locked_dequeue (GstQueue2 * queue)
1866 {
1867 GstMiniObject *item;
1869 if (!QUEUE_IS_USING_QUEUE (queue))
1870 item = gst_queue2_read_item_from_file (queue);
1871 else
1872 item = g_queue_pop_head (queue->queue);
1874 if (item == NULL)
1875 goto no_item;
1877 if (GST_IS_BUFFER (item)) {
1878 GstBuffer *buffer;
1879 guint size;
1881 buffer = GST_BUFFER_CAST (item);
1882 size = GST_BUFFER_SIZE (buffer);
1884 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1885 "retrieved buffer %p from queue", buffer);
1887 if (QUEUE_IS_USING_QUEUE (queue)) {
1888 queue->cur_level.buffers--;
1889 queue->cur_level.bytes -= size;
1890 }
1891 queue->bytes_out += size;
1893 apply_buffer (queue, buffer, &queue->src_segment);
1894 /* update the byterate stats */
1895 update_out_rates (queue);
1896 /* update the buffering */
1897 update_buffering (queue);
1899 } else if (GST_IS_EVENT (item)) {
1900 GstEvent *event = GST_EVENT_CAST (item);
1902 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1903 "retrieved event %p from queue", event);
1905 switch (GST_EVENT_TYPE (event)) {
1906 case GST_EVENT_EOS:
1907 /* queue is empty now that we dequeued the EOS */
1908 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1909 break;
1910 case GST_EVENT_NEWSEGMENT:
1911 apply_segment (queue, event, &queue->src_segment);
1912 break;
1913 default:
1914 break;
1915 }
1916 } else {
1917 g_warning
1918 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
1919 item, GST_OBJECT_NAME (queue));
1920 item = NULL;
1921 }
1922 GST_QUEUE2_SIGNAL_DEL (queue);
1924 return item;
1926 /* ERRORS */
1927 no_item:
1928 {
1929 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
1930 return NULL;
1931 }
1932 }
1934 static gboolean
1935 gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
1936 {
1937 GstQueue2 *queue;
1939 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1941 switch (GST_EVENT_TYPE (event)) {
1942 case GST_EVENT_FLUSH_START:
1943 {
1944 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
1945 if (QUEUE_IS_USING_QUEUE (queue)) {
1946 /* forward event */
1947 gst_pad_push_event (queue->srcpad, event);
1949 /* now unblock the chain function */
1950 GST_QUEUE2_MUTEX_LOCK (queue);
1951 queue->srcresult = GST_FLOW_WRONG_STATE;
1952 queue->sinkresult = GST_FLOW_WRONG_STATE;
1953 /* unblock the loop and chain functions */
1954 GST_QUEUE2_SIGNAL_ADD (queue);
1955 GST_QUEUE2_SIGNAL_DEL (queue);
1956 GST_QUEUE2_MUTEX_UNLOCK (queue);
1958 /* make sure it pauses, this should happen since we sent
1959 * flush_start downstream. */
1960 gst_pad_pause_task (queue->srcpad);
1961 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
1962 } else {
1963 GST_QUEUE2_MUTEX_LOCK (queue);
1964 /* flush the sink pad */
1965 queue->sinkresult = GST_FLOW_WRONG_STATE;
1966 GST_QUEUE2_SIGNAL_DEL (queue);
1967 GST_QUEUE2_MUTEX_UNLOCK (queue);
1969 gst_event_unref (event);
1970 }
1971 goto done;
1972 }
1973 case GST_EVENT_FLUSH_STOP:
1974 {
1975 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
1977 if (QUEUE_IS_USING_QUEUE (queue)) {
1978 /* forward event */
1979 gst_pad_push_event (queue->srcpad, event);
1981 GST_QUEUE2_MUTEX_LOCK (queue);
1982 gst_queue2_locked_flush (queue);
1983 queue->srcresult = GST_FLOW_OK;
1984 queue->sinkresult = GST_FLOW_OK;
1985 queue->is_eos = FALSE;
1986 queue->unexpected = FALSE;
1987 /* reset rate counters */
1988 reset_rate_timer (queue);
1989 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
1990 queue->srcpad);
1991 GST_QUEUE2_MUTEX_UNLOCK (queue);
1992 } else {
1993 GST_QUEUE2_MUTEX_LOCK (queue);
1994 queue->segment_event_received = FALSE;
1995 queue->is_eos = FALSE;
1996 queue->unexpected = FALSE;
1997 queue->sinkresult = GST_FLOW_OK;
1998 GST_QUEUE2_MUTEX_UNLOCK (queue);
2000 gst_event_unref (event);
2001 }
2002 goto done;
2003 }
2004 default:
2005 if (GST_EVENT_IS_SERIALIZED (event)) {
2006 /* serialized events go in the queue */
2007 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2008 /* refuse more events on EOS */
2009 if (queue->is_eos)
2010 goto out_eos;
2011 gst_queue2_locked_enqueue (queue, event);
2012 GST_QUEUE2_MUTEX_UNLOCK (queue);
2013 } else {
2014 /* non-serialized events are passed upstream. */
2015 gst_pad_push_event (queue->srcpad, event);
2016 }
2017 break;
2018 }
2019 done:
2020 return TRUE;
2022 /* ERRORS */
2023 out_flushing:
2024 {
2025 GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
2026 GST_QUEUE2_MUTEX_UNLOCK (queue);
2027 gst_event_unref (event);
2028 return FALSE;
2029 }
2030 out_eos:
2031 {
2032 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2033 GST_QUEUE2_MUTEX_UNLOCK (queue);
2034 gst_event_unref (event);
2035 return FALSE;
2036 }
2037 }
2039 static gboolean
2040 gst_queue2_is_empty (GstQueue2 * queue)
2041 {
2042 /* never empty on EOS */
2043 if (queue->is_eos)
2044 return FALSE;
2046 if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2047 return queue->current->writing_pos <= queue->current->max_reading_pos;
2048 } else {
2049 if (queue->queue->length == 0)
2050 return TRUE;
2051 }
2053 return FALSE;
2054 }
2056 static gboolean
2057 gst_queue2_is_filled (GstQueue2 * queue)
2058 {
2059 gboolean res;
2061 /* always filled on EOS */
2062 if (queue->is_eos)
2063 return TRUE;
2065 #define CHECK_FILLED(format,alt_max) ((queue->max_level.format) > 0 && \
2066 (queue->cur_level.format) >= ((alt_max) ? \
2067 MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2069 /* if using a ring buffer we're filled if all ring buffer space is used
2070 * _by the current range_ */
2071 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2072 guint64 rb_size = queue->ring_buffer_max_size;
2073 GST_DEBUG_OBJECT (queue,
2074 "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2075 queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2076 return CHECK_FILLED (bytes, rb_size);
2077 }
2079 /* if using file, we're never filled if we don't have EOS */
2080 if (QUEUE_IS_USING_TEMP_FILE (queue))
2081 return FALSE;
2083 /* we are never filled when we have no buffers at all */
2084 if (queue->cur_level.buffers == 0)
2085 return FALSE;
2087 /* we are filled if one of the current levels exceeds the max */
2088 res = CHECK_FILLED (buffers, 0) || CHECK_FILLED (bytes, 0)
2089 || CHECK_FILLED (time, 0);
2091 /* if we need to, use the rate estimate to check against the max time we are
2092 * allowed to queue */
2093 if (queue->use_rate_estimate)
2094 res |= CHECK_FILLED (rate_time, 0);
2096 #undef CHECK_FILLED
2097 return res;
2098 }
2100 static GstFlowReturn
2101 gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
2102 {
2103 GstQueue2 *queue;
2105 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
2107 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2108 "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
2109 GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
2110 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
2111 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
2113 /* we have to lock the queue since we span threads */
2114 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2115 /* when we received EOS, we refuse more data */
2116 if (queue->is_eos)
2117 goto out_eos;
2118 /* when we received unexpected from downstream, refuse more buffers */
2119 if (queue->unexpected)
2120 goto out_unexpected;
2122 if (!gst_queue2_wait_free_space (queue))
2123 goto out_flushing;
2125 /* put buffer in queue now */
2126 gst_queue2_locked_enqueue (queue, buffer);
2127 GST_QUEUE2_MUTEX_UNLOCK (queue);
2129 return GST_FLOW_OK;
2131 /* special conditions */
2132 out_flushing:
2133 {
2134 GstFlowReturn ret = queue->sinkresult;
2136 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2137 "exit because task paused, reason: %s", gst_flow_get_name (ret));
2138 GST_QUEUE2_MUTEX_UNLOCK (queue);
2139 gst_buffer_unref (buffer);
2141 return ret;
2142 }
2143 out_eos:
2144 {
2145 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2146 GST_QUEUE2_MUTEX_UNLOCK (queue);
2147 gst_buffer_unref (buffer);
2149 return GST_FLOW_UNEXPECTED;
2150 }
2151 out_unexpected:
2152 {
2153 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2154 "exit because we received UNEXPECTED");
2155 GST_QUEUE2_MUTEX_UNLOCK (queue);
2156 gst_buffer_unref (buffer);
2158 return GST_FLOW_UNEXPECTED;
2159 }
2160 }
2162 /* dequeue an item from the queue an push it downstream. This functions returns
2163 * the result of the push. */
2164 static GstFlowReturn
2165 gst_queue2_push_one (GstQueue2 * queue)
2166 {
2167 GstFlowReturn result = GST_FLOW_OK;
2168 GstMiniObject *data;
2170 data = gst_queue2_locked_dequeue (queue);
2171 if (data == NULL)
2172 goto no_item;
2174 next:
2175 if (GST_IS_BUFFER (data)) {
2176 GstBuffer *buffer;
2177 GstCaps *caps;
2179 buffer = GST_BUFFER_CAST (data);
2180 caps = GST_BUFFER_CAPS (buffer);
2182 GST_QUEUE2_MUTEX_UNLOCK (queue);
2184 /* set caps before pushing the buffer so that core does not try to do
2185 * something fancy to check if this is possible. */
2186 if (caps && caps != GST_PAD_CAPS (queue->srcpad))
2187 gst_pad_set_caps (queue->srcpad, caps);
2189 result = gst_pad_push (queue->srcpad, buffer);
2191 /* need to check for srcresult here as well */
2192 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2193 if (result == GST_FLOW_UNEXPECTED) {
2194 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2195 "got UNEXPECTED from downstream");
2196 /* stop pushing buffers, we dequeue all items until we see an item that we
2197 * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
2198 * queue we can push, we set a flag to make the sinkpad refuse more
2199 * buffers with an UNEXPECTED return value until we receive something
2200 * pushable again or we get flushed. */
2201 while ((data = gst_queue2_locked_dequeue (queue))) {
2202 if (GST_IS_BUFFER (data)) {
2203 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2204 "dropping UNEXPECTED buffer %p", data);
2205 gst_buffer_unref (GST_BUFFER_CAST (data));
2206 } else if (GST_IS_EVENT (data)) {
2207 GstEvent *event = GST_EVENT_CAST (data);
2208 GstEventType type = GST_EVENT_TYPE (event);
2210 if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
2211 /* we found a pushable item in the queue, push it out */
2212 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2213 "pushing pushable event %s after UNEXPECTED",
2214 GST_EVENT_TYPE_NAME (event));
2215 goto next;
2216 }
2217 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2218 "dropping UNEXPECTED event %p", event);
2219 gst_event_unref (event);
2220 }
2221 }
2222 /* no more items in the queue. Set the unexpected flag so that upstream
2223 * make us refuse any more buffers on the sinkpad. Since we will still
2224 * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
2225 * task function does not shut down. */
2226 queue->unexpected = TRUE;
2227 result = GST_FLOW_OK;
2228 }
2229 } else if (GST_IS_EVENT (data)) {
2230 GstEvent *event = GST_EVENT_CAST (data);
2231 GstEventType type = GST_EVENT_TYPE (event);
2233 GST_QUEUE2_MUTEX_UNLOCK (queue);
2235 gst_pad_push_event (queue->srcpad, event);
2237 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2238 /* if we're EOS, return UNEXPECTED so that the task pauses. */
2239 if (type == GST_EVENT_EOS) {
2240 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2241 "pushed EOS event %p, return UNEXPECTED", event);
2242 result = GST_FLOW_UNEXPECTED;
2243 }
2244 }
2245 return result;
2247 /* ERRORS */
2248 no_item:
2249 {
2250 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2251 "exit because we have no item in the queue");
2252 return GST_FLOW_ERROR;
2253 }
2254 out_flushing:
2255 {
2256 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2257 return GST_FLOW_WRONG_STATE;
2258 }
2259 }
2261 /* called repeadedly with @pad as the source pad. This function should push out
2262 * data to the peer element. */
2263 static void
2264 gst_queue2_loop (GstPad * pad)
2265 {
2266 GstQueue2 *queue;
2267 GstFlowReturn ret;
2269 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2271 /* have to lock for thread-safety */
2272 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2274 if (gst_queue2_is_empty (queue)) {
2275 gboolean started;
2277 /* pause the timer while we wait. The fact that we are waiting does not mean
2278 * the byterate on the output pad is lower */
2279 if ((started = queue->out_timer_started))
2280 g_timer_stop (queue->out_timer);
2282 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2283 "queue is empty, waiting for new data");
2284 do {
2285 /* Wait for data to be available, we could be unlocked because of a flush. */
2286 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2287 }
2288 while (gst_queue2_is_empty (queue));
2290 /* and continue if we were running before */
2291 if (started)
2292 g_timer_continue (queue->out_timer);
2293 }
2294 ret = gst_queue2_push_one (queue);
2295 queue->srcresult = ret;
2296 queue->sinkresult = ret;
2297 if (ret != GST_FLOW_OK)
2298 goto out_flushing;
2300 GST_QUEUE2_MUTEX_UNLOCK (queue);
2302 return;
2304 /* ERRORS */
2305 out_flushing:
2306 {
2307 gboolean eos = queue->is_eos;
2308 GstFlowReturn ret = queue->srcresult;
2310 gst_pad_pause_task (queue->srcpad);
2311 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2312 "pause task, reason: %s", gst_flow_get_name (queue->srcresult));
2313 GST_QUEUE2_MUTEX_UNLOCK (queue);
2314 /* let app know about us giving up if upstream is not expected to do so */
2315 /* UNEXPECTED is already taken care of elsewhere */
2316 if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
2317 GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2318 (_("Internal data flow error.")),
2319 ("streaming task paused, reason %s (%d)",
2320 gst_flow_get_name (ret), ret));
2321 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2322 }
2323 return;
2324 }
2325 }
2327 static gboolean
2328 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
2329 {
2330 gboolean res = TRUE;
2331 GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2333 #ifndef GST_DISABLE_GST_DEBUG
2334 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2335 event, GST_EVENT_TYPE_NAME (event));
2336 #endif
2338 switch (GST_EVENT_TYPE (event)) {
2339 case GST_EVENT_FLUSH_START:
2340 if (QUEUE_IS_USING_QUEUE (queue)) {
2341 /* just forward upstream */
2342 res = gst_pad_push_event (queue->sinkpad, event);
2343 } else {
2344 /* now unblock the getrange function */
2345 GST_QUEUE2_MUTEX_LOCK (queue);
2346 GST_DEBUG_OBJECT (queue, "flushing");
2347 queue->srcresult = GST_FLOW_WRONG_STATE;
2348 GST_QUEUE2_SIGNAL_ADD (queue);
2349 GST_QUEUE2_MUTEX_UNLOCK (queue);
2351 /* when using a temp file, we eat the event */
2352 res = TRUE;
2353 gst_event_unref (event);
2354 }
2355 break;
2356 case GST_EVENT_FLUSH_STOP:
2357 if (QUEUE_IS_USING_QUEUE (queue)) {
2358 /* just forward upstream */
2359 res = gst_pad_push_event (queue->sinkpad, event);
2360 } else {
2361 /* now unblock the getrange function */
2362 GST_QUEUE2_MUTEX_LOCK (queue);
2363 queue->srcresult = GST_FLOW_OK;
2364 if (queue->current) {
2365 /* forget the highest read offset, we'll calculate a new one when we
2366 * get the next getrange request. We need to do this in order to reset
2367 * the buffering percentage */
2368 queue->current->max_reading_pos = 0;
2369 }
2370 GST_QUEUE2_MUTEX_UNLOCK (queue);
2372 /* when using a temp file, we eat the event */
2373 res = TRUE;
2374 gst_event_unref (event);
2375 }
2376 break;
2377 default:
2378 res = gst_pad_push_event (queue->sinkpad, event);
2379 break;
2380 }
2382 return res;
2383 }
2385 static gboolean
2386 gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
2387 {
2388 gboolean ret = FALSE;
2389 GstPad *peer;
2391 if ((peer = gst_pad_get_peer (pad))) {
2392 ret = gst_pad_query (peer, query);
2393 gst_object_unref (peer);
2394 }
2395 return ret;
2396 }
2398 static gboolean
2399 gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
2400 {
2401 GstQueue2 *queue;
2403 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2405 switch (GST_QUERY_TYPE (query)) {
2406 case GST_QUERY_POSITION:
2407 {
2408 gint64 peer_pos;
2409 GstFormat format;
2411 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2412 goto peer_failed;
2414 /* get peer position */
2415 gst_query_parse_position (query, &format, &peer_pos);
2417 /* FIXME: this code assumes that there's no discont in the queue */
2418 switch (format) {
2419 case GST_FORMAT_BYTES:
2420 peer_pos -= queue->cur_level.bytes;
2421 break;
2422 case GST_FORMAT_TIME:
2423 peer_pos -= queue->cur_level.time;
2424 break;
2425 default:
2426 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2427 "know how to adjust value", gst_format_get_name (format));
2428 return FALSE;
2429 }
2430 /* set updated position */
2431 gst_query_set_position (query, format, peer_pos);
2432 break;
2433 }
2434 case GST_QUERY_DURATION:
2435 {
2436 GST_DEBUG_OBJECT (queue, "doing peer query");
2438 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2439 goto peer_failed;
2441 GST_DEBUG_OBJECT (queue, "peer query success");
2442 break;
2443 }
2444 case GST_QUERY_BUFFERING:
2445 {
2446 GstFormat format;
2448 GST_DEBUG_OBJECT (queue, "query buffering");
2450 /* FIXME - is this condition correct? what should ring buffer do? */
2451 if (QUEUE_IS_USING_QUEUE (queue)) {
2452 /* no temp file, just forward to the peer */
2453 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2454 goto peer_failed;
2455 GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
2456 } else {
2457 gint64 start, stop, range_start, range_stop;
2458 guint64 writing_pos;
2459 gint percent;
2460 gint64 estimated_total, buffering_left;
2461 GstFormat peer_fmt;
2462 gint64 duration;
2463 gboolean peer_res, is_buffering, is_eos;
2464 gdouble byte_in_rate, byte_out_rate;
2465 GstQueue2Range *queued_ranges;
2467 /* we need a current download region */
2468 if (queue->current == NULL)
2469 return FALSE;
2471 writing_pos = queue->current->writing_pos;
2472 byte_in_rate = queue->byte_in_rate;
2473 byte_out_rate = queue->byte_out_rate;
2474 is_buffering = queue->is_buffering;
2475 is_eos = queue->is_eos;
2476 percent = queue->buffering_percent;
2478 if (is_eos) {
2479 /* we're EOS, we know the duration in bytes now */
2480 peer_res = TRUE;
2481 duration = writing_pos;
2482 } else {
2483 /* get duration of upstream in bytes */
2484 peer_fmt = GST_FORMAT_BYTES;
2485 peer_res = gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
2486 &duration);
2487 }
2489 /* calculate remaining and total download time */
2490 if (peer_res && byte_in_rate > 0.0) {
2491 estimated_total = (duration * 1000) / byte_in_rate;
2492 buffering_left = ((duration - writing_pos) * 1000) / byte_in_rate;
2493 } else {
2494 estimated_total = -1;
2495 buffering_left = -1;
2496 }
2497 GST_DEBUG_OBJECT (queue, "estimated %" G_GINT64_FORMAT ", left %"
2498 G_GINT64_FORMAT, estimated_total, buffering_left);
2500 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2502 switch (format) {
2503 case GST_FORMAT_PERCENT:
2504 /* we need duration */
2505 if (!peer_res)
2506 goto peer_failed;
2508 GST_DEBUG_OBJECT (queue,
2509 "duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
2510 duration, writing_pos);
2512 start = 0;
2513 /* get our available data relative to the duration */
2514 if (duration != -1)
2515 stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
2516 else
2517 stop = -1;
2518 break;
2519 case GST_FORMAT_BYTES:
2520 start = 0;
2521 stop = writing_pos;
2522 break;
2523 default:
2524 start = -1;
2525 stop = -1;
2526 break;
2527 }
2529 /* fill out the buffered ranges */
2530 for (queued_ranges = queue->ranges; queued_ranges;
2531 queued_ranges = queued_ranges->next) {
2532 switch (format) {
2533 case GST_FORMAT_PERCENT:
2534 if (duration == -1) {
2535 range_start = 0;
2536 range_stop = 0;
2537 break;
2538 }
2539 range_start = 100 * queued_ranges->offset / duration;
2540 range_stop = 100 * queued_ranges->writing_pos / duration;
2541 break;
2542 case GST_FORMAT_BYTES:
2543 range_start = queued_ranges->offset;
2544 range_stop = queued_ranges->writing_pos;
2545 break;
2546 default:
2547 range_start = -1;
2548 range_stop = -1;
2549 break;
2550 }
2551 if (range_start == range_stop)
2552 continue;
2553 GST_DEBUG_OBJECT (queue,
2554 "range starting at %" G_GINT64_FORMAT " and finishing at %"
2555 G_GINT64_FORMAT, range_start, range_stop);
2556 gst_query_add_buffering_range (query, range_start, range_stop);
2557 }
2559 gst_query_set_buffering_percent (query, is_buffering, percent);
2560 gst_query_set_buffering_range (query, format, start, stop,
2561 estimated_total);
2562 gst_query_set_buffering_stats (query, GST_BUFFERING_DOWNLOAD,
2563 byte_in_rate, byte_out_rate, buffering_left);
2564 }
2565 break;
2566 }
2567 default:
2568 /* peer handled other queries */
2569 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2570 goto peer_failed;
2571 break;
2572 }
2574 return TRUE;
2576 /* ERRORS */
2577 peer_failed:
2578 {
2579 GST_DEBUG_OBJECT (queue, "failed peer query");
2580 return FALSE;
2581 }
2582 }
2584 static gboolean
2585 gst_queue2_handle_query (GstElement * element, GstQuery * query)
2586 {
2587 /* simply forward to the srcpad query function */
2588 return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
2589 }
2591 static GstFlowReturn
2592 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
2593 GstBuffer ** buffer)
2594 {
2595 GstQueue2 *queue;
2596 GstFlowReturn ret;
2598 queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
2600 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2601 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
2602 offset = (offset == -1) ? queue->current->reading_pos : offset;
2604 GST_DEBUG_OBJECT (queue,
2605 "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
2606 /* FIXME - function will block when the range is not yet available */
2607 ret = gst_queue2_create_read (queue, offset, length, buffer);
2608 GST_QUEUE2_MUTEX_UNLOCK (queue);
2610 gst_object_unref (queue);
2612 return ret;
2614 /* ERRORS */
2615 out_flushing:
2616 {
2617 ret = queue->srcresult;
2619 GST_DEBUG_OBJECT (queue, "we are flushing");
2620 GST_QUEUE2_MUTEX_UNLOCK (queue);
2621 return ret;
2622 }
2623 }
2625 static gboolean
2626 gst_queue2_src_checkgetrange_function (GstPad * pad)
2627 {
2628 GstQueue2 *queue;
2629 gboolean ret;
2631 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2633 /* we can operate in pull mode when we are using a tempfile */
2634 ret = !QUEUE_IS_USING_QUEUE (queue);
2636 gst_object_unref (GST_OBJECT (queue));
2638 return ret;
2639 }
2641 /* sink currently only operates in push mode */
2642 static gboolean
2643 gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
2644 {
2645 gboolean result = TRUE;
2646 GstQueue2 *queue;
2648 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2650 if (active) {
2651 GST_QUEUE2_MUTEX_LOCK (queue);
2652 GST_DEBUG_OBJECT (queue, "activating push mode");
2653 queue->srcresult = GST_FLOW_OK;
2654 queue->sinkresult = GST_FLOW_OK;
2655 queue->is_eos = FALSE;
2656 queue->unexpected = FALSE;
2657 reset_rate_timer (queue);
2658 GST_QUEUE2_MUTEX_UNLOCK (queue);
2659 } else {
2660 /* unblock chain function */
2661 GST_QUEUE2_MUTEX_LOCK (queue);
2662 GST_DEBUG_OBJECT (queue, "deactivating push mode");
2663 queue->srcresult = GST_FLOW_WRONG_STATE;
2664 queue->sinkresult = GST_FLOW_WRONG_STATE;
2665 gst_queue2_locked_flush (queue);
2666 GST_QUEUE2_MUTEX_UNLOCK (queue);
2667 }
2669 gst_object_unref (queue);
2671 return result;
2672 }
2674 /* src operating in push mode, we start a task on the source pad that pushes out
2675 * buffers from the queue */
2676 static gboolean
2677 gst_queue2_src_activate_push (GstPad * pad, gboolean active)
2678 {
2679 gboolean result = FALSE;
2680 GstQueue2 *queue;
2682 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2684 if (active) {
2685 GST_QUEUE2_MUTEX_LOCK (queue);
2686 GST_DEBUG_OBJECT (queue, "activating push mode");
2687 queue->srcresult = GST_FLOW_OK;
2688 queue->sinkresult = GST_FLOW_OK;
2689 queue->is_eos = FALSE;
2690 queue->unexpected = FALSE;
2691 result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
2692 GST_QUEUE2_MUTEX_UNLOCK (queue);
2693 } else {
2694 /* unblock loop function */
2695 GST_QUEUE2_MUTEX_LOCK (queue);
2696 GST_DEBUG_OBJECT (queue, "deactivating push mode");
2697 queue->srcresult = GST_FLOW_WRONG_STATE;
2698 queue->sinkresult = GST_FLOW_WRONG_STATE;
2699 /* the item add signal will unblock */
2700 GST_QUEUE2_SIGNAL_ADD (queue);
2701 GST_QUEUE2_MUTEX_UNLOCK (queue);
2703 /* step 2, make sure streaming finishes */
2704 result = gst_pad_stop_task (pad);
2705 }
2707 gst_object_unref (queue);
2709 return result;
2710 }
2712 /* pull mode, downstream will call our getrange function */
2713 static gboolean
2714 gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
2715 {
2716 gboolean result;
2717 GstQueue2 *queue;
2719 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2721 if (active) {
2722 GST_QUEUE2_MUTEX_LOCK (queue);
2723 if (!QUEUE_IS_USING_QUEUE (queue)) {
2724 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2725 /* open the temp file now */
2726 result = gst_queue2_open_temp_location_file (queue);
2727 } else if (!queue->ring_buffer) {
2728 queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
2729 result = !!queue->ring_buffer;
2730 } else {
2731 result = TRUE;
2732 }
2734 GST_DEBUG_OBJECT (queue, "activating pull mode");
2735 init_ranges (queue);
2736 queue->srcresult = GST_FLOW_OK;
2737 queue->sinkresult = GST_FLOW_OK;
2738 queue->is_eos = FALSE;
2739 queue->unexpected = FALSE;
2740 GST_QUEUE2_MUTEX_UNLOCK (queue);
2741 } else {
2742 GST_QUEUE2_MUTEX_LOCK (queue);
2743 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
2744 /* this is not allowed, we cannot operate in pull mode without a temp
2745 * file. */
2746 queue->srcresult = GST_FLOW_WRONG_STATE;
2747 queue->sinkresult = GST_FLOW_WRONG_STATE;
2748 result = FALSE;
2749 GST_QUEUE2_MUTEX_UNLOCK (queue);
2750 }
2751 } else {
2752 GST_QUEUE2_MUTEX_LOCK (queue);
2753 GST_DEBUG_OBJECT (queue, "deactivating pull mode");
2754 queue->srcresult = GST_FLOW_WRONG_STATE;
2755 queue->sinkresult = GST_FLOW_WRONG_STATE;
2756 /* this will unlock getrange */
2757 GST_QUEUE2_SIGNAL_ADD (queue);
2758 result = TRUE;
2759 GST_QUEUE2_MUTEX_UNLOCK (queue);
2760 }
2761 gst_object_unref (queue);
2763 return result;
2764 }
2766 static GstStateChangeReturn
2767 gst_queue2_change_state (GstElement * element, GstStateChange transition)
2768 {
2769 GstQueue2 *queue;
2770 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
2772 queue = GST_QUEUE2 (element);
2774 switch (transition) {
2775 case GST_STATE_CHANGE_NULL_TO_READY:
2776 break;
2777 case GST_STATE_CHANGE_READY_TO_PAUSED:
2778 GST_QUEUE2_MUTEX_LOCK (queue);
2779 if (!QUEUE_IS_USING_QUEUE (queue)) {
2780 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2781 if (!gst_queue2_open_temp_location_file (queue))
2782 ret = GST_STATE_CHANGE_FAILURE;
2783 } else {
2784 if (queue->ring_buffer) {
2785 g_free (queue->ring_buffer);
2786 queue->ring_buffer = NULL;
2787 }
2788 if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
2789 ret = GST_STATE_CHANGE_FAILURE;
2790 }
2791 init_ranges (queue);
2792 }
2793 queue->segment_event_received = FALSE;
2794 queue->starting_segment = NULL;
2795 GST_QUEUE2_MUTEX_UNLOCK (queue);
2796 break;
2797 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2798 break;
2799 default:
2800 break;
2801 }
2803 if (ret == GST_STATE_CHANGE_FAILURE)
2804 return ret;
2806 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2808 if (ret == GST_STATE_CHANGE_FAILURE)
2809 return ret;
2811 switch (transition) {
2812 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2813 break;
2814 case GST_STATE_CHANGE_PAUSED_TO_READY:
2815 GST_QUEUE2_MUTEX_LOCK (queue);
2816 if (!QUEUE_IS_USING_QUEUE (queue)) {
2817 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2818 gst_queue2_close_temp_location_file (queue);
2819 } else if (queue->ring_buffer) {
2820 g_free (queue->ring_buffer);
2821 queue->ring_buffer = NULL;
2822 }
2823 }
2824 if (queue->starting_segment != NULL) {
2825 gst_event_unref (queue->starting_segment);
2826 queue->starting_segment = NULL;
2827 }
2828 GST_QUEUE2_MUTEX_UNLOCK (queue);
2829 break;
2830 case GST_STATE_CHANGE_READY_TO_NULL:
2831 break;
2832 default:
2833 break;
2834 }
2836 return ret;
2837 }
2839 /* changing the capacity of the queue must wake up
2840 * the _chain function, it might have more room now
2841 * to store the buffer/event in the queue */
2842 #define QUEUE_CAPACITY_CHANGE(q)\
2843 GST_QUEUE2_SIGNAL_DEL (queue);
2845 /* Changing the minimum required fill level must
2846 * wake up the _loop function as it might now
2847 * be able to preceed.
2848 */
2849 #define QUEUE_THRESHOLD_CHANGE(q)\
2850 GST_QUEUE2_SIGNAL_ADD (queue);
2852 static void
2853 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
2854 {
2855 GstState state;
2857 /* the element must be stopped in order to do this */
2858 GST_OBJECT_LOCK (queue);
2859 state = GST_STATE (queue);
2860 if (state != GST_STATE_READY && state != GST_STATE_NULL)
2861 goto wrong_state;
2862 GST_OBJECT_UNLOCK (queue);
2864 /* set new location */
2865 g_free (queue->temp_template);
2866 queue->temp_template = g_strdup (template);
2868 return;
2870 /* ERROR */
2871 wrong_state:
2872 {
2873 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
2874 GST_OBJECT_UNLOCK (queue);
2875 }
2876 }
2878 static void
2879 gst_queue2_set_property (GObject * object,
2880 guint prop_id, const GValue * value, GParamSpec * pspec)
2881 {
2882 GstQueue2 *queue = GST_QUEUE2 (object);
2884 /* someone could change levels here, and since this
2885 * affects the get/put funcs, we need to lock for safety. */
2886 GST_QUEUE2_MUTEX_LOCK (queue);
2888 switch (prop_id) {
2889 case PROP_MAX_SIZE_BYTES:
2890 queue->max_level.bytes = g_value_get_uint (value);
2891 QUEUE_CAPACITY_CHANGE (queue);
2892 break;
2893 case PROP_MAX_SIZE_BUFFERS:
2894 queue->max_level.buffers = g_value_get_uint (value);
2895 QUEUE_CAPACITY_CHANGE (queue);
2896 break;
2897 case PROP_MAX_SIZE_TIME:
2898 queue->max_level.time = g_value_get_uint64 (value);
2899 /* set rate_time to the same value. We use an extra field in the level
2900 * structure so that we can easily access and compare it */
2901 queue->max_level.rate_time = queue->max_level.time;
2902 QUEUE_CAPACITY_CHANGE (queue);
2903 break;
2904 case PROP_USE_BUFFERING:
2905 queue->use_buffering = g_value_get_boolean (value);
2906 break;
2907 case PROP_USE_RATE_ESTIMATE:
2908 queue->use_rate_estimate = g_value_get_boolean (value);
2909 break;
2910 case PROP_LOW_PERCENT:
2911 queue->low_percent = g_value_get_int (value);
2912 break;
2913 case PROP_HIGH_PERCENT:
2914 queue->high_percent = g_value_get_int (value);
2915 break;
2916 case PROP_TEMP_TEMPLATE:
2917 gst_queue2_set_temp_template (queue, g_value_get_string (value));
2918 break;
2919 case PROP_TEMP_LOCATION:
2920 g_free (queue->temp_location);
2921 queue->temp_location = g_value_dup_string (value);
2922 /* you can set the property back to NULL to make it use the temp-tmpl
2923 * property. */
2924 queue->temp_location_set = queue->temp_location != NULL;
2925 break;
2926 case PROP_TEMP_REMOVE:
2927 queue->temp_remove = g_value_get_boolean (value);
2928 break;
2929 case PROP_RING_BUFFER_MAX_SIZE:
2930 queue->ring_buffer_max_size = g_value_get_uint64 (value);
2931 queue->use_ring_buffer = !!queue->ring_buffer_max_size;
2932 break;
2933 default:
2934 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2935 break;
2936 }
2938 GST_QUEUE2_MUTEX_UNLOCK (queue);
2939 }
2941 static void
2942 gst_queue2_get_property (GObject * object,
2943 guint prop_id, GValue * value, GParamSpec * pspec)
2944 {
2945 GstQueue2 *queue = GST_QUEUE2 (object);
2947 GST_QUEUE2_MUTEX_LOCK (queue);
2949 switch (prop_id) {
2950 case PROP_CUR_LEVEL_BYTES:
2951 g_value_set_uint (value, queue->cur_level.bytes);
2952 break;
2953 case PROP_CUR_LEVEL_BUFFERS:
2954 g_value_set_uint (value, queue->cur_level.buffers);
2955 break;
2956 case PROP_CUR_LEVEL_TIME:
2957 g_value_set_uint64 (value, queue->cur_level.time);
2958 break;
2959 case PROP_MAX_SIZE_BYTES:
2960 g_value_set_uint (value, queue->max_level.bytes);
2961 break;
2962 case PROP_MAX_SIZE_BUFFERS:
2963 g_value_set_uint (value, queue->max_level.buffers);
2964 break;
2965 case PROP_MAX_SIZE_TIME:
2966 g_value_set_uint64 (value, queue->max_level.time);
2967 break;
2968 case PROP_USE_BUFFERING:
2969 g_value_set_boolean (value, queue->use_buffering);
2970 break;
2971 case PROP_USE_RATE_ESTIMATE:
2972 g_value_set_boolean (value, queue->use_rate_estimate);
2973 break;
2974 case PROP_LOW_PERCENT:
2975 g_value_set_int (value, queue->low_percent);
2976 break;
2977 case PROP_HIGH_PERCENT:
2978 g_value_set_int (value, queue->high_percent);
2979 break;
2980 case PROP_TEMP_TEMPLATE:
2981 g_value_set_string (value, queue->temp_template);
2982 break;
2983 case PROP_TEMP_LOCATION:
2984 g_value_set_string (value, queue->temp_location);
2985 break;
2986 case PROP_TEMP_REMOVE:
2987 g_value_set_boolean (value, queue->temp_remove);
2988 break;
2989 case PROP_RING_BUFFER_MAX_SIZE:
2990 g_value_set_uint64 (value, queue->ring_buffer_max_size);
2991 break;
2992 default:
2993 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2994 break;
2995 }
2997 GST_QUEUE2_MUTEX_UNLOCK (queue);
2998 }