d83705940db6f4a80742e6425a0507eb2f3b98ef
1 /* GStreamer
2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6 *
7 * gstpoll.c: File descriptor set
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 * Boston, MA 02111-1307, USA.
23 */
24 /**
25 * SECTION:gstpoll
26 * @short_description: Keep track of file descriptors and make it possible
27 * to wait on them in a cancelable way
28 *
29 * A #GstPoll keeps track of file descriptors much like fd_set (used with
30 * select()) or a struct pollfd array (used with poll()). Once created with
31 * gst_poll_new(), the set can be used to wait for file descriptors to be
32 * readable and/or writeable. It is possible to make this wait be controlled
33 * by specifying %TRUE for the @controllable flag when creating the set (or
34 * later calling gst_poll_set_controllable()).
35 *
36 * New file descriptors are added to the set using gst_poll_add_fd(), and
37 * removed using gst_poll_remove_fd(). Controlling which file descriptors
38 * should be waited for to become readable and/or writeable are done using
39 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40 *
41 * Use gst_poll_wait() to wait for the file descriptors to actually become
42 * readable and/or writeable, or to timeout if no file descriptor is available
43 * in time. The wait can be controlled by calling gst_poll_restart() and
44 * gst_poll_set_flushing().
45 *
46 * Once the file descriptor set has been waited for, one can use
47 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48 * gst_poll_fd_has_error() to see if it has generated an error,
49 * gst_poll_fd_can_read() to see if it is possible to read from the file
50 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51 * write to it.
52 *
53 */
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
62 #include <sys/types.h>
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
68 #include <errno.h>
69 #include <fcntl.h>
71 #include <glib.h>
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #include <sys/poll.h>
79 #include <sys/time.h>
80 #include <sys/socket.h>
81 #endif
83 /* OS/X needs this because of bad headers */
84 #include <string.h>
86 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
87 * so we prefer our own poll emulation.
88 */
89 #if defined(BROKEN_POLL)
90 #undef HAVE_POLL
91 #endif
93 #include "gstpoll.h"
95 #define GST_CAT_DEFAULT GST_CAT_POLL
97 #ifdef G_OS_WIN32
98 typedef struct _WinsockFd WinsockFd;
100 struct _WinsockFd
101 {
102 gint fd;
103 glong event_mask;
104 WSANETWORKEVENTS events;
105 glong ignored_event_mask;
106 };
107 #endif
109 typedef enum
110 {
111 GST_POLL_MODE_AUTO,
112 GST_POLL_MODE_SELECT,
113 GST_POLL_MODE_PSELECT,
114 GST_POLL_MODE_POLL,
115 GST_POLL_MODE_PPOLL,
116 GST_POLL_MODE_WINDOWS
117 } GstPollMode;
119 struct _GstPoll
120 {
121 GstPollMode mode;
123 GMutex *lock;
124 /* array of fds, always written to and read from with lock */
125 GArray *fds;
126 /* array of active fds, only written to from the waiting thread with the
127 * lock and read from with the lock or without the lock from the waiting
128 * thread */
129 GArray *active_fds;
131 #ifndef G_OS_WIN32
132 gchar buf[1];
133 GstPollFD control_read_fd;
134 GstPollFD control_write_fd;
135 #else
136 GArray *active_fds_ignored;
137 GArray *events;
138 GArray *active_events;
140 HANDLE wakeup_event;
141 #endif
143 gboolean controllable;
144 volatile gint waiting;
145 volatile gint control_pending;
146 volatile gint flushing;
147 gboolean timer;
148 volatile gint rebuild;
149 };
151 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
152 gboolean active);
153 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
155 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
156 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
158 #define INC_WAITING(s) (G_ATOMIC_INT_ADD(&(s)->waiting, 1))
159 #define DEC_WAITING(s) (G_ATOMIC_INT_ADD(&(s)->waiting, -1))
160 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
162 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
163 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
165 #ifndef G_OS_WIN32
166 #define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
167 #define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
168 #else
169 #define WAKE_EVENT(s) (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
170 #define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
171 #endif
173 /* the poll/select call is also performed on a control socket, that way
174 * we can send special commands to control it */
175 static inline gboolean
176 raise_wakeup (GstPoll * set)
177 {
178 gboolean result = TRUE;
180 if (G_ATOMIC_INT_ADD (&set->control_pending, 1) == 0) {
181 /* raise when nothing pending */
182 result = WAKE_EVENT (set);
183 }
184 return result;
185 }
187 /* note how bad things can happen when the 2 threads both raise and release the
188 * wakeup. This is however not a problem because you must always pair a raise
189 * with a release */
190 static inline gboolean
191 release_wakeup (GstPoll * set)
192 {
193 gboolean result = TRUE;
195 if (g_atomic_int_dec_and_test (&set->control_pending)) {
196 result = RELEASE_EVENT (set);
197 }
198 return result;
199 }
201 static inline gint
202 release_all_wakeup (GstPoll * set)
203 {
204 gint old;
206 while (TRUE) {
207 if (!(old = g_atomic_int_get (&set->control_pending)))
208 /* nothing pending, just exit */
209 break;
211 /* try to remove all pending control messages */
212 if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
213 /* we managed to remove all messages, read the control socket */
214 if (RELEASE_EVENT (set))
215 break;
216 else
217 /* retry again until we read it successfully */
218 G_ATOMIC_INT_ADD (&set->control_pending, 1);
219 }
220 }
221 return old;
222 }
224 static gint
225 find_index (GArray * array, GstPollFD * fd)
226 {
227 #ifndef G_OS_WIN32
228 struct pollfd *ifd;
229 #else
230 WinsockFd *ifd;
231 #endif
232 guint i;
234 /* start by assuming the index found in the fd is still valid */
235 if (fd->idx >= 0 && fd->idx < array->len) {
236 #ifndef G_OS_WIN32
237 ifd = &g_array_index (array, struct pollfd, fd->idx);
238 #else
239 ifd = &g_array_index (array, WinsockFd, fd->idx);
240 #endif
242 if (ifd->fd == fd->fd) {
243 return fd->idx;
244 }
245 }
247 /* the pollfd array has changed and we need to lookup the fd again */
248 for (i = 0; i < array->len; i++) {
249 #ifndef G_OS_WIN32
250 ifd = &g_array_index (array, struct pollfd, i);
251 #else
252 ifd = &g_array_index (array, WinsockFd, i);
253 #endif
255 if (ifd->fd == fd->fd) {
256 fd->idx = (gint) i;
257 return fd->idx;
258 }
259 }
261 fd->idx = -1;
262 return fd->idx;
263 }
265 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
266 /* check if all file descriptors will fit in an fd_set */
267 static gboolean
268 selectable_fds (const GstPoll * set)
269 {
270 guint i;
272 g_mutex_lock (set->lock);
273 for (i = 0; i < set->fds->len; i++) {
274 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
276 if (pfd->fd >= FD_SETSIZE)
277 goto too_many;
278 }
279 g_mutex_unlock (set->lock);
281 return TRUE;
283 too_many:
284 {
285 g_mutex_unlock (set->lock);
286 return FALSE;
287 }
288 }
290 /* check if the timeout will convert to a timeout value used for poll()
291 * without a loss of precision
292 */
293 static gboolean
294 pollable_timeout (GstClockTime timeout)
295 {
296 if (timeout == GST_CLOCK_TIME_NONE)
297 return TRUE;
299 /* not a nice multiple of milliseconds */
300 if (timeout % 1000000)
301 return FALSE;
303 return TRUE;
304 }
305 #endif
307 static GstPollMode
308 choose_mode (const GstPoll * set, GstClockTime timeout)
309 {
310 GstPollMode mode;
312 if (set->mode == GST_POLL_MODE_AUTO) {
313 #ifdef HAVE_PPOLL
314 mode = GST_POLL_MODE_PPOLL;
315 #elif defined(HAVE_POLL)
316 if (!selectable_fds (set) || pollable_timeout (timeout)) {
317 mode = GST_POLL_MODE_POLL;
318 } else {
319 #ifdef HAVE_PSELECT
320 mode = GST_POLL_MODE_PSELECT;
321 #else
322 mode = GST_POLL_MODE_SELECT;
323 #endif
324 }
325 #elif defined(HAVE_PSELECT)
326 mode = GST_POLL_MODE_PSELECT;
327 #else
328 mode = GST_POLL_MODE_SELECT;
329 #endif
330 } else {
331 mode = set->mode;
332 }
333 return mode;
334 }
336 #ifndef G_OS_WIN32
337 static gint
338 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
339 fd_set * errorfds)
340 {
341 gint max_fd = -1;
342 guint i;
344 FD_ZERO (readfds);
345 FD_ZERO (writefds);
346 FD_ZERO (errorfds);
348 g_mutex_lock (set->lock);
350 for (i = 0; i < set->active_fds->len; i++) {
351 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
353 if (pfd->fd < FD_SETSIZE) {
354 if (pfd->events & POLLIN)
355 FD_SET (pfd->fd, readfds);
356 if (pfd->events & POLLOUT)
357 FD_SET (pfd->fd, writefds);
358 if (pfd->events)
359 FD_SET (pfd->fd, errorfds);
360 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
361 max_fd = pfd->fd;
362 }
363 }
365 g_mutex_unlock (set->lock);
367 return max_fd;
368 }
370 static void
371 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
372 fd_set * errorfds)
373 {
374 guint i;
376 g_mutex_lock (set->lock);
378 for (i = 0; i < set->active_fds->len; i++) {
379 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
381 if (pfd->fd < FD_SETSIZE) {
382 pfd->revents = 0;
383 if (FD_ISSET (pfd->fd, readfds))
384 pfd->revents |= POLLIN;
385 if (FD_ISSET (pfd->fd, writefds))
386 pfd->revents |= POLLOUT;
387 if (FD_ISSET (pfd->fd, errorfds))
388 pfd->revents |= POLLERR;
389 }
390 }
392 g_mutex_unlock (set->lock);
393 }
394 #else /* G_OS_WIN32 */
395 /*
396 * Translate errors thrown by the Winsock API used by GstPoll:
397 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
398 */
399 static gint
400 gst_poll_winsock_error_to_errno (DWORD last_error)
401 {
402 switch (last_error) {
403 case WSA_INVALID_HANDLE:
404 case WSAEINVAL:
405 case WSAENOTSOCK:
406 return EBADF;
408 case WSA_NOT_ENOUGH_MEMORY:
409 return ENOMEM;
411 /*
412 * Anything else, including:
413 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
414 * WSANOTINITIALISED
415 */
416 default:
417 return EINVAL;
418 }
419 }
421 static void
422 gst_poll_free_winsock_event (GstPoll * set, gint idx)
423 {
424 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
425 HANDLE event = g_array_index (set->events, HANDLE, idx);
427 WSAEventSelect (wfd->fd, event, 0);
428 CloseHandle (event);
429 }
431 static void
432 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
433 gboolean active)
434 {
435 WinsockFd *wfd;
437 wfd = &g_array_index (set->fds, WinsockFd, idx);
439 if (active)
440 wfd->event_mask |= flags;
441 else
442 wfd->event_mask &= ~flags;
444 /* reset ignored state if the new mask doesn't overlap at all */
445 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
446 wfd->ignored_event_mask = 0;
447 }
449 static gboolean
450 gst_poll_prepare_winsock_active_sets (GstPoll * set)
451 {
452 guint i;
454 g_array_set_size (set->active_fds, 0);
455 g_array_set_size (set->active_fds_ignored, 0);
456 g_array_set_size (set->active_events, 0);
457 g_array_append_val (set->active_events, set->wakeup_event);
459 for (i = 0; i < set->fds->len; i++) {
460 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
461 HANDLE event = g_array_index (set->events, HANDLE, i);
463 if (wfd->ignored_event_mask == 0) {
464 gint ret;
466 g_array_append_val (set->active_fds, *wfd);
467 g_array_append_val (set->active_events, event);
469 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
470 if (G_UNLIKELY (ret != 0)) {
471 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
472 return FALSE;
473 }
474 } else {
475 g_array_append_val (set->active_fds_ignored, wfd);
476 }
477 }
479 return TRUE;
480 }
482 static gint
483 gst_poll_collect_winsock_events (GstPoll * set)
484 {
485 gint res, i;
487 /*
488 * We need to check which events are signaled, and call
489 * WSAEnumNetworkEvents for those that are, which resets
490 * the event and clears the internal network event records.
491 */
492 res = 0;
493 for (i = 0; i < set->active_fds->len; i++) {
494 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
495 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
496 DWORD wait_ret;
498 wait_ret = WaitForSingleObject (event, 0);
499 if (wait_ret == WAIT_OBJECT_0) {
500 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
502 if (G_UNLIKELY (enum_ret != 0)) {
503 res = -1;
504 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
505 break;
506 }
508 res++;
509 } else {
510 /* clear any previously stored result */
511 memset (&wfd->events, 0, sizeof (wfd->events));
512 }
513 }
515 /* If all went well we also need to reset the ignored fds. */
516 if (res >= 0) {
517 res += set->active_fds_ignored->len;
519 for (i = 0; i < set->active_fds_ignored->len; i++) {
520 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
522 wfd->ignored_event_mask = 0;
523 }
525 g_array_set_size (set->active_fds_ignored, 0);
526 }
528 return res;
529 }
530 #endif
532 /**
533 * gst_poll_new:
534 * @controllable: whether it should be possible to control a wait.
535 *
536 * Create a new file descriptor set. If @controllable, it
537 * is possible to restart or flush a call to gst_poll_wait() with
538 * gst_poll_restart() and gst_poll_set_flushing() respectively.
539 *
540 * Free-function: gst_poll_free
541 *
542 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
543 * Free with gst_poll_free().
544 *
545 * Since: 0.10.18
546 */
547 GstPoll *
548 gst_poll_new (gboolean controllable)
549 {
550 GstPoll *nset;
552 GST_DEBUG ("controllable : %d", controllable);
554 nset = g_slice_new0 (GstPoll);
555 nset->lock = g_mutex_new ();
556 #ifndef G_OS_WIN32
557 nset->mode = GST_POLL_MODE_AUTO;
558 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
559 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
560 nset->control_read_fd.fd = -1;
561 nset->control_write_fd.fd = -1;
562 {
563 gint control_sock[2];
565 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
566 goto no_socket_pair;
568 fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
569 fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
571 nset->control_read_fd.fd = control_sock[0];
572 nset->control_write_fd.fd = control_sock[1];
574 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
575 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
576 }
577 #else
578 nset->mode = GST_POLL_MODE_WINDOWS;
579 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
580 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
581 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
582 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
583 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
585 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
586 #endif
588 /* ensure (re)build, though already sneakily set in non-windows case */
589 MARK_REBUILD (nset);
591 nset->controllable = controllable;
593 return nset;
595 /* ERRORS */
596 #ifndef G_OS_WIN32
597 no_socket_pair:
598 {
599 GST_WARNING ("%p: can't create socket pair !", nset);
600 gst_poll_free (nset);
601 return NULL;
602 }
603 #endif
604 }
606 /**
607 * gst_poll_new_timer:
608 *
609 * Create a new poll object that can be used for scheduling cancellable
610 * timeouts.
611 *
612 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
613 * performed from different threads.
614 *
615 * Free-function: gst_poll_free
616 *
617 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
618 * Free with gst_poll_free().
619 *
620 * Since: 0.10.23
621 */
622 GstPoll *
623 gst_poll_new_timer (void)
624 {
625 GstPoll *poll;
627 /* make a new controllable poll set */
628 if (!(poll = gst_poll_new (TRUE)))
629 goto done;
631 /* we are a timer */
632 poll->timer = TRUE;
634 done:
635 return poll;
636 }
638 /**
639 * gst_poll_free:
640 * @set: (transfer full): a file descriptor set.
641 *
642 * Free a file descriptor set.
643 *
644 * Since: 0.10.18
645 */
646 void
647 gst_poll_free (GstPoll * set)
648 {
649 g_return_if_fail (set != NULL);
651 GST_DEBUG ("%p: freeing", set);
653 #ifndef G_OS_WIN32
654 if (set->control_write_fd.fd >= 0)
655 close (set->control_write_fd.fd);
656 if (set->control_read_fd.fd >= 0)
657 close (set->control_read_fd.fd);
658 #else
659 CloseHandle (set->wakeup_event);
661 {
662 guint i;
664 for (i = 0; i < set->events->len; i++)
665 gst_poll_free_winsock_event (set, i);
666 }
668 g_array_free (set->active_events, TRUE);
669 g_array_free (set->events, TRUE);
670 g_array_free (set->active_fds_ignored, TRUE);
671 #endif
673 g_array_free (set->active_fds, TRUE);
674 g_array_free (set->fds, TRUE);
675 g_mutex_free (set->lock);
676 g_slice_free (GstPoll, set);
677 }
679 /**
680 * gst_poll_get_read_gpollfd:
681 * @set: a #GstPoll
682 * @fd: a #GPollFD
683 *
684 * Get a GPollFD for the reading part of the control socket. This is useful when
685 * integrating with a GSource and GMainLoop.
686 *
687 * Since: 0.10.32
688 */
689 void
690 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
691 {
692 g_return_if_fail (set != NULL);
693 g_return_if_fail (fd != NULL);
695 #ifndef G_OS_WIN32
696 fd->fd = set->control_read_fd.fd;
697 #else
698 #if GLIB_SIZEOF_VOID_P == 8
699 fd->fd = (gint64) set->wakeup_event;
700 #else
701 fd->fd = (gint) set->wakeup_event;
702 #endif
703 #endif
704 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
705 fd->revents = 0;
706 }
708 /**
709 * gst_poll_fd_init:
710 * @fd: a #GstPollFD
711 *
712 * Initializes @fd. Alternatively you can initialize it with
713 * #GST_POLL_FD_INIT.
714 *
715 * Since: 0.10.18
716 */
717 void
718 gst_poll_fd_init (GstPollFD * fd)
719 {
720 g_return_if_fail (fd != NULL);
722 fd->fd = -1;
723 fd->idx = -1;
724 }
726 static gboolean
727 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
728 {
729 gint idx;
731 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
733 idx = find_index (set->fds, fd);
734 if (idx < 0) {
735 #ifndef G_OS_WIN32
736 struct pollfd nfd;
738 nfd.fd = fd->fd;
739 nfd.events = POLLERR | POLLNVAL | POLLHUP;
740 nfd.revents = 0;
742 g_array_append_val (set->fds, nfd);
744 fd->idx = set->fds->len - 1;
745 #else
746 WinsockFd wfd;
747 HANDLE event;
749 wfd.fd = fd->fd;
750 wfd.event_mask = FD_CLOSE;
751 memset (&wfd.events, 0, sizeof (wfd.events));
752 wfd.ignored_event_mask = 0;
753 event = WSACreateEvent ();
755 g_array_append_val (set->fds, wfd);
756 g_array_append_val (set->events, event);
758 fd->idx = set->fds->len - 1;
759 #endif
760 MARK_REBUILD (set);
761 } else {
762 GST_WARNING ("%p: couldn't find fd !", set);
763 }
765 return TRUE;
766 }
768 /**
769 * gst_poll_add_fd:
770 * @set: a file descriptor set.
771 * @fd: a file descriptor.
772 *
773 * Add a file descriptor to the file descriptor set.
774 *
775 * Returns: %TRUE if the file descriptor was successfully added to the set.
776 *
777 * Since: 0.10.18
778 */
779 gboolean
780 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
781 {
782 gboolean ret;
784 g_return_val_if_fail (set != NULL, FALSE);
785 g_return_val_if_fail (fd != NULL, FALSE);
786 g_return_val_if_fail (fd->fd >= 0, FALSE);
788 g_mutex_lock (set->lock);
790 ret = gst_poll_add_fd_unlocked (set, fd);
792 g_mutex_unlock (set->lock);
794 return ret;
795 }
797 /**
798 * gst_poll_remove_fd:
799 * @set: a file descriptor set.
800 * @fd: a file descriptor.
801 *
802 * Remove a file descriptor from the file descriptor set.
803 *
804 * Returns: %TRUE if the file descriptor was successfully removed from the set.
805 *
806 * Since: 0.10.18
807 */
808 gboolean
809 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
810 {
811 gint idx;
813 g_return_val_if_fail (set != NULL, FALSE);
814 g_return_val_if_fail (fd != NULL, FALSE);
815 g_return_val_if_fail (fd->fd >= 0, FALSE);
818 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
820 g_mutex_lock (set->lock);
822 /* get the index, -1 is an fd that is not added */
823 idx = find_index (set->fds, fd);
824 if (idx >= 0) {
825 #ifdef G_OS_WIN32
826 gst_poll_free_winsock_event (set, idx);
827 g_array_remove_index_fast (set->events, idx);
828 #endif
830 /* remove the fd at index, we use _remove_index_fast, which copies the last
831 * element of the array to the freed index */
832 g_array_remove_index_fast (set->fds, idx);
834 /* mark fd as removed by setting the index to -1 */
835 fd->idx = -1;
836 MARK_REBUILD (set);
837 } else {
838 GST_WARNING ("%p: couldn't find fd !", set);
839 }
841 g_mutex_unlock (set->lock);
843 return idx >= 0;
844 }
846 /**
847 * gst_poll_fd_ctl_write:
848 * @set: a file descriptor set.
849 * @fd: a file descriptor.
850 * @active: a new status.
851 *
852 * Control whether the descriptor @fd in @set will be monitored for
853 * writability.
854 *
855 * Returns: %TRUE if the descriptor was successfully updated.
856 *
857 * Since: 0.10.18
858 */
859 gboolean
860 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
861 {
862 gint idx;
864 g_return_val_if_fail (set != NULL, FALSE);
865 g_return_val_if_fail (fd != NULL, FALSE);
866 g_return_val_if_fail (fd->fd >= 0, FALSE);
868 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
869 fd->fd, fd->idx, active);
871 g_mutex_lock (set->lock);
873 idx = find_index (set->fds, fd);
874 if (idx >= 0) {
875 #ifndef G_OS_WIN32
876 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
878 if (active)
879 pfd->events |= POLLOUT;
880 else
881 pfd->events &= ~POLLOUT;
883 GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
884 #else
885 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
886 active);
887 #endif
888 MARK_REBUILD (set);
889 } else {
890 GST_WARNING ("%p: couldn't find fd !", set);
891 }
893 g_mutex_unlock (set->lock);
895 return idx >= 0;
896 }
898 static gboolean
899 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
900 {
901 gint idx;
903 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
904 fd->fd, fd->idx, active);
906 idx = find_index (set->fds, fd);
908 if (idx >= 0) {
909 #ifndef G_OS_WIN32
910 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
912 if (active)
913 pfd->events |= (POLLIN | POLLPRI);
914 else
915 pfd->events &= ~(POLLIN | POLLPRI);
916 #else
917 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
918 #endif
919 MARK_REBUILD (set);
920 } else {
921 GST_WARNING ("%p: couldn't find fd !", set);
922 }
924 return idx >= 0;
925 }
927 /**
928 * gst_poll_fd_ctl_read:
929 * @set: a file descriptor set.
930 * @fd: a file descriptor.
931 * @active: a new status.
932 *
933 * Control whether the descriptor @fd in @set will be monitored for
934 * readability.
935 *
936 * Returns: %TRUE if the descriptor was successfully updated.
937 *
938 * Since: 0.10.18
939 */
940 gboolean
941 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
942 {
943 gboolean ret;
945 g_return_val_if_fail (set != NULL, FALSE);
946 g_return_val_if_fail (fd != NULL, FALSE);
947 g_return_val_if_fail (fd->fd >= 0, FALSE);
949 g_mutex_lock (set->lock);
951 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
953 g_mutex_unlock (set->lock);
955 return ret;
956 }
958 /**
959 * gst_poll_fd_ignored:
960 * @set: a file descriptor set.
961 * @fd: a file descriptor.
962 *
963 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
964 * the same result for @fd as last time. This function must be called if no
965 * operation (read/write/recv/send/etc.) will be performed on @fd before
966 * the next call to gst_poll_wait().
967 *
968 * The reason why this is needed is because the underlying implementation
969 * might not allow querying the fd more than once between calls to one of
970 * the re-enabling operations.
971 *
972 * Since: 0.10.18
973 */
974 void
975 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
976 {
977 #ifdef G_OS_WIN32
978 gint idx;
980 g_return_if_fail (set != NULL);
981 g_return_if_fail (fd != NULL);
982 g_return_if_fail (fd->fd >= 0);
984 g_mutex_lock (set->lock);
986 idx = find_index (set->fds, fd);
987 if (idx >= 0) {
988 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
990 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
991 MARK_REBUILD (set);
992 }
994 g_mutex_unlock (set->lock);
995 #endif
996 }
998 /**
999 * gst_poll_fd_has_closed:
1000 * @set: a file descriptor set.
1001 * @fd: a file descriptor.
1002 *
1003 * Check if @fd in @set has closed the connection.
1004 *
1005 * Returns: %TRUE if the connection was closed.
1006 *
1007 * Since: 0.10.18
1008 */
1009 gboolean
1010 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1011 {
1012 gboolean res = FALSE;
1013 gint idx;
1015 g_return_val_if_fail (set != NULL, FALSE);
1016 g_return_val_if_fail (fd != NULL, FALSE);
1017 g_return_val_if_fail (fd->fd >= 0, FALSE);
1019 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1021 g_mutex_lock (set->lock);
1023 idx = find_index (set->active_fds, fd);
1024 if (idx >= 0) {
1025 #ifndef G_OS_WIN32
1026 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1028 res = (pfd->revents & POLLHUP) != 0;
1029 #else
1030 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1032 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1033 #endif
1034 } else {
1035 GST_WARNING ("%p: couldn't find fd !", set);
1036 }
1038 g_mutex_unlock (set->lock);
1040 return res;
1041 }
1043 /**
1044 * gst_poll_fd_has_error:
1045 * @set: a file descriptor set.
1046 * @fd: a file descriptor.
1047 *
1048 * Check if @fd in @set has an error.
1049 *
1050 * Returns: %TRUE if the descriptor has an error.
1051 *
1052 * Since: 0.10.18
1053 */
1054 gboolean
1055 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1056 {
1057 gboolean res = FALSE;
1058 gint idx;
1060 g_return_val_if_fail (set != NULL, FALSE);
1061 g_return_val_if_fail (fd != NULL, FALSE);
1062 g_return_val_if_fail (fd->fd >= 0, FALSE);
1064 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1066 g_mutex_lock (set->lock);
1068 idx = find_index (set->active_fds, fd);
1069 if (idx >= 0) {
1070 #ifndef G_OS_WIN32
1071 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1073 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1074 #else
1075 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1077 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1078 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1079 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1080 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1081 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1082 #endif
1083 } else {
1084 GST_WARNING ("%p: couldn't find fd !", set);
1085 }
1087 g_mutex_unlock (set->lock);
1089 return res;
1090 }
1092 static gboolean
1093 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1094 {
1095 gboolean res = FALSE;
1096 gint idx;
1098 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1100 idx = find_index (set->active_fds, fd);
1101 if (idx >= 0) {
1102 #ifndef G_OS_WIN32
1103 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1105 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1106 #else
1107 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1109 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1110 #endif
1111 } else {
1112 GST_WARNING ("%p: couldn't find fd !", set);
1113 }
1115 return res;
1116 }
1118 /**
1119 * gst_poll_fd_can_read:
1120 * @set: a file descriptor set.
1121 * @fd: a file descriptor.
1122 *
1123 * Check if @fd in @set has data to be read.
1124 *
1125 * Returns: %TRUE if the descriptor has data to be read.
1126 *
1127 * Since: 0.10.18
1128 */
1129 gboolean
1130 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1131 {
1132 gboolean res = FALSE;
1134 g_return_val_if_fail (set != NULL, FALSE);
1135 g_return_val_if_fail (fd != NULL, FALSE);
1136 g_return_val_if_fail (fd->fd >= 0, FALSE);
1138 g_mutex_lock (set->lock);
1140 res = gst_poll_fd_can_read_unlocked (set, fd);
1142 g_mutex_unlock (set->lock);
1144 return res;
1145 }
1147 /**
1148 * gst_poll_fd_can_write:
1149 * @set: a file descriptor set.
1150 * @fd: a file descriptor.
1151 *
1152 * Check if @fd in @set can be used for writing.
1153 *
1154 * Returns: %TRUE if the descriptor can be used for writing.
1155 *
1156 * Since: 0.10.18
1157 */
1158 gboolean
1159 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1160 {
1161 gboolean res = FALSE;
1162 gint idx;
1164 g_return_val_if_fail (set != NULL, FALSE);
1165 g_return_val_if_fail (fd != NULL, FALSE);
1166 g_return_val_if_fail (fd->fd >= 0, FALSE);
1168 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1170 g_mutex_lock (set->lock);
1172 idx = find_index (set->active_fds, fd);
1173 if (idx >= 0) {
1174 #ifndef G_OS_WIN32
1175 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1177 res = (pfd->revents & POLLOUT) != 0;
1178 #else
1179 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1181 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1182 #endif
1183 } else {
1184 GST_WARNING ("%p: couldn't find fd !", set);
1185 }
1187 g_mutex_unlock (set->lock);
1189 return res;
1190 }
1192 /**
1193 * gst_poll_wait:
1194 * @set: a #GstPoll.
1195 * @timeout: a timeout in nanoseconds.
1196 *
1197 * Wait for activity on the file descriptors in @set. This function waits up to
1198 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1199 *
1200 * For #GstPoll objects created with gst_poll_new(), this function can only be
1201 * called from a single thread at a time. If called from multiple threads,
1202 * -1 will be returned with errno set to EPERM.
1203 *
1204 * This is not true for timer #GstPoll objects created with
1205 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1206 * simultaneously.
1207 *
1208 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1209 * activity was detected after @timeout. If an error occurs, -1 is returned
1210 * and errno is set.
1211 *
1212 * Since: 0.10.18
1213 */
1214 gint
1215 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1216 {
1217 gboolean restarting;
1218 gboolean is_timer;
1219 int res;
1220 gint old_waiting;
1222 g_return_val_if_fail (set != NULL, -1);
1224 GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1226 is_timer = set->timer;
1228 /* add one more waiter */
1229 old_waiting = INC_WAITING (set);
1231 /* we cannot wait from multiple threads unless we are a timer */
1232 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1233 goto already_waiting;
1235 /* flushing, exit immediatly */
1236 if (G_UNLIKELY (IS_FLUSHING (set)))
1237 goto flushing;
1239 do {
1240 GstPollMode mode;
1242 res = -1;
1243 restarting = FALSE;
1245 mode = choose_mode (set, timeout);
1247 if (TEST_REBUILD (set)) {
1248 g_mutex_lock (set->lock);
1249 #ifndef G_OS_WIN32
1250 g_array_set_size (set->active_fds, set->fds->len);
1251 memcpy (set->active_fds->data, set->fds->data,
1252 set->fds->len * sizeof (struct pollfd));
1253 #else
1254 if (!gst_poll_prepare_winsock_active_sets (set))
1255 goto winsock_error;
1256 #endif
1257 g_mutex_unlock (set->lock);
1258 }
1260 switch (mode) {
1261 case GST_POLL_MODE_AUTO:
1262 g_assert_not_reached ();
1263 break;
1264 case GST_POLL_MODE_PPOLL:
1265 {
1266 #ifdef HAVE_PPOLL
1267 struct timespec ts;
1268 struct timespec *tsptr;
1270 if (timeout != GST_CLOCK_TIME_NONE) {
1271 GST_TIME_TO_TIMESPEC (timeout, ts);
1272 tsptr = &ts;
1273 } else {
1274 tsptr = NULL;
1275 }
1277 res =
1278 ppoll ((struct pollfd *) set->active_fds->data,
1279 set->active_fds->len, tsptr, NULL);
1280 #else
1281 g_assert_not_reached ();
1282 errno = ENOSYS;
1283 #endif
1284 break;
1285 }
1286 case GST_POLL_MODE_POLL:
1287 {
1288 #ifdef HAVE_POLL
1289 gint t;
1291 if (timeout != GST_CLOCK_TIME_NONE) {
1292 t = GST_TIME_AS_MSECONDS (timeout);
1293 } else {
1294 t = -1;
1295 }
1297 res =
1298 poll ((struct pollfd *) set->active_fds->data,
1299 set->active_fds->len, t);
1300 #else
1301 g_assert_not_reached ();
1302 errno = ENOSYS;
1303 #endif
1304 break;
1305 }
1306 case GST_POLL_MODE_PSELECT:
1307 #ifndef HAVE_PSELECT
1308 {
1309 g_assert_not_reached ();
1310 errno = ENOSYS;
1311 break;
1312 }
1313 #endif
1314 case GST_POLL_MODE_SELECT:
1315 {
1316 #ifndef G_OS_WIN32
1317 fd_set readfds;
1318 fd_set writefds;
1319 fd_set errorfds;
1320 gint max_fd;
1322 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1324 if (mode == GST_POLL_MODE_SELECT) {
1325 struct timeval tv;
1326 struct timeval *tvptr;
1328 if (timeout != GST_CLOCK_TIME_NONE) {
1329 GST_TIME_TO_TIMEVAL (timeout, tv);
1330 tvptr = &tv;
1331 } else {
1332 tvptr = NULL;
1333 }
1335 GST_DEBUG ("Calling select");
1336 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1337 GST_DEBUG ("After select, res:%d", res);
1338 } else {
1339 #ifdef HAVE_PSELECT
1340 struct timespec ts;
1341 struct timespec *tsptr;
1343 if (timeout != GST_CLOCK_TIME_NONE) {
1344 GST_TIME_TO_TIMESPEC (timeout, ts);
1345 tsptr = &ts;
1346 } else {
1347 tsptr = NULL;
1348 }
1350 GST_DEBUG ("Calling pselect");
1351 res =
1352 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1353 GST_DEBUG ("After pselect, res:%d", res);
1354 #endif
1355 }
1357 if (res >= 0) {
1358 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1359 }
1360 #else /* G_OS_WIN32 */
1361 g_assert_not_reached ();
1362 errno = ENOSYS;
1363 #endif
1364 break;
1365 }
1366 case GST_POLL_MODE_WINDOWS:
1367 {
1368 #ifdef G_OS_WIN32
1369 gint ignore_count = set->active_fds_ignored->len;
1370 DWORD t, wait_ret;
1372 if (G_LIKELY (ignore_count == 0)) {
1373 if (timeout != GST_CLOCK_TIME_NONE)
1374 t = GST_TIME_AS_MSECONDS (timeout);
1375 else
1376 t = INFINITE;
1377 } else {
1378 /* already one or more ignored fds, so we quickly sweep the others */
1379 t = 0;
1380 }
1382 if (set->active_events->len != 0) {
1383 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1384 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1385 } else {
1386 wait_ret = WSA_WAIT_FAILED;
1387 WSASetLastError (WSA_INVALID_PARAMETER);
1388 }
1390 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1391 res = 0;
1392 } else if (wait_ret == WSA_WAIT_FAILED) {
1393 res = -1;
1394 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1395 } else {
1396 /* the first entry is the wakeup event */
1397 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1398 res = gst_poll_collect_winsock_events (set);
1399 } else {
1400 res = 1; /* wakeup event */
1401 }
1402 }
1403 #else
1404 g_assert_not_reached ();
1405 errno = ENOSYS;
1406 #endif
1407 break;
1408 }
1409 }
1411 if (!is_timer) {
1412 /* Applications needs to clear the control socket themselves for timer
1413 * polls.
1414 * For other polls, we need to clear the control socket. If there was only
1415 * one socket with activity and it was the control socket, we need to
1416 * restart */
1417 if (release_all_wakeup (set) > 0 && res == 1)
1418 restarting = TRUE;
1419 }
1421 if (G_UNLIKELY (IS_FLUSHING (set))) {
1422 /* we got woken up and we are flushing, we need to stop */
1423 errno = EBUSY;
1424 res = -1;
1425 break;
1426 }
1427 } while (G_UNLIKELY (restarting));
1429 DEC_WAITING (set);
1431 return res;
1433 /* ERRORS */
1434 already_waiting:
1435 {
1436 DEC_WAITING (set);
1437 errno = EPERM;
1438 return -1;
1439 }
1440 flushing:
1441 {
1442 DEC_WAITING (set);
1443 errno = EBUSY;
1444 return -1;
1445 }
1446 #ifdef G_OS_WIN32
1447 winsock_error:
1448 {
1449 g_mutex_unlock (set->lock);
1450 DEC_WAITING (set);
1451 return -1;
1452 }
1453 #endif
1454 }
1456 /**
1457 * gst_poll_set_controllable:
1458 * @set: a #GstPoll.
1459 * @controllable: new controllable state.
1460 *
1461 * When @controllable is %TRUE, this function ensures that future calls to
1462 * gst_poll_wait() will be affected by gst_poll_restart() and
1463 * gst_poll_set_flushing().
1464 *
1465 * Returns: %TRUE if the controllability of @set could be updated.
1466 *
1467 * Since: 0.10.18
1468 */
1469 gboolean
1470 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1471 {
1472 g_return_val_if_fail (set != NULL, FALSE);
1474 GST_LOG ("%p: controllable : %d", set, controllable);
1476 set->controllable = controllable;
1478 return TRUE;
1479 }
1481 /**
1482 * gst_poll_restart:
1483 * @set: a #GstPoll.
1484 *
1485 * Restart any gst_poll_wait() that is in progress. This function is typically
1486 * used after adding or removing descriptors to @set.
1487 *
1488 * If @set is not controllable, then this call will have no effect.
1489 *
1490 * Since: 0.10.18
1491 */
1492 void
1493 gst_poll_restart (GstPoll * set)
1494 {
1495 g_return_if_fail (set != NULL);
1497 if (set->controllable && GET_WAITING (set) > 0) {
1498 /* we are controllable and waiting, wake up the waiter. The socket will be
1499 * cleared by the _wait() thread and the poll will be restarted */
1500 raise_wakeup (set);
1501 }
1502 }
1504 /**
1505 * gst_poll_set_flushing:
1506 * @set: a #GstPoll.
1507 * @flushing: new flushing state.
1508 *
1509 * When @flushing is %TRUE, this function ensures that current and future calls
1510 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1511 *
1512 * Unsetting the flushing state will restore normal operation of @set.
1513 *
1514 * Since: 0.10.18
1515 */
1516 void
1517 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1518 {
1519 g_return_if_fail (set != NULL);
1521 /* update the new state first */
1522 SET_FLUSHING (set, flushing);
1524 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1525 /* we are flushing, controllable and waiting, wake up the waiter. When we
1526 * stop the flushing operation we don't clear the wakeup fd here, this will
1527 * happen in the _wait() thread. */
1528 raise_wakeup (set);
1529 }
1530 }
1532 /**
1533 * gst_poll_write_control:
1534 * @set: a #GstPoll.
1535 *
1536 * Write a byte to the control socket of the controllable @set.
1537 * This function is mostly useful for timer #GstPoll objects created with
1538 * gst_poll_new_timer().
1539 *
1540 * It will make any current and future gst_poll_wait() function return with
1541 * 1, meaning the control socket is set. After an equal amount of calls to
1542 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1543 * block again until their timeout expired.
1544 *
1545 * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1546 * byte could not be written.
1547 *
1548 * Since: 0.10.23
1549 */
1550 gboolean
1551 gst_poll_write_control (GstPoll * set)
1552 {
1553 gboolean res;
1555 g_return_val_if_fail (set != NULL, FALSE);
1556 g_return_val_if_fail (set->timer, FALSE);
1558 res = raise_wakeup (set);
1560 return res;
1561 }
1563 /**
1564 * gst_poll_read_control:
1565 * @set: a #GstPoll.
1566 *
1567 * Read a byte from the control socket of the controllable @set.
1568 * This function is mostly useful for timer #GstPoll objects created with
1569 * gst_poll_new_timer().
1570 *
1571 * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1572 * was no byte to read.
1573 *
1574 * Since: 0.10.23
1575 */
1576 gboolean
1577 gst_poll_read_control (GstPoll * set)
1578 {
1579 gboolean res;
1581 g_return_val_if_fail (set != NULL, FALSE);
1582 g_return_val_if_fail (set->timer, FALSE);
1584 res = release_wakeup (set);
1586 return res;
1587 }