1 /* GStreamer
2 * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
18 */
19 /**
20 * SECTION:gstnettimeprovider
21 * @short_description: Special object that exposed the time of a clock
22 * on the network.
23 * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
24 *
25 * This object exposes the time of a #GstClock on the network.
26 *
27 * A #GstNetTimeProvider is created with gst_net_time_provider_new() which
28 * takes a #GstClock, an address and a port number as arguments.
29 *
30 * After creating the object, a client clock such as #GstNetClientClock can
31 * query the exposed clock over the network for its values.
32 *
33 * The #GstNetTimeProvider typically wraps the clock used by a #GstPipeline.
34 *
35 * Last reviewed on 2005-11-23 (0.9.5)
36 */
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
42 #include "gstnettimeprovider.h"
43 #include "gstnettimepacket.h"
45 #include <unistd.h>
46 #include <sys/ioctl.h>
48 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
49 #include <sys/filio.h>
50 #endif
52 GST_DEBUG_CATEGORY (ntp_debug);
53 #define GST_CAT_DEFAULT (ntp_debug)
55 /* the select call is also performed on the control sockets, that way
56 * we can send special commands to unblock or restart the select call */
57 #define CONTROL_RESTART 'R' /* restart the select call */
58 #define CONTROL_STOP 'S' /* stop the select call */
59 #define CONTROL_SOCKETS(self) self->control_sock
60 #define WRITE_SOCKET(self) self->control_sock[1]
61 #define READ_SOCKET(self) self->control_sock[0]
63 #define SEND_COMMAND(self, command) \
64 G_STMT_START { \
65 unsigned char c; c = command; \
66 write (WRITE_SOCKET(self), &c, 1); \
67 } G_STMT_END
69 #define READ_COMMAND(self, command, res) \
70 G_STMT_START { \
71 res = read(READ_SOCKET(self), &command, 1); \
72 } G_STMT_END
74 #define DEFAULT_ADDRESS "0.0.0.0"
75 #define DEFAULT_PORT 5637
77 #define IS_ACTIVE(self) (g_atomic_int_get (&((self)->active.active)))
79 enum
80 {
81 PROP_0,
82 PROP_PORT,
83 PROP_ADDRESS,
84 PROP_CLOCK,
85 PROP_ACTIVE
86 /* FILL ME */
87 };
89 static gboolean gst_net_time_provider_start (GstNetTimeProvider * bself);
90 static void gst_net_time_provider_stop (GstNetTimeProvider * bself);
92 static gpointer gst_net_time_provider_thread (gpointer data);
94 static void gst_net_time_provider_finalize (GObject * object);
95 static void gst_net_time_provider_set_property (GObject * object, guint prop_id,
96 const GValue * value, GParamSpec * pspec);
97 static void gst_net_time_provider_get_property (GObject * object, guint prop_id,
98 GValue * value, GParamSpec * pspec);
100 #define _do_init(type) \
101 GST_DEBUG_CATEGORY_INIT (ntp_debug, "nettime", 0, "Network time provider");
103 GST_BOILERPLATE_FULL (GstNetTimeProvider, gst_net_time_provider, GstObject,
104 GST_TYPE_OBJECT, _do_init);
106 static void
107 gst_net_time_provider_base_init (gpointer g_class)
108 {
109 g_assert (sizeof (GstClockTime) == 8);
110 }
112 static void
113 gst_net_time_provider_class_init (GstNetTimeProviderClass * klass)
114 {
115 GObjectClass *gobject_class;
117 gobject_class = (GObjectClass *) klass;
119 gobject_class->finalize = gst_net_time_provider_finalize;
120 gobject_class->set_property = gst_net_time_provider_set_property;
121 gobject_class->get_property = gst_net_time_provider_get_property;
123 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
124 g_param_spec_int ("port", "port",
125 "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
126 DEFAULT_PORT, G_PARAM_READWRITE));
127 g_object_class_install_property (gobject_class, PROP_ADDRESS,
128 g_param_spec_string ("address", "address",
129 "The address to bind on, as a dotted quad (x.x.x.x)",
130 DEFAULT_ADDRESS, G_PARAM_READWRITE));
131 g_object_class_install_property (gobject_class, PROP_CLOCK,
132 g_param_spec_object ("clock", "Clock",
133 "The clock to export over the network", GST_TYPE_CLOCK,
134 G_PARAM_READWRITE));
135 g_object_class_install_property (gobject_class, PROP_ACTIVE,
136 g_param_spec_boolean ("active", "Active",
137 "TRUE if the clock will respond to queries over the network", TRUE,
138 G_PARAM_READWRITE));
139 }
141 static void
142 gst_net_time_provider_init (GstNetTimeProvider * self,
143 GstNetTimeProviderClass * g_class)
144 {
145 self->port = DEFAULT_PORT;
146 self->sock = -1;
147 self->address = g_strdup (DEFAULT_ADDRESS);
148 self->thread = NULL;
149 self->active.active = TRUE;
151 READ_SOCKET (self) = -1;
152 WRITE_SOCKET (self) = -1;
153 }
155 static void
156 gst_net_time_provider_finalize (GObject * object)
157 {
158 GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object);
160 if (self->thread) {
161 gst_net_time_provider_stop (self);
162 g_assert (self->thread == NULL);
163 }
165 if (READ_SOCKET (self) != -1) {
166 close (READ_SOCKET (self));
167 close (WRITE_SOCKET (self));
168 READ_SOCKET (self) = -1;
169 WRITE_SOCKET (self) = -1;
170 }
172 g_free (self->address);
173 self->address = NULL;
175 if (self->clock)
176 gst_object_unref (self->clock);
177 self->clock = NULL;
179 G_OBJECT_CLASS (parent_class)->finalize (object);
180 }
182 static gpointer
183 gst_net_time_provider_thread (gpointer data)
184 {
185 GstNetTimeProvider *self = data;
186 struct sockaddr_in tmpaddr;
187 socklen_t len;
188 fd_set read_fds;
189 guint max_sock;
190 GstNetTimePacket *packet;
191 gint ret;
193 while (TRUE) {
194 FD_ZERO (&read_fds);
195 FD_SET (self->sock, &read_fds);
196 FD_SET (READ_SOCKET (self), &read_fds);
197 max_sock = MAX (self->sock, READ_SOCKET (self));
199 GST_LOG_OBJECT (self, "doing select");
200 ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL);
201 GST_LOG_OBJECT (self, "select returned %d", ret);
203 if (ret <= 0) {
204 if (errno != EAGAIN && errno != EINTR)
205 goto select_error;
206 else
207 continue;
208 } else if (FD_ISSET (READ_SOCKET (self), &read_fds)) {
209 /* got control message */
210 while (TRUE) {
211 gchar command;
212 int res;
214 READ_COMMAND (self, command, res);
215 if (res < 0) {
216 GST_LOG_OBJECT (self, "no more commands");
217 break;
218 }
220 switch (command) {
221 case CONTROL_STOP:
222 /* break out of the select loop */
223 GST_LOG_OBJECT (self, "stop");
224 goto stopped;
225 default:
226 GST_WARNING_OBJECT (self, "unkown");
227 g_warning ("nettimeprovider: unknown control message received");
228 continue;
229 }
231 g_assert_not_reached ();
232 }
234 continue;
235 } else {
236 /* got data in */
237 len = sizeof (struct sockaddr);
239 packet = gst_net_time_packet_receive (self->sock,
240 (struct sockaddr *) &tmpaddr, &len);
242 if (!packet)
243 goto receive_error;
245 if (IS_ACTIVE (self)) {
246 /* do what we were asked to and send the packet back */
247 packet->remote_time = gst_clock_get_time (self->clock);
249 /* ignore errors */
250 gst_net_time_packet_send (packet, self->sock,
251 (struct sockaddr *) &tmpaddr, len);
252 }
254 g_free (packet);
256 continue;
257 }
259 g_assert_not_reached ();
261 /* log errors and keep going */
262 select_error:
263 {
264 GST_DEBUG_OBJECT (self, "select error %d: %s (%d)", ret,
265 g_strerror (errno), errno);
266 continue;
267 }
268 stopped:
269 {
270 GST_DEBUG_OBJECT (self, "shutting down");
271 /* close socket */
272 return NULL;
273 }
274 receive_error:
275 {
276 GST_DEBUG_OBJECT (self, "receive error");
277 continue;
278 }
280 g_assert_not_reached ();
282 }
284 g_assert_not_reached ();
286 return NULL;
287 }
289 static void
290 gst_net_time_provider_set_property (GObject * object, guint prop_id,
291 const GValue * value, GParamSpec * pspec)
292 {
293 GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object);
294 GstClock **clock_p = &self->clock;
296 switch (prop_id) {
297 case PROP_PORT:
298 self->port = g_value_get_int (value);
299 break;
300 case PROP_ADDRESS:
301 g_free (self->address);
302 if (g_value_get_string (value) == NULL)
303 self->address = g_strdup (DEFAULT_ADDRESS);
304 else
305 self->address = g_strdup (g_value_get_string (value));
306 break;
307 case PROP_CLOCK:
308 gst_object_replace ((GstObject **) clock_p,
309 (GstObject *) g_value_get_object (value));
310 break;
311 case PROP_ACTIVE:
312 gst_atomic_int_set (&self->active.active, g_value_get_boolean (value));
313 break;
314 default:
315 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
316 break;
317 }
318 }
320 static void
321 gst_net_time_provider_get_property (GObject * object, guint prop_id,
322 GValue * value, GParamSpec * pspec)
323 {
324 GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object);
326 switch (prop_id) {
327 case PROP_PORT:
328 g_value_set_int (value, self->port);
329 break;
330 case PROP_ADDRESS:
331 g_value_set_string (value, self->address);
332 break;
333 case PROP_CLOCK:
334 g_value_set_object (value, self->clock);
335 break;
336 case PROP_ACTIVE:
337 g_value_set_boolean (value, IS_ACTIVE (self));
338 break;
339 default:
340 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
341 break;
342 }
343 }
345 static gboolean
346 gst_net_time_provider_start (GstNetTimeProvider * self)
347 {
348 gint ru;
349 struct sockaddr_in my_addr;
350 guint len;
351 int port;
352 gint ret;
353 GError *error;
355 if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
356 goto no_socket;
358 self->sock = ret;
360 ru = 1;
361 ret = setsockopt (self->sock, SOL_SOCKET, SO_REUSEADDR, &ru, sizeof (ru));
362 if (ret < 0)
363 goto setsockopt_error;
365 memset (&my_addr, 0, sizeof (my_addr));
366 my_addr.sin_family = AF_INET; /* host byte order */
367 my_addr.sin_port = htons ((gint16) self->port); /* short, network byte order */
368 my_addr.sin_addr.s_addr = INADDR_ANY;
369 if (self->address)
370 inet_aton (self->address, &my_addr.sin_addr);
372 GST_DEBUG_OBJECT (self, "binding on port %d", self->port);
373 ret = bind (self->sock, (struct sockaddr *) &my_addr, sizeof (my_addr));
374 if (ret < 0)
375 goto bind_error;
377 len = sizeof (my_addr);
378 ret = getsockname (self->sock, (struct sockaddr *) &my_addr, &len);
379 if (ret < 0)
380 goto getsockname_error;
382 port = ntohs (my_addr.sin_port);
383 GST_DEBUG_OBJECT (self, "bound, on port %d", port);
385 if (port != self->port) {
386 self->port = port;
387 GST_DEBUG_OBJECT (self, "notifying %d", port);
388 g_object_notify (G_OBJECT (self), "port");
389 }
391 self->thread = g_thread_create (gst_net_time_provider_thread, self, TRUE,
392 &error);
393 if (!self->thread)
394 goto no_thread;
396 return TRUE;
398 /* ERRORS */
399 no_socket:
400 {
401 GST_ERROR_OBJECT (self, "socket failed %d: %s (%d)", ret,
402 g_strerror (errno), errno);
403 return FALSE;
404 }
405 setsockopt_error:
406 {
407 close (self->sock);
408 self->sock = -1;
409 GST_ERROR_OBJECT (self, "setsockopt failed %d: %s (%d)", ret,
410 g_strerror (errno), errno);
411 return FALSE;
412 }
413 bind_error:
414 {
415 close (self->sock);
416 self->sock = -1;
417 GST_ERROR_OBJECT (self, "bind failed %d: %s (%d)", ret,
418 g_strerror (errno), errno);
419 return FALSE;
420 }
421 getsockname_error:
422 {
423 close (self->sock);
424 self->sock = -1;
425 GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret,
426 g_strerror (errno), errno);
427 return FALSE;
428 }
429 no_thread:
430 {
431 close (self->sock);
432 self->sock = -1;
433 GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
434 g_error_free (error);
435 return FALSE;
436 }
437 }
439 static void
440 gst_net_time_provider_stop (GstNetTimeProvider * self)
441 {
442 SEND_COMMAND (self, CONTROL_STOP);
443 g_thread_join (self->thread);
444 self->thread = NULL;
446 if (self->sock != -1) {
447 close (self->sock);
448 self->sock = -1;
449 }
450 }
452 /**
453 * gst_net_time_provider_new:
454 * @clock: a #GstClock to export over the network
455 * @address: an address to bind on as a dotted quad (xxx.xxx.xxx.xxx), or NULL
456 * to bind to all addresses
457 * @port: a port to bind on, or 0 to let the kernel choose
458 *
459 * Allows network clients to get the current time of @clock.
460 *
461 * Returns: the new #GstNetTimeProvider, or NULL on error
462 */
463 GstNetTimeProvider *
464 gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port)
465 {
466 GstNetTimeProvider *ret;
467 gint iret;
469 g_return_val_if_fail (clock && GST_IS_CLOCK (clock), NULL);
470 g_return_val_if_fail (port >= 0 && port <= G_MAXUINT16, NULL);
472 ret = g_object_new (GST_TYPE_NET_TIME_PROVIDER, "clock", clock, "address",
473 address, "port", port, NULL);
475 GST_DEBUG_OBJECT (ret, "creating socket pair");
476 if ((iret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (ret))) < 0)
477 goto no_socket_pair;
479 fcntl (READ_SOCKET (ret), F_SETFL, O_NONBLOCK);
480 fcntl (WRITE_SOCKET (ret), F_SETFL, O_NONBLOCK);
482 if (!gst_net_time_provider_start (ret))
483 goto failed_start;
485 /* all systems go, cap'n */
486 return ret;
488 no_socket_pair:
489 {
490 GST_ERROR_OBJECT (ret, "no socket pair %d: %s (%d)", iret,
491 g_strerror (errno), errno);
492 gst_object_unref (ret);
493 return NULL;
494 }
495 failed_start:
496 {
497 /* already printed a nice error */
498 gst_object_unref (ret);
499 return NULL;
500 }
502 }