]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - glsdk/gstreamer0-10.git/blob - libs/gst/base/gstcollectpads.c
004508d3216bee415d40764fa19e0d836f811462
[glsdk/gstreamer0-10.git] / libs / gst / base / gstcollectpads.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstcollectpads.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21 /**
22  * SECTION:gstcollectpads
23  * @short_description: manages a set of pads that operate in collect mode
24  * @see_also:
25  *
26  * Manages a set of pads that operate in collect mode. This means that control
27  * is given to the manager of this object when all pads have data.
28  * <itemizedlist>
29  *   <listitem><para>
30  *     Collectpads are created with gst_collect_pads_new(). A callback should then
31  *     be installed with gst_collect_pads_set_function ().
32  *   </para></listitem>
33  *   <listitem><para>
34  *     Pads are added to the collection with gst_collect_pads_add_pad()/
35  *     gst_collect_pads_remove_pad(). The pad
36  *     has to be a sinkpad. The chain and event functions of the pad are
37  *     overridden. The element_private of the pad is used to store
38  *     private information for the collectpads.
39  *   </para></listitem>
40  *   <listitem><para>
41  *     For each pad, data is queued in the _chain function or by
42  *     performing a pull_range.
43  *   </para></listitem>
44  *   <listitem><para>
45  *     When data is queued on all pads, the callback function is called.
46  *   </para></listitem>
47  *   <listitem><para>
48  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
49  *     One can peek at the data with the gst_collect_pads_peek() function.
50  *     These functions will return NULL if the pad received an EOS event. When all
51  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
52  *     event itself.
53  *   </para></listitem>
54  *   <listitem><para>
55  *     Data can also be dequeued in byte units using the gst_collect_pads_available(),
56  *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
60  *     their state change functions to start and stop the processing of the collecpads.
61  *     The gst_collect_pads_stop() call should be called before calling the parent
62  *     element state change function in the PAUSED_TO_READY state change to ensure
63  *     no pad is blocked and the element can finish streaming.
64  *   </para></listitem>
65  *   <listitem><para>
66  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
67  *     elements that start a #GstTask to drive the collect_pads. This feature is however
68  *     not yet implemented.
69  *   </para></listitem>
70  * </itemizedlist>
71  *
72  * Last reviewed on 2006-05-10 (0.10.6)
73  */
75 #include "gstcollectpads.h"
77 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
78 #define GST_CAT_DEFAULT collect_pads_debug
80 #define GST_COLLECT_PADS_GET_PRIVATE(obj)  \
81   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_COLLECT_PADS, GstCollectPadsPrivate))
83 struct _GstCollectPadsPrivate
84 {
85   GstCollectPadsClipFunction clipfunc;
86   gpointer clipfunc_user_data;
87 };
89 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT);
91 static void gst_collect_pads_clear (GstCollectPads * pads,
92     GstCollectData * data);
93 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer);
94 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
95 static void gst_collect_pads_finalize (GObject * object);
96 static void ref_data (GstCollectData * data);
97 static void unref_data (GstCollectData * data);
98 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
100 static void
101 gst_collect_pads_base_init (gpointer g_class)
103   /* Do nothing here */
106 static void
107 gst_collect_pads_class_init (GstCollectPadsClass * klass)
109   GObjectClass *gobject_class = (GObjectClass *) klass;
111   g_type_class_add_private (klass, sizeof (GstCollectPadsPrivate));
113   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
114       "GstCollectPads");
116   gobject_class->finalize = gst_collect_pads_finalize;
119 static void
120 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
122   pads->abidata.ABI.priv = GST_COLLECT_PADS_GET_PRIVATE (pads);
124   pads->cond = g_cond_new ();
125   pads->data = NULL;
126   pads->cookie = 0;
127   pads->numpads = 0;
128   pads->queuedpads = 0;
129   pads->eospads = 0;
130   pads->started = FALSE;
132   /* members to manage the pad list */
133   pads->abidata.ABI.pad_lock = g_mutex_new ();
134   pads->abidata.ABI.pad_cookie = 0;
135   pads->abidata.ABI.pad_list = NULL;
138 static void
139 gst_collect_pads_finalize (GObject * object)
141   GSList *collected;
142   GstCollectPads *pads = GST_COLLECT_PADS (object);
144   GST_DEBUG ("finalize");
146   g_cond_free (pads->cond);
147   g_mutex_free (pads->abidata.ABI.pad_lock);
149   /* Remove pads */
150   collected = pads->abidata.ABI.pad_list;
151   for (; collected; collected = g_slist_next (collected)) {
152     GstCollectData *pdata = (GstCollectData *) collected->data;
154     unref_data (pdata);
155   }
156   /* Free pads list */
157   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
158   g_slist_free (pads->data);
159   g_slist_free (pads->abidata.ABI.pad_list);
161   G_OBJECT_CLASS (parent_class)->finalize (object);
164 /**
165  * gst_collect_pads_new:
166  *
167  * Create a new instance of #GstCollectPads.
168  *
169  * MT safe.
170  *
171  * Returns: (transfer full): a new #GstCollectPads, or NULL in case of an error.
172  */
173 GstCollectPads *
174 gst_collect_pads_new (void)
176   GstCollectPads *newcoll;
178   newcoll = g_object_newv (GST_TYPE_COLLECT_PADS, 0, NULL);
180   return newcoll;
183 /**
184  * gst_collect_pads_set_function:
185  * @pads: the collectspads to use
186  * @func: the function to set
187  * @user_data: (closure): user data passed to the function
188  *
189  * Set the callback function and user data that will be called when
190  * all the pads added to the collection have buffers queued.
191  *
192  * MT safe.
193  */
194 void
195 gst_collect_pads_set_function (GstCollectPads * pads,
196     GstCollectPadsFunction func, gpointer user_data)
198   g_return_if_fail (pads != NULL);
199   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
201   GST_OBJECT_LOCK (pads);
202   pads->func = func;
203   pads->user_data = user_data;
204   GST_OBJECT_UNLOCK (pads);
207 static void
208 ref_data (GstCollectData * data)
210   g_assert (data != NULL);
212   g_atomic_int_inc (&(data->abidata.ABI.refcount));
215 static void
216 unref_data (GstCollectData * data)
218   GstCollectDataDestroyNotify destroy_notify;
220   g_assert (data != NULL);
221   g_assert (data->abidata.ABI.refcount > 0);
223   if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount)))
224     return;
226   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
227   destroy_notify = (GstCollectDataDestroyNotify)
228       g_object_get_data (G_OBJECT (data->pad),
229       "gst-collect-data-destroy-notify");
231   if (destroy_notify)
232     destroy_notify (data);
234   g_object_unref (data->pad);
235   if (data->buffer) {
236     gst_buffer_unref (data->buffer);
237   }
238   g_free (data);
241 /**
242  * gst_collect_pads_add_pad:
243  * @pads: the collectspads to use
244  * @pad: (transfer none): the pad to add
245  * @size: the size of the returned #GstCollectData structure
246  *
247  * Add a pad to the collection of collect pads. The pad has to be
248  * a sinkpad. The refcount of the pad is incremented. Use
249  * gst_collect_pads_remove_pad() to remove the pad from the collection
250  * again.
251  *
252  * This function will override the chain and event functions of the pad
253  * along with the element_private data, which is used to store private
254  * information for the collectpads.
255  *
256  * You specify a size for the returned #GstCollectData structure
257  * so that you can use it to store additional information.
258  *
259  * The pad will be automatically activated in push mode when @pads is
260  * started.
261  *
262  * This function calls gst_collect_pads_add_pad_full() passing a value of NULL
263  * for destroy_notify.
264  *
265  * MT safe.
266  *
267  * Returns: a new #GstCollectData to identify the new pad. Or NULL
268  *   if wrong parameters are supplied.
269  */
270 GstCollectData *
271 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
273   return gst_collect_pads_add_pad_full (pads, pad, size, NULL);
276 /**
277  * gst_collect_pads_add_pad_full:
278  * @pads: the collectspads to use
279  * @pad: (transfer none): the pad to add
280  * @size: the size of the returned #GstCollectData structure
281  * @destroy_notify: function to be called before the returned #GstCollectData
282  * structure is freed
283  *
284  * Add a pad to the collection of collect pads. The pad has to be
285  * a sinkpad. The refcount of the pad is incremented. Use
286  * gst_collect_pads_remove_pad() to remove the pad from the collection
287  * again.
288  *
289  * You specify a size for the returned #GstCollectData structure
290  * so that you can use it to store additional information.
291  *
292  * You can also specify a #GstCollectDataDestroyNotify that will be called
293  * just before the #GstCollectData structure is freed. It is passed the
294  * pointer to the structure and should free any custom memory and resources
295  * allocated for it.
296  *
297  * The pad will be automatically activated in push mode when @pads is
298  * started.
299  *
300  * MT safe.
301  *
302  * Since: 0.10.12
303  *
304  * Returns: a new #GstCollectData to identify the new pad. Or NULL
305  *   if wrong parameters are supplied.
306  */
307 GstCollectData *
308 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size,
309     GstCollectDataDestroyNotify destroy_notify)
311   GstCollectData *data;
313   g_return_val_if_fail (pads != NULL, NULL);
314   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
315   g_return_val_if_fail (pad != NULL, NULL);
316   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
317   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
319   GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
321   data = g_malloc0 (size);
322   data->collect = pads;
323   data->pad = gst_object_ref (pad);
324   data->buffer = NULL;
325   data->pos = 0;
326   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
327   data->abidata.ABI.flushing = FALSE;
328   data->abidata.ABI.new_segment = FALSE;
329   data->abidata.ABI.eos = FALSE;
330   data->abidata.ABI.refcount = 1;
332   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
333   g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify",
334       (void *) destroy_notify);
336   GST_COLLECT_PADS_PAD_LOCK (pads);
337   GST_OBJECT_LOCK (pad);
338   gst_pad_set_element_private (pad, data);
339   GST_OBJECT_UNLOCK (pad);
340   pads->abidata.ABI.pad_list =
341       g_slist_append (pads->abidata.ABI.pad_list, data);
342   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
343   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
344   /* activate the pad when needed */
345   if (pads->started)
346     gst_pad_set_active (pad, TRUE);
347   pads->abidata.ABI.pad_cookie++;
348   GST_COLLECT_PADS_PAD_UNLOCK (pads);
350   return data;
353 static gint
354 find_pad (GstCollectData * data, GstPad * pad)
356   if (data->pad == pad)
357     return 0;
358   return 1;
361 /**
362  * gst_collect_pads_set_clip_function:
363  * @pads: the collectspads to use
364  * @clipfunc: clip function to install
365  * @user_data: (closure): user data to pass to @clip_func
366  *
367  * Install a clipping function that is called right after a buffer is received
368  * on a pad managed by @pads. See #GstCollectPadsClipFunction for more info.
369  *
370  * Since: 0.10.26
371  */
372 void
373 gst_collect_pads_set_clip_function (GstCollectPads * pads,
374     GstCollectPadsClipFunction clipfunc, gpointer user_data)
376   GstCollectPadsPrivate *priv;
378   g_return_if_fail (pads != NULL);
379   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
381   priv = pads->abidata.ABI.priv;
383   priv->clipfunc = clipfunc;
384   priv->clipfunc_user_data = user_data;
387 /**
388  * gst_collect_pads_remove_pad:
389  * @pads: the collectspads to use
390  * @pad: (transfer none): the pad to remove
391  *
392  * Remove a pad from the collection of collect pads. This function will also
393  * free the #GstCollectData and all the resources that were allocated with
394  * gst_collect_pads_add_pad().
395  *
396  * The pad will be deactivated automatically when @pads is stopped.
397  *
398  * MT safe.
399  *
400  * Returns: %TRUE if the pad could be removed.
401  */
402 gboolean
403 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
405   GstCollectData *data;
406   GSList *list;
408   g_return_val_if_fail (pads != NULL, FALSE);
409   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
410   g_return_val_if_fail (pad != NULL, FALSE);
411   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
413   GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
415   GST_COLLECT_PADS_PAD_LOCK (pads);
416   list =
417       g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
418       (GCompareFunc) find_pad);
419   if (!list)
420     goto unknown_pad;
422   data = (GstCollectData *) list->data;
424   GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
426   /* clear the stuff we configured */
427   gst_pad_set_chain_function (pad, NULL);
428   gst_pad_set_event_function (pad, NULL);
429   GST_OBJECT_LOCK (pad);
430   gst_pad_set_element_private (pad, NULL);
431   GST_OBJECT_UNLOCK (pad);
433   /* backward compat, also remove from data if stopped, note that this function
434    * can only be called when we are stopped because we don't take the LOCK to
435    * protect the pads->data list. */
436   if (!pads->started) {
437     GSList *dlist;
439     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
440     if (dlist) {
441       GstCollectData *pdata = dlist->data;
443       pads->data = g_slist_delete_link (pads->data, dlist);
444       unref_data (pdata);
445     }
446   }
447   /* remove from the pad list */
448   pads->abidata.ABI.pad_list =
449       g_slist_delete_link (pads->abidata.ABI.pad_list, list);
450   pads->abidata.ABI.pad_cookie++;
452   /* signal waiters because something changed */
453   GST_COLLECT_PADS_BROADCAST (pads);
455   /* deactivate the pad when needed */
456   if (!pads->started)
457     gst_pad_set_active (pad, FALSE);
459   /* clean and free the collect data */
460   unref_data (data);
462   GST_COLLECT_PADS_PAD_UNLOCK (pads);
464   return TRUE;
466 unknown_pad:
467   {
468     GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
469     GST_COLLECT_PADS_PAD_UNLOCK (pads);
470     return FALSE;
471   }
474 /**
475  * gst_collect_pads_is_active:
476  * @pads: (transfer none): the collectspads to use
477  * @pad: the pad to check
478  *
479  * Check if a pad is active.
480  *
481  * This function is currently not implemented.
482  *
483  * MT safe.
484  *
485  * Returns: %TRUE if the pad is active.
486  */
487 gboolean
488 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
490   g_return_val_if_fail (pads != NULL, FALSE);
491   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
492   g_return_val_if_fail (pad != NULL, FALSE);
493   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
495   g_warning ("gst_collect_pads_is_active() is not implemented");
497   return FALSE;
500 /**
501  * gst_collect_pads_collect:
502  * @pads: the collectspads to use
503  *
504  * Collect data on all pads. This function is usually called
505  * from a #GstTask function in an element.
506  *
507  * This function is currently not implemented.
508  *
509  * MT safe.
510  *
511  * Returns: #GstFlowReturn of the operation.
512  */
513 GstFlowReturn
514 gst_collect_pads_collect (GstCollectPads * pads)
516   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
517   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
519   g_warning ("gst_collect_pads_collect() is not implemented");
521   return GST_FLOW_NOT_SUPPORTED;
524 /**
525  * gst_collect_pads_collect_range:
526  * @pads: the collectspads to use
527  * @offset: the offset to collect
528  * @length: the length to collect
529  *
530  * Collect data with @offset and @length on all pads. This function
531  * is typically called in the getrange function of an element.
532  *
533  * This function is currently not implemented.
534  *
535  * MT safe.
536  *
537  * Returns: #GstFlowReturn of the operation.
538  */
539 GstFlowReturn
540 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
541     guint length)
543   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
544   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
546   g_warning ("gst_collect_pads_collect_range() is not implemented");
548   return GST_FLOW_NOT_SUPPORTED;
551 static gboolean
552 gst_collect_pads_is_flushing (GstCollectPads * pads)
554   GSList *walk = NULL;
555   gboolean res = TRUE;
557   GST_COLLECT_PADS_PAD_LOCK (pads);
559   /* Ensure pads->data state */
560   gst_collect_pads_check_pads_unlocked (pads);
562   GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
563       pads, pads->data);
565   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
566     GstCollectData *cdata = walk->data;
568     GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->abidata.ABI.flushing);
570     if (cdata->abidata.ABI.flushing) {
571       goto done;
572     }
573   }
575   res = FALSE;
576 done:
577   GST_COLLECT_PADS_PAD_UNLOCK (pads);
578   return res;
581 /* FIXME, I think this function is used to work around bad behaviour
582  * of elements that add pads to themselves without activating them.
583  *
584  * Must be called with PAD_LOCK.
585  */
586 static void
587 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
588     gboolean flushing)
590   GSList *walk = NULL;
592   GST_DEBUG ("Setting flushing (%d)", flushing);
594   /* Update the pads flushing flag */
595   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
596     GstCollectData *cdata = walk->data;
598     if (GST_IS_PAD (cdata->pad)) {
599       GST_OBJECT_LOCK (cdata->pad);
600       if (flushing)
601         GST_PAD_SET_FLUSHING (cdata->pad);
602       else
603         GST_PAD_UNSET_FLUSHING (cdata->pad);
604       cdata->abidata.ABI.flushing = flushing;
605       gst_collect_pads_clear (pads, cdata);
606       GST_OBJECT_UNLOCK (cdata->pad);
607     }
608   }
609   /* Setting the pads to flushing means that we changed the values which
610    * are 'protected' by the cookie. We therefore update it to force a
611    * recalculation of the current pad status. */
612   pads->abidata.ABI.pad_cookie++;
615 /**
616  * gst_collect_pads_set_flushing:
617  * @pads: the collectspads to use
618  * @flushing: desired state of the pads
619  *
620  * Change the flushing state of all the pads in the collection. No pad
621  * is able to accept anymore data when @flushing is %TRUE. Calling this
622  * function with @flushing %FALSE makes @pads accept data again.
623  *
624  * MT safe.
625  *
626  * Since: 0.10.7.
627  */
628 void
629 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
631   g_return_if_fail (pads != NULL);
632   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
634   GST_COLLECT_PADS_PAD_LOCK (pads);
635   /* Ensure pads->data state */
636   gst_collect_pads_check_pads_unlocked (pads);
637   gst_collect_pads_set_flushing_unlocked (pads, flushing);
638   GST_COLLECT_PADS_PAD_UNLOCK (pads);
641 /**
642  * gst_collect_pads_start:
643  * @pads: the collectspads to use
644  *
645  * Starts the processing of data in the collect_pads.
646  *
647  * MT safe.
648  */
649 void
650 gst_collect_pads_start (GstCollectPads * pads)
652   GSList *collected;
654   g_return_if_fail (pads != NULL);
655   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
657   GST_DEBUG_OBJECT (pads, "starting collect pads");
659   /* make sure stop and collect cannot be called anymore */
660   GST_OBJECT_LOCK (pads);
662   /* make pads streamable */
663   GST_COLLECT_PADS_PAD_LOCK (pads);
665   /* loop over the master pad list and reset the segment */
666   collected = pads->abidata.ABI.pad_list;
667   for (; collected; collected = g_slist_next (collected)) {
668     GstCollectData *data;
670     data = collected->data;
671     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
672   }
674   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
676   /* Start collect pads */
677   pads->started = TRUE;
678   GST_COLLECT_PADS_PAD_UNLOCK (pads);
679   GST_OBJECT_UNLOCK (pads);
682 /**
683  * gst_collect_pads_stop:
684  * @pads: the collectspads to use
685  *
686  * Stops the processing of data in the collect_pads. this function
687  * will also unblock any blocking operations.
688  *
689  * MT safe.
690  */
691 void
692 gst_collect_pads_stop (GstCollectPads * pads)
694   GSList *collected;
696   g_return_if_fail (pads != NULL);
697   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
699   GST_DEBUG_OBJECT (pads, "stopping collect pads");
701   /* make sure collect and start cannot be called anymore */
702   GST_OBJECT_LOCK (pads);
704   /* make pads not accept data anymore */
705   GST_COLLECT_PADS_PAD_LOCK (pads);
706   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
708   /* Stop collect pads */
709   pads->started = FALSE;
710   pads->eospads = 0;
711   pads->queuedpads = 0;
713   /* loop over the master pad list and flush buffers */
714   collected = pads->abidata.ABI.pad_list;
715   for (; collected; collected = g_slist_next (collected)) {
716     GstCollectData *data;
717     GstBuffer **buffer_p;
719     data = collected->data;
720     if (data->buffer) {
721       buffer_p = &data->buffer;
722       gst_buffer_replace (buffer_p, NULL);
723       data->pos = 0;
724     }
725     data->abidata.ABI.eos = FALSE;
726   }
728   GST_COLLECT_PADS_PAD_UNLOCK (pads);
729   /* Wake them up so they can end the chain functions. */
730   GST_COLLECT_PADS_BROADCAST (pads);
732   GST_OBJECT_UNLOCK (pads);
735 /**
736  * gst_collect_pads_peek:
737  * @pads: the collectspads to peek
738  * @data: the data to use
739  *
740  * Peek at the buffer currently queued in @data. This function
741  * should be called with the @pads LOCK held, such as in the callback
742  * handler.
743  *
744  * MT safe.
745  *
746  * Returns: (transfer full): The buffer in @data or NULL if no buffer is queued.
747  *  should unref the buffer after usage.
748  */
749 GstBuffer *
750 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
752   GstBuffer *result;
754   g_return_val_if_fail (pads != NULL, NULL);
755   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
756   g_return_val_if_fail (data != NULL, NULL);
758   if ((result = data->buffer))
759     gst_buffer_ref (result);
761   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
762       GST_DEBUG_PAD_NAME (data->pad), result);
764   return result;
767 /**
768  * gst_collect_pads_pop:
769  * @pads: the collectspads to pop
770  * @data: the data to use
771  *
772  * Pop the buffer currently queued in @data. This function
773  * should be called with the @pads LOCK held, such as in the callback
774  * handler.
775  *
776  * Free-function: gst_buffer_unref
777  *
778  * MT safe.
779  *
780  * Returns: (transfer full): The buffer in @data or NULL if no buffer was
781  *   queued. You should unref the buffer after usage.
782  */
783 GstBuffer *
784 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
786   GstBuffer *result;
788   g_return_val_if_fail (pads != NULL, NULL);
789   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
790   g_return_val_if_fail (data != NULL, NULL);
792   if ((result = data->buffer)) {
793     data->buffer = NULL;
794     data->pos = 0;
795     /* one less pad with queued data now */
796     pads->queuedpads--;
797   }
799   GST_COLLECT_PADS_BROADCAST (pads);
801   GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
802       GST_DEBUG_PAD_NAME (data->pad), result);
804   return result;
807 /* pop and unref the currently queued buffer, should e called with the LOCK
808  * helt. */
809 static void
810 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
812   GstBuffer *buf;
814   if ((buf = gst_collect_pads_pop (pads, data)))
815     gst_buffer_unref (buf);
818 /**
819  * gst_collect_pads_available:
820  * @pads: the collectspads to query
821  *
822  * Query how much bytes can be read from each queued buffer. This means
823  * that the result of this call is the maximum number of bytes that can
824  * be read from each of the pads.
825  *
826  * This function should be called with @pads LOCK held, such as
827  * in the callback.
828  *
829  * MT safe.
830  *
831  * Returns: The maximum number of bytes queued on all pads. This function
832  * returns 0 if a pad has no queued buffer.
833  */
834 /* FIXME, we can do this in the _chain functions */
835 guint
836 gst_collect_pads_available (GstCollectPads * pads)
838   GSList *collected;
839   guint result = G_MAXUINT;
841   g_return_val_if_fail (pads != NULL, 0);
842   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
844   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
845     GstCollectData *pdata;
846     GstBuffer *buffer;
847     gint size;
849     pdata = (GstCollectData *) collected->data;
851     /* ignore pad with EOS */
852     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
853       GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
854       continue;
855     }
857     /* an empty buffer without EOS is weird when we get here.. */
858     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
859       GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
860       goto not_filled;
861     }
863     /* this is the size left of the buffer */
864     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
865     GST_DEBUG ("pad %s:%s has %d bytes left",
866         GST_DEBUG_PAD_NAME (pdata->pad), size);
868     /* need to return the min of all available data */
869     if (size < result)
870       result = size;
871   }
872   /* nothing changed, all must be EOS then, return 0 */
873   if (G_UNLIKELY (result == G_MAXUINT))
874     result = 0;
876   return result;
878 not_filled:
879   {
880     return 0;
881   }
884 /**
885  * gst_collect_pads_read:
886  * @pads: the collectspads to query
887  * @data: the data to use
888  * @bytes: (out) (transfer none) (array length=size): a pointer to a byte array
889  * @size: the number of bytes to read
890  *
891  * Get a pointer in @bytes where @size bytes can be read from the
892  * given pad @data.
893  *
894  * This function should be called with @pads LOCK held, such as
895  * in the callback.
896  *
897  * MT safe.
898  *
899  * Returns: The number of bytes available for consumption in the
900  * memory pointed to by @bytes. This can be less than @size and
901  * is 0 if the pad is end-of-stream.
902  */
903 guint
904 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
905     guint8 ** bytes, guint size)
907   guint readsize;
908   GstBuffer *buffer;
910   g_return_val_if_fail (pads != NULL, 0);
911   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
912   g_return_val_if_fail (data != NULL, 0);
913   g_return_val_if_fail (bytes != NULL, 0);
915   /* no buffer, must be EOS */
916   if ((buffer = data->buffer) == NULL)
917     return 0;
919   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
921   *bytes = GST_BUFFER_DATA (buffer) + data->pos;
923   return readsize;
926 /**
927  * gst_collect_pads_read_buffer:
928  * @pads: the collectspads to query
929  * @data: the data to use
930  * @size: the number of bytes to read
931  *
932  * Get a buffer of @size bytes from the given pad @data.
933  *
934  * This function should be called with @pads LOCK held, such as in the callback.
935  *
936  * Free-function: gst_buffer_unref
937  *
938  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
939  *     that requested. A return of NULL signals that the pad is end-of-stream.
940  *     Unref the buffer with gst_buffer_unref() after use.
941  *
942  * MT safe.
943  *
944  * Since: 0.10.18
945  */
946 GstBuffer *
947 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
948     guint size)
950   guint readsize, bufsize;
951   GstBuffer *buffer;
953   g_return_val_if_fail (pads != NULL, NULL);
954   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
955   g_return_val_if_fail (data != NULL, NULL);
957   /* no buffer, must be EOS */
958   if ((buffer = data->buffer) == NULL)
959     return NULL;
961   bufsize = GST_BUFFER_SIZE (buffer);
963   readsize = MIN (size, bufsize - data->pos);
965   if (data->pos == 0 && readsize == bufsize)
966     return gst_buffer_ref (buffer);
967   else
968     return gst_buffer_create_sub (buffer, data->pos, readsize);
971 /**
972  * gst_collect_pads_take_buffer:
973  * @pads: the collectspads to query
974  * @data: the data to use
975  * @size: the number of bytes to read
976  *
977  * Get a buffer of @size bytes from the given pad @data. Flushes the amount
978  * of read bytes.
979  *
980  * This function should be called with @pads LOCK held, such as in the callback.
981  *
982  * Free-function: gst_buffer_unref
983  *
984  * MT safe.
985  *
986  * Returns: (transfer full): a #GstBuffer. The size of the buffer can be less
987  *     that requested. A return of NULL signals that the pad is end-of-stream.
988  *     Unref the buffer after use.
989  *
990  * Since: 0.10.18
991  */
992 GstBuffer *
993 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
994     guint size)
996   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
998   if (buffer) {
999     gst_collect_pads_flush (pads, data, GST_BUFFER_SIZE (buffer));
1000   }
1001   return buffer;
1004 /**
1005  * gst_collect_pads_flush:
1006  * @pads: the collectspads to query
1007  * @data: the data to use
1008  * @size: the number of bytes to flush
1009  *
1010  * Flush @size bytes from the pad @data.
1011  *
1012  * This function should be called with @pads LOCK held, such as
1013  * in the callback.
1014  *
1015  * MT safe.
1016  *
1017  * Returns: The number of bytes flushed. This can be less than @size and
1018  * is 0 if the pad was end-of-stream.
1019  */
1020 guint
1021 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
1022     guint size)
1024   guint flushsize;
1025   GstBuffer *buffer;
1027   g_return_val_if_fail (pads != NULL, 0);
1028   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
1029   g_return_val_if_fail (data != NULL, 0);
1031   /* no buffer, must be EOS */
1032   if ((buffer = data->buffer) == NULL)
1033     return 0;
1035   /* this is what we can flush at max */
1036   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
1038   data->pos += size;
1040   GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
1042   if (data->pos >= GST_BUFFER_SIZE (buffer))
1043     /* _clear will also reset data->pos to 0 */
1044     gst_collect_pads_clear (pads, data);
1046   return flushsize;
1049 /* see if pads were added or removed and update our stats. Any pad
1050  * added after releasing the PAD_LOCK will get collected in the next
1051  * round.
1052  *
1053  * We can do a quick check by checking the cookies, that get changed
1054  * whenever the pad list is updated.
1055  *
1056  * Must be called with LOCK.
1057  */
1058 static void
1059 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
1061   GST_DEBUG ("stored cookie : %d, used_cookie:%d",
1062       pads->abidata.ABI.pad_cookie, pads->cookie);
1063   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
1064     GSList *collected;
1066     /* clear list and stats */
1067     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
1068     g_slist_free (pads->data);
1069     pads->data = NULL;
1070     pads->numpads = 0;
1071     pads->queuedpads = 0;
1072     pads->eospads = 0;
1074     /* loop over the master pad list */
1075     collected = pads->abidata.ABI.pad_list;
1076     for (; collected; collected = g_slist_next (collected)) {
1077       GstCollectData *data;
1079       /* update the stats */
1080       pads->numpads++;
1081       data = collected->data;
1083       if (G_LIKELY (!data->abidata.ABI.flushing)) {
1084         if (data->buffer)
1085           pads->queuedpads++;
1086         if (data->abidata.ABI.eos)
1087           pads->eospads++;
1088       }
1090       /* add to the list of pads to collect */
1091       ref_data (data);
1092       pads->data = g_slist_prepend (pads->data, data);
1093     }
1094     /* and update the cookie */
1095     pads->cookie = pads->abidata.ABI.pad_cookie;
1096   }
1099 static inline void
1100 gst_collect_pads_check_pads (GstCollectPads * pads)
1102   /* the master list and cookie are protected with the PAD_LOCK */
1103   GST_COLLECT_PADS_PAD_LOCK (pads);
1104   gst_collect_pads_check_pads_unlocked (pads);
1105   GST_COLLECT_PADS_PAD_UNLOCK (pads);
1108 /* checks if all the pads are collected and call the collectfunction
1109  *
1110  * Should be called with LOCK.
1111  *
1112  * Returns: The #GstFlowReturn of collection.
1113  */
1114 static GstFlowReturn
1115 gst_collect_pads_check_collected (GstCollectPads * pads)
1117   GstFlowReturn flow_ret = GST_FLOW_OK;
1119   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
1120   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
1122   /* check for new pads, update stats etc.. */
1123   gst_collect_pads_check_pads (pads);
1125   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
1126     /* If all our pads are EOS just collect once to let the element
1127      * do its final EOS handling. */
1128     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
1129         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
1130     flow_ret = pads->func (pads, pads->user_data);
1131   } else {
1132     gboolean collected = FALSE;
1134     /* We call the collected function as long as our condition matches.
1135      * FIXME: should we error out if the collect function did not pop anything ?
1136      * we can get a busy loop here if the element does not pop from the collect
1137      * function
1138      */
1139     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
1140       GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
1141           pads->queuedpads, pads->eospads, pads->numpads,
1142           GST_DEBUG_FUNCPTR_NAME (pads->func));
1143       flow_ret = pads->func (pads, pads->user_data);
1144       collected = TRUE;
1146       /* break on error */
1147       if (flow_ret != GST_FLOW_OK)
1148         break;
1149       /* Don't keep looping after telling the element EOS or flushing */
1150       if (pads->queuedpads == 0)
1151         break;
1152     }
1153     if (!collected)
1154       GST_DEBUG ("Not all active pads (%d) have data, continuing",
1155           pads->numpads);
1156   }
1157   return flow_ret;
1160 static gboolean
1161 gst_collect_pads_event (GstPad * pad, GstEvent * event)
1163   gboolean res;
1164   GstCollectData *data;
1165   GstCollectPads *pads;
1167   /* some magic to get the managing collect_pads */
1168   GST_OBJECT_LOCK (pad);
1169   data = (GstCollectData *) gst_pad_get_element_private (pad);
1170   if (G_UNLIKELY (data == NULL))
1171     goto pad_removed;
1172   ref_data (data);
1173   GST_OBJECT_UNLOCK (pad);
1175   res = TRUE;
1177   pads = data->collect;
1179   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
1180       GST_DEBUG_PAD_NAME (data->pad));
1182   switch (GST_EVENT_TYPE (event)) {
1183     case GST_EVENT_FLUSH_START:
1184     {
1185       /* forward event to unblock check_collected */
1186       gst_pad_event_default (pad, event);
1188       /* now unblock the chain function.
1189        * no cond per pad, so they all unblock,
1190        * non-flushing block again */
1191       GST_OBJECT_LOCK (pads);
1192       data->abidata.ABI.flushing = TRUE;
1193       gst_collect_pads_clear (pads, data);
1194       GST_OBJECT_UNLOCK (pads);
1196       /* event already cleaned up by forwarding */
1197       goto done;
1198     }
1199     case GST_EVENT_FLUSH_STOP:
1200     {
1201       /* flush the 1 buffer queue */
1202       GST_OBJECT_LOCK (pads);
1203       data->abidata.ABI.flushing = FALSE;
1204       gst_collect_pads_clear (pads, data);
1205       /* we need new segment info after the flush */
1206       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
1207       data->abidata.ABI.new_segment = FALSE;
1208       /* if the pad was EOS, remove the EOS flag and
1209        * decrement the number of eospads */
1210       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
1211         pads->eospads--;
1212         data->abidata.ABI.eos = FALSE;
1213       }
1215       if (!gst_collect_pads_is_flushing (pads)) {
1216         /* forward event if all pads are no longer flushing */
1217         GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
1218         GST_OBJECT_UNLOCK (pads);
1219         goto forward;
1220       }
1221       gst_event_unref (event);
1222       GST_OBJECT_UNLOCK (pads);
1223       goto done;
1224     }
1225     case GST_EVENT_EOS:
1226     {
1227       GST_OBJECT_LOCK (pads);
1228       /* if the pad was not EOS, make it EOS and so we
1229        * have one more eospad */
1230       if (G_LIKELY (data->abidata.ABI.eos == FALSE)) {
1231         data->abidata.ABI.eos = TRUE;
1232         pads->eospads++;
1233       }
1234       /* check if we need collecting anything, we ignore the
1235        * result. */
1236       gst_collect_pads_check_collected (pads);
1237       GST_OBJECT_UNLOCK (pads);
1239       /* We eat this event, element should do something
1240        * in the collected callback. */
1241       gst_event_unref (event);
1242       goto done;
1243     }
1244     case GST_EVENT_NEWSEGMENT:
1245     {
1246       gint64 start, stop, time;
1247       gdouble rate, arate;
1248       GstFormat format;
1249       gboolean update;
1251       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1252           &start, &stop, &time);
1254       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
1255           ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
1256           GST_TIME_ARGS (stop));
1258       gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
1259           format, start, stop, time);
1261       data->abidata.ABI.new_segment = TRUE;
1263       /* we must not forward this event since multiple segments will be
1264        * accumulated and this is certainly not what we want. */
1265       gst_event_unref (event);
1266       /* FIXME: collect-pads based elements need to create their own newsegment
1267        * event (and only one really)
1268        * (a) make the segment part of the GstCollectData structure of each pad,
1269        * so you can just check that once you have a buffer queued on that pad,
1270        * (b) you can override a pad's event function with your own,
1271        * catch the newsegment event and then pass it on to the original
1272        * gstcollectpads event function
1273        * (that's what avimux does for something IIRC)
1274        * see #340060
1275        */
1276       goto done;
1277     }
1278     default:
1279       /* forward other events */
1280       goto forward;
1281   }
1283 forward:
1284   GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
1285       GST_EVENT_TYPE_NAME (event));
1286   res = gst_pad_event_default (pad, event);
1288 done:
1289   unref_data (data);
1290   return res;
1292   /* ERRORS */
1293 pad_removed:
1294   {
1295     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1296     GST_OBJECT_UNLOCK (pad);
1297     return FALSE;
1298   }
1301 /* For each buffer we receive we check if our collected condition is reached
1302  * and if so we call the collected function. When this is done we check if
1303  * data has been unqueued. If data is still queued we wait holding the stream
1304  * lock to make sure no EOS event can happen while we are ready to be
1305  * collected
1306  */
1307 static GstFlowReturn
1308 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
1310   GstCollectData *data;
1311   GstCollectPads *pads;
1312   GstCollectPadsPrivate *priv;
1313   GstFlowReturn ret;
1315   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1317   /* some magic to get the managing collect_pads */
1318   GST_OBJECT_LOCK (pad);
1319   data = (GstCollectData *) gst_pad_get_element_private (pad);
1320   if (G_UNLIKELY (data == NULL))
1321     goto no_data;
1322   ref_data (data);
1323   GST_OBJECT_UNLOCK (pad);
1325   pads = data->collect;
1326   priv = pads->abidata.ABI.priv;
1328   GST_OBJECT_LOCK (pads);
1329   /* if not started, bail out */
1330   if (G_UNLIKELY (!pads->started))
1331     goto not_started;
1332   /* check if this pad is flushing */
1333   if (G_UNLIKELY (data->abidata.ABI.flushing))
1334     goto flushing;
1335   /* pad was EOS, we can refuse this data */
1336   if (G_UNLIKELY (data->abidata.ABI.eos))
1337     goto unexpected;
1339   /* see if we need to clip */
1340   if (priv->clipfunc) {
1341     buffer = priv->clipfunc (pads, data, buffer, priv->clipfunc_user_data);
1343     if (G_UNLIKELY (buffer == NULL))
1344       goto clipped;
1345   }
1347   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
1348       GST_DEBUG_PAD_NAME (pad));
1350   /* One more pad has data queued */
1351   pads->queuedpads++;
1352   /* take ownership of the buffer */
1353   if (data->buffer)
1354     gst_buffer_unref (data->buffer);
1355   data->buffer = buffer;
1356   buffer = NULL;
1358   /* update segment last position if in TIME */
1359   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
1360     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (data->buffer);
1362     if (GST_CLOCK_TIME_IS_VALID (timestamp))
1363       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
1364   }
1366   /* While we have data queued on this pad try to collect stuff */
1367   do {
1368     GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
1369     /* Check if our collected condition is matched and call the collected function
1370      * if it is */
1371     ret = gst_collect_pads_check_collected (pads);
1372     /* when an error occurs, we want to report this back to the caller ASAP
1373      * without having to block if the buffer was not popped */
1374     if (G_UNLIKELY (ret != GST_FLOW_OK))
1375       goto error;
1377     /* data was consumed, we can exit and accept new data */
1378     if (data->buffer == NULL)
1379       break;
1381     /* Check if we got removed in the mean time, FIXME, this is racy.
1382      * Between this check and the _WAIT, the pad could be removed which will
1383      * makes us hang in the _WAIT. */
1384     GST_OBJECT_LOCK (pad);
1385     if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
1386       goto pad_removed;
1387     GST_OBJECT_UNLOCK (pad);
1389     GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
1390         GST_DEBUG_PAD_NAME (pad));
1392     /* wait to be collected, this must happen from another thread triggered
1393      * by the _chain function of another pad. We release the lock so we
1394      * can get stopped or flushed as well. We can however not get EOS
1395      * because we still hold the STREAM_LOCK.
1396      */
1397     GST_COLLECT_PADS_WAIT (pads);
1399     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
1401     /* after a signal, we could be stopped */
1402     if (G_UNLIKELY (!pads->started))
1403       goto not_started;
1404     /* check if this pad is flushing */
1405     if (G_UNLIKELY (data->abidata.ABI.flushing))
1406       goto flushing;
1407   }
1408   while (data->buffer != NULL);
1410 unlock_done:
1411   GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
1412   GST_OBJECT_UNLOCK (pads);
1413   unref_data (data);
1414   if (buffer)
1415     gst_buffer_unref (buffer);
1416   return ret;
1418 pad_removed:
1419   {
1420     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1421     GST_OBJECT_UNLOCK (pad);
1422     ret = GST_FLOW_NOT_LINKED;
1423     goto unlock_done;
1424   }
1425   /* ERRORS */
1426 no_data:
1427   {
1428     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
1429     GST_OBJECT_UNLOCK (pad);
1430     gst_buffer_unref (buffer);
1431     return GST_FLOW_NOT_LINKED;
1432   }
1433 not_started:
1434   {
1435     GST_DEBUG ("not started");
1436     gst_collect_pads_clear (pads, data);
1437     ret = GST_FLOW_WRONG_STATE;
1438     goto unlock_done;
1439   }
1440 flushing:
1441   {
1442     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
1443     gst_collect_pads_clear (pads, data);
1444     ret = GST_FLOW_WRONG_STATE;
1445     goto unlock_done;
1446   }
1447 unexpected:
1448   {
1449     /* we should not post an error for this, just inform upstream that
1450      * we don't expect anything anymore */
1451     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
1452     ret = GST_FLOW_UNEXPECTED;
1453     goto unlock_done;
1454   }
1455 clipped:
1456   {
1457     GST_DEBUG ("clipped buffer on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1458     ret = GST_FLOW_OK;
1459     goto unlock_done;
1460   }
1461 error:
1462   {
1463     /* we print the error, the element should post a reasonable error
1464      * message for fatal errors */
1465     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
1466     gst_collect_pads_clear (pads, data);
1467     goto unlock_done;
1468   }