004508d3216bee415d40764fa19e0d836f811462
1 /* GStreamer
2 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3 *
4 * gstcollectpads.c:
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 02111-1307, USA.
20 */
21 /**
22 * SECTION:gstcollectpads
23 * @short_description: manages a set of pads that operate in collect mode
24 * @see_also:
25 *
26 * Manages a set of pads that operate in collect mode. This means that control
27 * is given to the manager of this object when all pads have data.
28 * <itemizedlist>
29 * <listitem><para>
30 * Collectpads are created with gst_collect_pads_new(). A callback should then
31 * be installed with gst_collect_pads_set_function ().
32 * </para></listitem>
33 * <listitem><para>
34 * Pads are added to the collection with gst_collect_pads_add_pad()/
35 * gst_collect_pads_remove_pad(). The pad
36 * has to be a sinkpad. The chain and event functions of the pad are
37 * overridden. The element_private of the pad is used to store
38 * private information for the collectpads.
39 * </para></listitem>
40 * <listitem><para>
41 * For each pad, data is queued in the _chain function or by
42 * performing a pull_range.
43 * </para></listitem>
44 * <listitem><para>
45 * When data is queued on all pads, the callback function is called.
46 * </para></listitem>
47 * <listitem><para>
48 * Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49 * One can peek at the data with the gst_collect_pads_peek() function.
50 * These functions will return NULL if the pad received an EOS event. When all
51 * pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
52 * event itself.
53 * </para></listitem>
54 * <listitem><para>
55 * Data can also be dequeued in byte units using the gst_collect_pads_available(),
56 * gst_collect_pads_read() and gst_collect_pads_flush() calls.
57 * </para></listitem>
58 * <listitem><para>
59 * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60 * their state change functions to start and stop the processing of the collecpads.
61 * The gst_collect_pads_stop() call should be called before calling the parent
62 * element state change function in the PAUSED_TO_READY state change to ensure
63 * no pad is blocked and the element can finish streaming.
64 * </para></listitem>
65 * <listitem><para>
66 * gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67 * elements that start a #GstTask to drive the collect_pads. This feature is however
68 * not yet implemented.
69 * </para></listitem>
70 * </itemizedlist>
71 *
72 * Last reviewed on 2006-05-10 (0.10.6)
73 */
75 #include "gstcollectpads.h"
77 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
78 #define GST_CAT_DEFAULT collect_pads_debug
80 #define GST_COLLECT_PADS_GET_PRIVATE(obj) \
81 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_COLLECT_PADS, GstCollectPadsPrivate))
83 struct _GstCollectPadsPrivate
84 {
85 GstCollectPadsClipFunction clipfunc;
86 gpointer clipfunc_user_data;
87 };
89 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT);
91 static void gst_collect_pads_clear (GstCollectPads * pads,
92 GstCollectData * data);
93 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer);
94 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
95 static void gst_collect_pads_finalize (GObject * object);
96 static void ref_data (GstCollectData * data);
97 static void unref_data (GstCollectData * data);
98 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
100 static void
101 gst_collect_pads_base_init (gpointer g_class)
102 {
103 /* Do nothing here */
104 }
106 static void
107 gst_collect_pads_class_init (GstCollectPadsClass * klass)
108 {
109 GObjectClass *gobject_class = (GObjectClass *) klass;
111 g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
113 GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
114 "GstCollectPads");
116 gobject_class->finalize = gst_collect_pads_finalize;
117 }
119 static void
120 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
121 {
122 pads->abidata.ABI.priv = GST_COLLECT_PADS_GET_PRIVATE (pads);
124 pads->cond = g_cond_new ();
125 pads->data = NULL;
126 pads->cookie = 0;
127 pads->numpads = 0;
128 pads->queuedpads = 0;
129 pads->eospads = 0;
130 pads->started = FALSE;
132 /* members to manage the pad list */
133 pads->abidata.ABI.pad_lock = g_mutex_new ();
134 pads->abidata.ABI.pad_cookie = 0;
135 pads->abidata.ABI.pad_list = NULL;
136 }
138 static void
139 gst_collect_pads_finalize (GObject * object)
140 {
141 GSList *collected;
142 GstCollectPads *pads = GST_COLLECT_PADS (object);
144 GST_DEBUG ("finalize");
146 g_cond_free (pads->cond);
147 g_mutex_free (pads->abidata.ABI.pad_lock);
149 /* Remove pads */
150 collected = pads->abidata.ABI.pad_list;
151 for (; collected; collected = g_slist_next (collected)) {
152 GstCollectData *pdata = (GstCollectData *) collected->data;
154 unref_data (pdata);
155 }
156 /* Free pads list */
157 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
158 g_slist_free (pads->data);
159 g_slist_free (pads->abidata.ABI.pad_list);
161 G_OBJECT_CLASS (parent_class)->finalize (object);
162 }
164 /**
165 * gst_collect_pads_new:
166 *
167 * Create a new instance of #GstCollectPads.
168 *
169 * MT safe.
170 *
171 * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
172 */
173 GstCollectPads *
174 gst_collect_pads_new (void)
175 {
176 GstCollectPads *newcoll;
178 newcoll = g_object_newv (GST_TYPE_COLLECT_PADS, 0, NULL);
180 return newcoll;
181 }
183 /**
184 * gst_collect_pads_set_function:
185 * @pads: the collectspads to use
186 * @func: the function to set
187 * @user_data: (closure): user data passed to the function
188 *
189 * Set the callback function and user data that will be called when
190 * all the pads added to the collection have buffers queued.
191 *
192 * MT safe.
193 */
194 void
195 gst_collect_pads_set_function (GstCollectPads * pads,
196 GstCollectPadsFunction func, gpointer user_data)
197 {
198 g_return_if_fail (pads != NULL);
199 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
201 GST_OBJECT_LOCK (pads);
202 pads->func = func;
203 pads->user_data = user_data;
204 GST_OBJECT_UNLOCK (pads);
205 }
207 static void
208 ref_data (GstCollectData * data)
209 {
210 g_assert (data != NULL);
212 g_atomic_int_inc (&(data->abidata.ABI.refcount));
213 }
215 static void
216 unref_data (GstCollectData * data)
217 {
218 GstCollectDataDestroyNotify destroy_notify;
220 g_assert (data != NULL);
221 g_assert (data->abidata.ABI.refcount > 0);
223 if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount)))
224 return;
226 /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
227 destroy_notify = (GstCollectDataDestroyNotify)
228 g_object_get_data (G_OBJECT (data->pad),
229 "gst-collect-data-destroy-notify");
231 if (destroy_notify)
232 destroy_notify (data);
234 g_object_unref (data->pad);
235 if (data->buffer) {
236 gst_buffer_unref (data->buffer);
237 }
238 g_free (data);
239 }
241 /**
242 * gst_collect_pads_add_pad:
243 * @pads: the collectspads to use
244 * @pad: (transfer none): the pad to add
245 * @size: the size of the returned #GstCollectData structure
246 *
247 * Add a pad to the collection of collect pads. The pad has to be
248 * a sinkpad. The refcount of the pad is incremented. Use
249 * gst_collect_pads_remove_pad() to remove the pad from the collection
250 * again.
251 *
252 * This function will override the chain and event functions of the pad
253 * along with the element_private data, which is used to store private
254 * information for the collectpads.
255 *
256 * You specify a size for the returned #GstCollectData structure
257 * so that you can use it to store additional information.
258 *
259 * The pad will be automatically activated in push mode when @pads is
260 * started.
261 *
262 * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
263 * for destroy_notify.
264 *
265 * MT safe.
266 *
267 * Returns: a new #GstCollectData to identify the new pad. Or NULL
268 * if wrong parameters are supplied.
269 */
270 GstCollectData *
271 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
272 {
273 return gst_collect_pads_add_pad_full (pads, pad, size, NULL);
274 }
276 /**
277 * gst_collect_pads_add_pad_full:
278 * @pads: the collectspads to use
279 * @pad: (transfer none): the pad to add
280 * @size: the size of the returned #GstCollectData structure
281 * @destroy_notify: function to be called before the returned #GstCollectData
282 * structure is freed
283 *
284 * Add a pad to the collection of collect pads. The pad has to be
285 * a sinkpad. The refcount of the pad is incremented. Use
286 * gst_collect_pads_remove_pad() to remove the pad from the collection
287 * again.
288 *
289 * You specify a size for the returned #GstCollectData structure
290 * so that you can use it to store additional information.
291 *
292 * You can also specify a #GstCollectDataDestroyNotify that will be called
293 * just before the #GstCollectData structure is freed. It is passed the
294 * pointer to the structure and should free any custom memory and resources
295 * allocated for it.
296 *
297 * The pad will be automatically activated in push mode when @pads is
298 * started.
299 *
300 * MT safe.
301 *
302 * Since: 0.10.12
303 *
304 * Returns: a new #GstCollectData to identify the new pad. Or NULL
305 * if wrong parameters are supplied.
306 */
307 GstCollectData *
308 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size,
309 GstCollectDataDestroyNotify destroy_notify)
310 {
311 GstCollectData *data;
313 g_return_val_if_fail (pads != NULL, NULL);
314 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
315 g_return_val_if_fail (pad != NULL, NULL);
316 g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
317 g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
319 GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
321 data = g_malloc0 (size);
322 data->collect = pads;
323 data->pad = gst_object_ref (pad);
324 data->buffer = NULL;
325 data->pos = 0;
326 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
327 data->abidata.ABI.flushing = FALSE;
328 data->abidata.ABI.new_segment = FALSE;
329 data->abidata.ABI.eos = FALSE;
330 data->abidata.ABI.refcount = 1;
332 /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
333 g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify",
334 (void *) destroy_notify);
336 GST_COLLECT_PADS_PAD_LOCK (pads);
337 GST_OBJECT_LOCK (pad);
338 gst_pad_set_element_private (pad, data);
339 GST_OBJECT_UNLOCK (pad);
340 pads->abidata.ABI.pad_list =
341 g_slist_append (pads->abidata.ABI.pad_list, data);
342 gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
343 gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
344 /* activate the pad when needed */
345 if (pads->started)
346 gst_pad_set_active (pad, TRUE);
347 pads->abidata.ABI.pad_cookie++;
348 GST_COLLECT_PADS_PAD_UNLOCK (pads);
350 return data;
351 }
353 static gint
354 find_pad (GstCollectData * data, GstPad * pad)
355 {
356 if (data->pad == pad)
357 return 0;
358 return 1;
359 }
361 /**
362 * gst_collect_pads_set_clip_function:
363 * @pads: the collectspads to use
364 * @clipfunc: clip function to install
365 * @user_data: (closure): user data to pass to @clip_func
366 *
367 * Install a clipping function that is called right after a buffer is received
368 * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
369 *
370 * Since: 0.10.26
371 */
372 void
373 gst_collect_pads_set_clip_function (GstCollectPads * pads,
374 GstCollectPadsClipFunction clipfunc, gpointer user_data)
375 {
376 GstCollectPadsPrivate *priv;
378 g_return_if_fail (pads != NULL);
379 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
381 priv = pads->abidata.ABI.priv;
383 priv->clipfunc = clipfunc;
384 priv->clipfunc_user_data = user_data;
385 }
387 /**
388 * gst_collect_pads_remove_pad:
389 * @pads: the collectspads to use
390 * @pad: (transfer none): the pad to remove
391 *
392 * Remove a pad from the collection of collect pads. This function will also
393 * free the #GstCollectData and all the resources that were allocated with
394 * gst_collect_pads_add_pad().
395 *
396 * The pad will be deactivated automatically when @pads is stopped.
397 *
398 * MT safe.
399 *
400 * Returns: %TRUE if the pad could be removed.
401 */
402 gboolean
403 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
404 {
405 GstCollectData *data;
406 GSList *list;
408 g_return_val_if_fail (pads != NULL, FALSE);
409 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
410 g_return_val_if_fail (pad != NULL, FALSE);
411 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
413 GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
415 GST_COLLECT_PADS_PAD_LOCK (pads);
416 list =
417 g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
418 (GCompareFunc) find_pad);
419 if (!list)
420 goto unknown_pad;
422 data = (GstCollectData *) list->data;
424 GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
426 /* clear the stuff we configured */
427 gst_pad_set_chain_function (pad, NULL);
428 gst_pad_set_event_function (pad, NULL);
429 GST_OBJECT_LOCK (pad);
430 gst_pad_set_element_private (pad, NULL);
431 GST_OBJECT_UNLOCK (pad);
433 /* backward compat, also remove from data if stopped, note that this function
434 * can only be called when we are stopped because we don't take the LOCK to
435 * protect the pads->data list. */
436 if (!pads->started) {
437 GSList *dlist;
439 dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
440 if (dlist) {
441 GstCollectData *pdata = dlist->data;
443 pads->data = g_slist_delete_link (pads->data, dlist);
444 unref_data (pdata);
445 }
446 }
447 /* remove from the pad list */
448 pads->abidata.ABI.pad_list =
449 g_slist_delete_link (pads->abidata.ABI.pad_list, list);
450 pads->abidata.ABI.pad_cookie++;
452 /* signal waiters because something changed */
453 GST_COLLECT_PADS_BROADCAST (pads);
455 /* deactivate the pad when needed */
456 if (!pads->started)
457 gst_pad_set_active (pad, FALSE);
459 /* clean and free the collect data */
460 unref_data (data);
462 GST_COLLECT_PADS_PAD_UNLOCK (pads);
464 return TRUE;
466 unknown_pad:
467 {
468 GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
469 GST_COLLECT_PADS_PAD_UNLOCK (pads);
470 return FALSE;
471 }
472 }
474 /**
475 * gst_collect_pads_is_active:
476 * @pads: (transfer none): the collectspads to use
477 * @pad: the pad to check
478 *
479 * Check if a pad is active.
480 *
481 * This function is currently not implemented.
482 *
483 * MT safe.
484 *
485 * Returns: %TRUE if the pad is active.
486 */
487 gboolean
488 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
489 {
490 g_return_val_if_fail (pads != NULL, FALSE);
491 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
492 g_return_val_if_fail (pad != NULL, FALSE);
493 g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
495 g_warning ("gst_collect_pads_is_active() is not implemented");
497 return FALSE;
498 }
500 /**
501 * gst_collect_pads_collect:
502 * @pads: the collectspads to use
503 *
504 * Collect data on all pads. This function is usually called
505 * from a #GstTask function in an element.
506 *
507 * This function is currently not implemented.
508 *
509 * MT safe.
510 *
511 * Returns: #GstFlowReturn of the operation.
512 */
513 GstFlowReturn
514 gst_collect_pads_collect (GstCollectPads * pads)
515 {
516 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
517 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
519 g_warning ("gst_collect_pads_collect() is not implemented");
521 return GST_FLOW_NOT_SUPPORTED;
522 }
524 /**
525 * gst_collect_pads_collect_range:
526 * @pads: the collectspads to use
527 * @offset: the offset to collect
528 * @length: the length to collect
529 *
530 * Collect data with @offset and @length on all pads. This function
531 * is typically called in the getrange function of an element.
532 *
533 * This function is currently not implemented.
534 *
535 * MT safe.
536 *
537 * Returns: #GstFlowReturn of the operation.
538 */
539 GstFlowReturn
540 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
541 guint length)
542 {
543 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
544 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
546 g_warning ("gst_collect_pads_collect_range() is not implemented");
548 return GST_FLOW_NOT_SUPPORTED;
549 }
551 static gboolean
552 gst_collect_pads_is_flushing (GstCollectPads * pads)
553 {
554 GSList *walk = NULL;
555 gboolean res = TRUE;
557 GST_COLLECT_PADS_PAD_LOCK (pads);
559 /* Ensure pads->data state */
560 gst_collect_pads_check_pads_unlocked (pads);
562 GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
563 pads, pads->data);
565 for (walk = pads->data; walk; walk = g_slist_next (walk)) {
566 GstCollectData *cdata = walk->data;
568 GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->abidata.ABI.flushing);
570 if (cdata->abidata.ABI.flushing) {
571 goto done;
572 }
573 }
575 res = FALSE;
576 done:
577 GST_COLLECT_PADS_PAD_UNLOCK (pads);
578 return res;
579 }
581 /* FIXME, I think this function is used to work around bad behaviour
582 * of elements that add pads to themselves without activating them.
583 *
584 * Must be called with PAD_LOCK.
585 */
586 static void
587 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
588 gboolean flushing)
589 {
590 GSList *walk = NULL;
592 GST_DEBUG ("Setting flushing (%d)", flushing);
594 /* Update the pads flushing flag */
595 for (walk = pads->data; walk; walk = g_slist_next (walk)) {
596 GstCollectData *cdata = walk->data;
598 if (GST_IS_PAD (cdata->pad)) {
599 GST_OBJECT_LOCK (cdata->pad);
600 if (flushing)
601 GST_PAD_SET_FLUSHING (cdata->pad);
602 else
603 GST_PAD_UNSET_FLUSHING (cdata->pad);
604 cdata->abidata.ABI.flushing = flushing;
605 gst_collect_pads_clear (pads, cdata);
606 GST_OBJECT_UNLOCK (cdata->pad);
607 }
608 }
609 /* Setting the pads to flushing means that we changed the values which
610 * are 'protected' by the cookie. We therefore update it to force a
611 * recalculation of the current pad status. */
612 pads->abidata.ABI.pad_cookie++;
613 }
615 /**
616 * gst_collect_pads_set_flushing:
617 * @pads: the collectspads to use
618 * @flushing: desired state of the pads
619 *
620 * Change the flushing state of all the pads in the collection. No pad
621 * is able to accept anymore data when @flushing is %TRUE. Calling this
622 * function with @flushing %FALSE makes @pads accept data again.
623 *
624 * MT safe.
625 *
626 * Since: 0.10.7.
627 */
628 void
629 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
630 {
631 g_return_if_fail (pads != NULL);
632 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
634 GST_COLLECT_PADS_PAD_LOCK (pads);
635 /* Ensure pads->data state */
636 gst_collect_pads_check_pads_unlocked (pads);
637 gst_collect_pads_set_flushing_unlocked (pads, flushing);
638 GST_COLLECT_PADS_PAD_UNLOCK (pads);
639 }
641 /**
642 * gst_collect_pads_start:
643 * @pads: the collectspads to use
644 *
645 * Starts the processing of data in the collect_pads.
646 *
647 * MT safe.
648 */
649 void
650 gst_collect_pads_start (GstCollectPads * pads)
651 {
652 GSList *collected;
654 g_return_if_fail (pads != NULL);
655 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
657 GST_DEBUG_OBJECT (pads, "starting collect pads");
659 /* make sure stop and collect cannot be called anymore */
660 GST_OBJECT_LOCK (pads);
662 /* make pads streamable */
663 GST_COLLECT_PADS_PAD_LOCK (pads);
665 /* loop over the master pad list and reset the segment */
666 collected = pads->abidata.ABI.pad_list;
667 for (; collected; collected = g_slist_next (collected)) {
668 GstCollectData *data;
670 data = collected->data;
671 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
672 }
674 gst_collect_pads_set_flushing_unlocked (pads, FALSE);
676 /* Start collect pads */
677 pads->started = TRUE;
678 GST_COLLECT_PADS_PAD_UNLOCK (pads);
679 GST_OBJECT_UNLOCK (pads);
680 }
682 /**
683 * gst_collect_pads_stop:
684 * @pads: the collectspads to use
685 *
686 * Stops the processing of data in the collect_pads. this function
687 * will also unblock any blocking operations.
688 *
689 * MT safe.
690 */
691 void
692 gst_collect_pads_stop (GstCollectPads * pads)
693 {
694 GSList *collected;
696 g_return_if_fail (pads != NULL);
697 g_return_if_fail (GST_IS_COLLECT_PADS (pads));
699 GST_DEBUG_OBJECT (pads, "stopping collect pads");
701 /* make sure collect and start cannot be called anymore */
702 GST_OBJECT_LOCK (pads);
704 /* make pads not accept data anymore */
705 GST_COLLECT_PADS_PAD_LOCK (pads);
706 gst_collect_pads_set_flushing_unlocked (pads, TRUE);
708 /* Stop collect pads */
709 pads->started = FALSE;
710 pads->eospads = 0;
711 pads->queuedpads = 0;
713 /* loop over the master pad list and flush buffers */
714 collected = pads->abidata.ABI.pad_list;
715 for (; collected; collected = g_slist_next (collected)) {
716 GstCollectData *data;
717 GstBuffer **buffer_p;
719 data = collected->data;
720 if (data->buffer) {
721 buffer_p = &data->buffer;
722 gst_buffer_replace (buffer_p, NULL);
723 data->pos = 0;
724 }
725 data->abidata.ABI.eos = FALSE;
726 }
728 GST_COLLECT_PADS_PAD_UNLOCK (pads);
729 /* Wake them up so they can end the chain functions. */
730 GST_COLLECT_PADS_BROADCAST (pads);
732 GST_OBJECT_UNLOCK (pads);
733 }
735 /**
736 * gst_collect_pads_peek:
737 * @pads: the collectspads to peek
738 * @data: the data to use
739 *
740 * Peek at the buffer currently queued in @data. This function
741 * should be called with the @pads LOCK held, such as in the callback
742 * handler.
743 *
744 * MT safe.
745 *
746 * Returns: (transfer full): The buffer in @data or NULL if no buffer is queued.
747 * should unref the buffer after usage.
748 */
749 GstBuffer *
750 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
751 {
752 GstBuffer *result;
754 g_return_val_if_fail (pads != NULL, NULL);
755 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
756 g_return_val_if_fail (data != NULL, NULL);
758 if ((result = data->buffer))
759 gst_buffer_ref (result);
761 GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
762 GST_DEBUG_PAD_NAME (data->pad), result);
764 return result;
765 }
767 /**
768 * gst_collect_pads_pop:
769 * @pads: the collectspads to pop
770 * @data: the data to use
771 *
772 * Pop the buffer currently queued in @data. This function
773 * should be called with the @pads LOCK held, such as in the callback
774 * handler.
775 *
776 * Free-function: gst_buffer_unref
777 *
778 * MT safe.
779 *
780 * Returns: (transfer full): The buffer in @data or NULL if no buffer was
781 * queued. You should unref the buffer after usage.
782 */
783 GstBuffer *
784 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
785 {
786 GstBuffer *result;
788 g_return_val_if_fail (pads != NULL, NULL);
789 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
790 g_return_val_if_fail (data != NULL, NULL);
792 if ((result = data->buffer)) {
793 data->buffer = NULL;
794 data->pos = 0;
795 /* one less pad with queued data now */
796 pads->queuedpads--;
797 }
799 GST_COLLECT_PADS_BROADCAST (pads);
801 GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
802 GST_DEBUG_PAD_NAME (data->pad), result);
804 return result;
805 }
807 /* pop and unref the currently queued buffer, should e called with the LOCK
808 * helt. */
809 static void
810 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
811 {
812 GstBuffer *buf;
814 if ((buf = gst_collect_pads_pop (pads, data)))
815 gst_buffer_unref (buf);
816 }
818 /**
819 * gst_collect_pads_available:
820 * @pads: the collectspads to query
821 *
822 * Query how much bytes can be read from each queued buffer. This means
823 * that the result of this call is the maximum number of bytes that can
824 * be read from each of the pads.
825 *
826 * This function should be called with @pads LOCK held, such as
827 * in the callback.
828 *
829 * MT safe.
830 *
831 * Returns: The maximum number of bytes queued on all pads. This function
832 * returns 0 if a pad has no queued buffer.
833 */
834 /* FIXME, we can do this in the _chain functions */
835 guint
836 gst_collect_pads_available (GstCollectPads * pads)
837 {
838 GSList *collected;
839 guint result = G_MAXUINT;
841 g_return_val_if_fail (pads != NULL, 0);
842 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
844 for (collected = pads->data; collected; collected = g_slist_next (collected)) {
845 GstCollectData *pdata;
846 GstBuffer *buffer;
847 gint size;
849 pdata = (GstCollectData *) collected->data;
851 /* ignore pad with EOS */
852 if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
853 GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
854 continue;
855 }
857 /* an empty buffer without EOS is weird when we get here.. */
858 if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
859 GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
860 goto not_filled;
861 }
863 /* this is the size left of the buffer */
864 size = GST_BUFFER_SIZE (buffer) - pdata->pos;
865 GST_DEBUG ("pad %s:%s has %d bytes left",
866 GST_DEBUG_PAD_NAME (pdata->pad), size);
868 /* need to return the min of all available data */
869 if (size < result)
870 result = size;
871 }
872 /* nothing changed, all must be EOS then, return 0 */
873 if (G_UNLIKELY (result == G_MAXUINT))
874 result = 0;
876 return result;
878 not_filled:
879 {
880 return 0;
881 }
882 }
884 /**
885 * gst_collect_pads_read:
886 * @pads: the collectspads to query
887 * @data: the data to use
888 * @bytes: (out) (transfer none) (array length=size): a pointer to a byte array
889 * @size: the number of bytes to read
890 *
891 * Get a pointer in @bytes where @size bytes can be read from the
892 * given pad @data.
893 *
894 * This function should be called with @pads LOCK held, such as
895 * in the callback.
896 *
897 * MT safe.
898 *
899 * Returns: The number of bytes available for consumption in the
900 * memory pointed to by @bytes. This can be less than @size and
901 * is 0 if the pad is end-of-stream.
902 */
903 guint
904 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
905 guint8 ** bytes, guint size)
906 {
907 guint readsize;
908 GstBuffer *buffer;
910 g_return_val_if_fail (pads != NULL, 0);
911 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
912 g_return_val_if_fail (data != NULL, 0);
913 g_return_val_if_fail (bytes != NULL, 0);
915 /* no buffer, must be EOS */
916 if ((buffer = data->buffer) == NULL)
917 return 0;
919 readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
921 *bytes = GST_BUFFER_DATA (buffer) + data->pos;
923 return readsize;
924 }
926 /**
927 * gst_collect_pads_read_buffer:
928 * @pads: the collectspads to query
929 * @data: the data to use
930 * @size: the number of bytes to read
931 *
932 * Get a buffer of @size bytes from the given pad @data.
933 *
934 * This function should be called with @pads LOCK held, such as in the callback.
935 *
936 * Free-function: gst_buffer_unref
937 *
938 * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
939 * that requested. A return of NULL signals that the pad is end-of-stream.
940 * Unref the buffer with gst_buffer_unref() after use.
941 *
942 * MT safe.
943 *
944 * Since: 0.10.18
945 */
946 GstBuffer *
947 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
948 guint size)
949 {
950 guint readsize, bufsize;
951 GstBuffer *buffer;
953 g_return_val_if_fail (pads != NULL, NULL);
954 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
955 g_return_val_if_fail (data != NULL, NULL);
957 /* no buffer, must be EOS */
958 if ((buffer = data->buffer) == NULL)
959 return NULL;
961 bufsize = GST_BUFFER_SIZE (buffer);
963 readsize = MIN (size, bufsize - data->pos);
965 if (data->pos == 0 && readsize == bufsize)
966 return gst_buffer_ref (buffer);
967 else
968 return gst_buffer_create_sub (buffer, data->pos, readsize);
969 }
971 /**
972 * gst_collect_pads_take_buffer:
973 * @pads: the collectspads to query
974 * @data: the data to use
975 * @size: the number of bytes to read
976 *
977 * Get a buffer of @size bytes from the given pad @data. Flushes the amount
978 * of read bytes.
979 *
980 * This function should be called with @pads LOCK held, such as in the callback.
981 *
982 * Free-function: gst_buffer_unref
983 *
984 * MT safe.
985 *
986 * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
987 * that requested. A return of NULL signals that the pad is end-of-stream.
988 * Unref the buffer after use.
989 *
990 * Since: 0.10.18
991 */
992 GstBuffer *
993 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
994 guint size)
995 {
996 GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
998 if (buffer) {
999 gst_collect_pads_flush (pads, data, GST_BUFFER_SIZE (buffer));
1000 }
1001 return buffer;
1002 }
1004 /**
1005 * gst_collect_pads_flush:
1006 * @pads: the collectspads to query
1007 * @data: the data to use
1008 * @size: the number of bytes to flush
1009 *
1010 * Flush @size bytes from the pad @data.
1011 *
1012 * This function should be called with @pads LOCK held, such as
1013 * in the callback.
1014 *
1015 * MT safe.
1016 *
1017 * Returns: The number of bytes flushed. This can be less than @size and
1018 * is 0 if the pad was end-of-stream.
1019 */
1020 guint
1021 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1022 guint size)
1023 {
1024 guint flushsize;
1025 GstBuffer *buffer;
1027 g_return_val_if_fail (pads != NULL, 0);
1028 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1029 g_return_val_if_fail (data != NULL, 0);
1031 /* no buffer, must be EOS */
1032 if ((buffer = data->buffer) == NULL)
1033 return 0;
1035 /* this is what we can flush at max */
1036 flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1038 data->pos += size;
1040 GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
1042 if (data->pos >= GST_BUFFER_SIZE (buffer))
1043 /* _clear will also reset data->pos to 0 */
1044 gst_collect_pads_clear (pads, data);
1046 return flushsize;
1047 }
1049 /* see if pads were added or removed and update our stats. Any pad
1050 * added after releasing the PAD_LOCK will get collected in the next
1051 * round.
1052 *
1053 * We can do a quick check by checking the cookies, that get changed
1054 * whenever the pad list is updated.
1055 *
1056 * Must be called with LOCK.
1057 */
1058 static void
1059 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
1060 {
1061 GST_DEBUG ("stored cookie : %d, used_cookie:%d",
1062 pads->abidata.ABI.pad_cookie, pads->cookie);
1063 if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
1064 GSList *collected;
1066 /* clear list and stats */
1067 g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1068 g_slist_free (pads->data);
1069 pads->data = NULL;
1070 pads->numpads = 0;
1071 pads->queuedpads = 0;
1072 pads->eospads = 0;
1074 /* loop over the master pad list */
1075 collected = pads->abidata.ABI.pad_list;
1076 for (; collected; collected = g_slist_next (collected)) {
1077 GstCollectData *data;
1079 /* update the stats */
1080 pads->numpads++;
1081 data = collected->data;
1083 if (G_LIKELY (!data->abidata.ABI.flushing)) {
1084 if (data->buffer)
1085 pads->queuedpads++;
1086 if (data->abidata.ABI.eos)
1087 pads->eospads++;
1088 }
1090 /* add to the list of pads to collect */
1091 ref_data (data);
1092 pads->data = g_slist_prepend (pads->data, data);
1093 }
1094 /* and update the cookie */
1095 pads->cookie = pads->abidata.ABI.pad_cookie;
1096 }
1097 }
1099 static inline void
1100 gst_collect_pads_check_pads (GstCollectPads * pads)
1101 {
1102 /* the master list and cookie are protected with the PAD_LOCK */
1103 GST_COLLECT_PADS_PAD_LOCK (pads);
1104 gst_collect_pads_check_pads_unlocked (pads);
1105 GST_COLLECT_PADS_PAD_UNLOCK (pads);
1106 }
1108 /* checks if all the pads are collected and call the collectfunction
1109 *
1110 * Should be called with LOCK.
1111 *
1112 * Returns: The #GstFlowReturn of collection.
1113 */
1114 static GstFlowReturn
1115 gst_collect_pads_check_collected (GstCollectPads * pads)
1116 {
1117 GstFlowReturn flow_ret = GST_FLOW_OK;
1119 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1120 g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1122 /* check for new pads, update stats etc.. */
1123 gst_collect_pads_check_pads (pads);
1125 if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1126 /* If all our pads are EOS just collect once to let the element
1127 * do its final EOS handling. */
1128 GST_DEBUG ("All active pads (%d) are EOS, calling %s",
1129 pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
1130 flow_ret = pads->func (pads, pads->user_data);
1131 } else {
1132 gboolean collected = FALSE;
1134 /* We call the collected function as long as our condition matches.
1135 * FIXME: should we error out if the collect function did not pop anything ?
1136 * we can get a busy loop here if the element does not pop from the collect
1137 * function
1138 */
1139 while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1140 GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
1141 pads->queuedpads, pads->eospads, pads->numpads,
1142 GST_DEBUG_FUNCPTR_NAME (pads->func));
1143 flow_ret = pads->func (pads, pads->user_data);
1144 collected = TRUE;
1146 /* break on error */
1147 if (flow_ret != GST_FLOW_OK)
1148 break;
1149 /* Don't keep looping after telling the element EOS or flushing */
1150 if (pads->queuedpads == 0)
1151 break;
1152 }
1153 if (!collected)
1154 GST_DEBUG ("Not all active pads (%d) have data, continuing",
1155 pads->numpads);
1156 }
1157 return flow_ret;
1158 }
1160 static gboolean
1161 gst_collect_pads_event (GstPad * pad, GstEvent * event)
1162 {
1163 gboolean res;
1164 GstCollectData *data;
1165 GstCollectPads *pads;
1167 /* some magic to get the managing collect_pads */
1168 GST_OBJECT_LOCK (pad);
1169 data = (GstCollectData *) gst_pad_get_element_private (pad);
1170 if (G_UNLIKELY (data == NULL))
1171 goto pad_removed;
1172 ref_data (data);
1173 GST_OBJECT_UNLOCK (pad);
1175 res = TRUE;
1177 pads = data->collect;
1179 GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1180 GST_DEBUG_PAD_NAME (data->pad));
1182 switch (GST_EVENT_TYPE (event)) {
1183 case GST_EVENT_FLUSH_START:
1184 {
1185 /* forward event to unblock check_collected */
1186 gst_pad_event_default (pad, event);
1188 /* now unblock the chain function.
1189 * no cond per pad, so they all unblock,
1190 * non-flushing block again */
1191 GST_OBJECT_LOCK (pads);
1192 data->abidata.ABI.flushing = TRUE;
1193 gst_collect_pads_clear (pads, data);
1194 GST_OBJECT_UNLOCK (pads);
1196 /* event already cleaned up by forwarding */
1197 goto done;
1198 }
1199 case GST_EVENT_FLUSH_STOP:
1200 {
1201 /* flush the 1 buffer queue */
1202 GST_OBJECT_LOCK (pads);
1203 data->abidata.ABI.flushing = FALSE;
1204 gst_collect_pads_clear (pads, data);
1205 /* we need new segment info after the flush */
1206 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1207 data->abidata.ABI.new_segment = FALSE;
1208 /* if the pad was EOS, remove the EOS flag and
1209 * decrement the number of eospads */
1210 if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
1211 pads->eospads--;
1212 data->abidata.ABI.eos = FALSE;
1213 }
1215 if (!gst_collect_pads_is_flushing (pads)) {
1216 /* forward event if all pads are no longer flushing */
1217 GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
1218 GST_OBJECT_UNLOCK (pads);
1219 goto forward;
1220 }
1221 gst_event_unref (event);
1222 GST_OBJECT_UNLOCK (pads);
1223 goto done;
1224 }
1225 case GST_EVENT_EOS:
1226 {
1227 GST_OBJECT_LOCK (pads);
1228 /* if the pad was not EOS, make it EOS and so we
1229 * have one more eospad */
1230 if (G_LIKELY (data->abidata.ABI.eos == FALSE)) {
1231 data->abidata.ABI.eos = TRUE;
1232 pads->eospads++;
1233 }
1234 /* check if we need collecting anything, we ignore the
1235 * result. */
1236 gst_collect_pads_check_collected (pads);
1237 GST_OBJECT_UNLOCK (pads);
1239 /* We eat this event, element should do something
1240 * in the collected callback. */
1241 gst_event_unref (event);
1242 goto done;
1243 }
1244 case GST_EVENT_NEWSEGMENT:
1245 {
1246 gint64 start, stop, time;
1247 gdouble rate, arate;
1248 GstFormat format;
1249 gboolean update;
1251 gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1252 &start, &stop, &time);
1254 GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
1255 ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
1256 GST_TIME_ARGS (stop));
1258 gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
1259 format, start, stop, time);
1261 data->abidata.ABI.new_segment = TRUE;
1263 /* we must not forward this event since multiple segments will be
1264 * accumulated and this is certainly not what we want. */
1265 gst_event_unref (event);
1266 /* FIXME: collect-pads based elements need to create their own newsegment
1267 * event (and only one really)
1268 * (a) make the segment part of the GstCollectData structure of each pad,
1269 * so you can just check that once you have a buffer queued on that pad,
1270 * (b) you can override a pad's event function with your own,
1271 * catch the newsegment event and then pass it on to the original
1272 * gstcollectpads event function
1273 * (that's what avimux does for something IIRC)
1274 * see #340060
1275 */
1276 goto done;
1277 }
1278 default:
1279 /* forward other events */
1280 goto forward;
1281 }
1283 forward:
1284 GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
1285 GST_EVENT_TYPE_NAME (event));
1286 res = gst_pad_event_default (pad, event);
1288 done:
1289 unref_data (data);
1290 return res;
1292 /* ERRORS */
1293 pad_removed:
1294 {
1295 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1296 GST_OBJECT_UNLOCK (pad);
1297 return FALSE;
1298 }
1299 }
1301 /* For each buffer we receive we check if our collected condition is reached
1302 * and if so we call the collected function. When this is done we check if
1303 * data has been unqueued. If data is still queued we wait holding the stream
1304 * lock to make sure no EOS event can happen while we are ready to be
1305 * collected
1306 */
1307 static GstFlowReturn
1308 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
1309 {
1310 GstCollectData *data;
1311 GstCollectPads *pads;
1312 GstCollectPadsPrivate *priv;
1313 GstFlowReturn ret;
1315 GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1317 /* some magic to get the managing collect_pads */
1318 GST_OBJECT_LOCK (pad);
1319 data = (GstCollectData *) gst_pad_get_element_private (pad);
1320 if (G_UNLIKELY (data == NULL))
1321 goto no_data;
1322 ref_data (data);
1323 GST_OBJECT_UNLOCK (pad);
1325 pads = data->collect;
1326 priv = pads->abidata.ABI.priv;
1328 GST_OBJECT_LOCK (pads);
1329 /* if not started, bail out */
1330 if (G_UNLIKELY (!pads->started))
1331 goto not_started;
1332 /* check if this pad is flushing */
1333 if (G_UNLIKELY (data->abidata.ABI.flushing))
1334 goto flushing;
1335 /* pad was EOS, we can refuse this data */
1336 if (G_UNLIKELY (data->abidata.ABI.eos))
1337 goto unexpected;
1339 /* see if we need to clip */
1340 if (priv->clipfunc) {
1341 buffer = priv->clipfunc (pads, data, buffer, priv->clipfunc_user_data);
1343 if (G_UNLIKELY (buffer == NULL))
1344 goto clipped;
1345 }
1347 GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
1348 GST_DEBUG_PAD_NAME (pad));
1350 /* One more pad has data queued */
1351 pads->queuedpads++;
1352 /* take ownership of the buffer */
1353 if (data->buffer)
1354 gst_buffer_unref (data->buffer);
1355 data->buffer = buffer;
1356 buffer = NULL;
1358 /* update segment last position if in TIME */
1359 if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1360 GstClockTime timestamp = GST_BUFFER_TIMESTAMP (data->buffer);
1362 if (GST_CLOCK_TIME_IS_VALID (timestamp))
1363 gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
1364 }
1366 /* While we have data queued on this pad try to collect stuff */
1367 do {
1368 GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
1369 /* Check if our collected condition is matched and call the collected function
1370 * if it is */
1371 ret = gst_collect_pads_check_collected (pads);
1372 /* when an error occurs, we want to report this back to the caller ASAP
1373 * without having to block if the buffer was not popped */
1374 if (G_UNLIKELY (ret != GST_FLOW_OK))
1375 goto error;
1377 /* data was consumed, we can exit and accept new data */
1378 if (data->buffer == NULL)
1379 break;
1381 /* Check if we got removed in the mean time, FIXME, this is racy.
1382 * Between this check and the _WAIT, the pad could be removed which will
1383 * makes us hang in the _WAIT. */
1384 GST_OBJECT_LOCK (pad);
1385 if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
1386 goto pad_removed;
1387 GST_OBJECT_UNLOCK (pad);
1389 GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
1390 GST_DEBUG_PAD_NAME (pad));
1392 /* wait to be collected, this must happen from another thread triggered
1393 * by the _chain function of another pad. We release the lock so we
1394 * can get stopped or flushed as well. We can however not get EOS
1395 * because we still hold the STREAM_LOCK.
1396 */
1397 GST_COLLECT_PADS_WAIT (pads);
1399 GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1401 /* after a signal, we could be stopped */
1402 if (G_UNLIKELY (!pads->started))
1403 goto not_started;
1404 /* check if this pad is flushing */
1405 if (G_UNLIKELY (data->abidata.ABI.flushing))
1406 goto flushing;
1407 }
1408 while (data->buffer != NULL);
1410 unlock_done:
1411 GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
1412 GST_OBJECT_UNLOCK (pads);
1413 unref_data (data);
1414 if (buffer)
1415 gst_buffer_unref (buffer);
1416 return ret;
1418 pad_removed:
1419 {
1420 GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1421 GST_OBJECT_UNLOCK (pad);
1422 ret = GST_FLOW_NOT_LINKED;
1423 goto unlock_done;
1424 }
1425 /* ERRORS */
1426 no_data:
1427 {
1428 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1429 GST_OBJECT_UNLOCK (pad);
1430 gst_buffer_unref (buffer);
1431 return GST_FLOW_NOT_LINKED;
1432 }
1433 not_started:
1434 {
1435 GST_DEBUG ("not started");
1436 gst_collect_pads_clear (pads, data);
1437 ret = GST_FLOW_WRONG_STATE;
1438 goto unlock_done;
1439 }
1440 flushing:
1441 {
1442 GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1443 gst_collect_pads_clear (pads, data);
1444 ret = GST_FLOW_WRONG_STATE;
1445 goto unlock_done;
1446 }
1447 unexpected:
1448 {
1449 /* we should not post an error for this, just inform upstream that
1450 * we don't expect anything anymore */
1451 GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1452 ret = GST_FLOW_UNEXPECTED;
1453 goto unlock_done;
1454 }
1455 clipped:
1456 {
1457 GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1458 ret = GST_FLOW_OK;
1459 goto unlock_done;
1460 }
1461 error:
1462 {
1463 /* we print the error, the element should post a reasonable error
1464 * message for fatal errors */
1465 GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1466 gst_collect_pads_clear (pads, data);
1467 goto unlock_done;
1468 }
1469 }