1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2003 Colin Walters <cwalters@gnome.org>
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6 *
7 * gstqueue2.c:
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 * Boston, MA 02111-1307, USA.
23 */
25 /**
26 * SECTION:element-queue2
27 * @short_description: Asynchronous data queue.
28 *
29 * Data is queued until one of the limits specified by the
30 * #GstQueue22:max-size-buffers, #GstQueue22:max-size-bytes and/or
31 * #GstQueue22:max-size-time properties has been reached. Any attempt to push
32 * more buffers into the queue will block the pushing thread until more space
33 * becomes available.
34 *
35 * The queue will create a new thread on the source pad to decouple the
36 * processing on sink and source pad.
37 *
38 * You can query how many buffers are queued by reading the
39 * #GstQueue22:current-level-buffers property.
40 *
41 * The default queue size limits are 100 buffers, 2MB of data, or
42 * two seconds worth of data, whichever is reached first.
43 *
44 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
45 * will allocate a random free filename and buffer data in the file.
46 * By using this, it will buffer the entire stream data on the file independently
47 * of the queue size limits, they will only be used for buffering statistics.
48 *
49 * Since 0.10.24, setting the temp-location property with a filename is deprecated
50 * because it's impossible to securely open a temporary file in this way. The
51 * property will still be used to notify the application of the allocated
52 * filename, though.
53 *
54 * Last reviewed on 2009-07-10 (0.10.24)
55 */
57 #ifdef HAVE_CONFIG_H
58 #include "config.h"
59 #endif
61 #include "gstqueue2.h"
63 #include <glib/gstdio.h>
65 #include "gst/gst-i18n-lib.h"
67 #ifdef G_OS_WIN32
68 #include <io.h> /* lseek, open, close, read */
69 #undef lseek
70 #define lseek _lseeki64
71 #undef off_t
72 #define off_t guint64
73 #else
74 #include <unistd.h>
75 #endif
77 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
78 GST_PAD_SINK,
79 GST_PAD_ALWAYS,
80 GST_STATIC_CAPS_ANY);
82 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
83 GST_PAD_SRC,
84 GST_PAD_ALWAYS,
85 GST_STATIC_CAPS_ANY);
87 GST_DEBUG_CATEGORY_STATIC (queue_debug);
88 #define GST_CAT_DEFAULT (queue_debug)
89 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
91 enum
92 {
93 LAST_SIGNAL
94 };
96 /* default property values */
97 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
98 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
99 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
100 #define DEFAULT_USE_BUFFERING FALSE
101 #define DEFAULT_USE_RATE_ESTIMATE TRUE
102 #define DEFAULT_LOW_PERCENT 10
103 #define DEFAULT_HIGH_PERCENT 99
105 /* other defines */
106 #define DEFAULT_BUFFER_SIZE 4096
107 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
109 enum
110 {
111 PROP_0,
112 PROP_CUR_LEVEL_BUFFERS,
113 PROP_CUR_LEVEL_BYTES,
114 PROP_CUR_LEVEL_TIME,
115 PROP_MAX_SIZE_BUFFERS,
116 PROP_MAX_SIZE_BYTES,
117 PROP_MAX_SIZE_TIME,
118 PROP_USE_BUFFERING,
119 PROP_USE_RATE_ESTIMATE,
120 PROP_LOW_PERCENT,
121 PROP_HIGH_PERCENT,
122 PROP_TEMP_TEMPLATE,
123 PROP_TEMP_LOCATION
124 };
126 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
127 l.buffers = 0; \
128 l.bytes = 0; \
129 l.time = 0; \
130 l.rate_time = 0; \
131 } G_STMT_END
133 #define STATUS(queue, pad, msg) \
134 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
135 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
136 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
137 " ns, %"G_GUINT64_FORMAT" items", \
138 GST_DEBUG_PAD_NAME (pad), \
139 queue->cur_level.buffers, \
140 queue->max_level.buffers, \
141 queue->cur_level.bytes, \
142 queue->max_level.bytes, \
143 queue->cur_level.time, \
144 queue->max_level.time, \
145 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
146 queue->writing_pos - queue->max_reading_pos : \
147 queue->queue->length))
149 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
150 g_mutex_lock (q->qlock); \
151 } G_STMT_END
153 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
154 GST_QUEUE2_MUTEX_LOCK (q); \
155 if (q->srcresult != GST_FLOW_OK) \
156 goto label; \
157 } G_STMT_END
159 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
160 g_mutex_unlock (q->qlock); \
161 } G_STMT_END
163 #define GST_QUEUE2_WAIT_DEL_CHECK(q, label) G_STMT_START { \
164 STATUS (queue, q->sinkpad, "wait for DEL"); \
165 q->waiting_del = TRUE; \
166 g_cond_wait (q->item_del, queue->qlock); \
167 q->waiting_del = FALSE; \
168 if (q->srcresult != GST_FLOW_OK) { \
169 STATUS (queue, q->srcpad, "received DEL wakeup"); \
170 goto label; \
171 } \
172 STATUS (queue, q->sinkpad, "received DEL"); \
173 } G_STMT_END
175 #define GST_QUEUE2_WAIT_ADD_CHECK(q, label) G_STMT_START { \
176 STATUS (queue, q->srcpad, "wait for ADD"); \
177 q->waiting_add = TRUE; \
178 g_cond_wait (q->item_add, q->qlock); \
179 q->waiting_add = FALSE; \
180 if (q->srcresult != GST_FLOW_OK) { \
181 STATUS (queue, q->srcpad, "received ADD wakeup"); \
182 goto label; \
183 } \
184 STATUS (queue, q->srcpad, "received ADD"); \
185 } G_STMT_END
187 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
188 if (q->waiting_del) { \
189 STATUS (q, q->srcpad, "signal DEL"); \
190 g_cond_signal (q->item_del); \
191 } \
192 } G_STMT_END
194 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
195 if (q->waiting_add) { \
196 STATUS (q, q->sinkpad, "signal ADD"); \
197 g_cond_signal (q->item_add); \
198 } \
199 } G_STMT_END
201 #define _do_init(bla) \
202 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
203 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
204 "dataflow inside the queue element");
206 GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
207 _do_init);
209 static void gst_queue2_finalize (GObject * object);
211 static void gst_queue2_set_property (GObject * object,
212 guint prop_id, const GValue * value, GParamSpec * pspec);
213 static void gst_queue2_get_property (GObject * object,
214 guint prop_id, GValue * value, GParamSpec * pspec);
216 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
217 static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
218 guint size, GstCaps * caps, GstBuffer ** buf);
219 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
220 static void gst_queue2_loop (GstPad * pad);
222 static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
224 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
225 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
227 static GstCaps *gst_queue2_getcaps (GstPad * pad);
228 static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
230 static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
231 guint length, GstBuffer ** buffer);
232 static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
234 static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
235 static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
236 static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
237 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
238 GstStateChange transition);
240 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
241 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
243 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
245 static void
246 gst_queue2_base_init (gpointer g_class)
247 {
248 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
250 gst_element_class_add_pad_template (gstelement_class,
251 gst_static_pad_template_get (&srctemplate));
252 gst_element_class_add_pad_template (gstelement_class,
253 gst_static_pad_template_get (&sinktemplate));
255 gst_element_class_set_details_simple (gstelement_class, "Queue",
256 "Generic",
257 "Simple data queue",
258 "Erik Walthinsen <omega@cse.ogi.edu>, "
259 "Wim Taymans <wim.taymans@gmail.com>");
260 }
262 static void
263 gst_queue2_class_init (GstQueue2Class * klass)
264 {
265 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
266 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
268 parent_class = g_type_class_peek_parent (klass);
270 gobject_class->set_property = gst_queue2_set_property;
271 gobject_class->get_property = gst_queue2_get_property;
273 /* properties */
274 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
275 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
276 "Current amount of data in the queue (bytes)",
277 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
279 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
280 "Current number of buffers in the queue",
281 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
282 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
283 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
284 "Current amount of data in the queue (in ns)",
285 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
287 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
288 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
289 "Max. amount of data in the queue (bytes, 0=disable)",
290 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
291 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
293 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
294 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
295 DEFAULT_MAX_SIZE_BUFFERS,
296 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
298 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
299 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
300 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
303 g_param_spec_boolean ("use-buffering", "Use buffering",
304 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
305 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
306 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
307 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
308 "Estimate the bitrate of the stream to calculate time level",
309 DEFAULT_USE_RATE_ESTIMATE,
310 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
312 g_param_spec_int ("low-percent", "Low percent",
313 "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
314 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
316 g_param_spec_int ("high-percent", "High percent",
317 "High threshold for buffering to finish", 0, 100,
318 DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
321 g_param_spec_string ("temp-template", "Temporary File Template",
322 "File template to store temporary files in, should contain directory "
323 "and XXXXXX. (NULL == disabled)",
324 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
326 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
327 g_param_spec_string ("temp-location", "Temporary File Location",
328 "Location to store temporary files in (Deprecated: Only read this "
329 "property, use temp-tmpl to configure the name template)",
330 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
332 /* set several parent class virtual functions */
333 gobject_class->finalize = gst_queue2_finalize;
335 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
336 }
338 static void
339 gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
340 {
341 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
343 gst_pad_set_chain_function (queue->sinkpad,
344 GST_DEBUG_FUNCPTR (gst_queue2_chain));
345 gst_pad_set_activatepush_function (queue->sinkpad,
346 GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
347 gst_pad_set_event_function (queue->sinkpad,
348 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
349 gst_pad_set_getcaps_function (queue->sinkpad,
350 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
351 gst_pad_set_acceptcaps_function (queue->sinkpad,
352 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
353 gst_pad_set_bufferalloc_function (queue->sinkpad,
354 GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
355 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
357 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
359 gst_pad_set_activatepull_function (queue->srcpad,
360 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
361 gst_pad_set_activatepush_function (queue->srcpad,
362 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
363 gst_pad_set_getrange_function (queue->srcpad,
364 GST_DEBUG_FUNCPTR (gst_queue2_get_range));
365 gst_pad_set_checkgetrange_function (queue->srcpad,
366 GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
367 gst_pad_set_getcaps_function (queue->srcpad,
368 GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
369 gst_pad_set_acceptcaps_function (queue->srcpad,
370 GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
371 gst_pad_set_event_function (queue->srcpad,
372 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
373 gst_pad_set_query_function (queue->srcpad,
374 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
375 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
377 /* levels */
378 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
379 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
380 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
381 queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
382 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
383 queue->use_buffering = DEFAULT_USE_BUFFERING;
384 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
385 queue->low_percent = DEFAULT_LOW_PERCENT;
386 queue->high_percent = DEFAULT_HIGH_PERCENT;
388 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
389 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
391 queue->srcresult = GST_FLOW_WRONG_STATE;
392 queue->is_eos = FALSE;
393 queue->in_timer = g_timer_new ();
394 queue->out_timer = g_timer_new ();
396 queue->qlock = g_mutex_new ();
397 queue->waiting_add = FALSE;
398 queue->item_add = g_cond_new ();
399 queue->waiting_del = FALSE;
400 queue->item_del = g_cond_new ();
401 queue->queue = g_queue_new ();
403 /* tempfile related */
404 queue->temp_template = NULL;
405 queue->temp_location = NULL;
406 queue->temp_location_set = FALSE;
408 GST_DEBUG_OBJECT (queue,
409 "initialized queue's not_empty & not_full conditions");
410 }
412 /* called only once, as opposed to dispose */
413 static void
414 gst_queue2_finalize (GObject * object)
415 {
416 GstQueue2 *queue = GST_QUEUE2 (object);
418 GST_DEBUG_OBJECT (queue, "finalizing queue");
420 while (!g_queue_is_empty (queue->queue)) {
421 GstMiniObject *data = g_queue_pop_head (queue->queue);
423 gst_mini_object_unref (data);
424 }
426 g_queue_free (queue->queue);
427 g_mutex_free (queue->qlock);
428 g_cond_free (queue->item_add);
429 g_cond_free (queue->item_del);
430 g_timer_destroy (queue->in_timer);
431 g_timer_destroy (queue->out_timer);
433 /* temp_file path cleanup */
434 g_free (queue->temp_template);
435 g_free (queue->temp_location);
437 G_OBJECT_CLASS (parent_class)->finalize (object);
438 }
440 static gboolean
441 gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
442 {
443 GstQueue2 *queue;
444 GstPad *otherpad;
445 gboolean result;
447 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
449 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
450 result = gst_pad_peer_accept_caps (otherpad, caps);
452 return result;
453 }
455 static GstCaps *
456 gst_queue2_getcaps (GstPad * pad)
457 {
458 GstQueue2 *queue;
459 GstPad *otherpad;
460 GstCaps *result;
462 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
464 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
465 result = gst_pad_peer_get_caps (otherpad);
466 if (result == NULL)
467 result = gst_caps_new_any ();
469 return result;
470 }
472 static GstFlowReturn
473 gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
474 GstCaps * caps, GstBuffer ** buf)
475 {
476 GstQueue2 *queue;
477 GstFlowReturn result;
479 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
481 /* Forward to src pad, without setting caps on the src pad */
482 result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
484 return result;
485 }
487 /* calculate the diff between running time on the sink and src of the queue.
488 * This is the total amount of time in the queue. */
489 static void
490 update_time_level (GstQueue2 * queue)
491 {
492 gint64 sink_time, src_time;
494 sink_time =
495 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
496 queue->sink_segment.last_stop);
498 src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
499 queue->src_segment.last_stop);
501 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
502 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
504 if (sink_time >= src_time)
505 queue->cur_level.time = sink_time - src_time;
506 else
507 queue->cur_level.time = 0;
508 }
510 /* take a NEWSEGMENT event and apply the values to segment, updating the time
511 * level of queue. */
512 static void
513 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment)
514 {
515 gboolean update;
516 GstFormat format;
517 gdouble rate, arate;
518 gint64 start, stop, time;
520 gst_event_parse_new_segment_full (event, &update, &rate, &arate,
521 &format, &start, &stop, &time);
523 GST_DEBUG_OBJECT (queue,
524 "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
525 "format %d, "
526 "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
527 G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
529 if (format == GST_FORMAT_BYTES) {
530 }
532 /* now configure the values, we use these to track timestamps on the
533 * sinkpad. */
534 if (format != GST_FORMAT_TIME) {
535 /* non-time format, pretent the current time segment is closed with a
536 * 0 start and unknown stop time. */
537 update = FALSE;
538 format = GST_FORMAT_TIME;
539 start = 0;
540 stop = -1;
541 time = 0;
542 }
543 gst_segment_set_newsegment_full (segment, update,
544 rate, arate, format, start, stop, time);
546 GST_DEBUG_OBJECT (queue,
547 "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
549 /* segment can update the time level of the queue */
550 update_time_level (queue);
551 }
553 /* take a buffer and update segment, updating the time level of the queue. */
554 static void
555 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment)
556 {
557 GstClockTime duration, timestamp;
559 timestamp = GST_BUFFER_TIMESTAMP (buffer);
560 duration = GST_BUFFER_DURATION (buffer);
562 /* if no timestamp is set, assume it's continuous with the previous
563 * time */
564 if (timestamp == GST_CLOCK_TIME_NONE)
565 timestamp = segment->last_stop;
567 /* add duration */
568 if (duration != GST_CLOCK_TIME_NONE)
569 timestamp += duration;
571 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
572 GST_TIME_ARGS (timestamp));
574 gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
576 /* calc diff with other end */
577 update_time_level (queue);
578 }
580 static void
581 update_buffering (GstQueue2 * queue)
582 {
583 gint percent;
584 gboolean post = FALSE;
586 if (!queue->use_buffering || queue->high_percent <= 0)
587 return;
589 #define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \
590 (queue->cur_level.format) * 100 / (queue->max_level.format) : 0)
592 if (queue->is_eos) {
593 /* on EOS we are always 100% full, we set the var here so that it we can
594 * resue the logic below to stop buffering */
595 percent = 100;
596 } else {
597 /* figure out the percent we are filled, we take the max of all formats. */
598 percent = GET_PERCENT (bytes);
599 percent = MAX (percent, GET_PERCENT (time));
600 percent = MAX (percent, GET_PERCENT (buffers));
602 /* also apply the rate estimate when we need to */
603 if (queue->use_rate_estimate)
604 percent = MAX (percent, GET_PERCENT (rate_time));
605 }
607 if (queue->is_buffering) {
608 post = TRUE;
609 /* if we were buffering see if we reached the high watermark */
610 if (percent >= queue->high_percent)
611 queue->is_buffering = FALSE;
612 } else {
613 /* we were not buffering, check if we need to start buffering if we drop
614 * below the low threshold */
615 if (percent < queue->low_percent) {
616 queue->is_buffering = TRUE;
617 queue->buffering_iteration++;
618 post = TRUE;
619 }
620 }
621 if (post) {
622 GstMessage *message;
623 GstBufferingMode mode;
625 /* scale to high percent so that it becomes the 100% mark */
626 percent = percent * 100 / queue->high_percent;
627 /* clip */
628 if (percent > 100)
629 percent = 100;
631 if (QUEUE_IS_USING_TEMP_FILE (queue))
632 mode = GST_BUFFERING_DOWNLOAD;
633 else
634 mode = GST_BUFFERING_STREAM;
636 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent);
637 message = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
638 gst_message_set_buffering_stats (message, mode,
639 queue->byte_in_rate, queue->byte_out_rate, -1);
641 gst_element_post_message (GST_ELEMENT_CAST (queue), message);
643 } else {
644 GST_DEBUG_OBJECT (queue, "filled %d percent", percent);
645 }
647 #undef GET_PERCENT
648 }
650 static void
651 reset_rate_timer (GstQueue2 * queue)
652 {
653 queue->bytes_in = 0;
654 queue->bytes_out = 0;
655 queue->byte_in_rate = 0.0;
656 queue->byte_out_rate = 0.0;
657 queue->last_in_elapsed = 0.0;
658 queue->last_out_elapsed = 0.0;
659 queue->in_timer_started = FALSE;
660 queue->out_timer_started = FALSE;
661 }
663 /* the interval in seconds to recalculate the rate */
664 #define RATE_INTERVAL 0.2
665 /* Tuning for rate estimation. We use a large window for the input rate because
666 * it should be stable when connected to a network. The output rate is less
667 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
668 * therefore adapt more quickly. */
669 #define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0
670 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
672 static void
673 update_in_rates (GstQueue2 * queue)
674 {
675 gdouble elapsed, period;
676 gdouble byte_in_rate;
678 if (!queue->in_timer_started) {
679 queue->in_timer_started = TRUE;
680 g_timer_start (queue->in_timer);
681 return;
682 }
684 elapsed = g_timer_elapsed (queue->in_timer, NULL);
686 /* recalc after each interval. */
687 if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
688 period = elapsed - queue->last_in_elapsed;
690 GST_DEBUG_OBJECT (queue,
691 "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
693 byte_in_rate = queue->bytes_in / period;
695 if (queue->byte_in_rate == 0.0)
696 queue->byte_in_rate = byte_in_rate;
697 else
698 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);
700 /* reset the values to calculate rate over the next interval */
701 queue->last_in_elapsed = elapsed;
702 queue->bytes_in = 0;
703 }
705 if (queue->byte_in_rate > 0.0) {
706 queue->cur_level.rate_time =
707 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
708 }
709 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
710 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
711 }
713 static void
714 update_out_rates (GstQueue2 * queue)
715 {
716 gdouble elapsed, period;
717 gdouble byte_out_rate;
719 if (!queue->out_timer_started) {
720 queue->out_timer_started = TRUE;
721 g_timer_start (queue->out_timer);
722 return;
723 }
725 elapsed = g_timer_elapsed (queue->out_timer, NULL);
727 /* recalc after each interval. */
728 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
729 period = elapsed - queue->last_out_elapsed;
731 GST_DEBUG_OBJECT (queue,
732 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
734 byte_out_rate = queue->bytes_out / period;
736 if (queue->byte_out_rate == 0.0)
737 queue->byte_out_rate = byte_out_rate;
738 else
739 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
741 /* reset the values to calculate rate over the next interval */
742 queue->last_out_elapsed = elapsed;
743 queue->bytes_out = 0;
744 }
745 if (queue->byte_in_rate > 0.0) {
746 queue->cur_level.rate_time =
747 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
748 }
749 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
750 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
751 }
753 static void
754 gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
755 {
756 guint size;
757 guint8 *data;
758 int ret;
760 #ifdef HAVE_FSEEKO
761 fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET);
762 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
763 lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET);
764 #else
765 fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
766 #endif
768 data = GST_BUFFER_DATA (buffer);
769 size = GST_BUFFER_SIZE (buffer);
771 ret = fwrite (data, 1, size, queue->temp_file);
772 if (ret < size) {
773 /* FIXME do something useful here */
774 GST_ERROR_OBJECT (queue, "fwrite returned error");
775 }
776 queue->writing_pos += size;
778 if (queue->writing_pos > queue->max_reading_pos)
779 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
780 else
781 queue->cur_level.bytes = 0;
782 }
784 /* see if there is enough data in the file to read a full buffer */
785 static gboolean
786 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
787 {
788 GST_DEBUG_OBJECT (queue,
789 "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
790 length, queue->writing_pos);
791 if (queue->is_eos)
792 return TRUE;
794 if (offset + length < queue->writing_pos)
795 return TRUE;
797 return FALSE;
798 }
800 static GstFlowReturn
801 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
802 GstBuffer ** buffer)
803 {
804 size_t res;
805 GstBuffer *buf;
807 /* check if we have enough data at @offset. If there is not enough data, we
808 * block and wait. */
809 while (!gst_queue2_have_data (queue, offset, length)) {
810 GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
811 }
813 #ifdef HAVE_FSEEKO
814 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
815 goto seek_failed;
816 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
817 if (lseek (fileno (queue->temp_file), (off_t) offset,
818 SEEK_SET) == (off_t) - 1)
819 goto seek_failed;
820 #else
821 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
822 goto seek_failed;
823 #endif
825 buf = gst_buffer_new_and_alloc (length);
827 /* this should not block */
828 GST_LOG_OBJECT (queue, "Reading %d bytes", length);
829 res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
830 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
832 if (G_UNLIKELY (res == 0)) {
833 /* check for errors or EOF */
834 if (ferror (queue->temp_file))
835 goto could_not_read;
836 if (feof (queue->temp_file) && length > 0)
837 goto eos;
838 }
840 length = res;
842 GST_BUFFER_SIZE (buf) = length;
843 GST_BUFFER_OFFSET (buf) = offset;
844 GST_BUFFER_OFFSET_END (buf) = offset + length;
846 *buffer = buf;
848 queue->reading_pos = offset + length;
849 queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
851 if (queue->writing_pos > queue->max_reading_pos)
852 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
853 else
854 queue->cur_level.bytes = 0;
856 return GST_FLOW_OK;
858 /* ERRORS */
859 out_flushing:
860 {
861 GST_DEBUG_OBJECT (queue, "we are flushing");
862 return GST_FLOW_WRONG_STATE;
863 }
864 seek_failed:
865 {
866 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
867 return GST_FLOW_ERROR;
868 }
869 could_not_read:
870 {
871 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
872 gst_buffer_unref (buf);
873 return GST_FLOW_ERROR;
874 }
875 eos:
876 {
877 GST_DEBUG ("non-regular file hits EOS");
878 gst_buffer_unref (buf);
879 return GST_FLOW_UNEXPECTED;
880 }
881 }
883 /* should be called with QUEUE_LOCK */
884 static GstMiniObject *
885 gst_queue2_read_item_from_file (GstQueue2 * queue)
886 {
887 GstMiniObject *item;
889 if (queue->starting_segment != NULL) {
890 item = GST_MINI_OBJECT_CAST (queue->starting_segment);
891 queue->starting_segment = NULL;
892 } else {
893 GstFlowReturn ret;
894 GstBuffer *buffer;
896 ret =
897 gst_queue2_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
898 &buffer);
899 switch (ret) {
900 case GST_FLOW_OK:
901 item = GST_MINI_OBJECT_CAST (buffer);
902 break;
903 case GST_FLOW_UNEXPECTED:
904 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
905 break;
906 default:
907 item = NULL;
908 break;
909 }
910 }
911 return item;
912 }
914 static gboolean
915 gst_queue2_open_temp_location_file (GstQueue2 * queue)
916 {
917 gint fd = -1;
918 gchar *name = NULL;
920 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
922 /* we have two cases:
923 * - temp_location was set to something !NULL (Deprecated). in this case we
924 * open the specified filename.
925 * - temp_template was set, allocate a filename and open that filename
926 */
927 if (!queue->temp_location_set) {
928 /* nothing to do */
929 if (queue->temp_template == NULL)
930 goto no_directory;
932 /* make copy of the template, we don't want to change this */
933 name = g_strdup (queue->temp_template);
934 fd = g_mkstemp (name);
935 if (fd == -1)
936 goto mkstemp_failed;
938 /* open the file for update/writing */
939 queue->temp_file = fdopen (fd, "wb+");
940 /* error creating file */
941 if (queue->temp_file == NULL)
942 goto open_failed;
944 g_free (queue->temp_location);
945 queue->temp_location = name;
947 g_object_notify (G_OBJECT (queue), "temp-location");
948 } else {
949 /* open the file for update/writing, this is deprecated but we still need to
950 * support it for API/ABI compatibility */
951 queue->temp_file = g_fopen (queue->temp_location, "wb+");
952 /* error creating file */
953 if (queue->temp_file == NULL)
954 goto open_failed;
955 }
957 queue->writing_pos = 0;
958 queue->reading_pos = 0;
959 queue->max_reading_pos = 0;
961 return TRUE;
963 /* ERRORS */
964 no_directory:
965 {
966 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
967 (_("No Temp directory specified.")), (NULL));
968 return FALSE;
969 }
970 mkstemp_failed:
971 {
972 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
973 (_("Could not create temp file \"%s\"."), queue->temp_template),
974 GST_ERROR_SYSTEM);
975 g_free (name);
976 return FALSE;
977 }
978 open_failed:
979 {
980 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
981 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
982 g_free (name);
983 if (fd != -1)
984 close (fd);
985 return FALSE;
986 }
987 }
989 static void
990 gst_queue2_close_temp_location_file (GstQueue2 * queue)
991 {
992 /* nothing to do */
993 if (queue->temp_file == NULL)
994 return;
996 GST_DEBUG_OBJECT (queue, "closing temp file");
998 /* we don't remove the file so that the application can use it as a cache
999 * later on */
1000 fflush (queue->temp_file);
1001 fclose (queue->temp_file);
1002 remove (queue->temp_location);
1003 queue->temp_file = NULL;
1004 }
1006 static void
1007 gst_queue2_flush_temp_file (GstQueue2 * queue)
1008 {
1009 if (queue->temp_file == NULL)
1010 return;
1012 GST_DEBUG_OBJECT (queue, "flushing temp file");
1014 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1016 queue->writing_pos = 0;
1017 queue->reading_pos = 0;
1018 queue->max_reading_pos = 0;
1019 }
1021 static void
1022 gst_queue2_locked_flush (GstQueue2 * queue)
1023 {
1024 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1025 gst_queue2_flush_temp_file (queue);
1026 } else {
1027 while (!g_queue_is_empty (queue->queue)) {
1028 GstMiniObject *data = g_queue_pop_head (queue->queue);
1030 /* Then lose another reference because we are supposed to destroy that
1031 data when flushing */
1032 gst_mini_object_unref (data);
1033 }
1034 }
1035 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1036 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1037 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1038 if (queue->starting_segment != NULL)
1039 gst_event_unref (queue->starting_segment);
1040 queue->starting_segment = NULL;
1041 queue->segment_event_received = FALSE;
1043 /* we deleted a lot of something */
1044 GST_QUEUE2_SIGNAL_DEL (queue);
1045 }
1047 /* enqueue an item an update the level stats */
1048 static void
1049 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
1050 {
1051 if (GST_IS_BUFFER (item)) {
1052 GstBuffer *buffer;
1053 guint size;
1055 buffer = GST_BUFFER_CAST (item);
1056 size = GST_BUFFER_SIZE (buffer);
1058 /* add buffer to the statistics */
1059 queue->cur_level.buffers++;
1060 queue->cur_level.bytes += size;
1061 queue->bytes_in += size;
1063 /* apply new buffer to segment stats */
1064 apply_buffer (queue, buffer, &queue->sink_segment);
1065 /* update the byterate stats */
1066 update_in_rates (queue);
1068 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1069 gst_queue2_write_buffer_to_file (queue, buffer);
1070 }
1072 } else if (GST_IS_EVENT (item)) {
1073 GstEvent *event;
1075 event = GST_EVENT_CAST (item);
1077 switch (GST_EVENT_TYPE (event)) {
1078 case GST_EVENT_EOS:
1079 /* Zero the thresholds, this makes sure the queue is completely
1080 * filled and we can read all data from the queue. */
1081 queue->is_eos = TRUE;
1082 break;
1083 case GST_EVENT_NEWSEGMENT:
1084 apply_segment (queue, event, &queue->sink_segment);
1085 /* This is our first new segment, we hold it
1086 * as we can't save it on the temp file */
1087 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1088 if (queue->segment_event_received)
1089 goto unexpected_event;
1091 queue->segment_event_received = TRUE;
1092 if (queue->starting_segment != NULL)
1093 gst_event_unref (queue->starting_segment);
1094 queue->starting_segment = event;
1095 item = NULL;
1096 }
1097 /* a new segment allows us to accept more buffers if we got UNEXPECTED
1098 * from downstream */
1099 queue->unexpected = FALSE;
1100 break;
1101 default:
1102 if (QUEUE_IS_USING_TEMP_FILE (queue))
1103 goto unexpected_event;
1104 break;
1105 }
1106 } else {
1107 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
1108 item, GST_OBJECT_NAME (queue));
1109 /* we can't really unref since we don't know what it is */
1110 item = NULL;
1111 }
1113 if (item) {
1114 /* update the buffering status */
1115 update_buffering (queue);
1117 if (!QUEUE_IS_USING_TEMP_FILE (queue))
1118 g_queue_push_tail (queue->queue, item);
1119 else
1120 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
1122 GST_QUEUE2_SIGNAL_ADD (queue);
1123 }
1125 return;
1127 /* ERRORS */
1128 unexpected_event:
1129 {
1130 g_warning
1131 ("Unexpected event of kind %s can't be added in temp file of queue %s ",
1132 gst_event_type_get_name (GST_EVENT_TYPE (item)),
1133 GST_OBJECT_NAME (queue));
1134 gst_event_unref (GST_EVENT_CAST (item));
1135 return;
1136 }
1137 }
1139 /* dequeue an item from the queue and update level stats */
1140 static GstMiniObject *
1141 gst_queue2_locked_dequeue (GstQueue2 * queue)
1142 {
1143 GstMiniObject *item;
1145 if (QUEUE_IS_USING_TEMP_FILE (queue))
1146 item = gst_queue2_read_item_from_file (queue);
1147 else
1148 item = g_queue_pop_head (queue->queue);
1150 if (item == NULL)
1151 goto no_item;
1153 if (GST_IS_BUFFER (item)) {
1154 GstBuffer *buffer;
1155 guint size;
1157 buffer = GST_BUFFER_CAST (item);
1158 size = GST_BUFFER_SIZE (buffer);
1160 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1161 "retrieved buffer %p from queue", buffer);
1163 queue->cur_level.buffers--;
1164 queue->cur_level.bytes -= size;
1165 queue->bytes_out += size;
1166 apply_buffer (queue, buffer, &queue->src_segment);
1167 /* update the byterate stats */
1168 update_out_rates (queue);
1169 /* update the buffering */
1170 update_buffering (queue);
1172 } else if (GST_IS_EVENT (item)) {
1173 GstEvent *event = GST_EVENT_CAST (item);
1175 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1176 "retrieved event %p from queue", event);
1178 switch (GST_EVENT_TYPE (event)) {
1179 case GST_EVENT_EOS:
1180 /* queue is empty now that we dequeued the EOS */
1181 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1182 break;
1183 case GST_EVENT_NEWSEGMENT:
1184 apply_segment (queue, event, &queue->src_segment);
1185 break;
1186 default:
1187 break;
1188 }
1189 } else {
1190 g_warning
1191 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
1192 item, GST_OBJECT_NAME (queue));
1193 item = NULL;
1194 }
1195 GST_QUEUE2_SIGNAL_DEL (queue);
1197 return item;
1199 /* ERRORS */
1200 no_item:
1201 {
1202 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
1203 return NULL;
1204 }
1205 }
1207 static gboolean
1208 gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
1209 {
1210 GstQueue2 *queue;
1212 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1214 switch (GST_EVENT_TYPE (event)) {
1215 case GST_EVENT_FLUSH_START:
1216 {
1217 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
1218 /* forward event */
1219 gst_pad_push_event (queue->srcpad, event);
1221 /* now unblock the chain function */
1222 GST_QUEUE2_MUTEX_LOCK (queue);
1223 queue->srcresult = GST_FLOW_WRONG_STATE;
1224 /* unblock the loop and chain functions */
1225 g_cond_signal (queue->item_add);
1226 g_cond_signal (queue->item_del);
1227 GST_QUEUE2_MUTEX_UNLOCK (queue);
1229 /* make sure it pauses, this should happen since we sent
1230 * flush_start downstream. */
1231 gst_pad_pause_task (queue->srcpad);
1232 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
1233 goto done;
1234 }
1235 case GST_EVENT_FLUSH_STOP:
1236 {
1237 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
1238 /* forward event */
1239 gst_pad_push_event (queue->srcpad, event);
1241 GST_QUEUE2_MUTEX_LOCK (queue);
1242 gst_queue2_locked_flush (queue);
1243 queue->srcresult = GST_FLOW_OK;
1244 queue->is_eos = FALSE;
1245 queue->unexpected = FALSE;
1246 /* reset rate counters */
1247 reset_rate_timer (queue);
1248 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
1249 queue->srcpad);
1250 GST_QUEUE2_MUTEX_UNLOCK (queue);
1251 goto done;
1252 }
1253 default:
1254 if (GST_EVENT_IS_SERIALIZED (event)) {
1255 /* serialized events go in the queue */
1256 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1257 /* refuse more events on EOS */
1258 if (queue->is_eos)
1259 goto out_eos;
1260 gst_queue2_locked_enqueue (queue, event);
1261 GST_QUEUE2_MUTEX_UNLOCK (queue);
1262 } else {
1263 /* non-serialized events are passed upstream. */
1264 gst_pad_push_event (queue->srcpad, event);
1265 }
1266 break;
1267 }
1268 done:
1269 return TRUE;
1271 /* ERRORS */
1272 out_flushing:
1273 {
1274 GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
1275 GST_QUEUE2_MUTEX_UNLOCK (queue);
1276 gst_event_unref (event);
1277 return FALSE;
1278 }
1279 out_eos:
1280 {
1281 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
1282 GST_QUEUE2_MUTEX_UNLOCK (queue);
1283 gst_event_unref (event);
1284 return FALSE;
1285 }
1286 }
1288 static gboolean
1289 gst_queue2_is_empty (GstQueue2 * queue)
1290 {
1291 /* never empty on EOS */
1292 if (queue->is_eos)
1293 return FALSE;
1295 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1296 return queue->writing_pos == queue->max_reading_pos;
1297 } else {
1298 if (queue->queue->length == 0)
1299 return TRUE;
1300 }
1302 return FALSE;
1303 }
1305 static gboolean
1306 gst_queue2_is_filled (GstQueue2 * queue)
1307 {
1308 gboolean res;
1310 /* always filled on EOS */
1311 if (queue->is_eos)
1312 return TRUE;
1314 /* if using file, we're never filled if we don't have EOS */
1315 if (QUEUE_IS_USING_TEMP_FILE (queue))
1316 return FALSE;
1318 /* we are never filled when we have no buffers at all */
1319 if (queue->cur_level.buffers == 0)
1320 return FALSE;
1322 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
1323 (queue->cur_level.format) >= (queue->max_level.format))
1325 /* we are filled if one of the current levels exceeds the max */
1326 res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time);
1328 /* if we need to, use the rate estimate to check against the max time we are
1329 * allowed to queue */
1330 if (queue->use_rate_estimate)
1331 res |= CHECK_FILLED (rate_time);
1333 #undef CHECK_FILLED
1334 return res;
1335 }
1337 static GstFlowReturn
1338 gst_queue2_chain (GstPad * pad, GstBuffer * buffer)
1339 {
1340 GstQueue2 *queue;
1342 queue = GST_QUEUE2 (GST_OBJECT_PARENT (pad));
1344 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1345 "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
1346 GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
1347 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1348 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1350 /* we have to lock the queue since we span threads */
1351 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1352 /* when we received EOS, we refuse more data */
1353 if (queue->is_eos)
1354 goto out_eos;
1355 /* when we received unexpected from downstream, refuse more buffers */
1356 if (queue->unexpected)
1357 goto out_unexpected;
1359 /* We make space available if we're "full" according to whatever
1360 * the user defined as "full". */
1361 if (gst_queue2_is_filled (queue)) {
1362 gboolean started;
1364 /* pause the timer while we wait. The fact that we are waiting does not mean
1365 * the byterate on the input pad is lower */
1366 if ((started = queue->in_timer_started))
1367 g_timer_stop (queue->in_timer);
1369 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1370 "queue is full, waiting for free space");
1371 do {
1372 /* Wait for space to be available, we could be unlocked because of a flush. */
1373 GST_QUEUE2_WAIT_DEL_CHECK (queue, out_flushing);
1374 }
1375 while (gst_queue2_is_filled (queue));
1377 /* and continue if we were running before */
1378 if (started)
1379 g_timer_continue (queue->in_timer);
1380 }
1382 /* put buffer in queue now */
1383 gst_queue2_locked_enqueue (queue, buffer);
1384 GST_QUEUE2_MUTEX_UNLOCK (queue);
1386 return GST_FLOW_OK;
1388 /* special conditions */
1389 out_flushing:
1390 {
1391 GstFlowReturn ret = queue->srcresult;
1393 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1394 "exit because task paused, reason: %s", gst_flow_get_name (ret));
1395 GST_QUEUE2_MUTEX_UNLOCK (queue);
1396 gst_buffer_unref (buffer);
1398 return ret;
1399 }
1400 out_eos:
1401 {
1402 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1403 GST_QUEUE2_MUTEX_UNLOCK (queue);
1404 gst_buffer_unref (buffer);
1406 return GST_FLOW_UNEXPECTED;
1407 }
1408 out_unexpected:
1409 {
1410 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1411 "exit because we received UNEXPECTED");
1412 GST_QUEUE2_MUTEX_UNLOCK (queue);
1413 gst_buffer_unref (buffer);
1415 return GST_FLOW_UNEXPECTED;
1416 }
1417 }
1419 /* dequeue an item from the queue an push it downstream. This functions returns
1420 * the result of the push. */
1421 static GstFlowReturn
1422 gst_queue2_push_one (GstQueue2 * queue)
1423 {
1424 GstFlowReturn result = GST_FLOW_OK;
1425 GstMiniObject *data;
1427 data = gst_queue2_locked_dequeue (queue);
1428 if (data == NULL)
1429 goto no_item;
1431 next:
1432 if (GST_IS_BUFFER (data)) {
1433 GstBuffer *buffer;
1434 GstCaps *caps;
1436 buffer = GST_BUFFER_CAST (data);
1437 caps = GST_BUFFER_CAPS (buffer);
1439 GST_QUEUE2_MUTEX_UNLOCK (queue);
1441 /* set caps before pushing the buffer so that core does not try to do
1442 * something fancy to check if this is possible. */
1443 if (caps && caps != GST_PAD_CAPS (queue->srcpad))
1444 gst_pad_set_caps (queue->srcpad, caps);
1446 result = gst_pad_push (queue->srcpad, buffer);
1448 /* need to check for srcresult here as well */
1449 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1450 if (result == GST_FLOW_UNEXPECTED) {
1451 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1452 "got UNEXPECTED from downstream");
1453 /* stop pushing buffers, we dequeue all items until we see an item that we
1454 * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
1455 * queue we can push, we set a flag to make the sinkpad refuse more
1456 * buffers with an UNEXPECTED return value until we receive something
1457 * pushable again or we get flushed. */
1458 while ((data = gst_queue2_locked_dequeue (queue))) {
1459 if (GST_IS_BUFFER (data)) {
1460 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1461 "dropping UNEXPECTED buffer %p", data);
1462 gst_buffer_unref (GST_BUFFER_CAST (data));
1463 } else if (GST_IS_EVENT (data)) {
1464 GstEvent *event = GST_EVENT_CAST (data);
1465 GstEventType type = GST_EVENT_TYPE (event);
1467 if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
1468 /* we found a pushable item in the queue, push it out */
1469 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1470 "pushing pushable event %s after UNEXPECTED",
1471 GST_EVENT_TYPE_NAME (event));
1472 goto next;
1473 }
1474 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1475 "dropping UNEXPECTED event %p", event);
1476 gst_event_unref (event);
1477 }
1478 }
1479 /* no more items in the queue. Set the unexpected flag so that upstream
1480 * make us refuse any more buffers on the sinkpad. Since we will still
1481 * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
1482 * task function does not shut down. */
1483 queue->unexpected = TRUE;
1484 result = GST_FLOW_OK;
1485 }
1486 } else if (GST_IS_EVENT (data)) {
1487 GstEvent *event = GST_EVENT_CAST (data);
1488 GstEventType type = GST_EVENT_TYPE (event);
1490 GST_QUEUE2_MUTEX_UNLOCK (queue);
1492 gst_pad_push_event (queue->srcpad, event);
1494 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1495 /* if we're EOS, return UNEXPECTED so that the task pauses. */
1496 if (type == GST_EVENT_EOS) {
1497 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1498 "pushed EOS event %p, return UNEXPECTED", event);
1499 result = GST_FLOW_UNEXPECTED;
1500 }
1501 }
1502 return result;
1504 /* ERRORS */
1505 no_item:
1506 {
1507 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1508 "exit because we have no item in the queue");
1509 return GST_FLOW_ERROR;
1510 }
1511 out_flushing:
1512 {
1513 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
1514 return GST_FLOW_WRONG_STATE;
1515 }
1516 }
1518 /* called repeadedly with @pad as the source pad. This function should push out
1519 * data to the peer element. */
1520 static void
1521 gst_queue2_loop (GstPad * pad)
1522 {
1523 GstQueue2 *queue;
1524 GstFlowReturn ret;
1526 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
1528 /* have to lock for thread-safety */
1529 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1531 if (gst_queue2_is_empty (queue)) {
1532 gboolean started;
1534 /* pause the timer while we wait. The fact that we are waiting does not mean
1535 * the byterate on the output pad is lower */
1536 if ((started = queue->out_timer_started))
1537 g_timer_stop (queue->out_timer);
1539 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1540 "queue is empty, waiting for new data");
1541 do {
1542 /* Wait for data to be available, we could be unlocked because of a flush. */
1543 GST_QUEUE2_WAIT_ADD_CHECK (queue, out_flushing);
1544 }
1545 while (gst_queue2_is_empty (queue));
1547 /* and continue if we were running before */
1548 if (started)
1549 g_timer_continue (queue->out_timer);
1550 }
1551 ret = gst_queue2_push_one (queue);
1552 queue->srcresult = ret;
1553 if (ret != GST_FLOW_OK)
1554 goto out_flushing;
1556 GST_QUEUE2_MUTEX_UNLOCK (queue);
1558 return;
1560 /* ERRORS */
1561 out_flushing:
1562 {
1563 gboolean eos = queue->is_eos;
1564 GstFlowReturn ret = queue->srcresult;
1566 gst_pad_pause_task (queue->srcpad);
1567 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1568 "pause task, reason: %s", gst_flow_get_name (queue->srcresult));
1569 GST_QUEUE2_MUTEX_UNLOCK (queue);
1570 /* let app know about us giving up if upstream is not expected to do so */
1571 /* UNEXPECTED is already taken care of elsewhere */
1572 if (eos && (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) &&
1573 (ret != GST_FLOW_UNEXPECTED)) {
1574 GST_ELEMENT_ERROR (queue, STREAM, FAILED,
1575 (_("Internal data flow error.")),
1576 ("streaming task paused, reason %s (%d)",
1577 gst_flow_get_name (ret), ret));
1578 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
1579 }
1580 return;
1581 }
1582 }
1584 static gboolean
1585 gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
1586 {
1587 gboolean res = TRUE;
1588 GstQueue2 *queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
1590 #ifndef GST_DISABLE_GST_DEBUG
1591 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
1592 event, GST_EVENT_TYPE_NAME (event));
1593 #endif
1595 if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
1596 /* just forward upstream */
1597 res = gst_pad_push_event (queue->sinkpad, event);
1598 } else {
1599 /* when using a temp file, we unblock the pending read */
1600 res = TRUE;
1601 gst_event_unref (event);
1602 }
1604 return res;
1605 }
1607 static gboolean
1608 gst_queue2_peer_query (GstQueue2 * queue, GstPad * pad, GstQuery * query)
1609 {
1610 gboolean ret = FALSE;
1611 GstPad *peer;
1613 if ((peer = gst_pad_get_peer (pad))) {
1614 ret = gst_pad_query (peer, query);
1615 gst_object_unref (peer);
1616 }
1617 return ret;
1618 }
1620 static gboolean
1621 gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
1622 {
1623 GstQueue2 *queue;
1625 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
1627 switch (GST_QUERY_TYPE (query)) {
1628 case GST_QUERY_POSITION:
1629 {
1630 gint64 peer_pos;
1631 GstFormat format;
1633 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
1634 goto peer_failed;
1636 /* get peer position */
1637 gst_query_parse_position (query, &format, &peer_pos);
1639 /* FIXME: this code assumes that there's no discont in the queue */
1640 switch (format) {
1641 case GST_FORMAT_BYTES:
1642 peer_pos -= queue->cur_level.bytes;
1643 break;
1644 case GST_FORMAT_TIME:
1645 peer_pos -= queue->cur_level.time;
1646 break;
1647 default:
1648 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
1649 "know how to adjust value", gst_format_get_name (format));
1650 return FALSE;
1651 }
1652 /* set updated position */
1653 gst_query_set_position (query, format, peer_pos);
1654 break;
1655 }
1656 case GST_QUERY_DURATION:
1657 {
1658 GST_DEBUG_OBJECT (queue, "doing peer query");
1660 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
1661 goto peer_failed;
1663 GST_DEBUG_OBJECT (queue, "peer query success");
1664 break;
1665 }
1666 case GST_QUERY_BUFFERING:
1667 {
1668 GstFormat format;
1670 GST_DEBUG_OBJECT (queue, "query buffering");
1672 if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
1673 /* no temp file, just forward to the peer */
1674 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
1675 goto peer_failed;
1676 GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
1677 } else {
1678 gint64 start, stop;
1680 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1682 switch (format) {
1683 case GST_FORMAT_PERCENT:
1684 {
1685 gint64 duration;
1686 GstFormat peer_fmt;
1688 peer_fmt = GST_FORMAT_BYTES;
1690 if (!gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
1691 &duration))
1692 goto peer_failed;
1694 GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
1695 G_GINT64_FORMAT, duration, queue->writing_pos);
1697 start = 0;
1698 /* get our available data relative to the duration */
1699 if (duration != -1)
1700 stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration;
1701 else
1702 stop = -1;
1703 break;
1704 }
1705 case GST_FORMAT_BYTES:
1706 start = 0;
1707 stop = queue->writing_pos;
1708 break;
1709 default:
1710 start = -1;
1711 stop = -1;
1712 break;
1713 }
1714 gst_query_set_buffering_percent (query, queue->is_buffering, 100);
1715 gst_query_set_buffering_range (query, format, start, stop, -1);
1716 }
1717 break;
1718 }
1719 default:
1720 /* peer handled other queries */
1721 if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
1722 goto peer_failed;
1723 break;
1724 }
1726 return TRUE;
1728 /* ERRORS */
1729 peer_failed:
1730 {
1731 GST_DEBUG_OBJECT (queue, "failed peer query");
1732 return FALSE;
1733 }
1734 }
1736 static GstFlowReturn
1737 gst_queue2_get_range (GstPad * pad, guint64 offset, guint length,
1738 GstBuffer ** buffer)
1739 {
1740 GstQueue2 *queue;
1741 GstFlowReturn ret;
1743 queue = GST_QUEUE2_CAST (gst_pad_get_parent (pad));
1745 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, out_flushing);
1746 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
1747 offset = (offset == -1) ? queue->reading_pos : offset;
1749 /* function will block when the range is not yet available */
1750 ret = gst_queue2_create_read (queue, offset, length, buffer);
1751 GST_QUEUE2_MUTEX_UNLOCK (queue);
1753 gst_object_unref (queue);
1755 return ret;
1757 /* ERRORS */
1758 out_flushing:
1759 {
1760 GST_DEBUG_OBJECT (queue, "we are flushing");
1761 GST_QUEUE2_MUTEX_UNLOCK (queue);
1762 return GST_FLOW_WRONG_STATE;
1763 }
1764 }
1766 static gboolean
1767 gst_queue2_src_checkgetrange_function (GstPad * pad)
1768 {
1769 GstQueue2 *queue;
1770 gboolean ret;
1772 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
1774 /* we can operate in pull mode when we are using a tempfile */
1775 ret = QUEUE_IS_USING_TEMP_FILE (queue);
1777 gst_object_unref (GST_OBJECT (queue));
1779 return ret;
1780 }
1782 /* sink currently only operates in push mode */
1783 static gboolean
1784 gst_queue2_sink_activate_push (GstPad * pad, gboolean active)
1785 {
1786 gboolean result = TRUE;
1787 GstQueue2 *queue;
1789 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
1791 if (active) {
1792 GST_QUEUE2_MUTEX_LOCK (queue);
1793 GST_DEBUG_OBJECT (queue, "activating push mode");
1794 queue->srcresult = GST_FLOW_OK;
1795 queue->is_eos = FALSE;
1796 queue->unexpected = FALSE;
1797 reset_rate_timer (queue);
1798 GST_QUEUE2_MUTEX_UNLOCK (queue);
1799 } else {
1800 /* unblock chain function */
1801 GST_QUEUE2_MUTEX_LOCK (queue);
1802 GST_DEBUG_OBJECT (queue, "deactivating push mode");
1803 queue->srcresult = GST_FLOW_WRONG_STATE;
1804 gst_queue2_locked_flush (queue);
1805 GST_QUEUE2_MUTEX_UNLOCK (queue);
1806 }
1808 gst_object_unref (queue);
1810 return result;
1811 }
1813 /* src operating in push mode, we start a task on the source pad that pushes out
1814 * buffers from the queue */
1815 static gboolean
1816 gst_queue2_src_activate_push (GstPad * pad, gboolean active)
1817 {
1818 gboolean result = FALSE;
1819 GstQueue2 *queue;
1821 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
1823 if (active) {
1824 GST_QUEUE2_MUTEX_LOCK (queue);
1825 GST_DEBUG_OBJECT (queue, "activating push mode");
1826 queue->srcresult = GST_FLOW_OK;
1827 queue->is_eos = FALSE;
1828 queue->unexpected = FALSE;
1829 result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad);
1830 GST_QUEUE2_MUTEX_UNLOCK (queue);
1831 } else {
1832 /* unblock loop function */
1833 GST_QUEUE2_MUTEX_LOCK (queue);
1834 GST_DEBUG_OBJECT (queue, "deactivating push mode");
1835 queue->srcresult = GST_FLOW_WRONG_STATE;
1836 /* the item add signal will unblock */
1837 g_cond_signal (queue->item_add);
1838 GST_QUEUE2_MUTEX_UNLOCK (queue);
1840 /* step 2, make sure streaming finishes */
1841 result = gst_pad_stop_task (pad);
1842 }
1844 gst_object_unref (queue);
1846 return result;
1847 }
1849 /* pull mode, downstream will call our getrange function */
1850 static gboolean
1851 gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
1852 {
1853 gboolean result;
1854 GstQueue2 *queue;
1856 queue = GST_QUEUE2 (gst_pad_get_parent (pad));
1858 if (active) {
1859 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1860 GST_QUEUE2_MUTEX_LOCK (queue);
1861 GST_DEBUG_OBJECT (queue, "activating pull mode");
1862 queue->srcresult = GST_FLOW_OK;
1863 queue->is_eos = FALSE;
1864 queue->unexpected = FALSE;
1865 result = TRUE;
1866 GST_QUEUE2_MUTEX_UNLOCK (queue);
1867 } else {
1868 GST_QUEUE2_MUTEX_LOCK (queue);
1869 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
1870 /* this is not allowed, we cannot operate in pull mode without a temp
1871 * file. */
1872 queue->srcresult = GST_FLOW_WRONG_STATE;
1873 result = FALSE;
1874 GST_QUEUE2_MUTEX_UNLOCK (queue);
1875 }
1876 } else {
1877 GST_QUEUE2_MUTEX_LOCK (queue);
1878 GST_DEBUG_OBJECT (queue, "deactivating pull mode");
1879 queue->srcresult = GST_FLOW_WRONG_STATE;
1880 /* this will unlock getrange */
1881 g_cond_signal (queue->item_add);
1882 result = TRUE;
1883 GST_QUEUE2_MUTEX_UNLOCK (queue);
1884 }
1885 gst_object_unref (queue);
1887 return result;
1888 }
1890 static GstStateChangeReturn
1891 gst_queue2_change_state (GstElement * element, GstStateChange transition)
1892 {
1893 GstQueue2 *queue;
1894 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1896 queue = GST_QUEUE2 (element);
1898 switch (transition) {
1899 case GST_STATE_CHANGE_NULL_TO_READY:
1900 break;
1901 case GST_STATE_CHANGE_READY_TO_PAUSED:
1902 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1903 if (!gst_queue2_open_temp_location_file (queue))
1904 ret = GST_STATE_CHANGE_FAILURE;
1905 }
1906 queue->segment_event_received = FALSE;
1907 queue->starting_segment = NULL;
1908 break;
1909 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1910 break;
1911 default:
1912 break;
1913 }
1915 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1917 switch (transition) {
1918 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1919 break;
1920 case GST_STATE_CHANGE_PAUSED_TO_READY:
1921 if (QUEUE_IS_USING_TEMP_FILE (queue))
1922 gst_queue2_close_temp_location_file (queue);
1923 if (queue->starting_segment != NULL) {
1924 gst_event_unref (queue->starting_segment);
1925 queue->starting_segment = NULL;
1926 }
1927 break;
1928 case GST_STATE_CHANGE_READY_TO_NULL:
1929 break;
1930 default:
1931 break;
1932 }
1934 return ret;
1935 }
1937 /* changing the capacity of the queue must wake up
1938 * the _chain function, it might have more room now
1939 * to store the buffer/event in the queue */
1940 #define QUEUE_CAPACITY_CHANGE(q)\
1941 g_cond_signal (queue->item_del);
1943 /* Changing the minimum required fill level must
1944 * wake up the _loop function as it might now
1945 * be able to preceed.
1946 */
1947 #define QUEUE_THRESHOLD_CHANGE(q)\
1948 g_cond_signal (queue->item_add);
1950 static void
1951 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
1952 {
1953 GstState state;
1955 /* the element must be stopped in order to do this */
1956 GST_OBJECT_LOCK (queue);
1957 state = GST_STATE (queue);
1958 if (state != GST_STATE_READY && state != GST_STATE_NULL)
1959 goto wrong_state;
1960 GST_OBJECT_UNLOCK (queue);
1962 /* set new location */
1963 g_free (queue->temp_template);
1964 queue->temp_template = g_strdup (template);
1966 return;
1968 /* ERROR */
1969 wrong_state:
1970 {
1971 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
1972 GST_OBJECT_UNLOCK (queue);
1973 }
1974 }
1976 static void
1977 gst_queue2_set_property (GObject * object,
1978 guint prop_id, const GValue * value, GParamSpec * pspec)
1979 {
1980 GstQueue2 *queue = GST_QUEUE2 (object);
1982 /* someone could change levels here, and since this
1983 * affects the get/put funcs, we need to lock for safety. */
1984 GST_QUEUE2_MUTEX_LOCK (queue);
1986 switch (prop_id) {
1987 case PROP_MAX_SIZE_BYTES:
1988 queue->max_level.bytes = g_value_get_uint (value);
1989 QUEUE_CAPACITY_CHANGE (queue);
1990 break;
1991 case PROP_MAX_SIZE_BUFFERS:
1992 queue->max_level.buffers = g_value_get_uint (value);
1993 QUEUE_CAPACITY_CHANGE (queue);
1994 break;
1995 case PROP_MAX_SIZE_TIME:
1996 queue->max_level.time = g_value_get_uint64 (value);
1997 /* set rate_time to the same value. We use an extra field in the level
1998 * structure so that we can easily access and compare it */
1999 queue->max_level.rate_time = queue->max_level.time;
2000 QUEUE_CAPACITY_CHANGE (queue);
2001 break;
2002 case PROP_USE_BUFFERING:
2003 queue->use_buffering = g_value_get_boolean (value);
2004 break;
2005 case PROP_USE_RATE_ESTIMATE:
2006 queue->use_rate_estimate = g_value_get_boolean (value);
2007 break;
2008 case PROP_LOW_PERCENT:
2009 queue->low_percent = g_value_get_int (value);
2010 break;
2011 case PROP_HIGH_PERCENT:
2012 queue->high_percent = g_value_get_int (value);
2013 break;
2014 case PROP_TEMP_TEMPLATE:
2015 gst_queue2_set_temp_template (queue, g_value_get_string (value));
2016 break;
2017 case PROP_TEMP_LOCATION:
2018 g_free (queue->temp_location);
2019 queue->temp_location = g_value_dup_string (value);
2020 /* you can set the property back to NULL to make it use the temp-tmpl
2021 * property. */
2022 queue->temp_location_set = queue->temp_location != NULL;
2023 break;
2024 default:
2025 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2026 break;
2027 }
2029 GST_QUEUE2_MUTEX_UNLOCK (queue);
2030 }
2032 static void
2033 gst_queue2_get_property (GObject * object,
2034 guint prop_id, GValue * value, GParamSpec * pspec)
2035 {
2036 GstQueue2 *queue = GST_QUEUE2 (object);
2038 GST_QUEUE2_MUTEX_LOCK (queue);
2040 switch (prop_id) {
2041 case PROP_CUR_LEVEL_BYTES:
2042 g_value_set_uint (value, queue->cur_level.bytes);
2043 break;
2044 case PROP_CUR_LEVEL_BUFFERS:
2045 g_value_set_uint (value, queue->cur_level.buffers);
2046 break;
2047 case PROP_CUR_LEVEL_TIME:
2048 g_value_set_uint64 (value, queue->cur_level.time);
2049 break;
2050 case PROP_MAX_SIZE_BYTES:
2051 g_value_set_uint (value, queue->max_level.bytes);
2052 break;
2053 case PROP_MAX_SIZE_BUFFERS:
2054 g_value_set_uint (value, queue->max_level.buffers);
2055 break;
2056 case PROP_MAX_SIZE_TIME:
2057 g_value_set_uint64 (value, queue->max_level.time);
2058 break;
2059 case PROP_USE_BUFFERING:
2060 g_value_set_boolean (value, queue->use_buffering);
2061 break;
2062 case PROP_USE_RATE_ESTIMATE:
2063 g_value_set_boolean (value, queue->use_rate_estimate);
2064 break;
2065 case PROP_LOW_PERCENT:
2066 g_value_set_int (value, queue->low_percent);
2067 break;
2068 case PROP_HIGH_PERCENT:
2069 g_value_set_int (value, queue->high_percent);
2070 break;
2071 case PROP_TEMP_TEMPLATE:
2072 g_value_set_string (value, queue->temp_template);
2073 break;
2074 case PROP_TEMP_LOCATION:
2075 g_value_set_string (value, queue->temp_location);
2076 break;
2077 default:
2078 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2079 break;
2080 }
2082 GST_QUEUE2_MUTEX_UNLOCK (queue);
2083 }