1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2003 Colin Walters <cwalters@gnome.org>
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6 * SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7 *
8 * gstqueue2.c:
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
19 *
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 02111-1307, USA.
24 */
26 /**
27 * SECTION:element-queue2
28 *
29 * Data is queued until one of the limits specified by the
30 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
31 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32 * more buffers into the queue will block the pushing thread until more space
33 * becomes available.
34 *
35 * The queue will create a new thread on the source pad to decouple the
36 * processing on sink and source pad.
37 *
38 * You can query how many buffers are queued by reading the
39 * #GstQueue2:current-level-buffers property.
40 *
41 * The default queue size limits are 100 buffers, 2MB of data, or
42 * two seconds worth of data, whichever is reached first.
43 *
44 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
45 * will allocate a random free filename and buffer data in the file.
46 * By using this, it will buffer the entire stream data on the file independently
47 * of the queue size limits, they will only be used for buffering statistics.
48 *
49 * Since 0.10.24, setting the temp-location property with a filename is deprecated
50 * because it's impossible to securely open a temporary file in this way. The
51 * property will still be used to notify the application of the allocated
52 * filename, though.
53 *
54 * Last reviewed on 2009-07-10 (0.10.24)
55 */
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
61 #include "gstqueue2.h"
63 #include <glib/gstdio.h>
65 #include "gst/gst-i18n-lib.h"
67 #ifdef G_OS_WIN32
68 #include <io.h> /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
77 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
78 GST_PAD_SINK,
79 GST_PAD_ALWAYS,
80 GST_STATIC_CAPS_ANY);
82 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
83 GST_PAD_SRC,
84 GST_PAD_ALWAYS,
85 GST_STATIC_CAPS_ANY);
87 GST_DEBUG_CATEGORY_STATIC (queue_debug);
88 #define GST_CAT_DEFAULT (queue_debug)
89 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
91 enum
92 {
93 LAST_SIGNAL
94 };
96 /* other defines */
97 #define DEFAULT_BUFFER_SIZE 4096
98 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
99 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */
101 /* default property values */
102 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
103 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
104 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
105 #define DEFAULT_USE_BUFFERING FALSE
106 #define DEFAULT_USE_RATE_ESTIMATE TRUE
107 #define DEFAULT_LOW_PERCENT 10
108 #define DEFAULT_HIGH_PERCENT 99
109 #define DEFAULT_TEMP_REMOVE TRUE
110 #define DEFAULT_USE_RING_BUFFER FALSE
111 #define DEFAULT_RING_BUFFER_MAX_SIZE (1024 * DEFAULT_BUFFER_SIZE) /* 4 MB */
113 enum
114 {
115 PROP_0,
116 PROP_CUR_LEVEL_BUFFERS,
117 PROP_CUR_LEVEL_BYTES,
118 PROP_CUR_LEVEL_TIME,
119 PROP_MAX_SIZE_BUFFERS,
120 PROP_MAX_SIZE_BYTES,
121 PROP_MAX_SIZE_TIME,
122 PROP_USE_BUFFERING,
123 PROP_USE_RATE_ESTIMATE,
124 PROP_LOW_PERCENT,
125 PROP_HIGH_PERCENT,
126 PROP_TEMP_TEMPLATE,
127 PROP_TEMP_LOCATION,
128 PROP_TEMP_REMOVE,
129 PROP_USE_RING_BUFFER,
130 PROP_RING_BUFFER_MAX_SIZE,
131 PROP_LAST
132 };
134 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
135 l.buffers = 0; \
136 l.bytes = 0; \
137 l.time = 0; \
138 l.rate_time = 0; \
139 } G_STMT_END
141 #define STATUS(queue, pad, msg) \
142 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
143 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
144 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
145 " ns, %"G_GUINT64_FORMAT" items", \
146 GST_DEBUG_PAD_NAME (pad), \
147 queue->cur_level.buffers, \
148 queue->max_level.buffers, \
149 queue->cur_level.bytes, \
150 queue->max_level.bytes, \
151 queue->cur_level.time, \
152 queue->max_level.time, \
153 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
154 queue->current->writing_pos - queue->current->max_reading_pos : \
155 queue->queue->length))
157 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
158 g_mutex_lock (q->qlock); \
159 } G_STMT_END
161 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
162 GST_QUEUE2_MUTEX_LOCK (q); \
163 if (res != GST_FLOW_OK) \
164 goto label; \
165 } G_STMT_END
167 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
168 g_mutex_unlock (q->qlock); \
169 } G_STMT_END
171 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
172 STATUS (queue, q->sinkpad, "wait for DEL"); \
173 q->waiting_del = TRUE; \
174 g_cond_wait (q->item_del, queue->qlock); \
175 q->waiting_del = FALSE; \
176 if (res != GST_FLOW_OK) { \
177 STATUS (queue, q->srcpad, "received DEL wakeup"); \
178 goto label; \
179 } \
180 STATUS (queue, q->sinkpad, "received DEL"); \
181 } G_STMT_END
183 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
184 STATUS (queue, q->srcpad, "wait for ADD"); \
185 q->waiting_add = TRUE; \
186 g_cond_wait (q->item_add, q->qlock); \
187 q->waiting_add = FALSE; \
188 if (res != GST_FLOW_OK) { \
189 STATUS (queue, q->srcpad, "received ADD wakeup"); \
190 goto label; \
191 } \
192 STATUS (queue, q->srcpad, "received ADD"); \
193 } G_STMT_END
195 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
196 if (q->waiting_del) { \
197 STATUS (q, q->srcpad, "signal DEL"); \
198 g_cond_signal (q->item_del); \
199 } \
200 } G_STMT_END
202 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
203 if (q->waiting_add) { \
204 STATUS (q, q->sinkpad, "signal ADD"); \
205 g_cond_signal (q->item_add); \
206 } \
207 } G_STMT_END
209 #define _do_init(bla) \
210 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
211 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
212 "dataflow inside the queue element");
214 GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
215 _do_init);
217 static void gst_queue2_finalize (GObject * object);
219 static void gst_queue2_set_property (GObject * object,
220 guint prop_id, const GValue * value, GParamSpec * pspec);
221 static void gst_queue2_get_property (GObject * object,
222 guint prop_id, GValue * value, GParamSpec * pspec);
224 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
225 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
226 guint size, GstCaps * caps, GstBuffer ** buf);
227 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
228 static void gst_queue2_loop (GstPad * pad);
230 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
232 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
233 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
234 static gboolean gst_queue2_handle_query (GstElement * element,
235 GstQuery * query);
237 static GstCaps *gst_queue2_getcaps (GstPad * pad);
238 static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
240 static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
241 guint length, GstBuffer ** buffer);
242 static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
244 static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
245 static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
246 static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
247 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
248 GstStateChange transition);
250 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
251 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
253 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
256 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
258 static void
259 gst_queue2_base_init (gpointer g_class)
260 {
261 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
263 gst_element_class_add_pad_template (gstelement_class,
264 gst_static_pad_template_get (&srctemplate));
265 gst_element_class_add_pad_template (gstelement_class,
266 gst_static_pad_template_get (&sinktemplate));
268 gst_element_class_set_details_simple (gstelement_class, "Queue 2",
269 "Generic",
270 "Simple data queue",
271 "Erik Walthinsen <omega@cse.ogi.edu>, "
272 "Wim Taymans <wim.taymans@gmail.com>");
273 }
275 static void
276 gst_queue2_class_init (GstQueue2Class * klass)
277 {
278 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
279 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
281 parent_class = g_type_class_peek_parent (klass);
283 gobject_class->set_property = gst_queue2_set_property;
284 gobject_class->get_property = gst_queue2_get_property;
286 /* properties */
287 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
288 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
289 "Current amount of data in the queue (bytes)",
290 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
291 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
292 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
293 "Current number of buffers in the queue",
294 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
295 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
296 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
297 "Current amount of data in the queue (in ns)",
298 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
300 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
301 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
302 "Max. amount of data in the queue (bytes, 0=disable)",
303 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
304 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
306 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
307 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
308 DEFAULT_MAX_SIZE_BUFFERS,
309 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
311 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
312 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
313 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
316 g_param_spec_boolean ("use-buffering", "Use buffering",
317 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
318 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
319 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
320 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
321 "Estimate the bitrate of the stream to calculate time level",
322 DEFAULT_USE_RATE_ESTIMATE,
323 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
325 g_param_spec_int ("low-percent", "Low percent",
326 "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
327 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
329 g_param_spec_int ("high-percent", "High percent",
330 "High threshold for buffering to finish", 0, 100,
331 DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
334 g_param_spec_string ("temp-template", "Temporary File Template",
335 "File template to store temporary files in, should contain directory "
336 "and XXXXXX. (NULL == disabled)",
337 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
340 g_param_spec_string ("temp-location", "Temporary File Location",
341 "Location to store temporary files in (Deprecated: Only read this "
342 "property, use temp-template to configure the name template)",
343 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345 /**
346 * GstQueue2:temp-remove
347 *
348 * When temp-template is set, remove the temporary file when going to READY.
349 *
350 * Since: 0.10.26
351 */
352 g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
353 g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
354 "Remove the temp-location after use",
355 DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357 /**
358 * GstQueue2:use-ring-buffer
359 *
360 * When use-ring-buffer is set, buffer data into a ring buffer containing ranges
361 * of source data. Default FALSE.
362 *
363 * Since: 0.10.30
364 */
365 g_object_class_install_property (gobject_class, PROP_USE_RING_BUFFER,
366 g_param_spec_boolean ("use-ring-buffer", "Use a ring buffer",
367 "Use a ring buffer of size ring-buffer-max-size kB",
368 DEFAULT_USE_RING_BUFFER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369 /**
370 * GstQueue2:ring-buffer-max-size
371 *
372 * The maximum size of the ring buffer in kilobytes. If set to 0 kB then the size
373 * is unlimited. Default 16 megabytes.
374 *
375 * Since: 0.10.30
376 */
377 g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
378 g_param_spec_uint ("ring-buffer-max-size", "Max. ring buffer size (kB)",
379 "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
380 DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
381 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 /* set several parent class virtual functions */
384 gobject_class->finalize = gst_queue2_finalize;
386 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
387 gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
388 }
390 static void
391 gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
392 {
393 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
395 gst_pad_set_chain_function (queue->sinkpad,
396 GST_DEBUG_FUNCPTR (gst_queue2_chain));
397 gst_pad_set_activatepush_function (queue->sinkpad,
398 GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
399 gst_pad_set_event_function (queue->sinkpad,
400 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
401 gst_pad_set_getcaps_function (queue->sinkpad,
402 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
403 gst_pad_set_acceptcaps_function (queue->sinkpad,
404 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
405 gst_pad_set_bufferalloc_function (queue->sinkpad,
406 GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
407 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
409 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
411 gst_pad_set_activatepull_function (queue->srcpad,
412 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
413 gst_pad_set_activatepush_function (queue->srcpad,
414 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
415 gst_pad_set_getrange_function (queue->srcpad,
416 GST_DEBUG_FUNCPTR (gst_queue2_get_range));
417 gst_pad_set_checkgetrange_function (queue->srcpad,
418 GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
419 gst_pad_set_getcaps_function (queue->srcpad,
420 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
421 gst_pad_set_acceptcaps_function (queue->srcpad,
422 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
423 gst_pad_set_event_function (queue->srcpad,
424 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
425 gst_pad_set_query_function (queue->srcpad,
426 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
427 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
429 /* levels */
430 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
431 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
432 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
433 queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
434 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
435 queue->use_buffering = DEFAULT_USE_BUFFERING;
436 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
437 queue->low_percent = DEFAULT_LOW_PERCENT;
438 queue->high_percent = DEFAULT_HIGH_PERCENT;
440 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
441 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
443 queue->srcresult = GST_FLOW_WRONG_STATE;
444 queue->sinkresult = GST_FLOW_WRONG_STATE;
445 queue->is_eos = FALSE;
446 queue->in_timer = g_timer_new ();
447 queue->out_timer = g_timer_new ();
449 queue->qlock = g_mutex_new ();
450 queue->waiting_add = FALSE;
451 queue->item_add = g_cond_new ();
452 queue->waiting_del = FALSE;
453 queue->item_del = g_cond_new ();
454 queue->queue = g_queue_new ();
456 /* tempfile related */
457 queue->temp_template = NULL;
458 queue->temp_location = NULL;
459 queue->temp_location_set = FALSE;
460 queue->temp_remove = DEFAULT_TEMP_REMOVE;
462 queue->use_ring_buffer = DEFAULT_USE_RING_BUFFER;
463 queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
464 GST_DEBUG_OBJECT (queue,
465 "initialized queue's not_empty & not_full conditions");
466 }
468 /* called only once, as opposed to dispose */
469 static void
470 gst_queue2_finalize (GObject * object)
471 {
472 GstQueue2 *queue = GST_QUEUE2 (object);
474 GST_DEBUG_OBJECT (queue, "finalizing queue");
476 while (!g_queue_is_empty (queue->queue)) {
477 GstMiniObject *data = g_queue_pop_head (queue->queue);
479 gst_mini_object_unref (data);
480 }
482 g_queue_free (queue->queue);
483 g_mutex_free (queue->qlock);
484 g_cond_free (queue->item_add);
485 g_cond_free (queue->item_del);
486 g_timer_destroy (queue->in_timer);
487 g_timer_destroy (queue->out_timer);
489 /* temp_file path cleanup */
490 g_free (queue->temp_template);
491 g_free (queue->temp_location);
493 G_OBJECT_CLASS (parent_class)->finalize (object);
494 }
496 static void
497 debug_ranges (GstQueue2 * queue)
498 {
499 GstQueue2Range *walk;
501 for (walk = queue->ranges; walk; walk = walk->next) {
502 GST_DEBUG_OBJECT (queue, "range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT,
503 walk->offset, walk->writing_pos);
504 }
505 }
507 /* clear all the downloaded ranges */
508 static void
509 clean_ranges (GstQueue2 * queue)
510 {
511 GST_DEBUG_OBJECT (queue, "clean queue ranges");
513 g_slice_free_chain (GstQueue2Range, queue->ranges, next);
514 queue->ranges = NULL;
515 queue->current = NULL;
516 }
518 /* find a range that contains @offset or NULL when nothing does */
519 static GstQueue2Range *
520 find_range (GstQueue2 * queue, guint64 offset, guint64 length)
521 {
522 GstQueue2Range *range = NULL;
523 GstQueue2Range *walk;
525 /* first do a quick check for the current range */
526 for (walk = queue->ranges; walk; walk = walk->next) {
527 if (offset >= walk->offset && offset <= walk->writing_pos) {
528 /* we can reuse an existing range */
529 range = walk;
530 break;
531 }
532 }
533 return range;
534 }
536 /* make a new range for @offset or reuse an existing range */
537 static GstQueue2Range *
538 add_range (GstQueue2 * queue, guint64 offset)
539 {
540 GstQueue2Range *range, *prev, *next;
542 GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
544 if ((range = find_range (queue, offset, 0))) {
545 GST_DEBUG_OBJECT (queue,
546 "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
547 range->writing_pos);
548 range->writing_pos = offset;
549 } else {
550 GST_DEBUG_OBJECT (queue,
551 "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
553 range = g_slice_new0 (GstQueue2Range);
554 range->offset = offset;
555 /* we want to write to the next location in the ring buffer */
556 range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
557 range->writing_pos = offset;
558 range->rb_writing_pos = range->rb_offset;
559 range->reading_pos = offset;
560 range->rb_reading_pos = range->rb_offset;
561 range->max_reading_pos = offset;
563 /* insert sorted */
564 prev = NULL;
565 next = queue->ranges;
566 while (next) {
567 if (next->offset > offset) {
568 /* insert before next */
569 GST_DEBUG_OBJECT (queue,
570 "insert before range %p, offset %" G_GUINT64_FORMAT, next,
571 next->offset);
572 break;
573 }
574 /* try next */
575 prev = next;
576 next = next->next;
577 }
578 range->next = next;
579 if (prev)
580 prev->next = range;
581 else
582 queue->ranges = range;
583 }
584 debug_ranges (queue);
586 return range;
587 }
590 /* clear and init the download ranges for offset 0 */
591 static void
592 init_ranges (GstQueue2 * queue)
593 {
594 GST_DEBUG_OBJECT (queue, "init queue ranges");
596 /* get rid of all the current ranges */
597 clean_ranges (queue);
598 /* make a range for offset 0 */
599 queue->current = add_range (queue, 0);
600 }
602 static gboolean
603 gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
604 {
605 GstQueue2 *queue;
606 GstPad *otherpad;
607 gboolean result;
609 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
611 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
612 result = gst_pad_peer_accept_caps (otherpad, caps);
614 return result;
615 }
617 static GstCaps *
618 gst_queue2_getcaps (GstPad * pad)
619 {
620 GstQueue2 *queue;
621 GstPad *otherpad;
622 GstCaps *result;
624 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
626 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
627 result = gst_pad_peer_get_caps (otherpad);
628 if (result == NULL)
629 result = gst_caps_new_any ();
631 return result;
632 }
634 static GstFlowReturn
635 gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
636 GstCaps * caps, GstBuffer ** buf)
637 {
638 GstQueue2 *queue;
639 GstFlowReturn result;
641 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
643 /* Forward to src pad, without setting caps on the src pad */
644 result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
646 return result;
647 }
649 /* calculate the diff between running time on the sink and src of the queue.
650 * This is the total amount of time in the queue. */
651 static void
652 update_time_level (GstQueue2 * queue)
653 {
654 gint64 sink_time, src_time;
656 sink_time =
657 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
658 queue->sink_segment.last_stop);
660 src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
661 queue->src_segment.last_stop);
663 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
664 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
666 if (sink_time >= src_time)
667 queue->cur_level.time = sink_time - src_time;
668 else
669 queue->cur_level.time = 0;
670 }
672 /* take a NEWSEGMENT event and apply the values to segment, updating the time
673 * level of queue. */
674 static void
675 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
676 {
677 gboolean update;
678 GstFormat format;
679 gdouble rate, arate;
680 gint64 start, stop, time;
682 gst_event_parse_new_segment_full (event, &update, &rate, &arate,
683 &format, &start, &stop, &time);
685 GST_DEBUG_OBJECT (queue,
686 "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
687 "format %d, "
688 "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
689 G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
691 if (format == GST_FORMAT_BYTES) {
692 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
693 /* start is where we'll be getting from and as such writing next */
694 queue->current = add_range (queue, start);
695 /* update the stats for this range */
696 update_cur_level (queue, queue->current);
697 }
698 }
700 /* now configure the values, we use these to track timestamps on the
701 * sinkpad. */
702 if (format != GST_FORMAT_TIME) {
703 /* non-time format, pretent the current time segment is closed with a
704 * 0 start and unknown stop time. */
705 update = FALSE;
706 format = GST_FORMAT_TIME;
707 start = 0;
708 stop = -1;
709 time = 0;
710 }
711 gst_segment_set_newsegment_full (segment, update,
712 rate, arate, format, start, stop, time);
714 GST_DEBUG_OBJECT (queue,
715 "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
717 /* segment can update the time level of the queue */
718 update_time_level (queue);
719 }
721 /* take a buffer and update segment, updating the time level of the queue. */
722 static void
723 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
724 {
725 GstClockTime duration, timestamp;
727 timestamp = GST_BUFFER_TIMESTAMP (buffer);
728 duration = GST_BUFFER_DURATION (buffer);
730 /* if no timestamp is set, assume it's continuous with the previous
731 * time */
732 if (timestamp == GST_CLOCK_TIME_NONE)
733 timestamp = segment->last_stop;
735 /* add duration */
736 if (duration != GST_CLOCK_TIME_NONE)
737 timestamp += duration;
739 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
740 GST_TIME_ARGS (timestamp));
742 gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
744 /* calc diff with other end */
745 update_time_level (queue);
746 }
748 static void
749 update_buffering (GstQueue2 * queue)
750 {
751 gint64 percent;
752 gboolean post = FALSE;
754 if (!queue->use_buffering || queue->high_percent <= 0)
755 return;
757 #define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \
758 (queue->cur_level.format) * 100 / (queue->max_level.format) : 0)
760 if (queue->is_eos) {
761 /* on EOS we are always 100% full, we set the var here so that it we can
762 * reuse the logic below to stop buffering */
763 percent = 100;
764 } else {
765 /* figure out the percent we are filled, we take the max of all formats. */
766 percent = GET_PERCENT (bytes);
767 percent = MAX (percent, GET_PERCENT (time));
768 percent = MAX (percent, GET_PERCENT (buffers));
770 /* also apply the rate estimate when we need to */
771 if (queue->use_rate_estimate)
772 percent = MAX (percent, GET_PERCENT (rate_time));
773 }
775 if (queue->is_buffering) {
776 post = TRUE;
777 /* if we were buffering see if we reached the high watermark */
778 if (percent >= queue->high_percent)
779 queue->is_buffering = FALSE;
780 } else {
781 /* we were not buffering, check if we need to start buffering if we drop
782 * below the low threshold */
783 if (percent < queue->low_percent) {
784 queue->is_buffering = TRUE;
785 queue->buffering_iteration++;
786 post = TRUE;
787 }
788 }
789 if (post) {
790 GstMessage *message;
791 GstBufferingMode mode;
792 gint64 buffering_left = -1;
794 /* scale to high percent so that it becomes the 100% mark */
795 percent = percent * 100 / queue->high_percent;
796 /* clip */
797 if (percent > 100)
798 percent = 100;
800 queue->buffering_percent = percent;
802 if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
803 GstFormat fmt = GST_FORMAT_BYTES;
804 gint64 duration;
806 mode = GST_BUFFERING_DOWNLOAD;
807 if (queue->byte_in_rate > 0) {
808 if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
809 buffering_left =
810 (gdouble) ((duration -
811 queue->current->writing_pos) * 1000) / queue->byte_in_rate;
812 } else {
813 buffering_left = G_MAXINT64;
814 }
815 } else {
816 mode = GST_BUFFERING_STREAM;
817 }
819 GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
820 message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
821 (gint) percent);
822 gst_message_set_buffering_stats (message, mode,
823 queue->byte_in_rate, queue->byte_out_rate, buffering_left);
825 gst_element_post_message (GST_ELEMENT_CAST (queue), message);
827 } else {
828 GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
829 }
831 #undef GET_PERCENT
832 }
834 static void
835 reset_rate_timer (GstQueue2 * queue)
836 {
837 queue->bytes_in = 0;
838 queue->bytes_out = 0;
839 queue->byte_in_rate = 0.0;
840 queue->byte_out_rate = 0.0;
841 queue->last_in_elapsed = 0.0;
842 queue->last_out_elapsed = 0.0;
843 queue->in_timer_started = FALSE;
844 queue->out_timer_started = FALSE;
845 }
847 /* the interval in seconds to recalculate the rate */
848 #define RATE_INTERVAL 0.2
849 /* Tuning for rate estimation. We use a large window for the input rate because
850 * it should be stable when connected to a network. The output rate is less
851 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
852 * therefore adapt more quickly. */
853 #define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0
854 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
856 static void
857 update_in_rates (GstQueue2 * queue)
858 {
859 gdouble elapsed, period;
860 gdouble byte_in_rate;
862 if (!queue->in_timer_started) {
863 queue->in_timer_started = TRUE;
864 g_timer_start (queue->in_timer);
865 return;
866 }
868 elapsed = g_timer_elapsed (queue->in_timer, NULL);
870 /* recalc after each interval. */
871 if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
872 period = elapsed - queue->last_in_elapsed;
874 GST_DEBUG_OBJECT (queue,
875 "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
877 byte_in_rate = queue->bytes_in / period;
879 if (queue->byte_in_rate == 0.0)
880 queue->byte_in_rate = byte_in_rate;
881 else
882 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
884 /* reset the values to calculate rate over the next interval */
885 queue->last_in_elapsed = elapsed;
886 queue->bytes_in = 0;
887 }
889 if (queue->byte_in_rate > 0.0) {
890 queue->cur_level.rate_time =
891 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
892 }
893 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
894 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
895 }
897 static void
898 update_out_rates (GstQueue2 * queue)
899 {
900 gdouble elapsed, period;
901 gdouble byte_out_rate;
903 if (!queue->out_timer_started) {
904 queue->out_timer_started = TRUE;
905 g_timer_start (queue->out_timer);
906 return;
907 }
909 elapsed = g_timer_elapsed (queue->out_timer, NULL);
911 /* recalc after each interval. */
912 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
913 period = elapsed - queue->last_out_elapsed;
915 GST_DEBUG_OBJECT (queue,
916 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
918 byte_out_rate = queue->bytes_out / period;
920 if (queue->byte_out_rate == 0.0)
921 queue->byte_out_rate = byte_out_rate;
922 else
923 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
925 /* reset the values to calculate rate over the next interval */
926 queue->last_out_elapsed = elapsed;
927 queue->bytes_out = 0;
928 }
929 if (queue->byte_in_rate > 0.0) {
930 queue->cur_level.rate_time =
931 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
932 }
933 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
934 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
935 }
937 static void
938 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
939 {
940 guint64 max_reading_pos, writing_pos;
942 writing_pos = range->writing_pos;
943 max_reading_pos = range->max_reading_pos;
945 if (writing_pos > max_reading_pos)
946 queue->cur_level.bytes = writing_pos - max_reading_pos;
947 else
948 queue->cur_level.bytes = 0;
949 }
951 #ifdef HAVE_FSEEKO
952 #define FSEEK_FILE(file, offset) (fseeko (file, (off_t) offset, SEEK_SET))
953 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
954 #define FSEEK_FILE(file, offset) (lseek (fileno (file), (off_t) offset, SEEK_SET))
955 #else
956 #define FSEEK_FILE(file, offset) (fseek (file, offset, SEEK_SET))
957 #endif
959 static gboolean
960 gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
961 {
962 guint size;
963 guint8 *data;
964 guint64 writing_pos, max_reading_pos;
965 GstQueue2Range *next;
967 writing_pos = queue->current->writing_pos;
968 max_reading_pos = queue->current->max_reading_pos;
970 FSEEK_FILE (queue->temp_file, writing_pos);
972 data = GST_BUFFER_DATA (buffer);
973 size = GST_BUFFER_SIZE (buffer);
975 if (fwrite (data, size, 1, queue->temp_file) != 1)
976 goto handle_error;
978 writing_pos += size;
980 GST_INFO_OBJECT (queue,
981 "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
982 writing_pos, max_reading_pos);
984 if (writing_pos > max_reading_pos)
985 queue->cur_level.bytes = writing_pos - max_reading_pos;
986 else
987 queue->cur_level.bytes = 0;
989 /* try to merge with next range */
990 while ((next = queue->current->next)) {
991 GST_INFO_OBJECT (queue,
992 "checking merge with next range %" G_GUINT64_FORMAT " < %"
993 G_GUINT64_FORMAT, writing_pos, next->offset);
994 if (writing_pos < next->offset)
995 break;
997 GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
998 next->writing_pos);
1000 /* remove the group, we could choose to not read the data in this range
1001 * again. This would involve us doing a seek to the current writing position
1002 * in the range. FIXME, It would probably make sense to do a seek when there
1003 * is a lot of data in the range we merged with to avoid reading it all
1004 * again. */
1005 queue->current->next = next->next;
1006 g_slice_free (GstQueue2Range, next);
1008 debug_ranges (queue);
1009 }
1010 queue->current->writing_pos = writing_pos;
1012 return TRUE;
1014 /* ERRORS */
1015 handle_error:
1016 {
1017 switch (errno) {
1018 case ENOSPC:{
1019 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1020 break;
1021 }
1022 default:{
1023 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1024 (_("Error while writing to download file.")),
1025 ("%s", g_strerror (errno)));
1026 }
1027 }
1028 return FALSE;
1029 }
1030 }
1032 static void
1033 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1034 {
1035 guint64 reading_pos, max_reading_pos;
1037 reading_pos = pos;
1038 max_reading_pos = range->max_reading_pos;
1040 max_reading_pos = MAX (max_reading_pos, reading_pos);
1042 range->max_reading_pos = max_reading_pos;
1044 update_cur_level (queue, range);
1045 }
1047 static gboolean
1048 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1049 {
1050 GstEvent *event;
1051 gboolean res;
1053 GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1055 event =
1056 gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1057 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1058 GST_SEEK_TYPE_NONE, -1);
1060 GST_QUEUE2_MUTEX_UNLOCK (queue);
1061 res = gst_pad_push_event (queue->sinkpad, event);
1062 GST_QUEUE2_MUTEX_LOCK (queue);
1064 return res;
1065 }
1067 /* see if there is enough data in the file to read a full buffer */
1068 static gboolean
1069 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1070 {
1071 GstQueue2Range *range;
1073 GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1074 offset, length);
1076 if ((range = find_range (queue, offset, length))) {
1077 if (queue->current != range) {
1078 GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1079 perform_seek_to_offset (queue, range->writing_pos);
1080 }
1082 /* update the current reading position in the range */
1083 update_cur_pos (queue, queue->current, offset + length);
1085 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
1086 queue->cur_level.bytes, MIN (queue->max_level.bytes,
1087 queue->ring_buffer_max_size));
1089 /* we have a range for offset */
1090 GST_DEBUG_OBJECT (queue,
1091 "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1092 G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1094 if (queue->is_eos)
1095 return TRUE;
1097 if (offset + length < range->writing_pos)
1098 return TRUE;
1100 } else {
1101 GST_INFO_OBJECT (queue, "not found in any range");
1102 /* we don't have the range, see how far away we are, FIXME, find a good
1103 * threshold based on the incomming rate. */
1104 if (!queue->is_eos && queue->current) {
1105 if (offset < queue->current->writing_pos + 200000) {
1106 update_cur_pos (queue, queue->current, offset + length);
1107 GST_INFO_OBJECT (queue, "wait for data");
1108 return FALSE;
1109 }
1110 }
1112 /* too far away, do a seek */
1113 perform_seek_to_offset (queue, offset);
1114 }
1116 return FALSE;
1117 }
1119 static GstFlowReturn
1120 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1121 guint8 * dst)
1122 {
1123 size_t res;
1125 #ifdef HAVE_FSEEKO
1126 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
1127 goto seek_failed;
1128 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1129 if (lseek (fileno (queue->temp_file), (off_t) offset,
1130 SEEK_SET) == (off_t) - 1)
1131 goto seek_failed;
1132 #else
1133 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
1134 goto seek_failed;
1135 #endif
1137 /* this should not block */
1138 GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1139 length, offset);
1140 res = fread (dst, 1, length, queue->temp_file);
1141 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1143 if (G_UNLIKELY (res < length)) {
1144 /* check for errors or EOF */
1145 if (ferror (queue->temp_file))
1146 goto could_not_read;
1147 if (feof (queue->temp_file) && length > 0)
1148 goto eos;
1149 }
1151 return GST_FLOW_OK;
1153 seek_failed:
1154 {
1155 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1156 return GST_FLOW_ERROR;
1157 }
1158 could_not_read:
1159 {
1160 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1161 return GST_FLOW_ERROR;
1162 }
1163 eos:
1164 {
1165 GST_DEBUG ("non-regular file hits EOS");
1166 return GST_FLOW_UNEXPECTED;
1167 }
1168 }
1170 static GstFlowReturn
1171 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1172 GstBuffer ** buffer)
1173 {
1174 GstFlowReturn flow_ret;
1175 GstBuffer *buf;
1176 guint8 *data;
1177 guint64 file_offset;
1178 guint block_length;
1180 /* check if we have enough data at @offset. If there is not enough data, we
1181 * block and wait. */
1182 while (!gst_queue2_have_data (queue, offset, length)) {
1183 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1184 }
1186 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1187 file_offset =
1188 (queue->current->rb_offset + (offset -
1189 queue->current->offset)) % queue->ring_buffer_max_size;
1190 if (file_offset + length > queue->ring_buffer_max_size) {
1191 block_length = queue->ring_buffer_max_size - file_offset;
1192 } else {
1193 block_length = length;
1194 }
1195 } else {
1196 file_offset = offset;
1197 block_length = length;
1198 }
1200 buf = gst_buffer_new_and_alloc (length);
1201 data = GST_BUFFER_DATA (buf);
1203 if ((flow_ret =
1204 gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1205 data)) != GST_FLOW_OK) {
1206 gst_buffer_unref (buf);
1207 return flow_ret;
1208 }
1210 if (block_length < length) {
1211 /* read second block into a second buffer, then merge the two */
1212 data += block_length;
1213 block_length = length - block_length;
1215 if ((flow_ret =
1216 gst_queue2_read_data_at_offset (queue, 0, block_length,
1217 data)) != GST_FLOW_OK) {
1218 gst_buffer_unref (buf);
1219 return flow_ret;
1220 }
1221 }
1223 GST_BUFFER_SIZE (buf) = length;
1224 GST_BUFFER_OFFSET (buf) = offset;
1225 GST_BUFFER_OFFSET_END (buf) = offset + length;
1227 *buffer = buf;
1229 return GST_FLOW_OK;
1231 /* ERRORS */
1232 out_flushing:
1233 {
1234 GST_DEBUG_OBJECT (queue, "we are flushing");
1235 return GST_FLOW_WRONG_STATE;
1236 }
1237 }
1239 /* should be called with QUEUE_LOCK */
1240 static GstMiniObject *
1241 gst_queue2_read_item_from_file (GstQueue2 * queue)
1242 {
1243 GstMiniObject *item;
1245 if (queue->starting_segment != NULL) {
1246 item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1247 queue->starting_segment = NULL;
1248 } else {
1249 GstFlowReturn ret;
1250 GstBuffer *buffer;
1251 guint64 reading_pos;
1253 reading_pos = queue->current->reading_pos;
1255 ret =
1256 gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1257 &buffer);
1258 switch (ret) {
1259 case GST_FLOW_OK:
1260 item = GST_MINI_OBJECT_CAST (buffer);
1261 queue->current->reading_pos += DEFAULT_BUFFER_SIZE;
1262 if (QUEUE_IS_USING_RING_BUFFER (queue))
1263 queue->current->rb_reading_pos =
1264 (queue->current->rb_reading_pos +
1265 DEFAULT_BUFFER_SIZE) % queue->ring_buffer_max_size;
1266 break;
1267 case GST_FLOW_UNEXPECTED:
1268 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1269 break;
1270 default:
1271 item = NULL;
1272 break;
1273 }
1274 }
1275 return item;
1276 }
1278 static gboolean
1279 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1280 {
1281 gint fd = -1;
1282 gchar *name = NULL;
1284 if (queue->temp_file)
1285 goto already_opened;
1287 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1289 /* we have two cases:
1290 * - temp_location was set to something !NULL (Deprecated). in this case we
1291 * open the specified filename.
1292 * - temp_template was set, allocate a filename and open that filename
1293 */
1294 if (!queue->temp_location_set) {
1295 /* nothing to do */
1296 if (queue->temp_template == NULL)
1297 goto no_directory;
1299 /* make copy of the template, we don't want to change this */
1300 name = g_strdup (queue->temp_template);
1301 fd = g_mkstemp (name);
1302 if (fd == -1)
1303 goto mkstemp_failed;
1305 /* open the file for update/writing */
1306 queue->temp_file = fdopen (fd, "wb+");
1307 /* error creating file */
1308 if (queue->temp_file == NULL)
1309 goto open_failed;
1311 g_free (queue->temp_location);
1312 queue->temp_location = name;
1314 g_object_notify (G_OBJECT (queue), "temp-location");
1315 } else {
1316 /* open the file for update/writing, this is deprecated but we still need to
1317 * support it for API/ABI compatibility */
1318 queue->temp_file = g_fopen (queue->temp_location, "wb+");
1319 /* error creating file */
1320 if (queue->temp_file == NULL)
1321 goto open_failed;
1322 }
1323 GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1325 init_ranges (queue);
1327 return TRUE;
1329 /* ERRORS */
1330 already_opened:
1331 {
1332 GST_DEBUG_OBJECT (queue, "temp file was already open");
1333 return TRUE;
1334 }
1335 no_directory:
1336 {
1337 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1338 (_("No Temp directory specified.")), (NULL));
1339 return FALSE;
1340 }
1341 mkstemp_failed:
1342 {
1343 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1344 (_("Could not create temp file \"%s\"."), queue->temp_template),
1345 GST_ERROR_SYSTEM);
1346 g_free (name);
1347 return FALSE;
1348 }
1349 open_failed:
1350 {
1351 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1352 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1353 g_free (name);
1354 if (fd != -1)
1355 close (fd);
1356 return FALSE;
1357 }
1358 }
1360 static void
1361 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1362 {
1363 /* nothing to do */
1364 if (queue->temp_file == NULL)
1365 return;
1367 GST_DEBUG_OBJECT (queue, "closing temp file");
1369 fflush (queue->temp_file);
1370 fclose (queue->temp_file);
1372 if (queue->temp_remove)
1373 remove (queue->temp_location);
1375 queue->temp_file = NULL;
1376 clean_ranges (queue);
1377 }
1379 static void
1380 gst_queue2_flush_temp_file (GstQueue2 * queue)
1381 {
1382 if (queue->temp_file == NULL)
1383 return;
1385 GST_DEBUG_OBJECT (queue, "flushing temp file");
1387 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1389 init_ranges (queue);
1390 }
1392 static void
1393 gst_queue2_locked_flush (GstQueue2 * queue)
1394 {
1395 if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
1396 gst_queue2_flush_temp_file (queue);
1397 } else {
1398 while (!g_queue_is_empty (queue->queue)) {
1399 GstMiniObject *data = g_queue_pop_head (queue->queue);
1401 /* Then lose another reference because we are supposed to destroy that
1402 data when flushing */
1403 gst_mini_object_unref (data);
1404 }
1405 }
1406 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1407 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1408 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1409 if (queue->starting_segment != NULL)
1410 gst_event_unref (queue->starting_segment);
1411 queue->starting_segment = NULL;
1412 queue->segment_event_received = FALSE;
1414 /* we deleted a lot of something */
1415 GST_QUEUE2_SIGNAL_DEL (queue);
1416 }
1418 static gboolean
1419 gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
1420 {
1421 GstBuffer *buf, *rem;
1422 guint buf_size, rem_size;
1423 const guint rb_size = queue->ring_buffer_max_size;
1424 guint8 *data;
1425 guint64 writing_pos, reading_pos, new_writing_pos;
1426 gint64 space;
1427 GstQueue2Range *range, *prev;
1429 writing_pos = queue->current->rb_writing_pos;
1430 reading_pos = queue->current->rb_reading_pos;
1432 rem = buffer;
1434 /* loop if we can't write the whole buffer at once */
1435 do {
1436 /* calculate the space in the ring buffer not used by data from the
1437 * current range */
1438 space =
1439 MIN (queue->max_level.bytes,
1440 queue->ring_buffer_max_size) - queue->cur_level.bytes;
1442 rem_size = GST_BUFFER_SIZE (rem);
1443 /* don't try to process 0 size buffers */
1444 if (!rem_size)
1445 break;
1447 /* calculate if we need to split or if we can write the entire buffer now */
1448 if (rem_size > space) {
1449 buf_size = space;
1450 buf = gst_buffer_create_sub (rem, 0, space);
1452 rem_size -= space;
1453 rem = gst_buffer_create_sub (rem, space, rem_size);
1454 space = 0;
1455 } else {
1456 buf_size = rem_size;
1457 buf = rem;
1459 rem_size = 0;
1460 rem = NULL;
1461 space -= buf_size;
1462 }
1464 data = GST_BUFFER_DATA (buf);
1466 /* the writing position in the ring buffer after writing (part or all of)
1467 * the buffer */
1468 new_writing_pos = (writing_pos + buf_size) % rb_size;
1470 prev = NULL;
1471 range = queue->ranges;
1473 /* if we need to overwrite data in the ring buffer, we need to update the
1474 * ranges
1475 * warning: this code is complicated and includes some simplifications -
1476 * pen, paper and diagrams for the cases recommended! */
1477 while (range) {
1478 guint64 range_data_start, range_data_end;
1479 GstQueue2Range *range_to_destroy = NULL;
1481 /* we don't edit the current range here */
1482 if (range == queue->current)
1483 goto next_range;
1485 range_data_start = range->rb_offset;
1486 range_data_end = range->rb_writing_pos;
1488 if (range_data_end > range_data_start) {
1489 if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
1490 goto next_range;
1492 if (new_writing_pos > range_data_start) {
1493 if (new_writing_pos >= range_data_end) {
1494 /* remove range */
1495 range_to_destroy = range;
1496 if (prev)
1497 prev->next = range->next;
1498 } else {
1499 range->offset += (new_writing_pos - range_data_start);
1500 range->rb_offset = new_writing_pos;
1501 }
1502 }
1503 } else {
1504 guint64 new_wpos_virt = writing_pos + buf_size;
1506 if (new_wpos_virt <= range_data_start)
1507 goto next_range;
1509 if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
1510 /* remove range */
1511 range_to_destroy = range;
1512 if (prev)
1513 prev->next = range->next;
1514 } else {
1515 range->offset += (new_wpos_virt - range_data_start);
1516 range->rb_offset = new_writing_pos;
1517 }
1518 }
1520 next_range:
1521 if (!range_to_destroy)
1522 prev = range;
1524 range = range->next;
1525 if (range_to_destroy) {
1526 if (range_to_destroy == queue->ranges)
1527 queue->ranges = range;
1528 g_slice_free1 (sizeof (GstQueue2Range), range_to_destroy);
1529 range_to_destroy = NULL;
1530 }
1531 }
1533 FSEEK_FILE (queue->temp_file, writing_pos);
1535 if (new_writing_pos > writing_pos) {
1536 /* no wrapping, just write */
1537 if (fwrite (data, buf_size, 1, queue->temp_file) != 1)
1538 goto handle_error;
1539 } else {
1540 /* wrapping */
1541 guint block_one, block_two;
1543 block_one = rb_size - writing_pos;
1544 block_two = buf_size - block_one;
1546 /* write data to end of ring buffer */
1547 if (fwrite (data, block_one, 1, queue->temp_file) != 1)
1548 goto handle_error;
1550 FSEEK_FILE (queue->temp_file, 0);
1552 data += block_one;
1553 if (fwrite (data, block_two, 1, queue->temp_file) != 1)
1554 goto handle_error;
1555 }
1557 /* update the writing positions */
1558 GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT, buf_size,
1559 writing_pos);
1560 queue->current->writing_pos += buf_size;
1561 queue->current->rb_writing_pos = writing_pos = new_writing_pos;
1563 update_cur_level (queue, queue->current);
1564 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
1565 queue->cur_level.bytes, MIN (queue->max_level.bytes,
1566 queue->ring_buffer_max_size));
1568 /* if we have a remainder of the buffer data, wait until there's space to
1569 * write before looping */
1570 if (rem_size) {
1571 gboolean started;
1573 /* pause the timer while we wait. The fact that we are waiting does not mean
1574 * the byterate on the input pad is lower */
1575 if ((started = queue->in_timer_started))
1576 g_timer_stop (queue->in_timer);
1578 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1579 "queue is full, waiting for free space");
1580 while (gst_queue2_is_filled (queue)) {
1581 /* Wait for space to be available, we could be unlocked because of a flush. */
1582 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1583 }
1585 /* and continue if we were running before */
1586 if (started)
1587 g_timer_continue (queue->in_timer);
1588 }
1589 } while (rem_size);
1591 return TRUE;
1593 /* ERRORS */
1594 out_flushing:
1595 {
1596 GST_DEBUG_OBJECT (queue, "we are flushing");
1597 /* FIXME - GST_FLOW_UNEXPECTED ? */
1598 return FALSE;
1599 }
1600 handle_error:
1601 {
1602 switch (errno) {
1603 case ENOSPC:{
1604 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
1605 break;
1606 }
1607 default:{
1608 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
1609 (_("Error while writing to download file.")),
1610 ("%s", g_strerror (errno)));
1611 }
1612 }
1613 return FALSE;
1614 }
1615 }
1617 /* enqueue an item an update the level stats */
1618 static void
1619 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
1620 {
1621 if (GST_IS_BUFFER (item)) {
1622 GstBuffer *buffer;
1623 guint size;
1625 buffer = GST_BUFFER_CAST (item);
1626 size = GST_BUFFER_SIZE (buffer);
1628 /* add buffer to the statistics */
1629 if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1630 || QUEUE_IS_USING_RING_BUFFER (queue))) {
1631 queue->cur_level.buffers++;
1632 queue->cur_level.bytes += size;
1633 }
1634 queue->bytes_in += size;
1636 /* apply new buffer to segment stats */
1637 apply_buffer (queue, buffer, &queue->sink_segment);
1638 /* update the byterate stats */
1639 update_in_rates (queue);
1641 /* FIXME - check return values? */
1642 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1643 gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
1644 } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1645 gst_queue2_write_buffer_to_file (queue, buffer);
1646 }
1648 } else if (GST_IS_EVENT (item)) {
1649 GstEvent *event;
1651 event = GST_EVENT_CAST (item);
1653 switch (GST_EVENT_TYPE (event)) {
1654 case GST_EVENT_EOS:
1655 /* Zero the thresholds, this makes sure the queue is completely
1656 * filled and we can read all data from the queue. */
1657 GST_DEBUG_OBJECT (queue, "we have EOS");
1658 queue->is_eos = TRUE;
1659 break;
1660 case GST_EVENT_NEWSEGMENT:
1661 apply_segment (queue, event, &queue->sink_segment);
1662 /* This is our first new segment, we hold it
1663 * as we can't save it on the temp file */
1664 if (QUEUE_IS_USING_RING_BUFFER (queue)
1665 || QUEUE_IS_USING_TEMP_FILE (queue)) {
1666 if (queue->segment_event_received)
1667 goto unexpected_event;
1669 queue->segment_event_received = TRUE;
1670 if (queue->starting_segment != NULL)
1671 gst_event_unref (queue->starting_segment);
1672 queue->starting_segment = event;
1673 item = NULL;
1674 }
1675 /* a new segment allows us to accept more buffers if we got UNEXPECTED
1676 * from downstream */
1677 queue->unexpected = FALSE;
1678 break;
1679 default:
1680 if (QUEUE_IS_USING_RING_BUFFER (queue)
1681 || QUEUE_IS_USING_TEMP_FILE (queue))
1682 goto unexpected_event;
1683 break;
1684 }
1685 } else {
1686 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
1687 item, GST_OBJECT_NAME (queue));
1688 /* we can't really unref since we don't know what it is */
1689 item = NULL;
1690 }
1692 if (item) {
1693 /* update the buffering status */
1694 update_buffering (queue);
1696 if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1697 || QUEUE_IS_USING_RING_BUFFER (queue)))
1698 g_queue_push_tail (queue->queue, item);
1699 else
1700 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
1702 GST_QUEUE2_SIGNAL_ADD (queue);
1703 }
1705 return;
1707 /* ERRORS */
1708 unexpected_event:
1709 {
1710 g_warning
1711 ("Unexpected event of kind %s can't be added in temp file of queue %s ",
1712 gst_event_type_get_name (GST_EVENT_TYPE (item)),
1713 GST_OBJECT_NAME (queue));
1714 gst_event_unref (GST_EVENT_CAST (item));
1715 return;
1716 }
1717 }
1719 /* dequeue an item from the queue and update level stats */
1720 static GstMiniObject *
1721 gst_queue2_locked_dequeue (GstQueue2 * queue)
1722 {
1723 GstMiniObject *item;
1725 if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
1726 item = gst_queue2_read_item_from_file (queue);
1727 else
1728 item = g_queue_pop_head (queue->queue);
1730 if (item == NULL)
1731 goto no_item;
1733 if (GST_IS_BUFFER (item)) {
1734 GstBuffer *buffer;
1735 guint size;
1737 buffer = GST_BUFFER_CAST (item);
1738 size = GST_BUFFER_SIZE (buffer);
1740 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1741 "retrieved buffer %p from queue", buffer);
1743 if (!(QUEUE_IS_USING_TEMP_FILE (queue)
1744 || QUEUE_IS_USING_RING_BUFFER (queue))) {
1745 queue->cur_level.buffers--;
1746 queue->cur_level.bytes -= size;
1747 }
1748 queue->bytes_out += size;
1750 apply_buffer (queue, buffer, &queue->src_segment);
1751 /* update the byterate stats */
1752 update_out_rates (queue);
1753 /* update the buffering */
1754 update_buffering (queue);
1756 } else if (GST_IS_EVENT (item)) {
1757 GstEvent *event = GST_EVENT_CAST (item);
1759 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1760 "retrieved event %p from queue", event);
1762 switch (GST_EVENT_TYPE (event)) {
1763 case GST_EVENT_EOS:
1764 /* queue is empty now that we dequeued the EOS */
1765 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1766 break;
1767 case GST_EVENT_NEWSEGMENT:
1768 apply_segment (queue, event, &queue->src_segment);
1769 break;
1770 default:
1771 break;
1772 }
1773 } else {
1774 g_warning
1775 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
1776 item, GST_OBJECT_NAME (queue));
1777 item = NULL;
1778 }
1779 GST_QUEUE2_SIGNAL_DEL (queue);
1781 return item;
1783 /* ERRORS */
1784 no_item:
1785 {
1786 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
1787 return NULL;
1788 }
1789 }
1791 static gboolean
1792 gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
1793 {
1794 GstQueue2 *queue;
1796 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1798 switch (GST_EVENT_TYPE (event)) {
1799 case GST_EVENT_FLUSH_START:
1800 {
1801 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
1802 if (!(QUEUE_IS_USING_RING_BUFFER (queue)
1803 || QUEUE_IS_USING_TEMP_FILE (queue))) {
1804 /* forward event */
1805 gst_pad_push_event (queue->srcpad, event);
1807 /* now unblock the chain function */
1808 GST_QUEUE2_MUTEX_LOCK (queue);
1809 queue->srcresult = GST_FLOW_WRONG_STATE;
1810 queue->sinkresult = GST_FLOW_WRONG_STATE;
1811 /* unblock the loop and chain functions */
1812 GST_QUEUE2_SIGNAL_ADD (queue);
1813 GST_QUEUE2_SIGNAL_DEL (queue);
1814 GST_QUEUE2_MUTEX_UNLOCK (queue);
1816 /* make sure it pauses, this should happen since we sent
1817 * flush_start downstream. */
1818 gst_pad_pause_task (queue->srcpad);
1819 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
1820 }
1821 goto done;
1822 }
1823 case GST_EVENT_FLUSH_STOP:
1824 {
1825 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
1827 if (!(QUEUE_IS_USING_RING_BUFFER (queue)
1828 || QUEUE_IS_USING_TEMP_FILE (queue))) {
1829 /* forward event */
1830 gst_pad_push_event (queue->srcpad, event);
1832 GST_QUEUE2_MUTEX_LOCK (queue);
1833 gst_queue2_locked_flush (queue);
1834 queue->srcresult = GST_FLOW_OK;
1835 queue->sinkresult = GST_FLOW_OK;
1836 queue->is_eos = FALSE;
1837 queue->unexpected = FALSE;
1838 /* reset rate counters */
1839 reset_rate_timer (queue);
1840 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
1841 queue->srcpad);
1842 GST_QUEUE2_MUTEX_UNLOCK (queue);
1843 } else {
1844 GST_QUEUE2_MUTEX_LOCK (queue);
1845 queue->segment_event_received = FALSE;
1846 queue->is_eos = FALSE;
1847 queue->unexpected = FALSE;
1848 GST_QUEUE2_MUTEX_UNLOCK (queue);
1849 }
1850 goto done;
1851 }
1852 default:
1853 if (GST_EVENT_IS_SERIALIZED (event)) {
1854 /* serialized events go in the queue */
1855 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
1856 /* refuse more events on EOS */
1857 if (queue->is_eos)
1858 goto out_eos;
1859 gst_queue2_locked_enqueue (queue, event);
1860 GST_QUEUE2_MUTEX_UNLOCK (queue);
1861 } else {
1862 /* non-serialized events are passed upstream. */
1863 gst_pad_push_event (queue->srcpad, event);
1864 }
1865 break;
1866 }
1867 done:
1868 return TRUE;
1870 /* ERRORS */
1871 out_flushing:
1872 {
1873 GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
1874 GST_QUEUE2_MUTEX_UNLOCK (queue);
1875 gst_event_unref (event);
1876 return FALSE;
1877 }
1878 out_eos:
1879 {
1880 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
1881 GST_QUEUE2_MUTEX_UNLOCK (queue);
1882 gst_event_unref (event);
1883 return FALSE;
1884 }
1885 }
1887 static gboolean
1888 gst_queue2_is_empty (GstQueue2 * queue)
1889 {
1890 /* never empty on EOS */
1891 if (queue->is_eos)
1892 return FALSE;
1894 if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
1895 return queue->current->writing_pos <= queue->current->max_reading_pos;
1896 } else {
1897 if (queue->queue->length == 0)
1898 return TRUE;
1899 }
1901 return FALSE;
1902 }
1904 static gboolean
1905 gst_queue2_is_filled (GstQueue2 * queue)
1906 {
1907 gboolean res;
1909 /* always filled on EOS */
1910 if (queue->is_eos)
1911 return TRUE;
1913 /* if using a ring buffer we're filled if all ring buffer space is used
1914 * _by the current range_ */
1915 if (QUEUE_IS_USING_RING_BUFFER (queue))
1916 return queue->cur_level.bytes >= MIN (queue->max_level.bytes,
1917 queue->ring_buffer_max_size);
1919 /* if using file, we're never filled if we don't have EOS */
1920 if (QUEUE_IS_USING_TEMP_FILE (queue))
1921 return FALSE;
1923 /* we are never filled when we have no buffers at all */
1924 if (queue->cur_level.buffers == 0)
1925 return FALSE;
1927 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
1928 (queue->cur_level.format) >= (queue->max_level.format))
1930 /* we are filled if one of the current levels exceeds the max */
1931 res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time);
1933 /* if we need to, use the rate estimate to check against the max time we are
1934 * allowed to queue */
1935 if (queue->use_rate_estimate)
1936 res |= CHECK_FILLED (rate_time);
1938 #undef CHECK_FILLED
1939 return res;
1940 }
1942 static GstFlowReturn
1943 gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
1944 {
1945 GstQueue2 *queue;
1947 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1949 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1950 "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
1951 GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
1952 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1953 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1955 /* we have to lock the queue since we span threads */
1956 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
1957 /* when we received EOS, we refuse more data */
1958 if (queue->is_eos)
1959 goto out_eos;
1960 /* when we received unexpected from downstream, refuse more buffers */
1961 if (queue->unexpected)
1962 goto out_unexpected;
1964 /* We make space available if we're "full" according to whatever
1965 * the user defined as "full". */
1966 if (gst_queue2_is_filled (queue)) {
1967 gboolean started;
1969 /* pause the timer while we wait. The fact that we are waiting does not mean
1970 * the byterate on the input pad is lower */
1971 if ((started = queue->in_timer_started))
1972 g_timer_stop (queue->in_timer);
1974 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1975 "queue is full, waiting for free space");
1976 do {
1977 /* Wait for space to be available, we could be unlocked because of a flush. */
1978 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1979 }
1980 while (gst_queue2_is_filled (queue));
1982 /* and continue if we were running before */
1983 if (started)
1984 g_timer_continue (queue->in_timer);
1985 }
1987 /* put buffer in queue now */
1988 gst_queue2_locked_enqueue (queue, buffer);
1989 GST_QUEUE2_MUTEX_UNLOCK (queue);
1991 return GST_FLOW_OK;
1993 /* special conditions */
1994 out_flushing:
1995 {
1996 GstFlowReturn ret = queue->sinkresult;
1998 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1999 "exit because task paused, reason: %s", gst_flow_get_name (ret));
2000 GST_QUEUE2_MUTEX_UNLOCK (queue);
2001 gst_buffer_unref (buffer);
2003 return ret;
2004 }
2005 out_eos:
2006 {
2007 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2008 GST_QUEUE2_MUTEX_UNLOCK (queue);
2009 gst_buffer_unref (buffer);
2011 return GST_FLOW_UNEXPECTED;
2012 }
2013 out_unexpected:
2014 {
2015 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2016 "exit because we received UNEXPECTED");
2017 GST_QUEUE2_MUTEX_UNLOCK (queue);
2018 gst_buffer_unref (buffer);
2020 return GST_FLOW_UNEXPECTED;
2021 }
2022 }
2024 /* dequeue an item from the queue an push it downstream. This functions returns
2025 * the result of the push. */
2026 static GstFlowReturn
2027 gst_queue2_push_one (GstQueue2 * queue)
2028 {
2029 GstFlowReturn result = GST_FLOW_OK;
2030 GstMiniObject *data;
2032 data = gst_queue2_locked_dequeue (queue);
2033 if (data == NULL)
2034 goto no_item;
2036 next:
2037 if (GST_IS_BUFFER (data)) {
2038 GstBuffer *buffer;
2039 GstCaps *caps;
2041 buffer = GST_BUFFER_CAST (data);
2042 caps = GST_BUFFER_CAPS (buffer);
2044 GST_QUEUE2_MUTEX_UNLOCK (queue);
2046 /* set caps before pushing the buffer so that core does not try to do
2047 * something fancy to check if this is possible. */
2048 if (caps && caps != GST_PAD_CAPS (queue->srcpad))
2049 gst_pad_set_caps (queue->srcpad, caps);
2051 result = gst_pad_push (queue->srcpad, buffer);
2053 /* need to check for srcresult here as well */
2054 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2055 if (result == GST_FLOW_UNEXPECTED) {
2056 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2057 "got UNEXPECTED from downstream");
2058 /* stop pushing buffers, we dequeue all items until we see an item that we
2059 * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
2060 * queue we can push, we set a flag to make the sinkpad refuse more
2061 * buffers with an UNEXPECTED return value until we receive something
2062 * pushable again or we get flushed. */
2063 while ((data = gst_queue2_locked_dequeue (queue))) {
2064 if (GST_IS_BUFFER (data)) {
2065 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2066 "dropping UNEXPECTED buffer %p", data);
2067 gst_buffer_unref (GST_BUFFER_CAST (data));
2068 } else if (GST_IS_EVENT (data)) {
2069 GstEvent *event = GST_EVENT_CAST (data);
2070 GstEventType type = GST_EVENT_TYPE (event);
2072 if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
2073 /* we found a pushable item in the queue, push it out */
2074 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2075 "pushing pushable event %s after UNEXPECTED",
2076 GST_EVENT_TYPE_NAME (event));
2077 goto next;
2078 }
2079 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2080 "dropping UNEXPECTED event %p", event);
2081 gst_event_unref (event);
2082 }
2083 }
2084 /* no more items in the queue. Set the unexpected flag so that upstream
2085 * make us refuse any more buffers on the sinkpad. Since we will still
2086 * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
2087 * task function does not shut down. */
2088 queue->unexpected = TRUE;
2089 result = GST_FLOW_OK;
2090 }
2091 } else if (GST_IS_EVENT (data)) {
2092 GstEvent *event = GST_EVENT_CAST (data);
2093 GstEventType type = GST_EVENT_TYPE (event);
2095 GST_QUEUE2_MUTEX_UNLOCK (queue);
2097 gst_pad_push_event (queue->srcpad, event);
2099 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2100 /* if we're EOS, return UNEXPECTED so that the task pauses. */
2101 if (type == GST_EVENT_EOS) {
2102 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2103 "pushed EOS event %p, return UNEXPECTED", event);
2104 result = GST_FLOW_UNEXPECTED;
2105 }
2106 }
2107 return result;
2109 /* ERRORS */
2110 no_item:
2111 {
2112 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2113 "exit because we have no item in the queue");
2114 return GST_FLOW_ERROR;
2115 }
2116 out_flushing:
2117 {
2118 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
2119 return GST_FLOW_WRONG_STATE;
2120 }
2121 }
2123 /* called repeadedly with @pad as the source pad. This function should push out
2124 * data to the peer element. */
2125 static void
2126 gst_queue2_loop (GstPad * pad)
2127 {
2128 GstQueue2 *queue;
2129 GstFlowReturn ret;
2131 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2133 /* have to lock for thread-safety */
2134 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2136 if (gst_queue2_is_empty (queue)) {
2137 gboolean started;
2139 /* pause the timer while we wait. The fact that we are waiting does not mean
2140 * the byterate on the output pad is lower */
2141 if ((started = queue->out_timer_started))
2142 g_timer_stop (queue->out_timer);
2144 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
2145 "queue is empty, waiting for new data");
2146 do {
2147 /* Wait for data to be available, we could be unlocked because of a flush. */
2148 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
2149 }
2150 while (gst_queue2_is_empty (queue));
2152 /* and continue if we were running before */
2153 if (started)
2154 g_timer_continue (queue->out_timer);
2155 }
2156 ret = gst_queue2_push_one (queue);
2157 queue->srcresult = ret;
2158 queue->sinkresult = ret;
2159 if (ret != GST_FLOW_OK)
2160 goto out_flushing;
2162 GST_QUEUE2_MUTEX_UNLOCK (queue);
2164 return;
2166 /* ERRORS */
2167 out_flushing:
2168 {
2169 gboolean eos = queue->is_eos;
2170 GstFlowReturn ret = queue->srcresult;
2172 gst_pad_pause_task (queue->srcpad);
2173 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2174 "pause task, reason: %s", gst_flow_get_name (queue->srcresult));
2175 GST_QUEUE2_MUTEX_UNLOCK (queue);
2176 /* let app know about us giving up if upstream is not expected to do so */
2177 /* UNEXPECTED is already taken care of elsewhere */
2178 if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) &&
2179 (ret != GST_FLOW_UNEXPECTED)) {
2180 GST_ELEMENT_ERROR (queue, STREAM, FAILED,
2181 (_("Internal data flow error.")),
2182 ("streaming task paused, reason %s (%d)",
2183 gst_flow_get_name (ret), ret));
2184 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
2185 }
2186 return;
2187 }
2188 }
2190 static gboolean
2191 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
2192 {
2193 gboolean res = TRUE;
2194 GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2196 #ifndef GST_DISABLE_GST_DEBUG
2197 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
2198 event, GST_EVENT_TYPE_NAME (event));
2199 #endif
2201 switch (GST_EVENT_TYPE (event)) {
2202 case GST_EVENT_FLUSH_START:
2203 if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2204 || QUEUE_IS_USING_TEMP_FILE (queue))) {
2205 /* just forward upstream */
2206 res = gst_pad_push_event (queue->sinkpad, event);
2207 } else {
2208 /* now unblock the getrange function */
2209 GST_QUEUE2_MUTEX_LOCK (queue);
2210 GST_DEBUG_OBJECT (queue, "flushing");
2211 queue->srcresult = GST_FLOW_WRONG_STATE;
2212 GST_QUEUE2_SIGNAL_ADD (queue);
2213 GST_QUEUE2_MUTEX_UNLOCK (queue);
2215 /* when using a temp file, we eat the event */
2216 res = TRUE;
2217 gst_event_unref (event);
2218 }
2219 break;
2220 case GST_EVENT_FLUSH_STOP:
2221 if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2222 || QUEUE_IS_USING_TEMP_FILE (queue))) {
2223 /* just forward upstream */
2224 res = gst_pad_push_event (queue->sinkpad, event);
2225 } else {
2226 /* now unblock the getrange function */
2227 GST_QUEUE2_MUTEX_LOCK (queue);
2228 queue->srcresult = GST_FLOW_OK;
2229 if (queue->current)
2230 queue->current->max_reading_pos = 0;
2231 GST_QUEUE2_MUTEX_UNLOCK (queue);
2233 /* when using a temp file, we eat the event */
2234 res = TRUE;
2235 gst_event_unref (event);
2236 }
2237 break;
2238 default:
2239 res = gst_pad_push_event (queue->sinkpad, event);
2240 break;
2241 }
2243 return res;
2244 }
2246 static gboolean
2247 gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
2248 {
2249 gboolean ret = FALSE;
2250 GstPad *peer;
2252 if ((peer = gst_pad_get_peer (pad))) {
2253 ret = gst_pad_query (peer, query);
2254 gst_object_unref (peer);
2255 }
2256 return ret;
2257 }
2259 static gboolean
2260 gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
2261 {
2262 GstQueue2 *queue;
2264 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
2266 switch (GST_QUERY_TYPE (query)) {
2267 case GST_QUERY_POSITION:
2268 {
2269 gint64 peer_pos;
2270 GstFormat format;
2272 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2273 goto peer_failed;
2275 /* get peer position */
2276 gst_query_parse_position (query, &format, &peer_pos);
2278 /* FIXME: this code assumes that there's no discont in the queue */
2279 switch (format) {
2280 case GST_FORMAT_BYTES:
2281 peer_pos -= queue->cur_level.bytes;
2282 break;
2283 case GST_FORMAT_TIME:
2284 peer_pos -= queue->cur_level.time;
2285 break;
2286 default:
2287 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
2288 "know how to adjust value", gst_format_get_name (format));
2289 return FALSE;
2290 }
2291 /* set updated position */
2292 gst_query_set_position (query, format, peer_pos);
2293 break;
2294 }
2295 case GST_QUERY_DURATION:
2296 {
2297 GST_DEBUG_OBJECT (queue, "doing peer query");
2299 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2300 goto peer_failed;
2302 GST_DEBUG_OBJECT (queue, "peer query success");
2303 break;
2304 }
2305 case GST_QUERY_BUFFERING:
2306 {
2307 GstFormat format;
2309 GST_DEBUG_OBJECT (queue, "query buffering");
2311 if (!(QUEUE_IS_USING_RING_BUFFER (queue)
2312 || QUEUE_IS_USING_TEMP_FILE (queue))) {
2313 /* no temp file, just forward to the peer */
2314 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2315 goto peer_failed;
2316 GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
2317 } else {
2318 gint64 start, stop;
2319 guint64 writing_pos;
2320 gint percent;
2321 gint64 estimated_total, buffering_left;
2322 GstFormat peer_fmt;
2323 gint64 duration;
2324 gboolean peer_res, is_buffering, is_eos;
2325 gdouble byte_in_rate, byte_out_rate;
2327 /* we need a current download region */
2328 if (queue->current == NULL)
2329 return FALSE;
2331 writing_pos = queue->current->writing_pos;
2332 byte_in_rate = queue->byte_in_rate;
2333 byte_out_rate = queue->byte_out_rate;
2334 is_buffering = queue->is_buffering;
2335 is_eos = queue->is_eos;
2336 percent = queue->buffering_percent;
2338 if (is_eos) {
2339 /* we're EOS, we know the duration in bytes now */
2340 peer_res = TRUE;
2341 duration = writing_pos;
2342 } else {
2343 /* get duration of upstream in bytes */
2344 peer_fmt = GST_FORMAT_BYTES;
2345 peer_res = gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
2346 &duration);
2347 }
2349 /* calculate remaining and total download time */
2350 if (peer_res && byte_in_rate > 0.0) {
2351 estimated_total = (duration * 1000) / byte_in_rate;
2352 buffering_left = ((duration - writing_pos) * 1000) / byte_in_rate;
2353 } else {
2354 estimated_total = -1;
2355 buffering_left = -1;
2356 }
2357 GST_DEBUG_OBJECT (queue, "estimated %" G_GINT64_FORMAT ", left %"
2358 G_GINT64_FORMAT, estimated_total, buffering_left);
2360 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2362 switch (format) {
2363 case GST_FORMAT_PERCENT:
2364 /* we need duration */
2365 if (!peer_res)
2366 goto peer_failed;
2368 GST_DEBUG_OBJECT (queue,
2369 "duration %" G_GINT64_FORMAT ", writing %" G_GINT64_FORMAT,
2370 duration, writing_pos);
2372 start = 0;
2373 /* get our available data relative to the duration */
2374 if (duration != -1)
2375 stop = GST_FORMAT_PERCENT_MAX * writing_pos / duration;
2376 else
2377 stop = -1;
2378 break;
2379 case GST_FORMAT_BYTES:
2380 start = 0;
2381 stop = writing_pos;
2382 break;
2383 default:
2384 start = -1;
2385 stop = -1;
2386 break;
2387 }
2388 gst_query_set_buffering_percent (query, is_buffering, percent);
2389 gst_query_set_buffering_range (query, format, start, stop,
2390 estimated_total);
2391 gst_query_set_buffering_stats (query, GST_BUFFERING_DOWNLOAD,
2392 byte_in_rate, byte_out_rate, buffering_left);
2393 }
2394 break;
2395 }
2396 default:
2397 /* peer handled other queries */
2398 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
2399 goto peer_failed;
2400 break;
2401 }
2403 return TRUE;
2405 /* ERRORS */
2406 peer_failed:
2407 {
2408 GST_DEBUG_OBJECT (queue, "failed peer query");
2409 return FALSE;
2410 }
2411 }
2413 static gboolean
2414 gst_queue2_handle_query (GstElement * element, GstQuery * query)
2415 {
2416 /* simply forward to the srcpad query function */
2417 return gst_queue2_handle_src_query (GST_QUEUE2_CAST (element)->srcpad, query);
2418 }
2420 static GstFlowReturn
2421 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
2422 GstBuffer ** buffer)
2423 {
2424 GstQueue2 *queue;
2425 GstFlowReturn ret;
2427 queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
2428 if (length > queue->ring_buffer_max_size) {
2429 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT,
2430 (_("Buffer is too large to fit in ring buffer")),
2431 ("(%u > %" G_GUINT64_FORMAT ")", length, queue->ring_buffer_max_size));
2432 return GST_FLOW_ERROR;
2433 }
2435 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
2436 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
2437 offset = (offset == -1) ? queue->current->reading_pos : offset;
2439 /* FIXME - function will block when the range is not yet available */
2440 ret = gst_queue2_create_read (queue, offset, length, buffer);
2441 GST_QUEUE2_MUTEX_UNLOCK (queue);
2443 gst_object_unref (queue);
2445 return ret;
2447 /* ERRORS */
2448 out_flushing:
2449 {
2450 ret = queue->srcresult;
2452 GST_DEBUG_OBJECT (queue, "we are flushing");
2453 GST_QUEUE2_MUTEX_UNLOCK (queue);
2454 return ret;
2455 }
2456 }
2458 static gboolean
2459 gst_queue2_src_checkgetrange_function (GstPad * pad)
2460 {
2461 GstQueue2 *queue;
2462 gboolean ret;
2464 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2466 /* we can operate in pull mode when we are using a tempfile */
2467 ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
2469 gst_object_unref (GST_OBJECT (queue));
2471 return ret;
2472 }
2474 /* sink currently only operates in push mode */
2475 static gboolean
2476 gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
2477 {
2478 gboolean result = TRUE;
2479 GstQueue2 *queue;
2481 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2483 if (active) {
2484 GST_QUEUE2_MUTEX_LOCK (queue);
2485 GST_DEBUG_OBJECT (queue, "activating push mode");
2486 queue->srcresult = GST_FLOW_OK;
2487 queue->sinkresult = GST_FLOW_OK;
2488 queue->is_eos = FALSE;
2489 queue->unexpected = FALSE;
2490 reset_rate_timer (queue);
2491 GST_QUEUE2_MUTEX_UNLOCK (queue);
2492 } else {
2493 /* unblock chain function */
2494 GST_QUEUE2_MUTEX_LOCK (queue);
2495 GST_DEBUG_OBJECT (queue, "deactivating push mode");
2496 queue->srcresult = GST_FLOW_WRONG_STATE;
2497 queue->sinkresult = GST_FLOW_WRONG_STATE;
2498 gst_queue2_locked_flush (queue);
2499 GST_QUEUE2_MUTEX_UNLOCK (queue);
2500 }
2502 gst_object_unref (queue);
2504 return result;
2505 }
2507 /* src operating in push mode, we start a task on the source pad that pushes out
2508 * buffers from the queue */
2509 static gboolean
2510 gst_queue2_src_activate_push (GstPad * pad, gboolean active)
2511 {
2512 gboolean result = FALSE;
2513 GstQueue2 *queue;
2515 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2517 if (active) {
2518 GST_QUEUE2_MUTEX_LOCK (queue);
2519 GST_DEBUG_OBJECT (queue, "activating push mode");
2520 queue->srcresult = GST_FLOW_OK;
2521 queue->sinkresult = GST_FLOW_OK;
2522 queue->is_eos = FALSE;
2523 queue->unexpected = FALSE;
2524 result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
2525 GST_QUEUE2_MUTEX_UNLOCK (queue);
2526 } else {
2527 /* unblock loop function */
2528 GST_QUEUE2_MUTEX_LOCK (queue);
2529 GST_DEBUG_OBJECT (queue, "deactivating push mode");
2530 queue->srcresult = GST_FLOW_WRONG_STATE;
2531 queue->sinkresult = GST_FLOW_WRONG_STATE;
2532 /* the item add signal will unblock */
2533 GST_QUEUE2_SIGNAL_ADD (queue);
2534 GST_QUEUE2_MUTEX_UNLOCK (queue);
2536 /* step 2, make sure streaming finishes */
2537 result = gst_pad_stop_task (pad);
2538 }
2540 gst_object_unref (queue);
2542 return result;
2543 }
2545 /* pull mode, downstream will call our getrange function */
2546 static gboolean
2547 gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
2548 {
2549 gboolean result;
2550 GstQueue2 *queue;
2552 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
2554 if (active) {
2555 if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
2556 /* open the temp file now */
2557 result = gst_queue2_open_temp_location_file (queue);
2559 GST_QUEUE2_MUTEX_LOCK (queue);
2560 GST_DEBUG_OBJECT (queue, "activating pull mode");
2561 queue->srcresult = GST_FLOW_OK;
2562 queue->sinkresult = GST_FLOW_OK;
2563 queue->is_eos = FALSE;
2564 queue->unexpected = FALSE;
2565 GST_QUEUE2_MUTEX_UNLOCK (queue);
2566 } else {
2567 GST_QUEUE2_MUTEX_LOCK (queue);
2568 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
2569 /* this is not allowed, we cannot operate in pull mode without a temp
2570 * file. */
2571 queue->srcresult = GST_FLOW_WRONG_STATE;
2572 queue->sinkresult = GST_FLOW_WRONG_STATE;
2573 result = FALSE;
2574 GST_QUEUE2_MUTEX_UNLOCK (queue);
2575 }
2576 } else {
2577 GST_QUEUE2_MUTEX_LOCK (queue);
2578 GST_DEBUG_OBJECT (queue, "deactivating pull mode");
2579 queue->srcresult = GST_FLOW_WRONG_STATE;
2580 queue->sinkresult = GST_FLOW_WRONG_STATE;
2581 /* this will unlock getrange */
2582 GST_QUEUE2_SIGNAL_ADD (queue);
2583 result = TRUE;
2584 GST_QUEUE2_MUTEX_UNLOCK (queue);
2585 }
2586 gst_object_unref (queue);
2588 return result;
2589 }
2591 static GstStateChangeReturn
2592 gst_queue2_change_state (GstElement * element, GstStateChange transition)
2593 {
2594 GstQueue2 *queue;
2595 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
2597 queue = GST_QUEUE2 (element);
2599 switch (transition) {
2600 case GST_STATE_CHANGE_NULL_TO_READY:
2601 break;
2602 case GST_STATE_CHANGE_READY_TO_PAUSED:
2603 if (QUEUE_IS_USING_RING_BUFFER (queue)
2604 || QUEUE_IS_USING_TEMP_FILE (queue)) {
2605 if (!gst_queue2_open_temp_location_file (queue))
2606 ret = GST_STATE_CHANGE_FAILURE;
2607 }
2608 queue->segment_event_received = FALSE;
2609 queue->starting_segment = NULL;
2610 break;
2611 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2612 break;
2613 default:
2614 break;
2615 }
2617 if (ret == GST_STATE_CHANGE_FAILURE)
2618 return ret;
2620 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2622 if (ret == GST_STATE_CHANGE_FAILURE)
2623 return ret;
2625 switch (transition) {
2626 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2627 break;
2628 case GST_STATE_CHANGE_PAUSED_TO_READY:
2629 if (QUEUE_IS_USING_RING_BUFFER (queue)
2630 || QUEUE_IS_USING_TEMP_FILE (queue))
2631 gst_queue2_close_temp_location_file (queue);
2632 if (queue->starting_segment != NULL) {
2633 gst_event_unref (queue->starting_segment);
2634 queue->starting_segment = NULL;
2635 }
2636 break;
2637 case GST_STATE_CHANGE_READY_TO_NULL:
2638 break;
2639 default:
2640 break;
2641 }
2643 return ret;
2644 }
2646 /* changing the capacity of the queue must wake up
2647 * the _chain function, it might have more room now
2648 * to store the buffer/event in the queue */
2649 #define QUEUE_CAPACITY_CHANGE(q)\
2650 GST_QUEUE2_SIGNAL_DEL (queue);
2652 /* Changing the minimum required fill level must
2653 * wake up the _loop function as it might now
2654 * be able to preceed.
2655 */
2656 #define QUEUE_THRESHOLD_CHANGE(q)\
2657 GST_QUEUE2_SIGNAL_ADD (queue);
2659 static void
2660 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
2661 {
2662 GstState state;
2664 /* the element must be stopped in order to do this */
2665 GST_OBJECT_LOCK (queue);
2666 state = GST_STATE (queue);
2667 if (state != GST_STATE_READY && state != GST_STATE_NULL)
2668 goto wrong_state;
2669 GST_OBJECT_UNLOCK (queue);
2671 /* set new location */
2672 g_free (queue->temp_template);
2673 queue->temp_template = g_strdup (template);
2675 return;
2677 /* ERROR */
2678 wrong_state:
2679 {
2680 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
2681 GST_OBJECT_UNLOCK (queue);
2682 }
2683 }
2685 static void
2686 gst_queue2_set_property (GObject * object,
2687 guint prop_id, const GValue * value, GParamSpec * pspec)
2688 {
2689 GstQueue2 *queue = GST_QUEUE2 (object);
2691 /* someone could change levels here, and since this
2692 * affects the get/put funcs, we need to lock for safety. */
2693 GST_QUEUE2_MUTEX_LOCK (queue);
2695 switch (prop_id) {
2696 case PROP_MAX_SIZE_BYTES:
2697 queue->max_level.bytes = g_value_get_uint (value);
2698 QUEUE_CAPACITY_CHANGE (queue);
2699 break;
2700 case PROP_MAX_SIZE_BUFFERS:
2701 queue->max_level.buffers = g_value_get_uint (value);
2702 QUEUE_CAPACITY_CHANGE (queue);
2703 break;
2704 case PROP_MAX_SIZE_TIME:
2705 queue->max_level.time = g_value_get_uint64 (value);
2706 /* set rate_time to the same value. We use an extra field in the level
2707 * structure so that we can easily access and compare it */
2708 queue->max_level.rate_time = queue->max_level.time;
2709 QUEUE_CAPACITY_CHANGE (queue);
2710 break;
2711 case PROP_USE_BUFFERING:
2712 queue->use_buffering = g_value_get_boolean (value);
2713 break;
2714 case PROP_USE_RATE_ESTIMATE:
2715 queue->use_rate_estimate = g_value_get_boolean (value);
2716 break;
2717 case PROP_LOW_PERCENT:
2718 queue->low_percent = g_value_get_int (value);
2719 break;
2720 case PROP_HIGH_PERCENT:
2721 queue->high_percent = g_value_get_int (value);
2722 break;
2723 case PROP_TEMP_TEMPLATE:
2724 gst_queue2_set_temp_template (queue, g_value_get_string (value));
2725 break;
2726 case PROP_TEMP_LOCATION:
2727 g_free (queue->temp_location);
2728 queue->temp_location = g_value_dup_string (value);
2729 /* you can set the property back to NULL to make it use the temp-tmpl
2730 * property. */
2731 queue->temp_location_set = queue->temp_location != NULL;
2732 break;
2733 case PROP_TEMP_REMOVE:
2734 queue->temp_remove = g_value_get_boolean (value);
2735 break;
2736 case PROP_USE_RING_BUFFER:
2737 queue->use_ring_buffer = g_value_get_boolean (value);
2738 break;
2739 case PROP_RING_BUFFER_MAX_SIZE:
2740 queue->ring_buffer_max_size = g_value_get_uint (value);
2741 break;
2742 default:
2743 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2744 break;
2745 }
2747 GST_QUEUE2_MUTEX_UNLOCK (queue);
2748 }
2750 static void
2751 gst_queue2_get_property (GObject * object,
2752 guint prop_id, GValue * value, GParamSpec * pspec)
2753 {
2754 GstQueue2 *queue = GST_QUEUE2 (object);
2756 GST_QUEUE2_MUTEX_LOCK (queue);
2758 switch (prop_id) {
2759 case PROP_CUR_LEVEL_BYTES:
2760 g_value_set_uint (value, queue->cur_level.bytes);
2761 break;
2762 case PROP_CUR_LEVEL_BUFFERS:
2763 g_value_set_uint (value, queue->cur_level.buffers);
2764 break;
2765 case PROP_CUR_LEVEL_TIME:
2766 g_value_set_uint64 (value, queue->cur_level.time);
2767 break;
2768 case PROP_MAX_SIZE_BYTES:
2769 g_value_set_uint (value, queue->max_level.bytes);
2770 break;
2771 case PROP_MAX_SIZE_BUFFERS:
2772 g_value_set_uint (value, queue->max_level.buffers);
2773 break;
2774 case PROP_MAX_SIZE_TIME:
2775 g_value_set_uint64 (value, queue->max_level.time);
2776 break;
2777 case PROP_USE_BUFFERING:
2778 g_value_set_boolean (value, queue->use_buffering);
2779 break;
2780 case PROP_USE_RATE_ESTIMATE:
2781 g_value_set_boolean (value, queue->use_rate_estimate);
2782 break;
2783 case PROP_LOW_PERCENT:
2784 g_value_set_int (value, queue->low_percent);
2785 break;
2786 case PROP_HIGH_PERCENT:
2787 g_value_set_int (value, queue->high_percent);
2788 break;
2789 case PROP_TEMP_TEMPLATE:
2790 g_value_set_string (value, queue->temp_template);
2791 break;
2792 case PROP_TEMP_LOCATION:
2793 g_value_set_string (value, queue->temp_location);
2794 break;
2795 case PROP_TEMP_REMOVE:
2796 g_value_set_boolean (value, queue->temp_remove);
2797 break;
2798 case PROP_USE_RING_BUFFER:
2799 g_value_set_boolean (value, queue->use_ring_buffer);
2800 break;
2801 case PROP_RING_BUFFER_MAX_SIZE:
2802 g_value_set_uint (value, queue->ring_buffer_max_size);
2803 break;
2804 default:
2805 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2806 break;
2807 }
2809 GST_QUEUE2_MUTEX_UNLOCK (queue);
2810 }