summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: f26d179)
raw | patch | inline | side by side (parent: f26d179)
author | Wim Taymans <wim.taymans@gmail.com> | |
Tue, 3 Jul 2007 16:26:29 +0000 (16:26 +0000) | ||
committer | Wim Taymans <wim.taymans@gmail.com> | |
Tue, 3 Jul 2007 16:26:29 +0000 (16:26 +0000) |
Original commit message from CVS:
* plugins/elements/gsttee.c: (gst_tee_base_init),
(gst_tee_request_new_pad), (gst_tee_release_pad),
(gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc),
(gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer),
(gst_tee_chain):
Be a lot smarter when deciding what srcpad to use for proxying
the buffer_alloc. Also handle pad added/removed when doing so.
Fixes #357959.
Keep track of what pads we already pushed on in case we have pads
added/removed while pushing. Fixes #374639
* tests/check/Makefile.am:
* tests/check/elements/tee.c: (handoff), (GST_START_TEST),
(tee_suite):
Added unit test for pad resync.
* plugins/elements/gsttee.c: (gst_tee_base_init),
(gst_tee_request_new_pad), (gst_tee_release_pad),
(gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc),
(gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer),
(gst_tee_chain):
Be a lot smarter when deciding what srcpad to use for proxying
the buffer_alloc. Also handle pad added/removed when doing so.
Fixes #357959.
Keep track of what pads we already pushed on in case we have pads
added/removed while pushing. Fixes #374639
* tests/check/Makefile.am:
* tests/check/elements/tee.c: (handoff), (GST_START_TEST),
(tee_suite):
Added unit test for pad resync.
ChangeLog | patch | blob | history | |
plugins/elements/gsttee.c | patch | blob | history | |
tests/check/Makefile.am | patch | blob | history | |
tests/check/elements/tee.c | [new file with mode: 0644] | patch | blob |
diff --git a/ChangeLog b/ChangeLog
index 2f607b43eda52d388dbfaa225e47584712e9d7a7..a03eccc4e5f56ddb4826b35d2b9fa7e56f05b18a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
+2007-07-03 Wim Taymans <wim.taymans@gmail.com>
+
+ * plugins/elements/gsttee.c: (gst_tee_base_init),
+ (gst_tee_request_new_pad), (gst_tee_release_pad),
+ (gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc),
+ (gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer),
+ (gst_tee_chain):
+ Be a lot smarter when deciding what srcpad to use for proxying
+ the buffer_alloc. Also handle pad added/removed when doing so.
+ Fixes #357959.
+ Keep track of what pads we already pushed on in case we have pads
+ added/removed while pushing. Fixes #374639
+
+ * tests/check/Makefile.am:
+ * tests/check/elements/tee.c: (handoff), (GST_START_TEST),
+ (tee_suite):
+ Added unit test for pad resync.
+
2007-07-01 Thomas Vander Stichele <thomas at apestaart dot org>
* po/nl.po:
index f9ba0b746acf3d29c6325d25986f1fbbda6841b5..5b5382e0f3a98828ae945690b4dc825a824bcf70 100644 (file)
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
- *
+ * 2007 Wim Taymans <wim.taymans@gmail.com>
*
* gsttee.c: Tee element, one in N out
*
GST_BOILERPLATE_FULL (GstTee, gst_tee, GstElement, GST_TYPE_ELEMENT, _do_init);
+/* structure and quark to keep track of which pads have been pushed */
+static GQuark push_data;
+
+typedef struct
+{
+ gboolean pushed;
+ GstFlowReturn result;
+} PushData;
+
static GstPad *gst_tee_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * unused);
static void gst_tee_release_pad (GstElement * element, GstPad * pad);
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&tee_src_template));
+
+ push_data = g_quark_from_static_string ("tee-push-data");
}
static void
GstTee *tee;
GstActivateMode mode;
gboolean res;
+ PushData *data;
tee = GST_TEE (element);
+ GST_DEBUG_OBJECT (tee, "requesting pad");
+
GST_OBJECT_LOCK (tee);
name = g_strdup_printf ("src%d", tee->pad_counter++);
srcpad = gst_pad_new_from_template (templ, name);
g_free (name);
- if (tee->allocpad == NULL)
- tee->allocpad = srcpad;
-
mode = tee->sink_mode;
+
+ /* install the data, we automatically free it when the pad is disposed because
+ * of _release_pad or when the element goes away. */
+ data = g_new0 (PushData, 1);
+ data->pushed = FALSE;
+ data->result = GST_FLOW_NOT_LINKED;
+ g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free);
+
GST_OBJECT_UNLOCK (tee);
switch (mode) {
tee = GST_TEE (element);
+ GST_DEBUG_OBJECT (tee, "releasing pad");
+
GST_OBJECT_LOCK (tee);
if (tee->allocpad == pad)
tee->allocpad = NULL;
GST_OBJECT_UNLOCK (tee);
}
+/* we have no previous source pad we can use to proxy the pad alloc. Loop over
+ * the source pads, try to alloc a buffer on each one of them. Keep a reference
+ * to the first pad that succeeds, we will be using it to alloc more buffers
+ * later. */
+static GstFlowReturn
+gst_tee_find_buffer_alloc (GstTee * tee, guint64 offset, guint size,
+ GstCaps * caps, GstBuffer ** buf)
+{
+ GstFlowReturn res;
+ GList *pads;
+ guint32 cookie;
+
+ res = GST_FLOW_NOT_LINKED;
+
+retry:
+ pads = GST_ELEMENT_CAST (tee)->srcpads;
+ cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
+
+ while (pads) {
+ GstPad *pad;
+
+ pad = GST_PAD_CAST (pads->data);
+ gst_object_ref (pad);
+ GST_DEBUG_OBJECT (tee, "try alloc on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
+ GST_OBJECT_UNLOCK (tee);
+
+ res = gst_pad_alloc_buffer (pad, offset, size, caps, buf);
+
+ GST_DEBUG_OBJECT (tee, "got return value %d", res);
+
+ gst_object_unref (pad);
+
+ GST_OBJECT_LOCK (tee);
+ if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) {
+ GST_DEBUG_OBJECT (tee, "pad list changed, restart");
+ /* pad list changed, restart. If the pad alloc function returned OK we
+ * need to unref the buffer */
+ if (res == GST_FLOW_OK)
+ gst_buffer_unref (*buf);
+ goto retry;
+ }
+ if (res == GST_FLOW_OK) {
+ GST_DEBUG_OBJECT (tee, "we have a buffer on pad %s:%s",
+ GST_DEBUG_PAD_NAME (pad));
+ /* we have a buffer, keep the pad for later and exit the loop. */
+ tee->allocpad = pad;
+ break;
+ }
+ /* no valid buffer, try another pad */
+ pads = g_list_next (pads);
+ }
+
+ return res;
+}
+
static GstFlowReturn
gst_tee_buffer_alloc (GstPad * pad, guint64 offset, guint size,
GstCaps * caps, GstBuffer ** buf)
tee = GST_TEE (GST_PAD_PARENT (pad));
+ res = GST_FLOW_NOT_LINKED;
+
GST_OBJECT_LOCK (tee);
- if ((allocpad = tee->allocpad))
+ if ((allocpad = tee->allocpad)) {
+ /* if we had a previous pad we used for allocating a buffer, continue using
+ * it. */
+ GST_DEBUG_OBJECT (tee, "using pad %s:%s for alloc",
+ GST_DEBUG_PAD_NAME (allocpad));
gst_object_ref (allocpad);
- GST_OBJECT_UNLOCK (tee);
+ GST_OBJECT_UNLOCK (tee);
- if (allocpad) {
res = gst_pad_alloc_buffer (allocpad, offset, size, caps, buf);
gst_object_unref (allocpad);
- } else {
- res = GST_FLOW_OK;
- *buf = NULL;
+
+ GST_OBJECT_LOCK (tee);
}
+ /* either we failed to alloc on the the previous pad or we did not have a
+ * previous pad. */
+ if (res == GST_FLOW_NOT_LINKED) {
+ /* find a new pad to alloc a buffer on */
+ GST_DEBUG_OBJECT (tee, "finding pad for alloc");
+ res = gst_tee_find_buffer_alloc (tee, offset, size, caps, buf);
+ }
+ GST_OBJECT_UNLOCK (tee);
+
return res;
}
-typedef struct
-{
- GstTee *tee;
- GstBuffer *buffer;
-} PushData;
-
-static gboolean
-gst_tee_do_push (GstPad * pad, GValue * ret, PushData * data)
+static GstFlowReturn
+gst_tee_do_push (GstTee * tee, GstPad * pad, GstBuffer * buffer)
{
GstFlowReturn res;
- GstTee *tee = data->tee;
-
- if (G_UNLIKELY (!data->tee->silent)) {
- GstBuffer *buf = data->buffer;
+ if (G_UNLIKELY (!tee->silent)) {
GST_OBJECT_LOCK (tee);
g_free (tee->last_message);
tee->last_message =
g_strdup_printf ("chain ******* (%s:%s)t (%d bytes, %"
G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
- GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf), buf);
+ GST_BUFFER_SIZE (buffer), GST_BUFFER_TIMESTAMP (buffer), buffer);
GST_OBJECT_UNLOCK (tee);
g_object_notify (G_OBJECT (tee), "last_message");
}
/* Push */
- if (pad == data->tee->pull_pad) {
+ if (pad == tee->pull_pad) {
+ /* don't push on the pad we're pulling from */
res = GST_FLOW_OK;
} else {
- res = gst_pad_push (pad, gst_buffer_ref (data->buffer));
- GST_LOG_OBJECT (tee, "Pushing buffer %p to %" GST_PTR_FORMAT
- " yielded result=%d", data->buffer, pad, res);
+ res = gst_pad_push (pad, gst_buffer_ref (buffer));
}
+ return res;
+}
- /* If it's fatal or OK, or if ret is currently
- * not-linked, we overwrite the previous value */
- if (GST_FLOW_IS_FATAL (res) || (res == GST_FLOW_OK) ||
- (g_value_get_enum (ret) == GST_FLOW_NOT_LINKED)) {
- GST_LOG_OBJECT (tee, "Replacing ret val %d with %d",
- g_value_get_enum (ret), res);
- g_value_set_enum (ret, res);
- }
+static void
+clear_pads (GstPad * pad, GstTee * tee)
+{
+ PushData *data;
- gst_object_unref (pad);
+ data = g_object_get_qdata (G_OBJECT (pad), push_data);
+
+ /* the data must be there or we have a screwed up internal state */
+ g_assert (data != NULL);
- /* Stop iterating if flow return is fatal */
- return (!GST_FLOW_IS_FATAL (res));
+ data->pushed = FALSE;
+ data->result = GST_FLOW_NOT_LINKED;
}
static GstFlowReturn
gst_tee_handle_buffer (GstTee * tee, GstBuffer * buffer)
{
- GstIterator *iter;
- PushData data;
- GValue ret = { 0, };
- GstIteratorResult res;
+ GList *pads;
+ guint32 cookie;
+ GstFlowReturn ret, cret;
tee->offset += GST_BUFFER_SIZE (buffer);
- g_value_init (&ret, GST_TYPE_FLOW_RETURN);
- g_value_set_enum (&ret, GST_FLOW_NOT_LINKED);
- iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
- data.tee = tee;
- data.buffer = buffer;
-
- GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer);
- /* FIXME: Not sure how tee would handle RESEND buffer from some of the
- * pads but not from others. */
- res = gst_iterator_fold (iter, (GstIteratorFoldFunction) gst_tee_do_push,
- &ret, &data);
- gst_iterator_free (iter);
-
- GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result=%d", buffer,
- g_value_get_enum (&ret));
+ GST_OBJECT_LOCK (tee);
+ /* mark all pads as 'not pushed on yet' */
+ g_list_foreach (GST_ELEMENT_CAST (tee)->srcpads, (GFunc) clear_pads, tee);
+
+restart:
+ cret = GST_FLOW_NOT_LINKED;
+ pads = GST_ELEMENT_CAST (tee)->srcpads;
+ cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
+
+ while (pads) {
+ GstPad *pad;
+ PushData *data;
+
+ pad = GST_PAD_CAST (pads->data);
+
+ /* get the private data, something is really wrong with the internal state
+ * when it is not there */
+ data = g_object_get_qdata (G_OBJECT (pad), push_data);
+
+ g_assert (data != NULL);
+
+ if (!data->pushed) {
+ /* not yet pushed, release lock and start pushing */
+ gst_object_ref (pad);
+ GST_OBJECT_UNLOCK (tee);
+
+ GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer);
+
+ ret = gst_tee_do_push (tee, pad, buffer);
+
+ GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result %s", buffer,
+ gst_flow_get_name (ret));
+
+ GST_OBJECT_LOCK (tee);
+ /* keep track of which pad we pushed and the result value. We need to do
+ * this before we release the refcount on the pad, the PushData is
+ * destroyed when the last ref of the pad goes away. */
+ data->pushed = TRUE;
+ data->result = ret;
+ gst_object_unref (pad);
+ } else {
+ /* already pushed, use previous return value */
+ ret = data->result;
+ GST_LOG_OBJECT (tee, "pad already pushed with %s",
+ gst_flow_get_name (ret));
+ }
+ /* stop pushing more buffers when we have a fatal error */
+ if (GST_FLOW_IS_FATAL (ret))
+ goto error;
+
+ /* keep all other return values, overwriting the previous one */
+ GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
+ if (cret == GST_FLOW_NOT_LINKED)
+ cret = ret;
+
+ if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) {
+ GST_LOG_OBJECT (tee, "pad list changed");
+ /* the list of pads changed, restart iteration. Pads that we already
+ * pushed on and are still in the new list, will not be pushed on
+ * again. */
+ goto restart;
+ }
+ pads = g_list_next (pads);
+ }
+ GST_OBJECT_UNLOCK (tee);
gst_buffer_unref (buffer);
/* no need to unset gvalue */
- return g_value_get_enum (&ret);
+ return cret;
+
+ /* ERRORS */
+error:
+ {
+ GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
+ gst_buffer_unref (buffer);
+ GST_OBJECT_UNLOCK (tee);
+ return ret;
+ }
}
static GstFlowReturn
tee = GST_TEE (gst_pad_get_parent (pad));
+ GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
+
res = gst_tee_handle_buffer (tee, buffer);
+ GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
+
gst_object_unref (tee);
return res;
index 34f379ea559bbbc8950691b58dce5f06b9ad3e57..900398ade53abe0334143318daa26b4021dc78fa 100644 (file)
--- a/tests/check/Makefile.am
+++ b/tests/check/Makefile.am
elements/filesrc \
elements/identity \
elements/multiqueue \
+ elements/tee \
libs/basesrc \
libs/controller \
libs/typefindhelper \
diff --git a/tests/check/elements/tee.c b/tests/check/elements/tee.c
--- /dev/null
@@ -0,0 +1,145 @@
+/* GStreamer
+ *
+ * unit test for tee
+ *
+ * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <gst/check/gstcheck.h>
+
+static gint count1;
+static gint count2;
+
+static void
+handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
+{
+ *count = *count + 1;
+}
+
+/* construct fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink t. ! queue !
+ * fakesink. Each fakesink should exactly receive 3 buffers.
+ */
+GST_START_TEST (test_num_buffers)
+{
+ GstElement *pipeline;
+ GstElement *f1, *f2;
+ gchar *desc;
+ GstBus *bus;
+ GstMessage *msg;
+
+ desc = "fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink name=f1 "
+ "t. ! queue ! fakesink name=f2";
+ pipeline = gst_parse_launch (desc, NULL);
+ fail_if (pipeline == NULL);
+
+ f1 = gst_bin_get_by_name (GST_BIN (pipeline), "f1");
+ fail_if (f1 == NULL);
+ f2 = gst_bin_get_by_name (GST_BIN (pipeline), "f2");
+ fail_if (f2 == NULL);
+
+ count1 = 0;
+ count2 = 0;
+
+ g_object_set (G_OBJECT (f1), "signal-handoffs", TRUE, NULL);
+ g_signal_connect (G_OBJECT (f1), "handoff", (GCallback) handoff, &count1);
+ g_object_set (G_OBJECT (f2), "signal-handoffs", TRUE, NULL);
+ g_signal_connect (G_OBJECT (f2), "handoff", (GCallback) handoff, &count2);
+
+ bus = gst_element_get_bus (pipeline);
+ fail_if (bus == NULL);
+ gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+ msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
+ fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
+ gst_message_unref (msg);
+
+ fail_if (count1 != 3);
+ fail_if (count2 != 3);
+
+ gst_element_set_state (pipeline, GST_STATE_NULL);
+ gst_object_unref (f1);
+ gst_object_unref (f2);
+ gst_object_unref (bus);
+ gst_object_unref (pipeline);
+}
+
+GST_END_TEST;
+
+/* we use fakesrc ! tee ! fakesink and then randomly request/release and link
+ * some pads from tee. This should happily run without any errors. */
+GST_START_TEST (test_stress)
+{
+ GstElement *pipeline;
+ GstElement *tee;
+ gchar *desc;
+ GstBus *bus;
+ GstMessage *msg;
+ gint i;
+
+ desc = "fakesrc num-buffers=100000 ! tee name=t ! queue ! fakesink";
+ pipeline = gst_parse_launch (desc, NULL);
+ fail_if (pipeline == NULL);
+
+ tee = gst_bin_get_by_name (GST_BIN (pipeline), "t");
+ fail_if (tee == NULL);
+
+ /* bring the pipeline to PLAYING, then start switching */
+ bus = gst_element_get_bus (pipeline);
+ fail_if (bus == NULL);
+ gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+ for (i = 0; i < 50000; i++) {
+ GstPad *pad;
+
+ pad = gst_element_get_request_pad (tee, "src%d");
+ gst_element_release_request_pad (tee, pad);
+ gst_object_unref (pad);
+ }
+
+ /* now wait for completion or error */
+ msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
+ fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
+ gst_message_unref (msg);
+
+ gst_element_set_state (pipeline, GST_STATE_NULL);
+ gst_object_unref (tee);
+ gst_object_unref (bus);
+ gst_object_unref (pipeline);
+}
+
+GST_END_TEST;
+
+Suite *
+tee_suite (void)
+{
+ Suite *s = suite_create ("tee");
+ TCase *tc_chain = tcase_create ("general");
+
+ suite_add_tcase (s, tc_chain);
+ tcase_add_test (tc_chain, test_num_buffers);
+ tcase_add_test (tc_chain, test_stress);
+
+ return s;
+}
+
+GST_CHECK_MAIN (tee);