]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/transport/TransportRpmsg.c
transport: Add error check for read and write function calls
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2018 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     struct {
96         int     fd;
97         UInt32  qId;
98     } inFds[1024];
99     int             nInFds;
100     pthread_mutex_t gate;
101     int             unblockEvent;    /* unblock the dispatch thread */
102     int             waitEvent;       /* block the client thread */
103     pthread_t       threadId;        /* ID returned by pthread_create() */
104     Bool            threadStarted;
106     TransportRpmsg_Handle *inst;    /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110     .bind    = TransportRpmsg_bind,
111     .unbind  = TransportRpmsg_unbind,
112     .put     = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116     IMessageQTransport_Object base;
117     Int status;
118     UInt16 rprocId;
119     int numQueues;
120     int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124     .sock = {INVALIDSOCKET},
125     .unblockEvent = -1,
126     .waitEvent = -1,
127     .threadStarted = FALSE,
128     .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135                                Int fd,
136                                UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147     .createFxn = TransportRpmsg_Factory_create,
148     .deleteFxn = TransportRpmsg_Factory_delete,
149     .attachFxn = TransportRpmsg_Factory_attach,
150     .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
158     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159     return ((IMessageQTransport_Handle)&obj->base);
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
164     return ((TransportRpmsg_Handle)base);
167 /*
168  *  ======== TransportRpmsg_create ========
169  */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
172     Int status = MessageQ_S_SUCCESS;
173     TransportRpmsg_Object *obj = NULL;
174     int sock;
175     int flags;
176     UInt16 clusterId;
177     int i;
180     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
182     /* create socket for sending messages to remote processor */
183     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
185     if (sock < 0) {
186         status = Ipc_E_FAIL;
187         fprintf(stderr,
188                 "TransportRpmsg_create: socket failed: %d (%s)\n", errno,
189                 strerror(errno));
190         goto done;
191     }
192     TransportRpmsg_module->sock[clusterId] = sock;
193     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
195     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
197     if (status < 0) {
198         status = Ipc_E_FAIL;
199         fprintf(stderr,
200                 "TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
201                 errno, strerror(errno), params->rprocId);
202         close(sock);
203         TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
204         goto done;
205     }
207     /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
208     flags = fcntl(sock, F_GETFD);
209     if (flags != -1) {
210         fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
211     }
213     /* create the instance object */
214     obj = calloc(1, sizeof(TransportRpmsg_Object));
216     if (obj == NULL) {
217         status = Ipc_E_MEMORY;
218         close(sock);
219         TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
220         goto done;
221     }
223     /* initialize the instance */
224     obj->base.base.interfaceType = IMessageQTransport_TypeId;
225     obj->base.fxns = &TransportRpmsg_fxns;
226     obj->rprocId = params->rprocId;
227     obj->numQueues = TransportRpmsg_GROWSIZE;
229     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
231     if (obj->qIndexToFd == NULL) {
232         status = Ipc_E_MEMORY;
233         goto done;
234     }
236     /* must initialize array */
237     for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
238         obj->qIndexToFd[i] = -1;
239     }
241 done:
242     if (status < 0) {
243         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
244     }
246     return (TransportRpmsg_Handle)obj;
249 /*
250  *  ======== TransportRpmsg_delete ========
251  */
252 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
254     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
255     UInt16 clusterId;
256     int sock;
258     if (obj == NULL) {
259         goto done;
260     }
262     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
264     /* close the socket for the given transport instance */
265     sock = TransportRpmsg_module->sock[clusterId];
266     if (sock != INVALIDSOCKET) {
267         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
268         close(sock);
269     }
270     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
272     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
273         free(obj->qIndexToFd);
274         obj->qIndexToFd = NULL;
275     }
277     if (obj != NULL) {
278         free(obj);
279         obj = NULL;
280     }
282 done:
283     *pHandle = NULL;
286 /*
287  *  ======== TransportRpmsg_bind ========
288  */
289 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
291     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
292     UInt16 queuePort = queueId & 0x0000ffff;
293     int fd;
294     int flags;
295     int err;
296     uint64_t event;
297     UInt16 rprocId;
298     pthread_t tid;
299     Int status = MessageQ_S_SUCCESS;
301     tid = pthread_self();
302     rprocId = obj->rprocId;
304     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
305             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
307     pthread_mutex_lock(&TransportRpmsg_module->gate);
309     /*  Check if binding already exists.
310      *
311      *  There is a race condition between a thread calling MessageQ_create
312      *  and another thread calling Ipc_attach. Must make sure we don't bind
313      *  the same queue twice.
314      */
315     if (queueIndexToFd(obj, queueId) != -1) {
316         goto done;
317     }
319     /*  Create the socket to receive messages for this messageQ. */
320     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
321     if (fd < 0) {
322         fprintf(stderr, "TransportRpmsg_bind: socket call failed: %d (%s)\n",
323                 errno, strerror(errno));
324         status = MessageQ_E_OSFAILURE;
325         goto done;
326     }
327     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
328             (unsigned int)tid);
330     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
331     if (err < 0) {
332         /* don't hard-printf since this is no longer fatal */
333         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
334                       errno, strerror(errno));
335         close(fd);
336         status = MessageQ_E_OSFAILURE;
337         goto done;
338     }
340     /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
341     flags = fcntl(fd, F_GETFD);
342     if (flags != -1) {
343         fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
344     }
346     /*  pause the dispatch thread */
347     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
348             (unsigned int)tid);
349     event = TransportRpmsg_Event_PAUSE;
350     err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
351     if (err < 0) {
352        /* don't hard-printf since this is no longer fatal */
353         PRINTVERBOSE2("TransportRpmsg_bind: pause write failed: %d (%s)\n",
354                       errno, strerror(errno));
355         close(fd);
356         status = MessageQ_E_OSFAILURE;
357         goto done;
358     }
360     /* wait for ACK event */
361     err = read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
362     if (err < 0) {
363        /* don't hard-printf since this is no longer fatal */
364         PRINTVERBOSE2("TransportRpmsg_bind: ack read failed: %d (%s)\n",
365                       errno, strerror(errno));
366         close(fd);
367         status = MessageQ_E_OSFAILURE;
368         goto done;
369     }
371     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
372             (int)event, (unsigned int)tid);
374     /* add to our fat fd array and update select() parameters */
375     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
376     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
377     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
378     FD_SET(fd, &TransportRpmsg_module->rfds);
379     bindFdToQueueIndex(obj, fd, queuePort);
381     /* release the dispatch thread */
382     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
383             (unsigned int)tid);
384     event = TransportRpmsg_Event_CONTINUE;
385     err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
386     if (err < 0) {
387        /* don't hard-printf since this is no longer fatal */
388         PRINTVERBOSE2("TransportRpmsg_bind: dispatch write failed: %d (%s)\n",
389                       errno, strerror(errno));
390         close(fd);
391         status = MessageQ_E_OSFAILURE;
392         goto done;
393     }
395 done:
396     pthread_mutex_unlock(&TransportRpmsg_module->gate);
398     return (status);
401 /*
402  *  ======== TransportRpmsg_unbind ========
403  */
404 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
406     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
407     UInt16 queuePort = queueId & 0x0000ffff;
408     uint64_t event;
409     Int    status = MessageQ_S_SUCCESS;
410     int    maxFd;
411     int    fd;
412     int    i;
413     int    j;
414     int     err;
416     pthread_mutex_lock(&TransportRpmsg_module->gate);
418     /*  pause the dispatch thread */
419     event = TransportRpmsg_Event_PAUSE;
420     err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
421     if (err < 0) {
422        /* don't hard-printf since this is no longer fatal */
423         PRINTVERBOSE2("TransportRpmsg_unbind: pause write failed: %d (%s)\n",
424                       errno, strerror(errno));
425     }
427     /* wait for ACK event */
428     err = read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
429     if (err < 0) {
430        /* don't hard-printf since this is no longer fatal */
431         PRINTVERBOSE2("TransportRpmsg_unbind: ack read failed: %d (%s)\n",
432                       errno, strerror(errno));
433     }
435     /*  Check if binding already deleted.
436      *
437      *  There is a race condition between a thread calling MessageQ_delete
438      *  and another thread calling Ipc_detach. Must make sure we don't unbind
439      *  the same queue twice.
440      */
441     if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
442         goto done;
443     }
444     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
446     /* guarenteed to work because queueIndexToFd above succeeded */
447     unbindQueueIndex(obj, queuePort);
449     /* remove from input fd array */
450     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
451         if (TransportRpmsg_module->inFds[i].fd == fd) {
452             TransportRpmsg_module->nInFds--;
454             /* shift subsequent elements down */
455             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
456                 TransportRpmsg_module->inFds[j] =
457                         TransportRpmsg_module->inFds[j + 1];
458             }
459             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
460             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
461             break;
462         }
463     }
465     /* remove fd from the descriptor set, compute new max value */
466     FD_CLR(fd, &TransportRpmsg_module->rfds);
467     if (fd == TransportRpmsg_module->maxFd) {
468         /* find new max fd */
469         maxFd = TransportRpmsg_module->unblockEvent;
470         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
471             maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
472         }
473         TransportRpmsg_module->maxFd = maxFd;
474     }
476     close(fd);
478     /* release the dispatch thread */
479     event = TransportRpmsg_Event_CONTINUE;
480     err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
481     if (err < 0) {
482        /* don't hard-printf since this is no longer fatal */
483         PRINTVERBOSE2("TransportRpmsg_unbind: dispatch write failed: %d (%s)\n",
484                       errno, strerror(errno));
485     }
487 done:
488     pthread_mutex_unlock(&TransportRpmsg_module->gate);
490     return (status);
493 /*
494  *  ======== TransportRpmsg_put ========
495  */
496 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
498     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
499     Int     status    = TRUE;
500     int     sock;
501     int     err;
502     UInt16  clusterId;
503     (Void)handle;
505     /*
506      * Retrieve the socket for the AF_SYSLINK protocol associated with this
507      * transport.
508      */
509     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
510     sock = TransportRpmsg_module->sock[clusterId];
511     if (sock == INVALIDSOCKET) {
512         return FALSE;
513     }
515     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
517     err = send(sock, msg, msg->msgSize, 0);
518     if (err < 0) {
519         fprintf(stderr, "TransportRpmsg_put: send failed: %d (%s)\n",
520                 errno, strerror(errno));
521         status = FALSE;
523         goto exit;
524     }
526     /*
527      * Free the message, as this is a copy transport, we maintain MessageQ
528      * semantics.
529      */
530     MessageQ_free(msg);
532 exit:
533     return status;
536 /*
537  *  ======== TransportRpmsg_control ========
538  */
539 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
541     (Void)handle;
542     (Void)cmd;
543     (Void)cmdArg;
544     return FALSE;
547 /*
548  *  ======== rpmsgThreadFxn ========
549  */
550 void *rpmsgThreadFxn(void *arg)
552     Int      status = MessageQ_S_SUCCESS;
553     Int      tmpStatus;
554     int      retval;
555     uint64_t event;
556     fd_set   rfds;
557     int      maxFd;
558     int      nfds;
559     MessageQ_Msg     retMsg = NULL;
560     MessageQ_QueueId queueId;
561     MessageQ_Handle handle;
562     Bool run = TRUE;
563     int i;
564     int j;
565     int fd;
566     (Void)arg;
567     int err;
569     while (run) {
570         maxFd = TransportRpmsg_module->maxFd;
571         rfds = TransportRpmsg_module->rfds;
572         nfds = TransportRpmsg_module->nInFds;
574         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
575                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
577         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
579         /* if error, try again */
580         if (retval < 0) {
581             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
582             continue;
583         }
585         /* dispatch all pending messages, do this first */
586         for (i = 0; i < nfds; i++) {
587             fd = TransportRpmsg_module->inFds[i].fd;
589             if (FD_ISSET(fd, &rfds)) {
590                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
591                         TransportRpmsg_module->inFds[i].fd);
593                 /* transport input fd was signalled: get the message */
594                 tmpStatus = transportGet(fd, &retMsg);
595                 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
596                     fprintf(stderr,
597                             "rpmsgThreadFxn: transportGet failed on fd %d, "
598                             "returned %d\n", fd, tmpStatus);
599                 }
600                 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
601                     fprintf(stderr,
602                             "rpmsgThreadFxn: transportGet failed on fd %d, "
603                             "returned %d\n", fd, tmpStatus);
605                     retval = pthread_mutex_trylock(&TransportRpmsg_module->gate);
606                     if (retval != 0) {
607                         printf("Warning: rpmsgThreadFxn: "
608                                "unable to get the lock during shutdown, "
609                                "will try again\n");
610                         continue;
611                     }
613                     /*
614                      * Don't close(fd) at this time since it will get closed
615                      * later when MessageQ_delete() is called in response to
616                      * this failure.  Just remove fd's bit from the select mask
617                      * 'rfds' for now, but don't remove it from inFds[].
618                      */
619                     FD_CLR(fd, &TransportRpmsg_module->rfds);
620                     if (fd == TransportRpmsg_module->maxFd) {
621                         /* find new max fd */
622                         maxFd = TransportRpmsg_module->unblockEvent;
623                         for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
624                             maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
625                                          maxFd);
626                         }
627                         TransportRpmsg_module->maxFd = maxFd;
628                     }
629                     queueId = TransportRpmsg_module->inFds[i].qId;
631                     pthread_mutex_unlock(&TransportRpmsg_module->gate);
633                     handle = MessageQ_getLocalHandle(queueId);
635                     PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
636                                   "%p (queueId 0x%x)...\n", handle, queueId)
638                     if (handle != NULL) {
639                         MessageQ_shutdown(handle);
640                     }
641                     else {
642                         fprintf(stderr,
643                                 "rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
644                                 "returned NULL, can't shutdown\n", queueId);
645                     }
646                 }
647                 else {
648                     queueId = MessageQ_getDstQueue(retMsg);
649                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
650                             "delivering to queueId 0x%x\n", queueId)
651                     MessageQ_put(queueId, retMsg);
652                 }
653             }
654         }
656         /* check for events */
657         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
659             err = read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
660             if (err < 0) {
661                 /* don't hard-printf since this is no longer fatal */
662                 PRINTVERBOSE2("rpmsgThreadFxn: event read failed: %d (%s)\n",
663                               errno, strerror(errno));
664             }
666             do {
667                 if (event & TransportRpmsg_Event_SHUTDOWN) {
668                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
669                     run = FALSE;
670                     break; /* highest priority, stop processing events */
671                 }
672                 if (event & TransportRpmsg_Event_CONTINUE) {
673                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
674                             (int)event);
675                     event &= ~TransportRpmsg_Event_CONTINUE;
676                 }
677                 if (event & TransportRpmsg_Event_PAUSE) {
678                     /*  Our event was signalled by TransportRpmsg_bind()
679                      *  or TransportRpmsg_unbind() to tell us that the set
680                      *  of file descriptors has changed.
681                      */
682                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
683                     /* send the acknowledgement */
684                     event = TransportRpmsg_Event_ACK;
685                     err = write(TransportRpmsg_module->waitEvent, &event,
686                             sizeof(event));
687                     if (err < 0) {
688                         /* don't hard-printf since this is no longer fatal */
689                         PRINTVERBOSE2("rpmsgThreadFxn: ack write failed: %d (%s)\n",
690                                       errno, strerror(errno));
691                     }
693                     /* now wait to be released */
694                     err = read(TransportRpmsg_module->unblockEvent, &event,
695                             sizeof(event));
696                    if (err < 0) {
697                         /* don't hard-printf since this is no longer fatal */
698                         PRINTVERBOSE2("rpmsgThreadFxn: wait read failed: %d (%s)\n",
699                                       errno, strerror(errno));
700                     }
701                 }
702             } while (event != 0);
703         }
704     }
706     return (void *)status;
709 /*
710  * ======== transportGet ========
711  *  Retrieve a message waiting in the socket's queue.
712  */
713 static Int transportGet(int sock, MessageQ_Msg *retMsg)
715     Int           status    = MessageQ_S_SUCCESS;
716     MessageQ_Msg  msg;
717     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
718     socklen_t     len;
719     int           byteCount;
721     /*
722      * We have no way of peeking to see what message size we'll get, so we
723      * allocate a message of max size to receive contents from the rpmsg socket
724      * (currently, a copy transport)
725      */
726     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
727     if (!msg) {
728         status = MessageQ_E_MEMORY;
729         goto exit;
730     }
732     memset(&fromAddr, 0, sizeof (fromAddr));
733     len = sizeof (fromAddr);
735     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
736                          (struct sockaddr *)&fromAddr, &len);
737     if (len != sizeof (fromAddr)) {
738         fprintf(stderr, "recvfrom: got bad addr len (%d)\n", len);
739         status = MessageQ_E_FAIL;
740         goto freeMsg;
741     }
742     if (byteCount < 0) {
743         fprintf(stderr, "recvfrom failed: %s (%d)\n", strerror(errno), errno);
744         if (errno == ENOLINK) {
745             status = MessageQ_E_SHUTDOWN;
746         }
747         else {
748             status = MessageQ_E_FAIL;
749         }
750         goto freeMsg;
751     }
752     else {
753          /*
754           * Update the allocated message size (even though this may waste
755           * space when the actual message is smaller than the maximum rpmsg
756           * size, the message will be freed soon anyway, and it avoids an
757           * extra copy).
758           */
759          msg->msgSize = byteCount;
761          /* set the heapId in the message header to match allocation above */
762          msg->heapId = 0;
763     }
765     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
766     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
767             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
768     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
769             msg->msgSize)
771     *retMsg = msg;
773     goto exit;
775 freeMsg:
776     MessageQ_free(msg);
778 exit:
779     return status;
782 /*
783  *  ======== bindFdToQueueIndex ========
784  *
785  *  Precondition: caller must be inside the module gate
786  */
787 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
789     Int *queues;
790     Int *oldQueues;
791     UInt oldSize;
792     UInt newCount;
793     UInt queueIndex;
794     UInt i;
796     /* subtract port offset from queue index */
797     queueIndex = queuePort - MessageQ_PORTOFFSET;
799     if (queueIndex >= (UInt)obj->numQueues) {
800         newCount = queueIndex + TransportRpmsg_GROWSIZE;
801         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
802                 newCount);
804         /* allocate larger table */
805         oldSize = obj->numQueues * sizeof(int);
806         queues = calloc(newCount, sizeof(int));
808         /* copy contents from old table int new table */
809         memcpy(queues, obj->qIndexToFd, oldSize);
811         /* initialize remaining entries of new (larger) table */
812         for (i = obj->numQueues; i < newCount; i++) {
813             queues[i] = -1;
814         }
816         /* swap in new table, delete old table */
817         oldQueues = obj->qIndexToFd;
818         obj->qIndexToFd = queues;
819         obj->numQueues = newCount;
820         free(oldQueues);
821     }
823     /* add new entry */
824     obj->qIndexToFd[queueIndex] = fd;
827 /*
828  *  ======== unbindQueueIndex ========
829  *
830  *  Precondition: caller must be inside the module gate
831  */
832 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
834     UInt queueIndex;
836     /* subtract port offset from queue index */
837     queueIndex = queuePort - MessageQ_PORTOFFSET;
839     /* clear table entry */
840     obj->qIndexToFd[queueIndex] = -1;
843 /*
844  *  ======== queueIndexToFd ========
845  *
846  *  Precondition: caller must be inside the module gate
847  */
848 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
850     UInt queueIndex;
852     /* subtract port offset from queue index */
853     queueIndex = queuePort - MessageQ_PORTOFFSET;
855     /* return file descriptor */
856     return (obj->qIndexToFd[queueIndex]);
859 /*
860  *  ======== TransportRpmsg_Factory_create ========
861  *  Create the transport instances
862  *
863  *  Attach to all remote processors. For now, must attach to
864  *  at least one to tolerate MessageQ_E_RESOURCE failures.
865  *
866  *  This function implements the IPC Factory interface, so it
867  *  returns Ipc status codes.
868  */
869 Int TransportRpmsg_Factory_create(Void)
871     Int status = Ipc_S_SUCCESS;
872     Int i;
873     UInt16 clusterSize;
874     TransportRpmsg_Handle *inst;
875     int flags;
878     /* needed to enumerate processors in cluster */
879     clusterSize = MultiProc_getNumProcsInCluster();
881     /* allocate the instance array */
882     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
884     if (inst == NULL) {
885         fprintf(stderr,
886                 "Error: TransportRpmsg_Factory_create failed, no memory\n");
887         status = Ipc_E_MEMORY;
888         goto done;
889     }
891     for (i = 0; i < clusterSize; i++) {
892         inst[i] = NULL;
893     }
895     TransportRpmsg_module->inst = inst;
897     /* counter event object for passing commands to dispatch thread */
898     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
900     if (TransportRpmsg_module->unblockEvent == -1) {
901         fprintf(stderr, "create: unblock event failed: %d (%s)\n",
902                 errno, strerror(errno));
903         status = Ipc_E_FAIL;
904         goto done;
905     }
907     PRINTVERBOSE1("create: created unblock event %d\n",
908             TransportRpmsg_module->unblockEvent)
910     /* semaphore event object for acknowledging client thread */
911     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
913     if (TransportRpmsg_module->waitEvent == -1) {
914         fprintf(stderr,
915                 "create: wait event failed: %d (%s)\n", errno, strerror(errno));
916         status = Ipc_E_FAIL;
917         goto done;
918     }
920     PRINTVERBOSE1("create: created wait event %d\n",
921             TransportRpmsg_module->waitEvent)
923     /* make sure eventfds don't exist for 'fork() -> exec*()'ed child */
924     flags = fcntl(TransportRpmsg_module->waitEvent, F_GETFD);
925     if (flags != -1) {
926         fcntl(TransportRpmsg_module->waitEvent, F_SETFD, flags | FD_CLOEXEC);
927     }
928     flags = fcntl(TransportRpmsg_module->unblockEvent, F_GETFD);
929     if (flags != -1) {
930         fcntl(TransportRpmsg_module->unblockEvent, F_SETFD, flags | FD_CLOEXEC);
931     }
933     FD_ZERO(&TransportRpmsg_module->rfds);
934     FD_SET(TransportRpmsg_module->unblockEvent,
935             &TransportRpmsg_module->rfds);
936     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
937     TransportRpmsg_module->nInFds = 0;
939     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
941     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
942             &rpmsgThreadFxn, NULL);
944     if (status < 0) {
945         status = Ipc_E_FAIL;
946         fprintf(stderr, "create: failed to spawn thread\n");
947         goto done;
948     }
949     TransportRpmsg_module->threadStarted = TRUE;
951 done:
952     if (status < 0) {
953         TransportRpmsg_Factory_delete();
954     }
956     return (status);
959 /*
960  *  ======== TransportRpmsg_Factory_delete ========
961  *  Finalize the transport instances
962  */
963 Void TransportRpmsg_Factory_delete(Void)
965     uint64_t event;
968     /* shutdown the message dispatch thread */
969     if (TransportRpmsg_module->threadStarted) {
970         event = TransportRpmsg_Event_SHUTDOWN;
971         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
973         /* wait for dispatch thread to exit */
974         pthread_join(TransportRpmsg_module->threadId, NULL);
975     }
977     /* destroy the mutex object */
978     pthread_mutex_destroy(&TransportRpmsg_module->gate);
980     /* close the client wait event */
981     if (TransportRpmsg_module->waitEvent != -1) {
982         close(TransportRpmsg_module->waitEvent);
983         TransportRpmsg_module->waitEvent = -1;
984     }
986     /* close the dispatch thread unblock event */
987     if (TransportRpmsg_module->unblockEvent != -1) {
988         close(TransportRpmsg_module->unblockEvent);
989         TransportRpmsg_module->unblockEvent = -1;
990     }
992     /* free the instance handle array */
993     if (TransportRpmsg_module->inst != NULL) {
994         free(TransportRpmsg_module->inst);
995         TransportRpmsg_module->inst = NULL;
996     }
998     return;
1001 /*
1002  *  ======== TransportRpmsg_Factory_attach ========
1003  */
1004 Int TransportRpmsg_Factory_attach(UInt16 procId)
1006     Int status = Ipc_S_SUCCESS;
1007     UInt16 clusterId;
1008     TransportRpmsg_Params params;
1009     TransportRpmsg_Handle transport;
1010     IMessageQTransport_Handle iMsgQTrans;
1012     /* cannot attach to yourself */
1013     if (MultiProc_self() == procId) {
1014         status = Ipc_E_INVALIDARG;
1015         goto done;
1016     }
1018     /* processor must be a member of the cluster */
1019     clusterId = procId - MultiProc_getBaseIdOfCluster();
1021     if (clusterId >= MultiProc_getNumProcsInCluster()) {
1022         status = Ipc_E_INVALIDARG;
1023         goto done;
1024     }
1026     /* create transport instance for given processor */
1027     params.rprocId = procId;
1028     transport = TransportRpmsg_create(&params);
1030     if (transport == NULL) {
1031         status = Ipc_E_FAIL;
1032         goto done;
1033     }
1035     /* register transport instance with MessageQ */
1036     iMsgQTrans = TransportRpmsg_upCast(transport);
1037     TransportRpmsg_module->inst[clusterId] = transport;
1038     MessageQ_registerTransport(iMsgQTrans, procId, 0);
1040 done:
1041     return (status);
1044 /*
1045  *  ======== TransportRpmsg_Factory_detach ========
1046  */
1047 Int TransportRpmsg_Factory_detach(UInt16 procId)
1049     Int status = Ipc_S_SUCCESS;
1050     UInt16 clusterId;
1052     /* cannot detach from yourself */
1053     if (MultiProc_self() == procId) {
1054         status = Ipc_E_INVALIDARG;
1055         goto done;
1056     }
1058     /* processor must be a member of the cluster */
1059     clusterId = procId - MultiProc_getBaseIdOfCluster();
1061     if (clusterId >= MultiProc_getNumProcsInCluster()) {
1062         status = Ipc_E_INVALIDARG;
1063         goto done;
1064     }
1066     /* must be attached in order to detach */
1067     if (TransportRpmsg_module->inst[clusterId] == NULL) {
1068         status = Ipc_E_INVALIDSTATE;
1069         goto done;
1070     }
1072     /* unregister from MessageQ, delete the transport instance */
1073     MessageQ_unregisterTransport(procId, 0);
1074     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
1076 done:
1077     return (status);