1 /* GStreamer
2 * Copyright (C) <2005,2006> Wim Taymans <wim@fluendo.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
18 */
19 /*
20 * Unless otherwise indicated, Source Code is licensed under MIT license.
21 * See further explanation attached in License Statement (distributed in the file
22 * LICENSE).
23 *
24 * Permission is hereby granted, free of charge, to any person obtaining a copy of
25 * this software and associated documentation files (the "Software"), to deal in
26 * the Software without restriction, including without limitation the rights to
27 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
28 * of the Software, and to permit persons to whom the Software is furnished to do
29 * so, subject to the following conditions:
30 *
31 * The above copyright notice and this permission notice shall be included in all
32 * copies or substantial portions of the Software.
33 *
34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
35 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
36 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
37 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
38 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
39 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
40 * SOFTWARE.
41 */
42 /* Element-Checklist-Version: 5 */
44 /**
45 * SECTION:element-rdtmanager
46 * @see_also: GstRtspSrc
47 *
48 * A simple RTP session manager used internally by rtspsrc.
49 *
50 * Last reviewed on 2006-06-20 (0.10.4)
51 */
53 /* #define HAVE_RTCP */
55 #include "gstrdtbuffer.h"
56 #include "rdtmanager.h"
57 #include "rdtjitterbuffer.h"
59 #include <stdio.h>
61 GST_DEBUG_CATEGORY_STATIC (rdtmanager_debug);
62 #define GST_CAT_DEFAULT (rdtmanager_debug)
64 /* GstRDTManager signals and args */
65 enum
66 {
67 SIGNAL_REQUEST_PT_MAP,
68 SIGNAL_CLEAR_PT_MAP,
70 SIGNAL_ON_NEW_SSRC,
71 SIGNAL_ON_SSRC_COLLISION,
72 SIGNAL_ON_SSRC_VALIDATED,
73 SIGNAL_ON_SSRC_ACTIVE,
74 SIGNAL_ON_SSRC_SDES,
75 SIGNAL_ON_BYE_SSRC,
76 SIGNAL_ON_BYE_TIMEOUT,
77 SIGNAL_ON_TIMEOUT,
78 LAST_SIGNAL
79 };
81 #define DEFAULT_LATENCY_MS 200
83 enum
84 {
85 PROP_0,
86 PROP_LATENCY
87 };
89 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_sink_template =
90 GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
91 GST_PAD_SINK,
92 GST_PAD_REQUEST,
93 GST_STATIC_CAPS ("application/x-rdt")
94 );
96 static GstStaticPadTemplate gst_rdt_manager_recv_rtcp_sink_template =
97 GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
98 GST_PAD_SINK,
99 GST_PAD_REQUEST,
100 GST_STATIC_CAPS ("application/x-rtcp")
101 );
103 static GstStaticPadTemplate gst_rdt_manager_recv_rtp_src_template =
104 GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
105 GST_PAD_SRC,
106 GST_PAD_SOMETIMES,
107 GST_STATIC_CAPS ("application/x-rdt")
108 );
110 static GstStaticPadTemplate gst_rdt_manager_rtcp_src_template =
111 GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
112 GST_PAD_SRC,
113 GST_PAD_REQUEST,
114 GST_STATIC_CAPS ("application/x-rtcp")
115 );
117 static void gst_rdt_manager_finalize (GObject * object);
118 static void gst_rdt_manager_set_property (GObject * object,
119 guint prop_id, const GValue * value, GParamSpec * pspec);
120 static void gst_rdt_manager_get_property (GObject * object,
121 guint prop_id, GValue * value, GParamSpec * pspec);
123 static gboolean gst_rdt_manager_query_src (GstPad * pad, GstQuery * query);
124 static gboolean gst_rdt_manager_src_activate_push (GstPad * pad,
125 gboolean active);
127 static GstClock *gst_rdt_manager_provide_clock (GstElement * element);
128 static GstStateChangeReturn gst_rdt_manager_change_state (GstElement * element,
129 GstStateChange transition);
130 static GstPad *gst_rdt_manager_request_new_pad (GstElement * element,
131 GstPadTemplate * templ, const gchar * name);
132 static void gst_rdt_manager_release_pad (GstElement * element, GstPad * pad);
134 static gboolean gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
135 GstRDTManagerSession * session, GstCaps * caps);
136 static gboolean gst_rdt_manager_setcaps (GstPad * pad, GstCaps * caps);
138 static GstFlowReturn gst_rdt_manager_chain_rdt (GstPad * pad,
139 GstBuffer * buffer);
140 static GstFlowReturn gst_rdt_manager_chain_rtcp (GstPad * pad,
141 GstBuffer * buffer);
142 static void gst_rdt_manager_loop (GstPad * pad);
144 static guint gst_rdt_manager_signals[LAST_SIGNAL] = { 0 };
146 #define JBUF_LOCK(sess) (g_mutex_lock ((sess)->jbuf_lock))
148 #define JBUF_LOCK_CHECK(sess,label) G_STMT_START { \
149 JBUF_LOCK (sess); \
150 if (sess->srcresult != GST_FLOW_OK) \
151 goto label; \
152 } G_STMT_END
154 #define JBUF_UNLOCK(sess) (g_mutex_unlock ((sess)->jbuf_lock))
155 #define JBUF_WAIT(sess) (g_cond_wait ((sess)->jbuf_cond, (sess)->jbuf_lock))
157 #define JBUF_WAIT_CHECK(sess,label) G_STMT_START { \
158 JBUF_WAIT(sess); \
159 if (sess->srcresult != GST_FLOW_OK) \
160 goto label; \
161 } G_STMT_END
163 #define JBUF_SIGNAL(sess) (g_cond_signal ((sess)->jbuf_cond))
165 /* Manages the receiving end of the packets.
166 *
167 * There is one such structure for each RTP session (audio/video/...).
168 * We get the RTP/RTCP packets and stuff them into the session manager.
169 */
170 struct _GstRDTManagerSession
171 {
172 /* session id */
173 gint id;
174 /* the parent bin */
175 GstRDTManager *dec;
177 gboolean active;
178 /* we only support one ssrc and one pt */
179 guint32 ssrc;
180 guint8 pt;
181 gint clock_rate;
182 GstCaps *caps;
183 gint64 clock_base;
185 GstSegment segment;
187 /* the last seqnum we pushed out */
188 guint32 last_popped_seqnum;
189 /* the next expected seqnum */
190 guint32 next_seqnum;
191 /* last output time */
192 GstClockTime last_out_time;
194 /* the pads of the session */
195 GstPad *recv_rtp_sink;
196 GstPad *recv_rtp_src;
197 GstPad *recv_rtcp_sink;
198 GstPad *rtcp_src;
200 GstFlowReturn srcresult;
201 gboolean blocked;
202 gboolean eos;
203 gboolean waiting;
204 gboolean discont;
205 GstClockID clock_id;
207 /* jitterbuffer, lock and cond */
208 RDTJitterBuffer *jbuf;
209 GMutex *jbuf_lock;
210 GCond *jbuf_cond;
212 /* some accounting */
213 guint64 num_late;
214 guint64 num_duplicates;
215 };
217 /* find a session with the given id */
218 static GstRDTManagerSession *
219 find_session_by_id (GstRDTManager * rdtmanager, gint id)
220 {
221 GSList *walk;
223 for (walk = rdtmanager->sessions; walk; walk = g_slist_next (walk)) {
224 GstRDTManagerSession *sess = (GstRDTManagerSession *) walk->data;
226 if (sess->id == id)
227 return sess;
228 }
229 return NULL;
230 }
232 /* create a session with the given id */
233 static GstRDTManagerSession *
234 create_session (GstRDTManager * rdtmanager, gint id)
235 {
236 GstRDTManagerSession *sess;
238 sess = g_new0 (GstRDTManagerSession, 1);
239 sess->id = id;
240 sess->dec = rdtmanager;
241 sess->jbuf = rdt_jitter_buffer_new ();
242 sess->jbuf_lock = g_mutex_new ();
243 sess->jbuf_cond = g_cond_new ();
244 rdtmanager->sessions = g_slist_prepend (rdtmanager->sessions, sess);
246 return sess;
247 }
249 static gboolean
250 activate_session (GstRDTManager * rdtmanager, GstRDTManagerSession * session,
251 guint32 ssrc, guint8 pt)
252 {
253 GstPadTemplate *templ;
254 GstElementClass *klass;
255 gchar *name;
256 GstCaps *caps;
257 GValue ret = { 0 };
258 GValue args[3] = { {0}
259 , {0}
260 , {0}
261 };
263 GST_DEBUG_OBJECT (rdtmanager, "creating stream");
265 session->ssrc = ssrc;
266 session->pt = pt;
268 /* get pt map */
269 g_value_init (&args[0], GST_TYPE_ELEMENT);
270 g_value_set_object (&args[0], rdtmanager);
271 g_value_init (&args[1], G_TYPE_UINT);
272 g_value_set_uint (&args[1], session->id);
273 g_value_init (&args[2], G_TYPE_UINT);
274 g_value_set_uint (&args[2], pt);
276 g_value_init (&ret, GST_TYPE_CAPS);
277 g_value_set_boxed (&ret, NULL);
279 g_signal_emitv (args, gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP], 0,
280 &ret);
282 g_value_unset (&args[0]);
283 g_value_unset (&args[1]);
284 g_value_unset (&args[2]);
285 caps = (GstCaps *) g_value_dup_boxed (&ret);
286 g_value_unset (&ret);
288 if (caps)
289 gst_rdt_manager_parse_caps (rdtmanager, session, caps);
291 name = g_strdup_printf ("recv_rtp_src_%d_%u_%d", session->id, ssrc, pt);
292 klass = GST_ELEMENT_GET_CLASS (rdtmanager);
293 templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
294 session->recv_rtp_src = gst_pad_new_from_template (templ, name);
295 g_free (name);
297 gst_pad_set_caps (session->recv_rtp_src, caps);
298 gst_caps_unref (caps);
300 gst_pad_set_element_private (session->recv_rtp_src, session);
301 gst_pad_set_query_function (session->recv_rtp_src, gst_rdt_manager_query_src);
302 gst_pad_set_activatepush_function (session->recv_rtp_src,
303 gst_rdt_manager_src_activate_push);
305 gst_pad_set_active (session->recv_rtp_src, TRUE);
306 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_src);
308 return TRUE;
309 }
311 static void
312 free_session (GstRDTManagerSession * session)
313 {
314 g_object_unref (session->jbuf);
315 g_cond_free (session->jbuf_cond);
316 g_mutex_free (session->jbuf_lock);
317 g_free (session);
318 }
320 GST_BOILERPLATE (GstRDTManager, gst_rdt_manager, GstElement, GST_TYPE_ELEMENT);
322 static void
323 gst_rdt_manager_base_init (gpointer klass)
324 {
325 GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
327 /* sink pads */
328 gst_element_class_add_static_pad_template (element_class,
329 &gst_rdt_manager_recv_rtp_sink_template);
330 gst_element_class_add_static_pad_template (element_class,
331 &gst_rdt_manager_recv_rtcp_sink_template);
332 /* src pads */
333 gst_element_class_add_static_pad_template (element_class,
334 &gst_rdt_manager_recv_rtp_src_template);
335 gst_element_class_add_static_pad_template (element_class,
336 &gst_rdt_manager_rtcp_src_template);
338 gst_element_class_set_details_simple (element_class, "RTP Decoder",
339 "Codec/Parser/Network",
340 "Accepts raw RTP and RTCP packets and sends them forward",
341 "Wim Taymans <wim@fluendo.com>");
342 }
344 /* BOXED:UINT,UINT */
345 #define g_marshal_value_peek_uint(v) g_value_get_uint (v)
347 static void
348 gst_rdt_manager_marshal_BOXED__UINT_UINT (GClosure * closure,
349 GValue * return_value,
350 guint n_param_values,
351 const GValue * param_values,
352 gpointer invocation_hint, gpointer marshal_data)
353 {
354 typedef gpointer (*GMarshalFunc_BOXED__UINT_UINT) (gpointer data1,
355 guint arg_1, guint arg_2, gpointer data2);
356 register GMarshalFunc_BOXED__UINT_UINT callback;
357 register GCClosure *cc = (GCClosure *) closure;
358 register gpointer data1, data2;
359 gpointer v_return;
361 g_return_if_fail (return_value != NULL);
362 g_return_if_fail (n_param_values == 3);
364 if (G_CCLOSURE_SWAP_DATA (closure)) {
365 data1 = closure->data;
366 data2 = g_value_peek_pointer (param_values + 0);
367 } else {
368 data1 = g_value_peek_pointer (param_values + 0);
369 data2 = closure->data;
370 }
371 callback =
372 (GMarshalFunc_BOXED__UINT_UINT) (marshal_data ? marshal_data :
373 cc->callback);
375 v_return = callback (data1,
376 g_marshal_value_peek_uint (param_values + 1),
377 g_marshal_value_peek_uint (param_values + 2), data2);
379 g_value_take_boxed (return_value, v_return);
380 }
382 static void
383 gst_rdt_manager_marshal_VOID__UINT_UINT (GClosure * closure,
384 GValue * return_value,
385 guint n_param_values,
386 const GValue * param_values,
387 gpointer invocation_hint, gpointer marshal_data)
388 {
389 typedef void (*GMarshalFunc_VOID__UINT_UINT) (gpointer data1,
390 guint arg_1, guint arg_2, gpointer data2);
391 register GMarshalFunc_VOID__UINT_UINT callback;
392 register GCClosure *cc = (GCClosure *) closure;
393 register gpointer data1, data2;
395 g_return_if_fail (n_param_values == 3);
397 if (G_CCLOSURE_SWAP_DATA (closure)) {
398 data1 = closure->data;
399 data2 = g_value_peek_pointer (param_values + 0);
400 } else {
401 data1 = g_value_peek_pointer (param_values + 0);
402 data2 = closure->data;
403 }
404 callback =
405 (GMarshalFunc_VOID__UINT_UINT) (marshal_data ? marshal_data :
406 cc->callback);
408 callback (data1,
409 g_marshal_value_peek_uint (param_values + 1),
410 g_marshal_value_peek_uint (param_values + 2), data2);
411 }
413 static void
414 gst_rdt_manager_class_init (GstRDTManagerClass * g_class)
415 {
416 GObjectClass *gobject_class;
417 GstElementClass *gstelement_class;
418 GstRDTManagerClass *klass;
420 klass = (GstRDTManagerClass *) g_class;
421 gobject_class = (GObjectClass *) klass;
422 gstelement_class = (GstElementClass *) klass;
424 gobject_class->finalize = gst_rdt_manager_finalize;
425 gobject_class->set_property = gst_rdt_manager_set_property;
426 gobject_class->get_property = gst_rdt_manager_get_property;
428 g_object_class_install_property (gobject_class, PROP_LATENCY,
429 g_param_spec_uint ("latency", "Buffer latency in ms",
430 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
431 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 /**
434 * GstRDTManager::request-pt-map:
435 * @rdtmanager: the object which received the signal
436 * @session: the session
437 * @pt: the pt
438 *
439 * Request the payload type as #GstCaps for @pt in @session.
440 */
441 gst_rdt_manager_signals[SIGNAL_REQUEST_PT_MAP] =
442 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
443 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, request_pt_map),
444 NULL, NULL, gst_rdt_manager_marshal_BOXED__UINT_UINT, GST_TYPE_CAPS, 2,
445 G_TYPE_UINT, G_TYPE_UINT);
447 /**
448 * GstRDTManager::clear-pt-map:
449 * @rtpbin: the object which received the signal
450 *
451 * Clear all previously cached pt-mapping obtained with
452 * GstRDTManager::request-pt-map.
453 */
454 gst_rdt_manager_signals[SIGNAL_CLEAR_PT_MAP] =
455 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
456 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, clear_pt_map),
457 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
459 /**
460 * GstRDTManager::on-bye-ssrc:
461 * @rtpbin: the object which received the signal
462 * @session: the session
463 * @ssrc: the SSRC
464 *
465 * Notify of an SSRC that became inactive because of a BYE packet.
466 */
467 gst_rdt_manager_signals[SIGNAL_ON_BYE_SSRC] =
468 g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
469 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_ssrc),
470 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
471 G_TYPE_UINT, G_TYPE_UINT);
472 /**
473 * GstRDTManager::on-bye-timeout:
474 * @rtpbin: the object which received the signal
475 * @session: the session
476 * @ssrc: the SSRC
477 *
478 * Notify of an SSRC that has timed out because of BYE
479 */
480 gst_rdt_manager_signals[SIGNAL_ON_BYE_TIMEOUT] =
481 g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
482 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_bye_timeout),
483 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
484 G_TYPE_UINT, G_TYPE_UINT);
485 /**
486 * GstRDTManager::on-timeout:
487 * @rtpbin: the object which received the signal
488 * @session: the session
489 * @ssrc: the SSRC
490 *
491 * Notify of an SSRC that has timed out
492 */
493 gst_rdt_manager_signals[SIGNAL_ON_TIMEOUT] =
494 g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
495 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRDTManagerClass, on_timeout),
496 NULL, NULL, gst_rdt_manager_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
497 G_TYPE_UINT, G_TYPE_UINT);
499 gstelement_class->provide_clock =
500 GST_DEBUG_FUNCPTR (gst_rdt_manager_provide_clock);
501 gstelement_class->change_state =
502 GST_DEBUG_FUNCPTR (gst_rdt_manager_change_state);
503 gstelement_class->request_new_pad =
504 GST_DEBUG_FUNCPTR (gst_rdt_manager_request_new_pad);
505 gstelement_class->release_pad =
506 GST_DEBUG_FUNCPTR (gst_rdt_manager_release_pad);
508 GST_DEBUG_CATEGORY_INIT (rdtmanager_debug, "rdtmanager", 0, "RTP decoder");
509 }
511 static void
512 gst_rdt_manager_init (GstRDTManager * rdtmanager, GstRDTManagerClass * klass)
513 {
514 rdtmanager->provided_clock = gst_system_clock_obtain ();
515 rdtmanager->latency = DEFAULT_LATENCY_MS;
516 }
518 static void
519 gst_rdt_manager_finalize (GObject * object)
520 {
521 GstRDTManager *rdtmanager;
523 rdtmanager = GST_RDT_MANAGER (object);
525 g_slist_foreach (rdtmanager->sessions, (GFunc) free_session, NULL);
526 g_slist_free (rdtmanager->sessions);
528 G_OBJECT_CLASS (parent_class)->finalize (object);
529 }
531 static gboolean
532 gst_rdt_manager_query_src (GstPad * pad, GstQuery * query)
533 {
534 GstRDTManager *rdtmanager;
535 gboolean res;
537 rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
539 switch (GST_QUERY_TYPE (query)) {
540 case GST_QUERY_LATENCY:
541 {
542 GstClockTime latency;
544 latency = rdtmanager->latency * GST_MSECOND;
546 /* we pretend to be live with a 3 second latency */
547 gst_query_set_latency (query, TRUE, latency, -1);
549 GST_DEBUG_OBJECT (rdtmanager, "reporting %" GST_TIME_FORMAT " of latency",
550 GST_TIME_ARGS (latency));
551 res = TRUE;
552 break;
553 }
554 default:
555 res = gst_pad_query_default (pad, query);
556 break;
557 }
558 return res;
559 }
561 static gboolean
562 gst_rdt_manager_src_activate_push (GstPad * pad, gboolean active)
563 {
564 gboolean result = TRUE;
565 GstRDTManager *rdtmanager;
566 GstRDTManagerSession *session;
568 session = gst_pad_get_element_private (pad);
569 rdtmanager = session->dec;
571 if (active) {
572 /* allow data processing */
573 JBUF_LOCK (session);
574 GST_DEBUG_OBJECT (rdtmanager, "Enabling pop on queue");
575 /* Mark as non flushing */
576 session->srcresult = GST_FLOW_OK;
577 gst_segment_init (&session->segment, GST_FORMAT_TIME);
578 session->last_popped_seqnum = -1;
579 session->last_out_time = -1;
580 session->next_seqnum = -1;
581 session->eos = FALSE;
582 JBUF_UNLOCK (session);
584 /* start pushing out buffers */
585 GST_DEBUG_OBJECT (rdtmanager, "Starting task on srcpad");
586 gst_pad_start_task (pad, (GstTaskFunction) gst_rdt_manager_loop, pad);
587 } else {
588 /* make sure all data processing stops ASAP */
589 JBUF_LOCK (session);
590 /* mark ourselves as flushing */
591 session->srcresult = GST_FLOW_WRONG_STATE;
592 GST_DEBUG_OBJECT (rdtmanager, "Disabling pop on queue");
593 /* this unblocks any waiting pops on the src pad task */
594 JBUF_SIGNAL (session);
595 /* unlock clock, we just unschedule, the entry will be released by
596 * the locking streaming thread. */
597 if (session->clock_id)
598 gst_clock_id_unschedule (session->clock_id);
599 JBUF_UNLOCK (session);
601 /* NOTE this will hardlock if the state change is called from the src pad
602 * task thread because we will _join() the thread. */
603 GST_DEBUG_OBJECT (rdtmanager, "Stopping task on srcpad");
604 result = gst_pad_stop_task (pad);
605 }
607 return result;
608 }
610 static GstFlowReturn
611 gst_rdt_manager_handle_data_packet (GstRDTManagerSession * session,
612 GstClockTime timestamp, GstRDTPacket * packet)
613 {
614 GstRDTManager *rdtmanager;
615 guint16 seqnum;
616 gboolean tail;
617 GstFlowReturn res;
618 GstBuffer *buffer;
620 rdtmanager = session->dec;
622 res = GST_FLOW_OK;
624 seqnum = 0;
625 GST_DEBUG_OBJECT (rdtmanager,
626 "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
627 GST_TIME_ARGS (timestamp));
629 buffer = gst_rdt_packet_to_buffer (packet);
631 JBUF_LOCK_CHECK (session, out_flushing);
633 /* insert the packet into the queue now, FIXME, use seqnum */
634 if (!rdt_jitter_buffer_insert (session->jbuf, buffer, timestamp,
635 session->clock_rate, &tail))
636 goto duplicate;
638 /* signal addition of new buffer when the _loop is waiting. */
639 if (session->waiting)
640 JBUF_SIGNAL (session);
642 finished:
643 JBUF_UNLOCK (session);
645 return res;
647 /* ERRORS */
648 out_flushing:
649 {
650 res = session->srcresult;
651 GST_DEBUG_OBJECT (rdtmanager, "flushing %s", gst_flow_get_name (res));
652 gst_buffer_unref (buffer);
653 goto finished;
654 }
655 duplicate:
656 {
657 GST_WARNING_OBJECT (rdtmanager, "Duplicate packet #%d detected, dropping",
658 seqnum);
659 session->num_duplicates++;
660 gst_buffer_unref (buffer);
661 goto finished;
662 }
663 }
665 static gboolean
666 gst_rdt_manager_parse_caps (GstRDTManager * rdtmanager,
667 GstRDTManagerSession * session, GstCaps * caps)
668 {
669 GstStructure *caps_struct;
670 guint val;
672 /* first parse the caps */
673 caps_struct = gst_caps_get_structure (caps, 0);
675 GST_DEBUG_OBJECT (rdtmanager, "got caps");
677 /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
678 * measure the amount of data in the buffer */
679 if (!gst_structure_get_int (caps_struct, "clock-rate", &session->clock_rate))
680 session->clock_rate = 1000;
682 if (session->clock_rate <= 0)
683 goto wrong_rate;
685 GST_DEBUG_OBJECT (rdtmanager, "got clock-rate %d", session->clock_rate);
687 /* gah, clock-base is uint. If we don't have a base, we will use the first
688 * buffer timestamp as the base time. This will screw up sync but it's better
689 * than nothing. */
690 if (gst_structure_get_uint (caps_struct, "clock-base", &val))
691 session->clock_base = val;
692 else
693 session->clock_base = -1;
695 GST_DEBUG_OBJECT (rdtmanager, "got clock-base %" G_GINT64_FORMAT,
696 session->clock_base);
698 /* first expected seqnum */
699 if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
700 session->next_seqnum = val;
701 else
702 session->next_seqnum = -1;
704 GST_DEBUG_OBJECT (rdtmanager, "got seqnum-base %d", session->next_seqnum);
706 return TRUE;
708 /* ERRORS */
709 wrong_rate:
710 {
711 GST_DEBUG_OBJECT (rdtmanager, "Invalid clock-rate %d", session->clock_rate);
712 return FALSE;
713 }
714 }
716 static gboolean
717 gst_rdt_manager_setcaps (GstPad * pad, GstCaps * caps)
718 {
719 GstRDTManager *rdtmanager;
720 GstRDTManagerSession *session;
721 gboolean res;
723 rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
724 /* find session */
725 session = gst_pad_get_element_private (pad);
727 res = gst_rdt_manager_parse_caps (rdtmanager, session, caps);
729 return res;
730 }
732 static GstFlowReturn
733 gst_rdt_manager_chain_rdt (GstPad * pad, GstBuffer * buffer)
734 {
735 GstFlowReturn res;
736 GstRDTManager *rdtmanager;
737 GstRDTManagerSession *session;
738 GstClockTime timestamp;
739 GstRDTPacket packet;
740 guint32 ssrc;
741 guint8 pt;
742 gboolean more;
744 rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
746 GST_DEBUG_OBJECT (rdtmanager, "got RDT packet");
748 ssrc = 0;
749 pt = 0;
751 GST_DEBUG_OBJECT (rdtmanager, "SSRC %08x, PT %d", ssrc, pt);
753 /* find session */
754 session = gst_pad_get_element_private (pad);
756 /* see if we have the pad */
757 if (!session->active) {
758 activate_session (rdtmanager, session, ssrc, pt);
759 session->active = TRUE;
760 }
762 if (GST_BUFFER_IS_DISCONT (buffer)) {
763 GST_DEBUG_OBJECT (rdtmanager, "received discont");
764 session->discont = TRUE;
765 }
767 res = GST_FLOW_OK;
769 /* take the timestamp of the buffer. This is the time when the packet was
770 * received and is used to calculate jitter and clock skew. We will adjust
771 * this timestamp with the smoothed value after processing it in the
772 * jitterbuffer. */
773 timestamp = GST_BUFFER_TIMESTAMP (buffer);
774 /* bring to running time */
775 timestamp = gst_segment_to_running_time (&session->segment, GST_FORMAT_TIME,
776 timestamp);
778 more = gst_rdt_buffer_get_first_packet (buffer, &packet);
779 while (more) {
780 GstRDTType type;
782 type = gst_rdt_packet_get_type (&packet);
783 GST_DEBUG_OBJECT (rdtmanager, "Have packet of type %04x", type);
785 if (GST_RDT_IS_DATA_TYPE (type)) {
786 GST_DEBUG_OBJECT (rdtmanager, "We have a data packet");
787 res = gst_rdt_manager_handle_data_packet (session, timestamp, &packet);
788 } else {
789 switch (type) {
790 default:
791 GST_DEBUG_OBJECT (rdtmanager, "Ignoring packet");
792 break;
793 }
794 }
795 if (res != GST_FLOW_OK)
796 break;
798 more = gst_rdt_packet_move_to_next (&packet);
799 }
801 gst_buffer_unref (buffer);
803 return res;
804 }
806 /* push packets from the queue to the downstream demuxer */
807 static void
808 gst_rdt_manager_loop (GstPad * pad)
809 {
810 GstRDTManager *rdtmanager;
811 GstRDTManagerSession *session;
812 GstBuffer *buffer;
813 GstFlowReturn result;
815 rdtmanager = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
817 session = gst_pad_get_element_private (pad);
819 JBUF_LOCK_CHECK (session, flushing);
820 GST_DEBUG_OBJECT (rdtmanager, "Peeking item");
821 while (TRUE) {
822 /* always wait if we are blocked */
823 if (!session->blocked) {
824 /* if we have a packet, we can exit the loop and grab it */
825 if (rdt_jitter_buffer_num_packets (session->jbuf) > 0)
826 break;
827 /* no packets but we are EOS, do eos logic */
828 if (session->eos)
829 goto do_eos;
830 }
831 /* underrun, wait for packets or flushing now */
832 session->waiting = TRUE;
833 JBUF_WAIT_CHECK (session, flushing);
834 session->waiting = FALSE;
835 }
837 buffer = rdt_jitter_buffer_pop (session->jbuf);
839 GST_DEBUG_OBJECT (rdtmanager, "Got item %p", buffer);
841 if (session->discont) {
842 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
843 session->discont = FALSE;
844 }
846 gst_buffer_set_caps (buffer, GST_PAD_CAPS (session->recv_rtp_src));
847 JBUF_UNLOCK (session);
849 result = gst_pad_push (session->recv_rtp_src, buffer);
850 if (result != GST_FLOW_OK)
851 goto pause;
853 return;
855 /* ERRORS */
856 flushing:
857 {
858 GST_DEBUG_OBJECT (rdtmanager, "we are flushing");
859 gst_pad_pause_task (session->recv_rtp_src);
860 JBUF_UNLOCK (session);
861 return;
862 }
863 do_eos:
864 {
865 /* store result, we are flushing now */
866 GST_DEBUG_OBJECT (rdtmanager, "We are EOS, pushing EOS downstream");
867 session->srcresult = GST_FLOW_UNEXPECTED;
868 gst_pad_pause_task (session->recv_rtp_src);
869 gst_pad_push_event (session->recv_rtp_src, gst_event_new_eos ());
870 JBUF_UNLOCK (session);
871 return;
872 }
873 pause:
874 {
875 GST_DEBUG_OBJECT (rdtmanager, "pausing task, reason %s",
876 gst_flow_get_name (result));
878 JBUF_LOCK (session);
879 /* store result */
880 session->srcresult = result;
881 /* we don't post errors or anything because upstream will do that for us
882 * when we pass the return value upstream. */
883 gst_pad_pause_task (session->recv_rtp_src);
884 JBUF_UNLOCK (session);
885 return;
886 }
887 }
889 static GstFlowReturn
890 gst_rdt_manager_chain_rtcp (GstPad * pad, GstBuffer * buffer)
891 {
892 GstRDTManager *src;
894 #ifdef HAVE_RTCP
895 gboolean valid;
896 GstRTCPPacket packet;
897 gboolean more;
898 #endif
900 src = GST_RDT_MANAGER (GST_PAD_PARENT (pad));
902 GST_DEBUG_OBJECT (src, "got rtcp packet");
904 #ifdef HAVE_RTCP
905 valid = gst_rtcp_buffer_validate (buffer);
906 if (!valid)
907 goto bad_packet;
909 /* position on first packet */
910 more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
911 while (more) {
912 switch (gst_rtcp_packet_get_type (&packet)) {
913 case GST_RTCP_TYPE_SR:
914 {
915 guint32 ssrc, rtptime, packet_count, octet_count;
916 guint64 ntptime;
917 guint count, i;
919 gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
920 &packet_count, &octet_count);
922 GST_DEBUG_OBJECT (src,
923 "got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT
924 ", RTP %u, PC %u, OC %u", ssrc, ntptime, rtptime, packet_count,
925 octet_count);
927 count = gst_rtcp_packet_get_rb_count (&packet);
928 for (i = 0; i < count; i++) {
929 guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
930 guint8 fractionlost;
931 gint32 packetslost;
933 gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
934 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
936 GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
937 ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
938 packetslost, exthighestseq, jitter, lsr, dlsr);
939 }
940 break;
941 }
942 case GST_RTCP_TYPE_RR:
943 {
944 guint32 ssrc;
945 guint count, i;
947 ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
949 GST_DEBUG_OBJECT (src, "got RR packet: SSRC %08x", ssrc);
951 count = gst_rtcp_packet_get_rb_count (&packet);
952 for (i = 0; i < count; i++) {
953 guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
954 guint8 fractionlost;
955 gint32 packetslost;
957 gst_rtcp_packet_get_rb (&packet, i, &ssrc, &fractionlost,
958 &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
960 GST_DEBUG_OBJECT (src, "got RB packet %d: SSRC %08x, FL %u"
961 ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", ssrc, fractionlost,
962 packetslost, exthighestseq, jitter, lsr, dlsr);
963 }
964 break;
965 }
966 case GST_RTCP_TYPE_SDES:
967 {
968 guint chunks, i, j;
969 gboolean more_chunks, more_items;
971 chunks = gst_rtcp_packet_sdes_get_chunk_count (&packet);
972 GST_DEBUG_OBJECT (src, "got SDES packet with %d chunks", chunks);
974 more_chunks = gst_rtcp_packet_sdes_first_chunk (&packet);
975 i = 0;
976 while (more_chunks) {
977 guint32 ssrc;
979 ssrc = gst_rtcp_packet_sdes_get_ssrc (&packet);
981 GST_DEBUG_OBJECT (src, "chunk %d, SSRC %08x", i, ssrc);
983 more_items = gst_rtcp_packet_sdes_first_item (&packet);
984 j = 0;
985 while (more_items) {
986 GstRTCPSDESType type;
987 guint8 len;
988 gchar *data;
990 gst_rtcp_packet_sdes_get_item (&packet, &type, &len, &data);
992 GST_DEBUG_OBJECT (src, "item %d, type %d, len %d, data %s", j,
993 type, len, data);
995 more_items = gst_rtcp_packet_sdes_next_item (&packet);
996 j++;
997 }
998 more_chunks = gst_rtcp_packet_sdes_next_chunk (&packet);
999 i++;
1000 }
1001 break;
1002 }
1003 case GST_RTCP_TYPE_BYE:
1004 {
1005 guint count, i;
1006 gchar *reason;
1008 reason = gst_rtcp_packet_bye_get_reason (&packet);
1009 GST_DEBUG_OBJECT (src, "got BYE packet (reason: %s)",
1010 GST_STR_NULL (reason));
1011 g_free (reason);
1013 count = gst_rtcp_packet_bye_get_ssrc_count (&packet);
1014 for (i = 0; i < count; i++) {
1015 guint32 ssrc;
1018 ssrc = gst_rtcp_packet_bye_get_nth_ssrc (&packet, i);
1020 GST_DEBUG_OBJECT (src, "SSRC: %08x", ssrc);
1021 }
1022 break;
1023 }
1024 case GST_RTCP_TYPE_APP:
1025 GST_DEBUG_OBJECT (src, "got APP packet");
1026 break;
1027 default:
1028 GST_WARNING_OBJECT (src, "got unknown RTCP packet");
1029 break;
1030 }
1031 more = gst_rtcp_packet_move_to_next (&packet);
1032 }
1033 gst_buffer_unref (buffer);
1034 return GST_FLOW_OK;
1036 bad_packet:
1037 {
1038 GST_WARNING_OBJECT (src, "got invalid RTCP packet");
1039 return GST_FLOW_OK;
1040 }
1041 #else
1042 return GST_FLOW_OK;
1043 #endif
1044 }
1046 static void
1047 gst_rdt_manager_set_property (GObject * object, guint prop_id,
1048 const GValue * value, GParamSpec * pspec)
1049 {
1050 GstRDTManager *src;
1052 src = GST_RDT_MANAGER (object);
1054 switch (prop_id) {
1055 case PROP_LATENCY:
1056 src->latency = g_value_get_uint (value);
1057 break;
1058 default:
1059 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1060 break;
1061 }
1062 }
1064 static void
1065 gst_rdt_manager_get_property (GObject * object, guint prop_id, GValue * value,
1066 GParamSpec * pspec)
1067 {
1068 GstRDTManager *src;
1070 src = GST_RDT_MANAGER (object);
1072 switch (prop_id) {
1073 case PROP_LATENCY:
1074 g_value_set_uint (value, src->latency);
1075 break;
1076 default:
1077 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1078 break;
1079 }
1080 }
1082 static GstClock *
1083 gst_rdt_manager_provide_clock (GstElement * element)
1084 {
1085 GstRDTManager *rdtmanager;
1087 rdtmanager = GST_RDT_MANAGER (element);
1089 return GST_CLOCK_CAST (gst_object_ref (rdtmanager->provided_clock));
1090 }
1092 static GstStateChangeReturn
1093 gst_rdt_manager_change_state (GstElement * element, GstStateChange transition)
1094 {
1095 GstStateChangeReturn ret;
1097 switch (transition) {
1098 default:
1099 break;
1100 }
1102 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1104 switch (transition) {
1105 case GST_STATE_CHANGE_READY_TO_PAUSED:
1106 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1107 /* we're NO_PREROLL when going to PAUSED */
1108 ret = GST_STATE_CHANGE_NO_PREROLL;
1109 break;
1110 default:
1111 break;
1112 }
1114 return ret;
1115 }
1117 /* Create a pad for receiving RTP for the session in @name
1118 */
1119 static GstPad *
1120 create_recv_rtp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1121 const gchar * name)
1122 {
1123 guint sessid;
1124 GstRDTManagerSession *session;
1126 /* first get the session number */
1127 if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1)
1128 goto no_name;
1130 GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1132 /* get or create session */
1133 session = find_session_by_id (rdtmanager, sessid);
1134 if (!session) {
1135 GST_DEBUG_OBJECT (rdtmanager, "creating session %d", sessid);
1136 /* create session now */
1137 session = create_session (rdtmanager, sessid);
1138 if (session == NULL)
1139 goto create_error;
1140 }
1141 /* check if pad was requested */
1142 if (session->recv_rtp_sink != NULL)
1143 goto existed;
1145 GST_DEBUG_OBJECT (rdtmanager, "getting RTP sink pad");
1147 session->recv_rtp_sink = gst_pad_new_from_template (templ, name);
1148 gst_pad_set_element_private (session->recv_rtp_sink, session);
1149 gst_pad_set_setcaps_function (session->recv_rtp_sink,
1150 gst_rdt_manager_setcaps);
1151 gst_pad_set_chain_function (session->recv_rtp_sink,
1152 gst_rdt_manager_chain_rdt);
1153 gst_pad_set_active (session->recv_rtp_sink, TRUE);
1154 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtp_sink);
1156 return session->recv_rtp_sink;
1158 /* ERRORS */
1159 no_name:
1160 {
1161 g_warning ("rdtmanager: invalid name given");
1162 return NULL;
1163 }
1164 create_error:
1165 {
1166 /* create_session already warned */
1167 return NULL;
1168 }
1169 existed:
1170 {
1171 g_warning ("rdtmanager: recv_rtp pad already requested for session %d",
1172 sessid);
1173 return NULL;
1174 }
1175 }
1177 /* Create a pad for receiving RTCP for the session in @name
1178 */
1179 static GstPad *
1180 create_recv_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1181 const gchar * name)
1182 {
1183 guint sessid;
1184 GstRDTManagerSession *session;
1186 /* first get the session number */
1187 if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
1188 goto no_name;
1190 GST_DEBUG_OBJECT (rdtmanager, "finding session %d", sessid);
1192 /* get the session, it must exist or we error */
1193 session = find_session_by_id (rdtmanager, sessid);
1194 if (!session)
1195 goto no_session;
1197 /* check if pad was requested */
1198 if (session->recv_rtcp_sink != NULL)
1199 goto existed;
1201 GST_DEBUG_OBJECT (rdtmanager, "getting RTCP sink pad");
1203 session->recv_rtcp_sink = gst_pad_new_from_template (templ, name);
1204 gst_pad_set_element_private (session->recv_rtp_sink, session);
1205 gst_pad_set_chain_function (session->recv_rtcp_sink,
1206 gst_rdt_manager_chain_rtcp);
1207 gst_pad_set_active (session->recv_rtcp_sink, TRUE);
1208 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->recv_rtcp_sink);
1210 return session->recv_rtcp_sink;
1212 /* ERRORS */
1213 no_name:
1214 {
1215 g_warning ("rdtmanager: invalid name given");
1216 return NULL;
1217 }
1218 no_session:
1219 {
1220 g_warning ("rdtmanager: no session with id %d", sessid);
1221 return NULL;
1222 }
1223 existed:
1224 {
1225 g_warning ("rdtmanager: recv_rtcp pad already requested for session %d",
1226 sessid);
1227 return NULL;
1228 }
1229 }
1231 /* Create a pad for sending RTCP for the session in @name
1232 */
1233 static GstPad *
1234 create_rtcp (GstRDTManager * rdtmanager, GstPadTemplate * templ,
1235 const gchar * name)
1236 {
1237 guint sessid;
1238 GstRDTManagerSession *session;
1240 /* first get the session number */
1241 if (name == NULL || sscanf (name, "rtcp_src_%d", &sessid) != 1)
1242 goto no_name;
1244 /* get or create session */
1245 session = find_session_by_id (rdtmanager, sessid);
1246 if (!session)
1247 goto no_session;
1249 /* check if pad was requested */
1250 if (session->rtcp_src != NULL)
1251 goto existed;
1253 session->rtcp_src = gst_pad_new_from_template (templ, name);
1254 gst_pad_set_active (session->rtcp_src, TRUE);
1255 gst_element_add_pad (GST_ELEMENT_CAST (rdtmanager), session->rtcp_src);
1257 return session->rtcp_src;
1259 /* ERRORS */
1260 no_name:
1261 {
1262 g_warning ("rdtmanager: invalid name given");
1263 return NULL;
1264 }
1265 no_session:
1266 {
1267 g_warning ("rdtmanager: session with id %d does not exist", sessid);
1268 return NULL;
1269 }
1270 existed:
1271 {
1272 g_warning ("rdtmanager: rtcp_src pad already requested for session %d",
1273 sessid);
1274 return NULL;
1275 }
1276 }
1278 /*
1279 */
1280 static GstPad *
1281 gst_rdt_manager_request_new_pad (GstElement * element,
1282 GstPadTemplate * templ, const gchar * name)
1283 {
1284 GstRDTManager *rdtmanager;
1285 GstElementClass *klass;
1286 GstPad *result;
1288 g_return_val_if_fail (templ != NULL, NULL);
1289 g_return_val_if_fail (GST_IS_RDT_MANAGER (element), NULL);
1291 rdtmanager = GST_RDT_MANAGER (element);
1292 klass = GST_ELEMENT_GET_CLASS (element);
1294 /* figure out the template */
1295 if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink_%d")) {
1296 result = create_recv_rtp (rdtmanager, templ, name);
1297 } else if (templ == gst_element_class_get_pad_template (klass,
1298 "recv_rtcp_sink_%d")) {
1299 result = create_recv_rtcp (rdtmanager, templ, name);
1300 } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src_%d")) {
1301 result = create_rtcp (rdtmanager, templ, name);
1302 } else
1303 goto wrong_template;
1305 return result;
1307 /* ERRORS */
1308 wrong_template:
1309 {
1310 g_warning ("rdtmanager: this is not our template");
1311 return NULL;
1312 }
1313 }
1315 static void
1316 gst_rdt_manager_release_pad (GstElement * element, GstPad * pad)
1317 {
1318 }
1320 gboolean
1321 gst_rdt_manager_plugin_init (GstPlugin * plugin)
1322 {
1323 return gst_element_register (plugin, "rdtmanager",
1324 GST_RANK_NONE, GST_TYPE_RDT_MANAGER);
1325 }