f6b11177cb360903340875caee2fbfb43607c937
1 /* GStreamer
2 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3 *
4 * gstdataqueue.c:
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
20 */
22 /**
23 * SECTION:gstdataqueue
24 * @short_description: Threadsafe queueing object
25 *
26 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
27 * also provides size-related functionality. This object should be used for
28 * any #GstElement that wishes to provide some sort of queueing functionality.
29 *
30 * Since: 0.10.11
31 */
33 #include <gst/gst.h>
34 #include "string.h"
35 #include "gstdataqueue.h"
37 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
38 #define GST_CAT_DEFAULT (data_queue_debug)
39 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
42 /* Queue signals and args */
43 enum
44 {
45 SIGNAL_EMPTY,
46 SIGNAL_FULL,
47 LAST_SIGNAL
48 };
50 enum
51 {
52 ARG_0,
53 ARG_CUR_LEVEL_VISIBLE,
54 ARG_CUR_LEVEL_BYTES,
55 ARG_CUR_LEVEL_TIME
56 /* FILL ME */
57 };
59 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
60 GST_CAT_LOG (data_queue_dataflow, \
61 "locking qlock from thread %p", \
62 g_thread_self ()); \
63 g_mutex_lock (q->qlock); \
64 GST_CAT_LOG (data_queue_dataflow, \
65 "locked qlock from thread %p", \
66 g_thread_self ()); \
67 } G_STMT_END
69 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
70 GST_DATA_QUEUE_MUTEX_LOCK (q); \
71 if (q->flushing) \
72 goto label; \
73 } G_STMT_END
75 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
76 GST_CAT_LOG (data_queue_dataflow, \
77 "unlocking qlock from thread %p", \
78 g_thread_self ()); \
79 g_mutex_unlock (q->qlock); \
80 } G_STMT_END
82 #define STATUS(q, msg) \
83 GST_CAT_LOG (data_queue_dataflow, \
84 "queue:%p " msg ": %u visible items, %u " \
85 "bytes, %"G_GUINT64_FORMAT \
86 " ns, %u elements", \
87 queue, \
88 q->cur_level.visible, \
89 q->cur_level.bytes, \
90 q->cur_level.time, \
91 q->queue->length)
93 static void gst_data_queue_finalize (GObject * object);
95 static void gst_data_queue_set_property (GObject * object,
96 guint prop_id, const GValue * value, GParamSpec * pspec);
97 static void gst_data_queue_get_property (GObject * object,
98 guint prop_id, GValue * value, GParamSpec * pspec);
100 static GObjectClass *parent_class = NULL;
101 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
103 #define _do_init \
104 { \
105 GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
106 "data queue object"); \
107 GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
108 "dataflow inside the data queue object"); \
109 }
112 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
114 static void
115 gst_data_queue_class_init (GstDataQueueClass * klass)
116 {
117 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
119 parent_class = g_type_class_peek_parent (klass);
121 gobject_class->set_property = gst_data_queue_set_property;
122 gobject_class->get_property = gst_data_queue_get_property;
124 /* signals */
125 /**
126 * GstDataQueue::empty:
127 * @queue: the queue instance
128 *
129 * Reports that the queue became empty (empty).
130 * A queue is empty if the total amount of visible items inside it (num-visible, time,
131 * size) is lower than the boundary values which can be set through the GObject
132 * properties.
133 */
134 gst_data_queue_signals[SIGNAL_EMPTY] =
135 g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
136 G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
137 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
139 /**
140 * GstDataQueue::full:
141 * @queue: the queue instance
142 *
143 * Reports that the queue became full (full).
144 * A queue is full if the total amount of data inside it (num-visible, time,
145 * size) is higher than the boundary values which can be set through the GObject
146 * properties.
147 */
148 gst_data_queue_signals[SIGNAL_FULL] =
149 g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
150 G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
151 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
153 /* properties */
154 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
155 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
156 "Current amount of data in the queue (bytes)",
157 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
158 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE,
159 g_param_spec_uint ("current-level-visible",
160 "Current level (visible items)",
161 "Current number of visible items in the queue", 0, G_MAXUINT, 0,
162 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
163 g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
164 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
165 "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
166 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
168 gobject_class->finalize = gst_data_queue_finalize;
169 }
171 static void
172 gst_data_queue_init (GstDataQueue * queue)
173 {
174 queue->cur_level.visible = 0; /* no content */
175 queue->cur_level.bytes = 0; /* no content */
176 queue->cur_level.time = 0; /* no content */
178 queue->checkfull = NULL;
180 queue->qlock = g_mutex_new ();
181 queue->item_add = g_cond_new ();
182 queue->item_del = g_cond_new ();
183 queue->queue = g_queue_new ();
185 GST_DEBUG ("initialized queue's not_empty & not_full conditions");
186 }
188 /**
189 * gst_data_queue_new_full:
190 * @checkfull: the callback used to tell if the element considers the queue full
191 * or not.
192 * @fullcallback: the callback which will be called when the queue is considered full.
193 * @emptycallback: the callback which will be called when the queue is considered empty.
194 * @checkdata: a #gpointer that will be given in the @checkfull callback.
195 *
196 * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
197 * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
198 * or @emptycallback.
199 *
200 * Returns: a new #GstDataQueue.
201 *
202 * Since: 0.10.26
203 */
205 GstDataQueue *
206 gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull,
207 GstDataQueueFullCallback fullcallback,
208 GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
209 {
210 GstDataQueue *ret;
212 g_return_val_if_fail (checkfull != NULL, NULL);
214 ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
215 ret->checkfull = checkfull;
216 ret->checkdata = checkdata;
217 ret->fullcallback = fullcallback;
218 ret->emptycallback = emptycallback;
220 return ret;
221 }
223 /**
224 * gst_data_queue_new:
225 * @checkfull: the callback used to tell if the element considers the queue full
226 * or not.
227 * @checkdata: a #gpointer that will be given in the @checkfull callback.
228 *
229 * Returns: a new #GstDataQueue.
230 */
232 GstDataQueue *
233 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata)
234 {
235 return gst_data_queue_new_full (checkfull, NULL, NULL, checkdata);
236 }
238 static void
239 gst_data_queue_cleanup (GstDataQueue * queue)
240 {
241 while (!g_queue_is_empty (queue->queue)) {
242 GstDataQueueItem *item = g_queue_pop_head (queue->queue);
244 /* Just call the destroy notify on the item */
245 item->destroy (item);
246 }
247 queue->cur_level.visible = 0;
248 queue->cur_level.bytes = 0;
249 queue->cur_level.time = 0;
250 }
252 /* called only once, as opposed to dispose */
253 static void
254 gst_data_queue_finalize (GObject * object)
255 {
256 GstDataQueue *queue = GST_DATA_QUEUE (object);
258 GST_DEBUG ("finalizing queue");
260 gst_data_queue_cleanup (queue);
261 g_queue_free (queue->queue);
263 GST_DEBUG ("free mutex");
264 g_mutex_free (queue->qlock);
265 GST_DEBUG ("done free mutex");
267 g_cond_free (queue->item_add);
268 g_cond_free (queue->item_del);
270 G_OBJECT_CLASS (parent_class)->finalize (object);
271 }
273 static inline void
274 gst_data_queue_locked_flush (GstDataQueue * queue)
275 {
276 STATUS (queue, "before flushing");
277 gst_data_queue_cleanup (queue);
278 STATUS (queue, "after flushing");
279 /* we deleted something... */
280 if (queue->abidata.ABI.waiting_del)
281 g_cond_signal (queue->item_del);
282 }
284 static inline gboolean
285 gst_data_queue_locked_is_empty (GstDataQueue * queue)
286 {
287 return (queue->queue->length == 0);
288 }
290 static inline gboolean
291 gst_data_queue_locked_is_full (GstDataQueue * queue)
292 {
293 return queue->checkfull (queue, queue->cur_level.visible,
294 queue->cur_level.bytes, queue->cur_level.time, queue->checkdata);
295 }
297 /**
298 * gst_data_queue_flush:
299 * @queue: a #GstDataQueue.
300 *
301 * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
302 * #gst_data_queue_pop will be released.
303 * MT safe.
304 *
305 * Since: 0.10.11
306 */
307 void
308 gst_data_queue_flush (GstDataQueue * queue)
309 {
310 GST_DEBUG ("queue:%p", queue);
311 GST_DATA_QUEUE_MUTEX_LOCK (queue);
312 gst_data_queue_locked_flush (queue);
313 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
314 }
316 /**
317 * gst_data_queue_is_empty:
318 * @queue: a #GstDataQueue.
319 *
320 * Queries if there are any items in the @queue.
321 * MT safe.
322 *
323 * Returns: #TRUE if @queue is empty.
324 *
325 * Since: 0.10.11
326 */
327 gboolean
328 gst_data_queue_is_empty (GstDataQueue * queue)
329 {
330 gboolean res;
332 GST_DATA_QUEUE_MUTEX_LOCK (queue);
333 res = gst_data_queue_locked_is_empty (queue);
334 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
336 return res;
337 }
339 /**
340 * gst_data_queue_is_full:
341 * @queue: a #GstDataQueue.
342 *
343 * Queries if @queue is full. This check will be done using the
344 * #GstDataQueueCheckFullFunction registered with @queue.
345 * MT safe.
346 *
347 * Returns: #TRUE if @queue is full.
348 *
349 * Since: 0.10.11
350 */
351 gboolean
352 gst_data_queue_is_full (GstDataQueue * queue)
353 {
354 gboolean res;
356 GST_DATA_QUEUE_MUTEX_LOCK (queue);
357 res = gst_data_queue_locked_is_full (queue);
358 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
360 return res;
361 }
363 /**
364 * gst_data_queue_set_flushing:
365 * @queue: a #GstDataQueue.
366 * @flushing: a #gboolean stating if the queue will be flushing or not.
367 *
368 * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
369 * state, any incoming data on the @queue will be discarded. Any call currently
370 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
371 * away with a return value of #FALSE. While the @queue is in flushing state,
372 * all calls to those two functions will return #FALSE.
373 *
374 * MT Safe.
375 *
376 * Since: 0.10.11
377 */
378 void
379 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
380 {
381 GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
383 GST_DATA_QUEUE_MUTEX_LOCK (queue);
384 queue->flushing = flushing;
385 if (flushing) {
386 /* release push/pop functions */
387 if (queue->abidata.ABI.waiting_add)
388 g_cond_signal (queue->item_add);
389 if (queue->abidata.ABI.waiting_del)
390 g_cond_signal (queue->item_del);
391 }
392 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
393 }
395 /**
396 * gst_data_queue_push:
397 * @queue: a #GstDataQueue.
398 * @item: a #GstDataQueueItem.
399 *
400 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
401 * on the @queue. If the @queue is full, the call will block until space is
402 * available, OR the @queue is set to flushing state.
403 * MT safe.
404 *
405 * Note that this function has slightly different semantics than gst_pad_push()
406 * and gst_pad_push_event(): this function only takes ownership of @item and
407 * the #GstMiniObject contained in @item if the push was successful. If FALSE
408 * is returned, the caller is responsible for freeing @item and its contents.
409 *
410 * Returns: #TRUE if the @item was successfully pushed on the @queue.
411 *
412 * Since: 0.10.11
413 */
414 gboolean
415 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
416 {
417 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
418 g_return_val_if_fail (item != NULL, FALSE);
420 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
422 STATUS (queue, "before pushing");
424 /* We ALWAYS need to check for queue fillness */
425 if (gst_data_queue_locked_is_full (queue)) {
426 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
427 if (G_LIKELY (queue->fullcallback))
428 queue->fullcallback (queue, queue->checkdata);
429 else
430 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
431 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
433 /* signal might have removed some items */
434 while (gst_data_queue_locked_is_full (queue)) {
435 queue->abidata.ABI.waiting_del = TRUE;
436 g_cond_wait (queue->item_del, queue->qlock);
437 queue->abidata.ABI.waiting_del = FALSE;
438 if (queue->flushing)
439 goto flushing;
440 }
441 }
443 g_queue_push_tail (queue->queue, item);
445 if (item->visible)
446 queue->cur_level.visible++;
447 queue->cur_level.bytes += item->size;
448 queue->cur_level.time += item->duration;
450 STATUS (queue, "after pushing");
451 if (queue->abidata.ABI.waiting_add)
452 g_cond_signal (queue->item_add);
454 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
456 return TRUE;
458 /* ERRORS */
459 flushing:
460 {
461 GST_DEBUG ("queue:%p, we are flushing", queue);
462 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
463 return FALSE;
464 }
465 }
467 /**
468 * gst_data_queue_pop:
469 * @queue: a #GstDataQueue.
470 * @item: pointer to store the returned #GstDataQueueItem.
471 *
472 * Retrieves the first @item available on the @queue. If the queue is currently
473 * empty, the call will block until at least one item is available, OR the
474 * @queue is set to the flushing state.
475 * MT safe.
476 *
477 * Returns: #TRUE if an @item was successfully retrieved from the @queue.
478 *
479 * Since: 0.10.11
480 */
481 gboolean
482 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
483 {
484 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
485 g_return_val_if_fail (item != NULL, FALSE);
487 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
489 STATUS (queue, "before popping");
491 if (gst_data_queue_locked_is_empty (queue)) {
492 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
493 if (G_LIKELY (queue->emptycallback))
494 queue->emptycallback (queue, queue->checkdata);
495 else
496 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
497 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
499 while (gst_data_queue_locked_is_empty (queue)) {
500 queue->abidata.ABI.waiting_add = TRUE;
501 g_cond_wait (queue->item_add, queue->qlock);
502 queue->abidata.ABI.waiting_add = FALSE;
503 if (queue->flushing)
504 goto flushing;
505 }
506 }
508 /* Get the item from the GQueue */
509 *item = g_queue_pop_head (queue->queue);
511 /* update current level counter */
512 if ((*item)->visible)
513 queue->cur_level.visible--;
514 queue->cur_level.bytes -= (*item)->size;
515 queue->cur_level.time -= (*item)->duration;
517 STATUS (queue, "after popping");
518 if (queue->abidata.ABI.waiting_del)
519 g_cond_signal (queue->item_del);
521 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
523 return TRUE;
525 /* ERRORS */
526 flushing:
527 {
528 GST_DEBUG ("queue:%p, we are flushing", queue);
529 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
530 return FALSE;
531 }
532 }
534 /**
535 * gst_data_queue_drop_head:
536 * @queue: The #GstDataQueue to drop an item from.
537 * @type: The #GType of the item to drop.
538 *
539 * Pop and unref the head-most #GstMiniObject with the given #GType.
540 *
541 * Returns: TRUE if an element was removed.
542 *
543 * Since: 0.10.11
544 */
545 gboolean
546 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
547 {
548 gboolean res = FALSE;
549 GList *item;
550 GstDataQueueItem *leak = NULL;
552 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
554 GST_DEBUG ("queue:%p", queue);
556 GST_DATA_QUEUE_MUTEX_LOCK (queue);
557 for (item = g_queue_peek_head_link (queue->queue); item; item = item->next) {
558 GstDataQueueItem *tmp = (GstDataQueueItem *) item->data;
560 if (G_TYPE_CHECK_INSTANCE_TYPE (tmp->object, type)) {
561 leak = tmp;
562 break;
563 }
564 }
566 if (!leak)
567 goto done;
569 g_queue_delete_link (queue->queue, item);
571 if (leak->visible)
572 queue->cur_level.visible--;
573 queue->cur_level.bytes -= leak->size;
574 queue->cur_level.time -= leak->duration;
576 leak->destroy (leak);
578 res = TRUE;
580 done:
581 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
583 GST_DEBUG ("queue:%p , res:%d", queue, res);
585 return res;
586 }
588 /**
589 * gst_data_queue_limits_changed:
590 * @queue: The #GstDataQueue
591 *
592 * Inform the queue that the limits for the fullness check have changed and that
593 * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
594 *
595 * Since: 0.10.11
596 */
597 void
598 gst_data_queue_limits_changed (GstDataQueue * queue)
599 {
600 g_return_if_fail (GST_IS_DATA_QUEUE (queue));
602 GST_DATA_QUEUE_MUTEX_LOCK (queue);
603 if (queue->abidata.ABI.waiting_del) {
604 GST_DEBUG ("signal del");
605 g_cond_signal (queue->item_del);
606 }
607 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
608 }
610 /**
611 * gst_data_queue_get_level:
612 * @queue: The #GstDataQueue
613 * @level: the location to store the result
614 *
615 * Get the current level of the queue.
616 *
617 * Since: 0.10.11
618 */
619 void
620 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
621 {
622 memcpy (level, (&queue->cur_level), sizeof (GstDataQueueSize));
623 }
625 static void
626 gst_data_queue_set_property (GObject * object,
627 guint prop_id, const GValue * value, GParamSpec * pspec)
628 {
629 switch (prop_id) {
630 default:
631 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
632 break;
633 }
634 }
636 static void
637 gst_data_queue_get_property (GObject * object,
638 guint prop_id, GValue * value, GParamSpec * pspec)
639 {
640 GstDataQueue *queue = GST_DATA_QUEUE (object);
642 GST_DATA_QUEUE_MUTEX_LOCK (queue);
644 switch (prop_id) {
645 case ARG_CUR_LEVEL_BYTES:
646 g_value_set_uint (value, queue->cur_level.bytes);
647 break;
648 case ARG_CUR_LEVEL_VISIBLE:
649 g_value_set_uint (value, queue->cur_level.visible);
650 break;
651 case ARG_CUR_LEVEL_TIME:
652 g_value_set_uint64 (value, queue->cur_level.time);
653 break;
654 default:
655 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
656 break;
657 }
659 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
660 }