1 /* GStreamer
2 * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
4 * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
5 * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
6 *
7 * gstpoll.c: File descriptor set
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 * Boston, MA 02111-1307, USA.
23 */
24 /**
25 * SECTION:gstpoll
26 * @short_description: Keep track of file descriptors and make it possible
27 * to wait on them in a cancelable way
28 *
29 * A #GstPoll keeps track of file descriptors much like fd_set (used with
30 * select()) or a struct pollfd array (used with poll()). Once created with
31 * gst_poll_new(), the set can be used to wait for file descriptors to be
32 * readable and/or writeable. It is possible to make this wait be controlled
33 * by specifying %TRUE for the @controllable flag when creating the set (or
34 * later calling gst_poll_set_controllable()).
35 *
36 * New file descriptors are added to the set using gst_poll_add_fd(), and
37 * removed using gst_poll_remove_fd(). Controlling which file descriptors
38 * should be waited for to become readable and/or writeable are done using
39 * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
40 *
41 * Use gst_poll_wait() to wait for the file descriptors to actually become
42 * readable and/or writeable, or to timeout if no file descriptor is available
43 * in time. The wait can be controlled by calling gst_poll_restart() and
44 * gst_poll_set_flushing().
45 *
46 * Once the file descriptor set has been waited for, one can use
47 * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
48 * gst_poll_fd_has_error() to see if it has generated an error,
49 * gst_poll_fd_can_read() to see if it is possible to read from the file
50 * descriptor, and gst_poll_fd_can_write() to see if it is possible to
51 * write to it.
52 *
53 */
55 #ifdef HAVE_CONFIG_H
56 #include "config.h"
57 #endif
59 #include "gst_private.h"
60 #include "glib-compat-private.h"
62 #include <sys/types.h>
64 #ifdef HAVE_UNISTD_H
65 #include <unistd.h>
66 #endif
68 #include <errno.h>
69 #include <fcntl.h>
71 #include <glib.h>
73 #ifdef G_OS_WIN32
74 #include <winsock2.h>
75 #define EINPROGRESS WSAEINPROGRESS
76 #else
77 #define _GNU_SOURCE 1
78 #ifdef HAVE_SYS_POLL_H
79 #include <sys/poll.h>
80 #endif
81 #ifdef HAVE_POLL_H
82 #include <poll.h>
83 #endif
84 #include <sys/time.h>
85 #include <sys/socket.h>
86 #endif
88 /* OS/X needs this because of bad headers */
89 #include <string.h>
91 /* The poll() emulation on OS/X doesn't handle fds=NULL, nfds=0,
92 * so we prefer our own poll emulation.
93 */
94 #if defined(BROKEN_POLL)
95 #undef HAVE_POLL
96 #endif
98 #include "gstpoll.h"
100 #define GST_CAT_DEFAULT GST_CAT_POLL
102 #ifdef G_OS_WIN32
103 typedef struct _WinsockFd WinsockFd;
105 struct _WinsockFd
106 {
107 gint fd;
108 glong event_mask;
109 WSANETWORKEVENTS events;
110 glong ignored_event_mask;
111 };
112 #endif
114 typedef enum
115 {
116 GST_POLL_MODE_AUTO,
117 GST_POLL_MODE_SELECT,
118 GST_POLL_MODE_PSELECT,
119 GST_POLL_MODE_POLL,
120 GST_POLL_MODE_PPOLL,
121 GST_POLL_MODE_WINDOWS
122 } GstPollMode;
124 struct _GstPoll
125 {
126 GstPollMode mode;
128 GMutex *lock;
129 /* array of fds, always written to and read from with lock */
130 GArray *fds;
131 /* array of active fds, only written to from the waiting thread with the
132 * lock and read from with the lock or without the lock from the waiting
133 * thread */
134 GArray *active_fds;
136 #ifndef G_OS_WIN32
137 gchar buf[1];
138 GstPollFD control_read_fd;
139 GstPollFD control_write_fd;
140 #else
141 GArray *active_fds_ignored;
142 GArray *events;
143 GArray *active_events;
145 HANDLE wakeup_event;
146 #endif
148 gboolean controllable;
149 volatile gint waiting;
150 volatile gint control_pending;
151 volatile gint flushing;
152 gboolean timer;
153 volatile gint rebuild;
154 };
156 static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
157 gboolean active);
158 static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
160 #define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
161 #define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
163 #define INC_WAITING(s) (G_ATOMIC_INT_ADD(&(s)->waiting, 1))
164 #define DEC_WAITING(s) (G_ATOMIC_INT_ADD(&(s)->waiting, -1))
165 #define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
167 #define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
168 #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
170 #ifndef G_OS_WIN32
171 #define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
172 #define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
173 #else
174 #define WAKE_EVENT(s) (SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
175 #define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
176 #endif
178 /* the poll/select call is also performed on a control socket, that way
179 * we can send special commands to control it */
180 static inline gboolean
181 raise_wakeup (GstPoll * set)
182 {
183 gboolean result = TRUE;
185 if (G_ATOMIC_INT_ADD (&set->control_pending, 1) == 0) {
186 /* raise when nothing pending */
187 result = WAKE_EVENT (set);
188 }
189 return result;
190 }
192 /* note how bad things can happen when the 2 threads both raise and release the
193 * wakeup. This is however not a problem because you must always pair a raise
194 * with a release */
195 static inline gboolean
196 release_wakeup (GstPoll * set)
197 {
198 gboolean result = TRUE;
200 if (g_atomic_int_dec_and_test (&set->control_pending)) {
201 result = RELEASE_EVENT (set);
202 }
203 return result;
204 }
206 static inline gint
207 release_all_wakeup (GstPoll * set)
208 {
209 gint old;
211 while (TRUE) {
212 if (!(old = g_atomic_int_get (&set->control_pending)))
213 /* nothing pending, just exit */
214 break;
216 /* try to remove all pending control messages */
217 if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
218 /* we managed to remove all messages, read the control socket */
219 if (RELEASE_EVENT (set))
220 break;
221 else
222 /* retry again until we read it successfully */
223 G_ATOMIC_INT_ADD (&set->control_pending, 1);
224 }
225 }
226 return old;
227 }
229 static gint
230 find_index (GArray * array, GstPollFD * fd)
231 {
232 #ifndef G_OS_WIN32
233 struct pollfd *ifd;
234 #else
235 WinsockFd *ifd;
236 #endif
237 guint i;
239 /* start by assuming the index found in the fd is still valid */
240 if (fd->idx >= 0 && fd->idx < array->len) {
241 #ifndef G_OS_WIN32
242 ifd = &g_array_index (array, struct pollfd, fd->idx);
243 #else
244 ifd = &g_array_index (array, WinsockFd, fd->idx);
245 #endif
247 if (ifd->fd == fd->fd) {
248 return fd->idx;
249 }
250 }
252 /* the pollfd array has changed and we need to lookup the fd again */
253 for (i = 0; i < array->len; i++) {
254 #ifndef G_OS_WIN32
255 ifd = &g_array_index (array, struct pollfd, i);
256 #else
257 ifd = &g_array_index (array, WinsockFd, i);
258 #endif
260 if (ifd->fd == fd->fd) {
261 fd->idx = (gint) i;
262 return fd->idx;
263 }
264 }
266 fd->idx = -1;
267 return fd->idx;
268 }
270 #if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
271 /* check if all file descriptors will fit in an fd_set */
272 static gboolean
273 selectable_fds (const GstPoll * set)
274 {
275 guint i;
277 g_mutex_lock (set->lock);
278 for (i = 0; i < set->fds->len; i++) {
279 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
281 if (pfd->fd >= FD_SETSIZE)
282 goto too_many;
283 }
284 g_mutex_unlock (set->lock);
286 return TRUE;
288 too_many:
289 {
290 g_mutex_unlock (set->lock);
291 return FALSE;
292 }
293 }
295 /* check if the timeout will convert to a timeout value used for poll()
296 * without a loss of precision
297 */
298 static gboolean
299 pollable_timeout (GstClockTime timeout)
300 {
301 if (timeout == GST_CLOCK_TIME_NONE)
302 return TRUE;
304 /* not a nice multiple of milliseconds */
305 if (timeout % 1000000)
306 return FALSE;
308 return TRUE;
309 }
310 #endif
312 static GstPollMode
313 choose_mode (const GstPoll * set, GstClockTime timeout)
314 {
315 GstPollMode mode;
317 if (set->mode == GST_POLL_MODE_AUTO) {
318 #ifdef HAVE_PPOLL
319 mode = GST_POLL_MODE_PPOLL;
320 #elif defined(HAVE_POLL)
321 if (!selectable_fds (set) || pollable_timeout (timeout)) {
322 mode = GST_POLL_MODE_POLL;
323 } else {
324 #ifdef HAVE_PSELECT
325 mode = GST_POLL_MODE_PSELECT;
326 #else
327 mode = GST_POLL_MODE_SELECT;
328 #endif
329 }
330 #elif defined(HAVE_PSELECT)
331 mode = GST_POLL_MODE_PSELECT;
332 #else
333 mode = GST_POLL_MODE_SELECT;
334 #endif
335 } else {
336 mode = set->mode;
337 }
338 return mode;
339 }
341 #ifndef G_OS_WIN32
342 static gint
343 pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds,
344 fd_set * errorfds)
345 {
346 gint max_fd = -1;
347 guint i;
349 FD_ZERO (readfds);
350 FD_ZERO (writefds);
351 FD_ZERO (errorfds);
353 g_mutex_lock (set->lock);
355 for (i = 0; i < set->active_fds->len; i++) {
356 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
358 if (pfd->fd < FD_SETSIZE) {
359 if (pfd->events & POLLIN)
360 FD_SET (pfd->fd, readfds);
361 if (pfd->events & POLLOUT)
362 FD_SET (pfd->fd, writefds);
363 if (pfd->events)
364 FD_SET (pfd->fd, errorfds);
365 if (pfd->fd > max_fd && (pfd->events & (POLLIN | POLLOUT)))
366 max_fd = pfd->fd;
367 }
368 }
370 g_mutex_unlock (set->lock);
372 return max_fd;
373 }
375 static void
376 fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds,
377 fd_set * errorfds)
378 {
379 guint i;
381 g_mutex_lock (set->lock);
383 for (i = 0; i < set->active_fds->len; i++) {
384 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
386 if (pfd->fd < FD_SETSIZE) {
387 pfd->revents = 0;
388 if (FD_ISSET (pfd->fd, readfds))
389 pfd->revents |= POLLIN;
390 if (FD_ISSET (pfd->fd, writefds))
391 pfd->revents |= POLLOUT;
392 if (FD_ISSET (pfd->fd, errorfds))
393 pfd->revents |= POLLERR;
394 }
395 }
397 g_mutex_unlock (set->lock);
398 }
399 #else /* G_OS_WIN32 */
400 /*
401 * Translate errors thrown by the Winsock API used by GstPoll:
402 * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
403 */
404 static gint
405 gst_poll_winsock_error_to_errno (DWORD last_error)
406 {
407 switch (last_error) {
408 case WSA_INVALID_HANDLE:
409 case WSAEINVAL:
410 case WSAENOTSOCK:
411 return EBADF;
413 case WSA_NOT_ENOUGH_MEMORY:
414 return ENOMEM;
416 /*
417 * Anything else, including:
418 * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
419 * WSANOTINITIALISED
420 */
421 default:
422 return EINVAL;
423 }
424 }
426 static void
427 gst_poll_free_winsock_event (GstPoll * set, gint idx)
428 {
429 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
430 HANDLE event = g_array_index (set->events, HANDLE, idx);
432 WSAEventSelect (wfd->fd, event, 0);
433 CloseHandle (event);
434 }
436 static void
437 gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
438 gboolean active)
439 {
440 WinsockFd *wfd;
442 wfd = &g_array_index (set->fds, WinsockFd, idx);
444 if (active)
445 wfd->event_mask |= flags;
446 else
447 wfd->event_mask &= ~flags;
449 /* reset ignored state if the new mask doesn't overlap at all */
450 if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
451 wfd->ignored_event_mask = 0;
452 }
454 static gboolean
455 gst_poll_prepare_winsock_active_sets (GstPoll * set)
456 {
457 guint i;
459 g_array_set_size (set->active_fds, 0);
460 g_array_set_size (set->active_fds_ignored, 0);
461 g_array_set_size (set->active_events, 0);
462 g_array_append_val (set->active_events, set->wakeup_event);
464 for (i = 0; i < set->fds->len; i++) {
465 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
466 HANDLE event = g_array_index (set->events, HANDLE, i);
468 if (wfd->ignored_event_mask == 0) {
469 gint ret;
471 g_array_append_val (set->active_fds, *wfd);
472 g_array_append_val (set->active_events, event);
474 ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
475 if (G_UNLIKELY (ret != 0)) {
476 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
477 return FALSE;
478 }
479 } else {
480 g_array_append_val (set->active_fds_ignored, wfd);
481 }
482 }
484 return TRUE;
485 }
487 static gint
488 gst_poll_collect_winsock_events (GstPoll * set)
489 {
490 gint res, i;
492 /*
493 * We need to check which events are signaled, and call
494 * WSAEnumNetworkEvents for those that are, which resets
495 * the event and clears the internal network event records.
496 */
497 res = 0;
498 for (i = 0; i < set->active_fds->len; i++) {
499 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
500 HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
501 DWORD wait_ret;
503 wait_ret = WaitForSingleObject (event, 0);
504 if (wait_ret == WAIT_OBJECT_0) {
505 gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
507 if (G_UNLIKELY (enum_ret != 0)) {
508 res = -1;
509 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
510 break;
511 }
513 res++;
514 } else {
515 /* clear any previously stored result */
516 memset (&wfd->events, 0, sizeof (wfd->events));
517 }
518 }
520 /* If all went well we also need to reset the ignored fds. */
521 if (res >= 0) {
522 res += set->active_fds_ignored->len;
524 for (i = 0; i < set->active_fds_ignored->len; i++) {
525 WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
527 wfd->ignored_event_mask = 0;
528 }
530 g_array_set_size (set->active_fds_ignored, 0);
531 }
533 return res;
534 }
535 #endif
537 /**
538 * gst_poll_new:
539 * @controllable: whether it should be possible to control a wait.
540 *
541 * Create a new file descriptor set. If @controllable, it
542 * is possible to restart or flush a call to gst_poll_wait() with
543 * gst_poll_restart() and gst_poll_set_flushing() respectively.
544 *
545 * Free-function: gst_poll_free
546 *
547 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
548 * Free with gst_poll_free().
549 *
550 * Since: 0.10.18
551 */
552 GstPoll *
553 gst_poll_new (gboolean controllable)
554 {
555 GstPoll *nset;
557 GST_DEBUG ("controllable : %d", controllable);
559 nset = g_slice_new0 (GstPoll);
560 nset->lock = g_mutex_new ();
561 #ifndef G_OS_WIN32
562 nset->mode = GST_POLL_MODE_AUTO;
563 nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
564 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
565 nset->control_read_fd.fd = -1;
566 nset->control_write_fd.fd = -1;
567 {
568 gint control_sock[2];
570 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
571 goto no_socket_pair;
573 fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
574 fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
576 nset->control_read_fd.fd = control_sock[0];
577 nset->control_write_fd.fd = control_sock[1];
579 gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
580 gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
581 }
582 #else
583 nset->mode = GST_POLL_MODE_WINDOWS;
584 nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
585 nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
586 nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
587 nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
588 nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
590 nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
591 #endif
593 /* ensure (re)build, though already sneakily set in non-windows case */
594 MARK_REBUILD (nset);
596 nset->controllable = controllable;
598 return nset;
600 /* ERRORS */
601 #ifndef G_OS_WIN32
602 no_socket_pair:
603 {
604 GST_WARNING ("%p: can't create socket pair !", nset);
605 gst_poll_free (nset);
606 return NULL;
607 }
608 #endif
609 }
611 /**
612 * gst_poll_new_timer:
613 *
614 * Create a new poll object that can be used for scheduling cancellable
615 * timeouts.
616 *
617 * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
618 * performed from different threads.
619 *
620 * Free-function: gst_poll_free
621 *
622 * Returns: (transfer full): a new #GstPoll, or %NULL in case of an error.
623 * Free with gst_poll_free().
624 *
625 * Since: 0.10.23
626 */
627 GstPoll *
628 gst_poll_new_timer (void)
629 {
630 GstPoll *poll;
632 /* make a new controllable poll set */
633 if (!(poll = gst_poll_new (TRUE)))
634 goto done;
636 /* we are a timer */
637 poll->timer = TRUE;
639 done:
640 return poll;
641 }
643 /**
644 * gst_poll_free:
645 * @set: (transfer full): a file descriptor set.
646 *
647 * Free a file descriptor set.
648 *
649 * Since: 0.10.18
650 */
651 void
652 gst_poll_free (GstPoll * set)
653 {
654 g_return_if_fail (set != NULL);
656 GST_DEBUG ("%p: freeing", set);
658 #ifndef G_OS_WIN32
659 if (set->control_write_fd.fd >= 0)
660 close (set->control_write_fd.fd);
661 if (set->control_read_fd.fd >= 0)
662 close (set->control_read_fd.fd);
663 #else
664 CloseHandle (set->wakeup_event);
666 {
667 guint i;
669 for (i = 0; i < set->events->len; i++)
670 gst_poll_free_winsock_event (set, i);
671 }
673 g_array_free (set->active_events, TRUE);
674 g_array_free (set->events, TRUE);
675 g_array_free (set->active_fds_ignored, TRUE);
676 #endif
678 g_array_free (set->active_fds, TRUE);
679 g_array_free (set->fds, TRUE);
680 g_mutex_free (set->lock);
681 g_slice_free (GstPoll, set);
682 }
684 /**
685 * gst_poll_get_read_gpollfd:
686 * @set: a #GstPoll
687 * @fd: a #GPollFD
688 *
689 * Get a GPollFD for the reading part of the control socket. This is useful when
690 * integrating with a GSource and GMainLoop.
691 *
692 * Since: 0.10.32
693 */
694 void
695 gst_poll_get_read_gpollfd (GstPoll * set, GPollFD * fd)
696 {
697 g_return_if_fail (set != NULL);
698 g_return_if_fail (fd != NULL);
700 #ifndef G_OS_WIN32
701 fd->fd = set->control_read_fd.fd;
702 #else
703 #if GLIB_SIZEOF_VOID_P == 8
704 fd->fd = (gint64) set->wakeup_event;
705 #else
706 fd->fd = (gint) set->wakeup_event;
707 #endif
708 #endif
709 fd->events = G_IO_IN | G_IO_HUP | G_IO_ERR;
710 fd->revents = 0;
711 }
713 /**
714 * gst_poll_fd_init:
715 * @fd: a #GstPollFD
716 *
717 * Initializes @fd. Alternatively you can initialize it with
718 * #GST_POLL_FD_INIT.
719 *
720 * Since: 0.10.18
721 */
722 void
723 gst_poll_fd_init (GstPollFD * fd)
724 {
725 g_return_if_fail (fd != NULL);
727 fd->fd = -1;
728 fd->idx = -1;
729 }
731 static gboolean
732 gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
733 {
734 gint idx;
736 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
738 idx = find_index (set->fds, fd);
739 if (idx < 0) {
740 #ifndef G_OS_WIN32
741 struct pollfd nfd;
743 nfd.fd = fd->fd;
744 nfd.events = POLLERR | POLLNVAL | POLLHUP;
745 nfd.revents = 0;
747 g_array_append_val (set->fds, nfd);
749 fd->idx = set->fds->len - 1;
750 #else
751 WinsockFd wfd;
752 HANDLE event;
754 wfd.fd = fd->fd;
755 wfd.event_mask = FD_CLOSE;
756 memset (&wfd.events, 0, sizeof (wfd.events));
757 wfd.ignored_event_mask = 0;
758 event = WSACreateEvent ();
760 g_array_append_val (set->fds, wfd);
761 g_array_append_val (set->events, event);
763 fd->idx = set->fds->len - 1;
764 #endif
765 MARK_REBUILD (set);
766 } else {
767 GST_WARNING ("%p: couldn't find fd !", set);
768 }
770 return TRUE;
771 }
773 /**
774 * gst_poll_add_fd:
775 * @set: a file descriptor set.
776 * @fd: a file descriptor.
777 *
778 * Add a file descriptor to the file descriptor set.
779 *
780 * Returns: %TRUE if the file descriptor was successfully added to the set.
781 *
782 * Since: 0.10.18
783 */
784 gboolean
785 gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
786 {
787 gboolean ret;
789 g_return_val_if_fail (set != NULL, FALSE);
790 g_return_val_if_fail (fd != NULL, FALSE);
791 g_return_val_if_fail (fd->fd >= 0, FALSE);
793 g_mutex_lock (set->lock);
795 ret = gst_poll_add_fd_unlocked (set, fd);
797 g_mutex_unlock (set->lock);
799 return ret;
800 }
802 /**
803 * gst_poll_remove_fd:
804 * @set: a file descriptor set.
805 * @fd: a file descriptor.
806 *
807 * Remove a file descriptor from the file descriptor set.
808 *
809 * Returns: %TRUE if the file descriptor was successfully removed from the set.
810 *
811 * Since: 0.10.18
812 */
813 gboolean
814 gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
815 {
816 gint idx;
818 g_return_val_if_fail (set != NULL, FALSE);
819 g_return_val_if_fail (fd != NULL, FALSE);
820 g_return_val_if_fail (fd->fd >= 0, FALSE);
823 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
825 g_mutex_lock (set->lock);
827 /* get the index, -1 is an fd that is not added */
828 idx = find_index (set->fds, fd);
829 if (idx >= 0) {
830 #ifdef G_OS_WIN32
831 gst_poll_free_winsock_event (set, idx);
832 g_array_remove_index_fast (set->events, idx);
833 #endif
835 /* remove the fd at index, we use _remove_index_fast, which copies the last
836 * element of the array to the freed index */
837 g_array_remove_index_fast (set->fds, idx);
839 /* mark fd as removed by setting the index to -1 */
840 fd->idx = -1;
841 MARK_REBUILD (set);
842 } else {
843 GST_WARNING ("%p: couldn't find fd !", set);
844 }
846 g_mutex_unlock (set->lock);
848 return idx >= 0;
849 }
851 /**
852 * gst_poll_fd_ctl_write:
853 * @set: a file descriptor set.
854 * @fd: a file descriptor.
855 * @active: a new status.
856 *
857 * Control whether the descriptor @fd in @set will be monitored for
858 * writability.
859 *
860 * Returns: %TRUE if the descriptor was successfully updated.
861 *
862 * Since: 0.10.18
863 */
864 gboolean
865 gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
866 {
867 gint idx;
869 g_return_val_if_fail (set != NULL, FALSE);
870 g_return_val_if_fail (fd != NULL, FALSE);
871 g_return_val_if_fail (fd->fd >= 0, FALSE);
873 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
874 fd->fd, fd->idx, active);
876 g_mutex_lock (set->lock);
878 idx = find_index (set->fds, fd);
879 if (idx >= 0) {
880 #ifndef G_OS_WIN32
881 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
883 if (active)
884 pfd->events |= POLLOUT;
885 else
886 pfd->events &= ~POLLOUT;
888 GST_LOG ("pfd->events now %d (POLLOUT:%d)", pfd->events, POLLOUT);
889 #else
890 gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
891 active);
892 #endif
893 MARK_REBUILD (set);
894 } else {
895 GST_WARNING ("%p: couldn't find fd !", set);
896 }
898 g_mutex_unlock (set->lock);
900 return idx >= 0;
901 }
903 static gboolean
904 gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
905 {
906 gint idx;
908 GST_DEBUG ("%p: fd (fd:%d, idx:%d), active : %d", set,
909 fd->fd, fd->idx, active);
911 idx = find_index (set->fds, fd);
913 if (idx >= 0) {
914 #ifndef G_OS_WIN32
915 struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
917 if (active)
918 pfd->events |= (POLLIN | POLLPRI);
919 else
920 pfd->events &= ~(POLLIN | POLLPRI);
921 #else
922 gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
923 #endif
924 MARK_REBUILD (set);
925 } else {
926 GST_WARNING ("%p: couldn't find fd !", set);
927 }
929 return idx >= 0;
930 }
932 /**
933 * gst_poll_fd_ctl_read:
934 * @set: a file descriptor set.
935 * @fd: a file descriptor.
936 * @active: a new status.
937 *
938 * Control whether the descriptor @fd in @set will be monitored for
939 * readability.
940 *
941 * Returns: %TRUE if the descriptor was successfully updated.
942 *
943 * Since: 0.10.18
944 */
945 gboolean
946 gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
947 {
948 gboolean ret;
950 g_return_val_if_fail (set != NULL, FALSE);
951 g_return_val_if_fail (fd != NULL, FALSE);
952 g_return_val_if_fail (fd->fd >= 0, FALSE);
954 g_mutex_lock (set->lock);
956 ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
958 g_mutex_unlock (set->lock);
960 return ret;
961 }
963 /**
964 * gst_poll_fd_ignored:
965 * @set: a file descriptor set.
966 * @fd: a file descriptor.
967 *
968 * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
969 * the same result for @fd as last time. This function must be called if no
970 * operation (read/write/recv/send/etc.) will be performed on @fd before
971 * the next call to gst_poll_wait().
972 *
973 * The reason why this is needed is because the underlying implementation
974 * might not allow querying the fd more than once between calls to one of
975 * the re-enabling operations.
976 *
977 * Since: 0.10.18
978 */
979 void
980 gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
981 {
982 #ifdef G_OS_WIN32
983 gint idx;
985 g_return_if_fail (set != NULL);
986 g_return_if_fail (fd != NULL);
987 g_return_if_fail (fd->fd >= 0);
989 g_mutex_lock (set->lock);
991 idx = find_index (set->fds, fd);
992 if (idx >= 0) {
993 WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
995 wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
996 MARK_REBUILD (set);
997 }
999 g_mutex_unlock (set->lock);
1000 #endif
1001 }
1003 /**
1004 * gst_poll_fd_has_closed:
1005 * @set: a file descriptor set.
1006 * @fd: a file descriptor.
1007 *
1008 * Check if @fd in @set has closed the connection.
1009 *
1010 * Returns: %TRUE if the connection was closed.
1011 *
1012 * Since: 0.10.18
1013 */
1014 gboolean
1015 gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
1016 {
1017 gboolean res = FALSE;
1018 gint idx;
1020 g_return_val_if_fail (set != NULL, FALSE);
1021 g_return_val_if_fail (fd != NULL, FALSE);
1022 g_return_val_if_fail (fd->fd >= 0, FALSE);
1024 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1026 g_mutex_lock (set->lock);
1028 idx = find_index (set->active_fds, fd);
1029 if (idx >= 0) {
1030 #ifndef G_OS_WIN32
1031 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1033 res = (pfd->revents & POLLHUP) != 0;
1034 #else
1035 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1037 res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
1038 #endif
1039 } else {
1040 GST_WARNING ("%p: couldn't find fd !", set);
1041 }
1043 g_mutex_unlock (set->lock);
1045 return res;
1046 }
1048 /**
1049 * gst_poll_fd_has_error:
1050 * @set: a file descriptor set.
1051 * @fd: a file descriptor.
1052 *
1053 * Check if @fd in @set has an error.
1054 *
1055 * Returns: %TRUE if the descriptor has an error.
1056 *
1057 * Since: 0.10.18
1058 */
1059 gboolean
1060 gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
1061 {
1062 gboolean res = FALSE;
1063 gint idx;
1065 g_return_val_if_fail (set != NULL, FALSE);
1066 g_return_val_if_fail (fd != NULL, FALSE);
1067 g_return_val_if_fail (fd->fd >= 0, FALSE);
1069 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1071 g_mutex_lock (set->lock);
1073 idx = find_index (set->active_fds, fd);
1074 if (idx >= 0) {
1075 #ifndef G_OS_WIN32
1076 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1078 res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
1079 #else
1080 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1082 res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
1083 (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
1084 (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
1085 (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
1086 (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
1087 #endif
1088 } else {
1089 GST_WARNING ("%p: couldn't find fd !", set);
1090 }
1092 g_mutex_unlock (set->lock);
1094 return res;
1095 }
1097 static gboolean
1098 gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
1099 {
1100 gboolean res = FALSE;
1101 gint idx;
1103 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1105 idx = find_index (set->active_fds, fd);
1106 if (idx >= 0) {
1107 #ifndef G_OS_WIN32
1108 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1110 res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
1111 #else
1112 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1114 res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
1115 #endif
1116 } else {
1117 GST_WARNING ("%p: couldn't find fd !", set);
1118 }
1120 return res;
1121 }
1123 /**
1124 * gst_poll_fd_can_read:
1125 * @set: a file descriptor set.
1126 * @fd: a file descriptor.
1127 *
1128 * Check if @fd in @set has data to be read.
1129 *
1130 * Returns: %TRUE if the descriptor has data to be read.
1131 *
1132 * Since: 0.10.18
1133 */
1134 gboolean
1135 gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
1136 {
1137 gboolean res = FALSE;
1139 g_return_val_if_fail (set != NULL, FALSE);
1140 g_return_val_if_fail (fd != NULL, FALSE);
1141 g_return_val_if_fail (fd->fd >= 0, FALSE);
1143 g_mutex_lock (set->lock);
1145 res = gst_poll_fd_can_read_unlocked (set, fd);
1147 g_mutex_unlock (set->lock);
1149 return res;
1150 }
1152 /**
1153 * gst_poll_fd_can_write:
1154 * @set: a file descriptor set.
1155 * @fd: a file descriptor.
1156 *
1157 * Check if @fd in @set can be used for writing.
1158 *
1159 * Returns: %TRUE if the descriptor can be used for writing.
1160 *
1161 * Since: 0.10.18
1162 */
1163 gboolean
1164 gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
1165 {
1166 gboolean res = FALSE;
1167 gint idx;
1169 g_return_val_if_fail (set != NULL, FALSE);
1170 g_return_val_if_fail (fd != NULL, FALSE);
1171 g_return_val_if_fail (fd->fd >= 0, FALSE);
1173 GST_DEBUG ("%p: fd (fd:%d, idx:%d)", set, fd->fd, fd->idx);
1175 g_mutex_lock (set->lock);
1177 idx = find_index (set->active_fds, fd);
1178 if (idx >= 0) {
1179 #ifndef G_OS_WIN32
1180 struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
1182 res = (pfd->revents & POLLOUT) != 0;
1183 #else
1184 WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
1186 res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
1187 #endif
1188 } else {
1189 GST_WARNING ("%p: couldn't find fd !", set);
1190 }
1192 g_mutex_unlock (set->lock);
1194 return res;
1195 }
1197 /**
1198 * gst_poll_wait:
1199 * @set: a #GstPoll.
1200 * @timeout: a timeout in nanoseconds.
1201 *
1202 * Wait for activity on the file descriptors in @set. This function waits up to
1203 * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
1204 *
1205 * For #GstPoll objects created with gst_poll_new(), this function can only be
1206 * called from a single thread at a time. If called from multiple threads,
1207 * -1 will be returned with errno set to EPERM.
1208 *
1209 * This is not true for timer #GstPoll objects created with
1210 * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
1211 * simultaneously.
1212 *
1213 * Returns: The number of #GstPollFD in @set that have activity or 0 when no
1214 * activity was detected after @timeout. If an error occurs, -1 is returned
1215 * and errno is set.
1216 *
1217 * Since: 0.10.18
1218 */
1219 gint
1220 gst_poll_wait (GstPoll * set, GstClockTime timeout)
1221 {
1222 gboolean restarting;
1223 gboolean is_timer;
1224 int res;
1225 gint old_waiting;
1227 g_return_val_if_fail (set != NULL, -1);
1229 GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
1231 is_timer = set->timer;
1233 /* add one more waiter */
1234 old_waiting = INC_WAITING (set);
1236 /* we cannot wait from multiple threads unless we are a timer */
1237 if (G_UNLIKELY (old_waiting > 0 && !is_timer))
1238 goto already_waiting;
1240 /* flushing, exit immediately */
1241 if (G_UNLIKELY (IS_FLUSHING (set)))
1242 goto flushing;
1244 do {
1245 GstPollMode mode;
1247 res = -1;
1248 restarting = FALSE;
1250 mode = choose_mode (set, timeout);
1252 if (TEST_REBUILD (set)) {
1253 g_mutex_lock (set->lock);
1254 #ifndef G_OS_WIN32
1255 g_array_set_size (set->active_fds, set->fds->len);
1256 memcpy (set->active_fds->data, set->fds->data,
1257 set->fds->len * sizeof (struct pollfd));
1258 #else
1259 if (!gst_poll_prepare_winsock_active_sets (set))
1260 goto winsock_error;
1261 #endif
1262 g_mutex_unlock (set->lock);
1263 }
1265 switch (mode) {
1266 case GST_POLL_MODE_AUTO:
1267 g_assert_not_reached ();
1268 break;
1269 case GST_POLL_MODE_PPOLL:
1270 {
1271 #ifdef HAVE_PPOLL
1272 struct timespec ts;
1273 struct timespec *tsptr;
1275 if (timeout != GST_CLOCK_TIME_NONE) {
1276 GST_TIME_TO_TIMESPEC (timeout, ts);
1277 tsptr = &ts;
1278 } else {
1279 tsptr = NULL;
1280 }
1282 res =
1283 ppoll ((struct pollfd *) set->active_fds->data,
1284 set->active_fds->len, tsptr, NULL);
1285 #else
1286 g_assert_not_reached ();
1287 errno = ENOSYS;
1288 #endif
1289 break;
1290 }
1291 case GST_POLL_MODE_POLL:
1292 {
1293 #ifdef HAVE_POLL
1294 gint t;
1296 if (timeout != GST_CLOCK_TIME_NONE) {
1297 t = GST_TIME_AS_MSECONDS (timeout);
1298 } else {
1299 t = -1;
1300 }
1302 res =
1303 poll ((struct pollfd *) set->active_fds->data,
1304 set->active_fds->len, t);
1305 #else
1306 g_assert_not_reached ();
1307 errno = ENOSYS;
1308 #endif
1309 break;
1310 }
1311 case GST_POLL_MODE_PSELECT:
1312 #ifndef HAVE_PSELECT
1313 {
1314 g_assert_not_reached ();
1315 errno = ENOSYS;
1316 break;
1317 }
1318 #endif
1319 case GST_POLL_MODE_SELECT:
1320 {
1321 #ifndef G_OS_WIN32
1322 fd_set readfds;
1323 fd_set writefds;
1324 fd_set errorfds;
1325 gint max_fd;
1327 max_fd = pollfd_to_fd_set (set, &readfds, &writefds, &errorfds);
1329 if (mode == GST_POLL_MODE_SELECT) {
1330 struct timeval tv;
1331 struct timeval *tvptr;
1333 if (timeout != GST_CLOCK_TIME_NONE) {
1334 GST_TIME_TO_TIMEVAL (timeout, tv);
1335 tvptr = &tv;
1336 } else {
1337 tvptr = NULL;
1338 }
1340 GST_DEBUG ("Calling select");
1341 res = select (max_fd + 1, &readfds, &writefds, &errorfds, tvptr);
1342 GST_DEBUG ("After select, res:%d", res);
1343 } else {
1344 #ifdef HAVE_PSELECT
1345 struct timespec ts;
1346 struct timespec *tsptr;
1348 if (timeout != GST_CLOCK_TIME_NONE) {
1349 GST_TIME_TO_TIMESPEC (timeout, ts);
1350 tsptr = &ts;
1351 } else {
1352 tsptr = NULL;
1353 }
1355 GST_DEBUG ("Calling pselect");
1356 res =
1357 pselect (max_fd + 1, &readfds, &writefds, &errorfds, tsptr, NULL);
1358 GST_DEBUG ("After pselect, res:%d", res);
1359 #endif
1360 }
1362 if (res >= 0) {
1363 fd_set_to_pollfd (set, &readfds, &writefds, &errorfds);
1364 }
1365 #else /* G_OS_WIN32 */
1366 g_assert_not_reached ();
1367 errno = ENOSYS;
1368 #endif
1369 break;
1370 }
1371 case GST_POLL_MODE_WINDOWS:
1372 {
1373 #ifdef G_OS_WIN32
1374 gint ignore_count = set->active_fds_ignored->len;
1375 DWORD t, wait_ret;
1377 if (G_LIKELY (ignore_count == 0)) {
1378 if (timeout != GST_CLOCK_TIME_NONE)
1379 t = GST_TIME_AS_MSECONDS (timeout);
1380 else
1381 t = INFINITE;
1382 } else {
1383 /* already one or more ignored fds, so we quickly sweep the others */
1384 t = 0;
1385 }
1387 if (set->active_events->len != 0) {
1388 wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
1389 (HANDLE *) set->active_events->data, FALSE, t, FALSE);
1390 } else {
1391 wait_ret = WSA_WAIT_FAILED;
1392 WSASetLastError (WSA_INVALID_PARAMETER);
1393 }
1395 if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
1396 res = 0;
1397 } else if (wait_ret == WSA_WAIT_FAILED) {
1398 res = -1;
1399 errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
1400 } else {
1401 /* the first entry is the wakeup event */
1402 if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
1403 res = gst_poll_collect_winsock_events (set);
1404 } else {
1405 res = 1; /* wakeup event */
1406 }
1407 }
1408 #else
1409 g_assert_not_reached ();
1410 errno = ENOSYS;
1411 #endif
1412 break;
1413 }
1414 }
1416 if (!is_timer) {
1417 /* Applications needs to clear the control socket themselves for timer
1418 * polls.
1419 * For other polls, we need to clear the control socket. If there was only
1420 * one socket with activity and it was the control socket, we need to
1421 * restart */
1422 if (release_all_wakeup (set) > 0 && res == 1)
1423 restarting = TRUE;
1424 }
1426 if (G_UNLIKELY (IS_FLUSHING (set))) {
1427 /* we got woken up and we are flushing, we need to stop */
1428 errno = EBUSY;
1429 res = -1;
1430 break;
1431 }
1432 } while (G_UNLIKELY (restarting));
1434 DEC_WAITING (set);
1436 return res;
1438 /* ERRORS */
1439 already_waiting:
1440 {
1441 DEC_WAITING (set);
1442 errno = EPERM;
1443 return -1;
1444 }
1445 flushing:
1446 {
1447 DEC_WAITING (set);
1448 errno = EBUSY;
1449 return -1;
1450 }
1451 #ifdef G_OS_WIN32
1452 winsock_error:
1453 {
1454 g_mutex_unlock (set->lock);
1455 DEC_WAITING (set);
1456 return -1;
1457 }
1458 #endif
1459 }
1461 /**
1462 * gst_poll_set_controllable:
1463 * @set: a #GstPoll.
1464 * @controllable: new controllable state.
1465 *
1466 * When @controllable is %TRUE, this function ensures that future calls to
1467 * gst_poll_wait() will be affected by gst_poll_restart() and
1468 * gst_poll_set_flushing().
1469 *
1470 * Returns: %TRUE if the controllability of @set could be updated.
1471 *
1472 * Since: 0.10.18
1473 */
1474 gboolean
1475 gst_poll_set_controllable (GstPoll * set, gboolean controllable)
1476 {
1477 g_return_val_if_fail (set != NULL, FALSE);
1479 GST_LOG ("%p: controllable : %d", set, controllable);
1481 set->controllable = controllable;
1483 return TRUE;
1484 }
1486 /**
1487 * gst_poll_restart:
1488 * @set: a #GstPoll.
1489 *
1490 * Restart any gst_poll_wait() that is in progress. This function is typically
1491 * used after adding or removing descriptors to @set.
1492 *
1493 * If @set is not controllable, then this call will have no effect.
1494 *
1495 * Since: 0.10.18
1496 */
1497 void
1498 gst_poll_restart (GstPoll * set)
1499 {
1500 g_return_if_fail (set != NULL);
1502 if (set->controllable && GET_WAITING (set) > 0) {
1503 /* we are controllable and waiting, wake up the waiter. The socket will be
1504 * cleared by the _wait() thread and the poll will be restarted */
1505 raise_wakeup (set);
1506 }
1507 }
1509 /**
1510 * gst_poll_set_flushing:
1511 * @set: a #GstPoll.
1512 * @flushing: new flushing state.
1513 *
1514 * When @flushing is %TRUE, this function ensures that current and future calls
1515 * to gst_poll_wait() will return -1, with errno set to EBUSY.
1516 *
1517 * Unsetting the flushing state will restore normal operation of @set.
1518 *
1519 * Since: 0.10.18
1520 */
1521 void
1522 gst_poll_set_flushing (GstPoll * set, gboolean flushing)
1523 {
1524 g_return_if_fail (set != NULL);
1526 /* update the new state first */
1527 SET_FLUSHING (set, flushing);
1529 if (flushing && set->controllable && GET_WAITING (set) > 0) {
1530 /* we are flushing, controllable and waiting, wake up the waiter. When we
1531 * stop the flushing operation we don't clear the wakeup fd here, this will
1532 * happen in the _wait() thread. */
1533 raise_wakeup (set);
1534 }
1535 }
1537 /**
1538 * gst_poll_write_control:
1539 * @set: a #GstPoll.
1540 *
1541 * Write a byte to the control socket of the controllable @set.
1542 * This function is mostly useful for timer #GstPoll objects created with
1543 * gst_poll_new_timer().
1544 *
1545 * It will make any current and future gst_poll_wait() function return with
1546 * 1, meaning the control socket is set. After an equal amount of calls to
1547 * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
1548 * block again until their timeout expired.
1549 *
1550 * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
1551 * byte could not be written.
1552 *
1553 * Since: 0.10.23
1554 */
1555 gboolean
1556 gst_poll_write_control (GstPoll * set)
1557 {
1558 gboolean res;
1560 g_return_val_if_fail (set != NULL, FALSE);
1561 g_return_val_if_fail (set->timer, FALSE);
1563 res = raise_wakeup (set);
1565 return res;
1566 }
1568 /**
1569 * gst_poll_read_control:
1570 * @set: a #GstPoll.
1571 *
1572 * Read a byte from the control socket of the controllable @set.
1573 * This function is mostly useful for timer #GstPoll objects created with
1574 * gst_poll_new_timer().
1575 *
1576 * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
1577 * was no byte to read.
1578 *
1579 * Since: 0.10.23
1580 */
1581 gboolean
1582 gst_poll_read_control (GstPoll * set)
1583 {
1584 gboolean res;
1586 g_return_val_if_fail (set != NULL, FALSE);
1587 g_return_val_if_fail (set->timer, FALSE);
1589 res = release_wakeup (set);
1591 return res;
1592 }