1 /* GStreamer
2 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3 *
4 * gstmultiqueue.c:
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
20 */
22 #ifdef HAVE_CONFIG_H
23 # include "config.h"
24 #endif
26 #include <gst/gst.h>
27 #include "gstmultiqueue.h"
29 /**
30 * GstSingleQueue:
31 * @sinkpad: associated sink #GstPad
32 * @srcpad: associated source #GstPad
33 *
34 * Structure containing all information and properties about
35 * a single queue.
36 */
38 typedef struct _GstSingleQueue GstSingleQueue;
40 struct _GstSingleQueue
41 {
42 /* unique identifier of the queue */
43 guint id;
45 GstMultiQueue *mqueue;
47 GstPad *sinkpad;
48 GstPad *srcpad;
50 /* flowreturn of previous srcpad push */
51 GstFlowReturn srcresult;
53 /* queue of data */
54 GstDataQueue *queue;
55 GstDataQueueSize max_size, extra_size;
56 gboolean inextra; /* TRUE if the queue is currently in extradata mode */
58 /* Protected by global lock */
59 guint32 nextid; /* ID of the next object waiting to be pushed */
60 guint32 oldid; /* ID of the last object pushed (last in a series) */
61 GCond *turn; /* SingleQueue turn waiting conditional */
62 };
65 /* Extension of GstDataQueueItem structure for our usage */
66 typedef struct _GstMultiQueueItem GstMultiQueueItem;
68 struct _GstMultiQueueItem
69 {
70 GstMiniObject *object;
71 guint size;
72 guint64 duration;
73 gboolean visible;
75 GDestroyNotify destroy;
76 guint32 posid;
77 };
79 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
81 static void wake_up_next_non_linked (GstMultiQueue * mq);
82 static void compute_next_non_linked (GstMultiQueue * mq);
84 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
85 GST_PAD_SINK,
86 GST_PAD_REQUEST,
87 GST_STATIC_CAPS_ANY);
89 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
90 GST_PAD_SRC,
91 GST_PAD_SOMETIMES,
92 GST_STATIC_CAPS_ANY);
94 GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
95 #define GST_CAT_DEFAULT (multi_queue_debug)
97 static const GstElementDetails gst_multi_queue_details =
98 GST_ELEMENT_DETAILS ("MultiQueue",
99 "Generic",
100 "Multiple data queue",
101 "Edward Hervey <edward@fluendo.com>");
103 #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
104 #define DEFAULT_MAX_SIZE_BUFFERS 200
105 #define DEFAULT_MAX_SIZE_TIME GST_SECOND
107 #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
108 #define DEFAULT_EXTRA_SIZE_BUFFERS 200
109 #define DEFAULT_EXTRA_SIZE_TIME GST_SECOND
111 /* Signals and args */
112 enum
113 {
114 SIGNAL_UNDERRUN,
115 SIGNAL_OVERRUN,
116 LAST_SIGNAL
117 };
119 enum
120 {
121 ARG_0,
122 ARG_EXTRA_SIZE_BYTES,
123 ARG_EXTRA_SIZE_BUFFERS,
124 ARG_EXTRA_SIZE_TIME,
125 ARG_MAX_SIZE_BYTES,
126 ARG_MAX_SIZE_BUFFERS,
127 ARG_MAX_SIZE_TIME,
128 };
130 #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
131 GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
132 "locking qlock from thread %p", \
133 g_thread_self ()); \
134 g_mutex_lock (q->qlock); \
135 GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
136 "locked qlock from thread %p", \
137 g_thread_self ()); \
138 } G_STMT_END
140 #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
141 GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
142 "unlocking qlock from thread %p", \
143 g_thread_self ()); \
144 g_mutex_unlock (q->qlock); \
145 } G_STMT_END
147 static void gst_multi_queue_finalize (GObject * object);
148 static void gst_multi_queue_set_property (GObject * object,
149 guint prop_id, const GValue * value, GParamSpec * pspec);
150 static void gst_multi_queue_get_property (GObject * object,
151 guint prop_id, GValue * value, GParamSpec * pspec);
153 static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
154 GstPadTemplate * temp, const gchar * name);
155 static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
157 #define _do_init(bla) \
158 GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
160 GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
161 GST_TYPE_ELEMENT, _do_init);
163 static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
165 static void
166 gst_multi_queue_base_init (gpointer g_class)
167 {
168 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
170 gst_element_class_add_pad_template (gstelement_class,
171 gst_static_pad_template_get (&sinktemplate));
172 gst_element_class_add_pad_template (gstelement_class,
173 gst_static_pad_template_get (&srctemplate));
174 gst_element_class_set_details (gstelement_class, &gst_multi_queue_details);
175 }
177 static void
178 gst_multi_queue_class_init (GstMultiQueueClass * klass)
179 {
180 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
181 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
183 gobject_class->set_property =
184 GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
185 gobject_class->get_property =
186 GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
188 /* SIGNALS */
189 gst_multi_queue_signals[SIGNAL_UNDERRUN] =
190 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
191 G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
192 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
194 gst_multi_queue_signals[SIGNAL_OVERRUN] =
195 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
196 G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
197 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
199 /* PROPERTIES */
201 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
202 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
203 "Max. amount of data in the queue (bytes, 0=disable)",
204 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
205 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
206 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
207 "Max. number of buffers in the queue (0=disable)",
208 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
209 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
210 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
211 "Max. amount of data in the queue (in ns, 0=disable)",
212 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
214 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
215 g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
216 "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
217 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
218 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
219 g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
220 "Amount of buffers the queues can grow if one of them is empty (0=disable)",
221 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
222 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
223 g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
224 "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
225 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
227 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
229 gstelement_class->request_new_pad =
230 GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
231 gstelement_class->release_pad =
232 GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
233 }
235 static void
236 gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
237 {
239 mqueue->nbqueues = 0;
240 mqueue->queues = NULL;
242 mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
243 mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
244 mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
246 mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
247 mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
248 mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
250 mqueue->counter = 0;
251 mqueue->highid = -1;
252 mqueue->nextnotlinked = -1;
254 mqueue->qlock = g_mutex_new ();
256 /* FILLME ? */
257 }
259 static void
260 gst_multi_queue_finalize (GObject * object)
261 {
262 GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
263 GList *tmp = mqueue->queues;
265 /* FILLME ? */
266 /* DRAIN QUEUES */
267 while (tmp) {
268 GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
270 gst_data_queue_flush (sq->queue);
271 g_object_unref (G_OBJECT (sq->queue));
273 tmp = g_list_next (tmp);
274 }
275 g_list_free (mqueue->queues);
277 /* free/unref instance data */
278 g_mutex_free (mqueue->qlock);
280 if (G_OBJECT_CLASS (parent_class)->finalize)
281 G_OBJECT_CLASS (parent_class)->finalize (object);
282 }
284 #define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \
285 GList * tmp = mq->queues; \
286 while (tmp) { \
287 GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
288 g_object_set_property ((GObject*) q->queue, name, value); \
289 tmp = g_list_next(tmp); \
290 }; \
291 } G_STMT_END
293 static void
294 gst_multi_queue_set_property (GObject * object, guint prop_id,
295 const GValue * value, GParamSpec * pspec)
296 {
297 GstMultiQueue *mq = GST_MULTI_QUEUE (object);
299 switch (prop_id) {
300 case ARG_MAX_SIZE_BYTES:
301 mq->max_size.bytes = g_value_get_uint (value);
302 SET_CHILD_PROPERTY (mq, "max-size-bytes", value);
303 break;
304 case ARG_MAX_SIZE_BUFFERS:
305 mq->max_size.visible = g_value_get_uint (value);
306 SET_CHILD_PROPERTY (mq, "max-size-visible", value);
307 break;
308 case ARG_MAX_SIZE_TIME:
309 mq->max_size.time = g_value_get_uint64 (value);
310 SET_CHILD_PROPERTY (mq, "max-size-time", value);
311 break;
312 case ARG_EXTRA_SIZE_BYTES:
313 mq->extra_size.bytes = g_value_get_uint (value);
314 break;
315 case ARG_EXTRA_SIZE_BUFFERS:
316 mq->extra_size.visible = g_value_get_uint (value);
317 break;
318 case ARG_EXTRA_SIZE_TIME:
319 mq->extra_size.time = g_value_get_uint64 (value);
320 break;
321 default:
322 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
323 break;
324 }
326 }
328 static void
329 gst_multi_queue_get_property (GObject * object, guint prop_id,
330 GValue * value, GParamSpec * pspec)
331 {
332 GstMultiQueue *mq = GST_MULTI_QUEUE (object);
334 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
336 switch (prop_id) {
337 case ARG_EXTRA_SIZE_BYTES:
338 g_value_set_uint (value, mq->extra_size.bytes);
339 break;
340 case ARG_EXTRA_SIZE_BUFFERS:
341 g_value_set_uint (value, mq->extra_size.visible);
342 break;
343 case ARG_EXTRA_SIZE_TIME:
344 g_value_set_uint64 (value, mq->extra_size.time);
345 break;
346 case ARG_MAX_SIZE_BYTES:
347 g_value_set_uint (value, mq->max_size.bytes);
348 break;
349 case ARG_MAX_SIZE_BUFFERS:
350 g_value_set_uint (value, mq->max_size.visible);
351 break;
352 case ARG_MAX_SIZE_TIME:
353 g_value_set_uint64 (value, mq->max_size.time);
354 break;
355 default:
356 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
357 break;
358 }
360 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
361 }
364 /*
365 * GstElement methods
366 */
368 static GstPad *
369 gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
370 const gchar * name)
371 {
372 GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
373 GstSingleQueue *squeue;
375 GST_LOG_OBJECT (element, "name : %s", name);
377 /* Create a new single queue, add the sink and source pad and return the sink pad */
378 squeue = gst_single_queue_new (mqueue);
380 GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
381 mqueue->queues = g_list_append (mqueue->queues, squeue);
382 GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
384 GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
385 GST_DEBUG_PAD_NAME (squeue->sinkpad));
387 return squeue->sinkpad;
388 }
390 static void
391 gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
392 {
393 GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
394 GstSingleQueue *sq = NULL;
395 GList *tmp;
397 GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
399 GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
400 /* Find which single queue it belongs to, knowing that it should be a sinkpad */
401 for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
402 sq = (GstSingleQueue *) tmp->data;
404 if (sq->sinkpad == pad)
405 break;
406 }
408 if (!tmp) {
409 GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
410 return;
411 }
413 /* remove it from the list */
414 mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
416 /* delete SingleQueue */
417 gst_data_queue_set_flushing (sq->queue, TRUE);
418 gst_data_queue_flush (sq->queue);
420 g_object_unref (G_OBJECT (sq->queue));
422 gst_element_remove_pad (element, sq->srcpad);
423 gst_element_remove_pad (element, sq->sinkpad);
425 /* FIXME : recompute next-non-linked */
426 GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
427 }
429 static gboolean
430 gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
431 GstMiniObject * object)
432 {
433 if (GST_IS_BUFFER (object)) {
434 sq->srcresult = gst_pad_push (sq->srcpad, GST_BUFFER (object));
436 if ((sq->srcresult != GST_FLOW_OK)
437 && (sq->srcresult != GST_FLOW_NOT_LINKED)) {
438 GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, reason %s",
439 sq->id, gst_flow_get_name (sq->srcresult));
440 gst_data_queue_set_flushing (sq->queue, TRUE);
441 gst_pad_pause_task (sq->srcpad);
442 }
443 } else if (GST_IS_EVENT (object)) {
444 if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
445 sq->srcresult = GST_FLOW_UNEXPECTED;
447 GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, got EOS",
448 sq->id);
449 gst_data_queue_set_flushing (sq->queue, TRUE);
450 gst_pad_pause_task (sq->srcpad);
451 }
452 gst_pad_push_event (sq->srcpad, GST_EVENT (object));
453 } else {
454 g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
455 sq->id);
456 }
458 return FALSE;
459 }
461 static void
462 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
463 {
464 gst_mini_object_unref (item->object);
465 g_free (item);
466 }
468 static GstMultiQueueItem *
469 gst_multi_queue_item_new (GstMiniObject * object)
470 {
471 GstMultiQueueItem *item;
473 item = g_new0 (GstMultiQueueItem, 1);
474 item->object = gst_mini_object_ref (object);
475 item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
477 if (GST_IS_BUFFER (object)) {
478 item->size = GST_BUFFER_SIZE (object);
479 item->duration = GST_BUFFER_DURATION (object);
480 if (item->duration == GST_CLOCK_TIME_NONE)
481 item->duration = 0;
482 item->visible = TRUE;
483 }
485 return item;
486 }
488 static void
489 gst_multi_queue_loop (GstPad * pad)
490 {
491 GstSingleQueue *sq;
492 GstMultiQueueItem *item;
493 GstDataQueueItem *sitem;
494 GstMultiQueue *mq;
495 GstMiniObject *object;
496 guint32 newid;
497 guint32 oldid = -1;
499 sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
500 mq = sq->mqueue;
502 restart:
503 GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
504 if (!(gst_data_queue_pop (sq->queue, &sitem))) {
505 /* QUEUE FLUSHING */
506 if (sq->srcresult != GST_FLOW_OK)
507 goto out_flushing;
508 else
509 GST_WARNING_OBJECT (mq,
510 "data_queue_pop() returned FALSE, but srcresult == GST_FLOW_OK !");
511 } else {
512 item = (GstMultiQueueItem *) sitem;
513 newid = item->posid;
514 object = item->object;
516 GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
517 sq->id, newid, oldid);
519 /* 1. Only check turn if :
520 * _ We haven't pushed anything yet
521 * _ OR the new id isn't the follower of the previous one (continuous segment) */
522 if ((oldid == -1) || (newid != (oldid + 1))) {
523 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
525 GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
526 gst_flow_get_name (sq->srcresult));
528 /* preamble : if we're not linked, set the newid as the next one we want */
529 if (sq->srcresult == GST_FLOW_NOT_LINKED)
530 sq->nextid = newid;
532 /* store the last id we outputted */
533 if (oldid != -1)
534 sq->oldid = oldid;
536 /* 2. If there's a queue waiting to push, wake it up. If it's us the */
537 /* check below (3.) will avoid us waiting. */
538 wake_up_next_non_linked (mq);
540 /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted
541 * _ Update global next-not-linked
542 * _ Wait on our conditional
543 */
544 while ((sq->srcresult == GST_FLOW_NOT_LINKED)
545 && (mq->nextnotlinked != sq->id)) {
546 compute_next_non_linked (mq);
547 g_cond_wait (sq->turn, mq->qlock);
548 }
550 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
552 /* 4. Check again status, maybe we're flushing */
553 if ((sq->srcresult != GST_FLOW_OK)
554 && (sq->srcresult != GST_FLOW_NOT_LINKED)) {
555 gst_multi_queue_item_destroy (item);
556 goto out_flushing;
557 }
558 }
560 GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
561 gst_flow_get_name (sq->srcresult));
563 /* 4. Try to push out the new object */
564 gst_single_queue_push_one (mq, sq, object);
566 GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
567 gst_flow_get_name (sq->srcresult));
569 gst_multi_queue_item_destroy (item);
570 oldid = newid;
572 /* 5. if GstFlowReturn is non-fatal, goto restart */
573 if ((sq->srcresult == GST_FLOW_OK)
574 || (sq->srcresult == GST_FLOW_NOT_LINKED))
575 goto restart;
576 }
579 beach:
580 return;
582 out_flushing:
583 {
584 gst_pad_pause_task (sq->srcpad);
585 GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
586 "SingleQueue[%d] task paused, reason:%s",
587 sq->id, gst_flow_get_name (sq->srcresult));
588 goto beach;
589 }
590 }
593 /**
594 * gst_multi_queue_chain:
595 *
596 * This is similar to GstQueue's chain function, except:
597 * _ we don't have leak behavioures,
598 * _ we push with a unique id (curid)
599 */
601 static GstFlowReturn
602 gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
603 {
604 GstSingleQueue *sq;
605 GstMultiQueue *mq;
606 GstMultiQueueItem *item;
607 GstFlowReturn ret = GST_FLOW_OK;
608 guint32 curid;
610 sq = gst_pad_get_element_private (pad);
611 mq = (GstMultiQueue *) gst_pad_get_parent (pad);
613 /* Get an unique incrementing id */
614 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
615 curid = mq->counter++;
616 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
618 GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d",
619 sq->id, curid);
621 item = gst_multi_queue_item_new ((GstMiniObject *) buffer);
622 item->posid = curid;
624 if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) {
625 GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
626 sq->id, gst_flow_get_name (sq->srcresult));
627 gst_multi_queue_item_destroy (item);
628 gst_buffer_unref (buffer);
629 ret = sq->srcresult;
630 }
632 gst_object_unref (mq);
633 return ret;
634 }
636 static gboolean
637 gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
638 {
639 GstSingleQueue *sq;
641 sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
643 if (active)
644 sq->srcresult = GST_FLOW_OK;
645 else {
646 sq->srcresult = GST_FLOW_WRONG_STATE;
647 gst_data_queue_flush (sq->queue);
648 }
650 return TRUE;
651 }
653 static gboolean
654 gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
655 {
656 GstSingleQueue *sq;
657 GstMultiQueue *mq;
658 guint32 curid;
659 GstMultiQueueItem *item;
661 sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
662 mq = (GstMultiQueue *) gst_pad_get_parent (pad);
664 switch (GST_EVENT_TYPE (event)) {
665 case GST_EVENT_FLUSH_START:
666 GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
667 sq->id);
669 gst_pad_push_event (sq->srcpad, event);
671 sq->srcresult = GST_FLOW_WRONG_STATE;
672 gst_data_queue_set_flushing (sq->queue, TRUE);
674 /* wake up non-linked task */
675 GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
676 sq->id);
677 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
678 g_cond_signal (sq->turn);
679 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
681 gst_pad_pause_task (sq->srcpad);
682 goto done;
684 case GST_EVENT_FLUSH_STOP:
685 GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
686 sq->id);
688 gst_pad_push_event (sq->srcpad, event);
690 gst_data_queue_flush (sq->queue);
691 gst_data_queue_set_flushing (sq->queue, FALSE);
692 sq->srcresult = GST_FLOW_OK;
693 sq->nextid = -1;
694 sq->oldid = -1;
696 GST_DEBUG_OBJECT (mq, "SingleQueue %d : restarting task", sq->id);
697 gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
698 sq->srcpad);
699 goto done;
701 default:
702 if (!(GST_EVENT_IS_SERIALIZED (event))) {
703 gst_pad_push_event (sq->srcpad, event);
704 goto done;
705 }
706 break;
707 }
709 /* Get an unique incrementing id */
710 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
711 curid = mq->counter++;
712 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
714 item = gst_multi_queue_item_new ((GstMiniObject *) event);
715 item->posid = curid;
717 GST_DEBUG_OBJECT (mq,
718 "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event,
719 GST_EVENT_TYPE_NAME (event), curid);
721 if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) {
722 GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
723 sq->id, gst_flow_get_name (sq->srcresult));
724 gst_multi_queue_item_destroy (item);
725 gst_event_unref (event);
726 }
728 done:
729 gst_object_unref (mq);
730 return TRUE;
731 }
733 static GstCaps *
734 gst_multi_queue_getcaps (GstPad * pad)
735 {
736 GstSingleQueue *sq = gst_pad_get_element_private (pad);
737 GstPad *otherpad;
738 GstCaps *result;
740 GST_LOG_OBJECT (pad, "...");
742 otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
744 GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
746 result = gst_pad_peer_get_caps (otherpad);
747 if (result == NULL)
748 result = gst_caps_new_any ();
750 return result;
751 }
753 static GstFlowReturn
754 gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
755 GstCaps * caps, GstBuffer ** buf)
756 {
757 GstSingleQueue *sq = gst_pad_get_element_private (pad);
759 return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
760 }
762 static gboolean
763 gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
764 {
765 GstMultiQueue *mq;
766 GstSingleQueue *sq;
767 gboolean result = FALSE;
769 sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
770 mq = sq->mqueue;
772 GST_LOG ("SingleQueue %d", sq->id);
774 if (active) {
775 sq->srcresult = GST_FLOW_OK;
776 gst_data_queue_set_flushing (sq->queue, FALSE);
777 result = gst_pad_start_task (pad, (GstTaskFunction) gst_multi_queue_loop,
778 pad);
779 } else {
780 /* 1. unblock loop function */
781 sq->srcresult = GST_FLOW_WRONG_STATE;
782 gst_data_queue_set_flushing (sq->queue, TRUE);
784 /* 2. unblock potentially non-linked pad */
785 GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
786 sq->id);
787 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
788 g_cond_signal (sq->turn);
789 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
791 /* 3. make sure streaming finishes */
792 result = gst_pad_stop_task (pad);
793 gst_data_queue_set_flushing (sq->queue, FALSE);
794 }
796 return result;
797 }
799 static gboolean
800 gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
801 {
802 return TRUE;
803 }
805 static gboolean
806 gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
807 {
808 GstSingleQueue *sq = gst_pad_get_element_private (pad);
810 return gst_pad_push_event (sq->sinkpad, event);
811 }
813 static gboolean
814 gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
815 {
816 GstSingleQueue *sq = gst_pad_get_element_private (pad);
817 GstPad *peerpad;
818 gboolean res;
820 /* FILLME */
821 /* Handle position offset depending on queue size */
823 /* default handling */
824 if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
825 goto no_peer;
827 res = gst_pad_query (peerpad, query);
829 gst_object_unref (peerpad);
831 return res;
833 no_peer:
834 {
835 GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
836 return FALSE;
837 }
838 }
841 /*
842 * Next-non-linked functions
843 */
845 /* WITH LOCK TAKEN */
846 static void
847 wake_up_next_non_linked (GstMultiQueue * mq)
848 {
849 GList *tmp;
851 GST_LOG ("mq->nextnotlinked:%d", mq->nextnotlinked);
853 /* maybe no-one is waiting */
854 if (mq->nextnotlinked == -1)
855 return;
857 /* Else figure out which singlequeue it is and wake it up */
858 for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
859 GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
861 if (sq->srcresult == GST_FLOW_NOT_LINKED)
862 if (sq->id == mq->nextnotlinked) {
863 GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
864 g_cond_signal (sq->turn);
865 return;
866 }
867 }
868 }
870 /* WITH LOCK TAKEN */
871 static void
872 compute_next_non_linked (GstMultiQueue * mq)
873 {
874 GList *tmp;
875 guint32 lowest = G_MAXUINT32;
876 gint nextid = -1;
878 for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
879 GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
881 GST_LOG ("inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
882 sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
884 if (sq->srcresult == GST_FLOW_NOT_LINKED)
885 if (lowest > sq->nextid) {
886 lowest = sq->nextid;
887 nextid = sq->id;
888 }
890 /* If we don't have a global highid, or the global highid is lower than */
891 /* this single queue's last outputted id, store the queue's one */
892 if ((mq->highid == -1) || (mq->highid < sq->oldid))
893 mq->highid = sq->oldid;
894 }
896 mq->nextnotlinked = nextid;
897 GST_LOG_OBJECT (mq,
898 "Next-non-linked is sq #%d with nextid : %d. Highid is now : %d", nextid,
899 lowest, mq->highid);
900 }
902 /*
903 * GstSingleQueue functions
904 */
906 static void
907 single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
908 {
909 GstMultiQueue *mq = sq->mqueue;
910 GList *tmp;
912 GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id);
914 if (!sq->inextra) {
915 /* Check if at least one other queue is empty... */
916 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
917 for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
918 GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
920 if (gst_data_queue_is_empty (ssq->queue)) {
921 /* ... if so set sq->inextra to TRUE and don't emit overrun signal */
922 GST_DEBUG_OBJECT (mq,
923 "Another queue is empty, bumping single queue into extra data mode");
924 sq->inextra = TRUE;
925 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
926 goto beach;
927 }
928 }
929 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
930 }
932 /* Overrun is always forwarded, since this is blocking the upstream element */
933 g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN],
934 0);
936 beach:
937 return;
938 }
940 static void
941 single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
942 {
943 gboolean empty = TRUE;
944 GstMultiQueue *mq = sq->mqueue;
945 GList *tmp;
947 GST_LOG_OBJECT (mq,
948 "Single Queue %d is empty, Checking if all single queues are empty",
949 sq->id);
951 GST_MULTI_QUEUE_MUTEX_LOCK (mq);
952 for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
953 GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
955 if (!gst_data_queue_is_empty (sq->queue)) {
956 empty = FALSE;
957 break;
958 }
959 }
960 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
962 if (empty) {
963 GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
964 g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
965 }
966 }
968 static gboolean
969 single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
970 guint64 time, GstSingleQueue * sq)
971 {
972 gboolean res;
974 /* In all cases (extra mode or not), we check how the queue current level
975 * compares to max_size. */
976 res = (((sq->max_size.visible != 0) &&
977 sq->max_size.visible < visible) ||
978 ((sq->max_size.bytes != 0) &&
979 sq->max_size.bytes < bytes) ||
980 ((sq->max_size.time != 0) && sq->max_size.time < time));
982 if (G_UNLIKELY (sq->inextra)) {
983 /* If we're in extra mode, one of two things can happen to check for
984 * fullness: */
986 if (!res)
987 /* #1 : Either we are not full against normal max_size levels, in which
988 * case we can go out of extra mode. */
989 sq->inextra = FALSE;
990 else
991 /* #2 : Or else, the check should be done against max_size + extra_size */
992 res = (((sq->max_size.visible != 0) &&
993 (sq->max_size.visible + sq->extra_size.visible) < visible) ||
994 ((sq->max_size.bytes != 0) &&
995 (sq->max_size.bytes + sq->extra_size.bytes) < bytes) ||
996 ((sq->max_size.time != 0) &&
997 (sq->max_size.time + sq->extra_size.time) < time));
999 }
1000 return res;
1001 }
1003 static GstSingleQueue *
1004 gst_single_queue_new (GstMultiQueue * mqueue)
1005 {
1006 GstSingleQueue *sq;
1007 gchar *tmp;
1009 sq = g_new0 (GstSingleQueue, 1);
1011 GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1012 sq->id = mqueue->nbqueues++;
1014 /* copy over max_size and extra_size so we don't need to take the lock
1015 * any longer when checking if the queue is full. */
1016 /* FIXME : We can't modify those values once the single queue is created
1017 * since we don't have any lock protecting those values. */
1018 sq->max_size.visible = mqueue->max_size.visible;
1019 sq->max_size.bytes = mqueue->max_size.bytes;
1020 sq->max_size.time = mqueue->max_size.time;
1022 sq->extra_size.visible = mqueue->extra_size.visible;
1023 sq->extra_size.bytes = mqueue->extra_size.bytes;
1024 sq->extra_size.time = mqueue->extra_size.time;
1026 GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1028 GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
1030 sq->mqueue = mqueue;
1031 sq->srcresult = GST_FLOW_OK;
1032 sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
1033 single_queue_check_full, sq);
1035 sq->nextid = -1;
1036 sq->oldid = -1;
1037 sq->turn = g_cond_new ();
1039 /* FIXME : attach to underrun/overrun signals to handle non-starvation
1040 * OR should this be handled when we check if the queue is full/empty before pushing/popping ? */
1042 g_signal_connect (G_OBJECT (sq->queue), "full",
1043 G_CALLBACK (single_queue_overrun_cb), sq);
1045 g_signal_connect (G_OBJECT (sq->queue), "empty",
1046 G_CALLBACK (single_queue_underrun_cb), sq);
1048 tmp = g_strdup_printf ("sink%d", sq->id);
1049 sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
1050 g_free (tmp);
1052 gst_pad_set_chain_function (sq->sinkpad,
1053 GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
1054 gst_pad_set_activatepush_function (sq->sinkpad,
1055 GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
1056 gst_pad_set_event_function (sq->sinkpad,
1057 GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
1058 gst_pad_set_getcaps_function (sq->sinkpad,
1059 GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1060 gst_pad_set_bufferalloc_function (sq->sinkpad,
1061 GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
1063 tmp = g_strdup_printf ("src%d", sq->id);
1064 sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
1065 g_free (tmp);
1067 gst_pad_set_activatepush_function (sq->srcpad,
1068 GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
1069 gst_pad_set_acceptcaps_function (sq->srcpad,
1070 GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
1071 gst_pad_set_getcaps_function (sq->srcpad,
1072 GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1073 gst_pad_set_event_function (sq->srcpad,
1074 GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
1075 gst_pad_set_query_function (sq->srcpad,
1076 GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
1078 gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
1079 gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
1081 gst_pad_set_active (sq->srcpad, TRUE);
1082 gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
1084 gst_pad_set_active (sq->sinkpad, TRUE);
1085 gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
1087 GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
1088 sq->id);
1090 return sq;
1091 }