]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/transport/TransportRpmsg.c
Add FD_CLOEXEC flag for sockets, /dev/mem and LAD pipes
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     struct {
96         int     fd;
97         UInt32  qId;
98     } inFds[1024];
99     int             nInFds;
100     pthread_mutex_t gate;
101     int             unblockEvent;    /* unblock the dispatch thread */
102     int             waitEvent;       /* block the client thread */
103     pthread_t       threadId;        /* ID returned by pthread_create() */
104     Bool            threadStarted;
106     TransportRpmsg_Handle *inst;    /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110     .bind    = TransportRpmsg_bind,
111     .unbind  = TransportRpmsg_unbind,
112     .put     = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116     IMessageQTransport_Object base;
117     Int status;
118     UInt16 rprocId;
119     int numQueues;
120     int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124     .sock = {INVALIDSOCKET},
125     .unblockEvent = -1,
126     .waitEvent = -1,
127     .threadStarted = FALSE,
128     .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135                                Int fd,
136                                UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147     .createFxn = TransportRpmsg_Factory_create,
148     .deleteFxn = TransportRpmsg_Factory_delete,
149     .attachFxn = TransportRpmsg_Factory_attach,
150     .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
158     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159     return ((IMessageQTransport_Handle)&obj->base);
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
164     return ((TransportRpmsg_Handle)base);
167 /*
168  *  ======== TransportRpmsg_create ========
169  */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
172     Int status = MessageQ_S_SUCCESS;
173     TransportRpmsg_Object *obj = NULL;
174     int sock;
175     int flags;
176     UInt16 clusterId;
177     int i;
180     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
182     /* create socket for sending messages to remote processor */
183     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
185     if (sock < 0) {
186         status = Ipc_E_FAIL;
187         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
188                 strerror(errno));
189         goto done;
190     }
191     TransportRpmsg_module->sock[clusterId] = sock;
192     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
194     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
196     if (status < 0) {
197         status = Ipc_E_FAIL;
198         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
199                 errno, strerror(errno), params->rprocId);
200         goto done;
201     }
203     /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
204     flags = fcntl(sock, F_GETFD);
205     if (flags != -1) {
206         fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
207     }
209     /* create the instance object */
210     obj = calloc(1, sizeof(TransportRpmsg_Object));
212     if (obj == NULL) {
213         status = Ipc_E_MEMORY;
214         goto done;
215     }
217     /* initialize the instance */
218     obj->base.base.interfaceType = IMessageQTransport_TypeId;
219     obj->base.fxns = &TransportRpmsg_fxns;
220     obj->rprocId = params->rprocId;
221     obj->numQueues = TransportRpmsg_GROWSIZE;
223     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
225     if (obj->qIndexToFd == NULL) {
226         status = Ipc_E_MEMORY;
227         goto done;
228     }
230     /* must initialize array */
231     for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
232         obj->qIndexToFd[i] = -1;
233     }
235 done:
236     if (status < 0) {
237         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
238     }
240     return (TransportRpmsg_Handle)obj;
243 /*
244  *  ======== TransportRpmsg_delete ========
245  */
246 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
248     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
249     UInt16 clusterId;
250     int sock;
253     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
255     /* close the socket for the given transport instance */
256     sock = TransportRpmsg_module->sock[clusterId];
257     if (sock != INVALIDSOCKET) {
258         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
259         close(sock);
260     }
261     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
263     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
264         free(obj->qIndexToFd);
265         obj->qIndexToFd = NULL;
266     }
268     if (obj != NULL) {
269         free(obj);
270         obj = NULL;
271     }
273     *pHandle = NULL;
276 /*
277  *  ======== TransportRpmsg_bind ========
278  */
279 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
281     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
282     UInt16 queuePort = queueId & 0x0000ffff;
283     int fd;
284     int flags;
285     int err;
286     uint64_t event;
287     UInt16 rprocId;
288     pthread_t tid;
289     Int status = MessageQ_S_SUCCESS;
291     tid = pthread_self();
292     rprocId = obj->rprocId;
294     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
295             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
297     pthread_mutex_lock(&TransportRpmsg_module->gate);
299     /*  Check if binding already exists.
300      *
301      *  There is a race condition between a thread calling MessageQ_create
302      *  and another thread calling Ipc_attach. Must make sure we don't bind
303      *  the same queue twice.
304      */
305     if (queueIndexToFd(obj, queueId) != -1) {
306         goto done;
307     }
309     /*  Create the socket to receive messages for this messageQ. */
310     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
311     if (fd < 0) {
312         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
313                 errno, strerror(errno));
314         status = MessageQ_E_OSFAILURE;
315         goto done;
316     }
317     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
318             (unsigned int)tid);
320     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
321     if (err < 0) {
322         /* don't hard-printf since this is no longer fatal */
323         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
324                       errno, strerror(errno));
325         close(fd);
326         status = MessageQ_E_OSFAILURE;
327         goto done;
328     }
330     /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
331     flags = fcntl(fd, F_GETFD);
332     if (flags != -1) {
333         fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
334     }
336     /*  pause the dispatch thread */
337     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
338             (unsigned int)tid);
339     event = TransportRpmsg_Event_PAUSE;
340     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
342     /* wait for ACK event */
343     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
344     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
345             (int)event, (unsigned int)tid);
347     /* add to our fat fd array and update select() parameters */
348     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
349     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
350     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
351     FD_SET(fd, &TransportRpmsg_module->rfds);
352     bindFdToQueueIndex(obj, fd, queuePort);
354     /* release the dispatch thread */
355     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
356             (unsigned int)tid);
357     event = TransportRpmsg_Event_CONTINUE;
358     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
360 done:
361     pthread_mutex_unlock(&TransportRpmsg_module->gate);
363     return (status);
366 /*
367  *  ======== TransportRpmsg_unbind ========
368  */
369 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
371     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
372     UInt16 queuePort = queueId & 0x0000ffff;
373     uint64_t event;
374     Int    status = MessageQ_S_SUCCESS;
375     int    maxFd;
376     int    fd;
377     int    i;
378     int    j;
380     pthread_mutex_lock(&TransportRpmsg_module->gate);
382     /*  pause the dispatch thread */
383     event = TransportRpmsg_Event_PAUSE;
384     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
386     /* wait for ACK event */
387     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
389     /*  Check if binding already deleted.
390      *
391      *  There is a race condition between a thread calling MessageQ_delete
392      *  and another thread calling Ipc_detach. Must make sure we don't unbind
393      *  the same queue twice.
394      */
395     if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
396         goto done;
397     }
398     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
400     /* guarenteed to work because queueIndexToFd above succeeded */
401     unbindQueueIndex(obj, queuePort);
403     /* remove from input fd array */
404     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
405         if (TransportRpmsg_module->inFds[i].fd == fd) {
406             TransportRpmsg_module->nInFds--;
408             /* shift subsequent elements down */
409             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
410                 TransportRpmsg_module->inFds[j] =
411                         TransportRpmsg_module->inFds[j + 1];
412             }
413             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
414             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
415             break;
416         }
417     }
419     /* remove fd from the descriptor set, compute new max value */
420     FD_CLR(fd, &TransportRpmsg_module->rfds);
421     if (fd == TransportRpmsg_module->maxFd) {
422         /* find new max fd */
423         maxFd = TransportRpmsg_module->unblockEvent;
424         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
425             maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
426         }
427         TransportRpmsg_module->maxFd = maxFd;
428     }
430     close(fd);
432     /* release the dispatch thread */
433     event = TransportRpmsg_Event_CONTINUE;
434     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
436 done:
437     pthread_mutex_unlock(&TransportRpmsg_module->gate);
439     return (status);
442 /*
443  *  ======== TransportRpmsg_put ========
444  */
445 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
447     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
448     Int     status    = TRUE;
449     int     sock;
450     int     err;
451     UInt16  clusterId;
453     /*
454      * Retrieve the socket for the AF_SYSLINK protocol associated with this
455      * transport.
456      */
457     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
458     sock = TransportRpmsg_module->sock[clusterId];
459     if (!sock) {
460         return FALSE;
461     }
463     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
465     err = send(sock, msg, msg->msgSize, 0);
466     if (err < 0) {
467         printf("TransportRpmsg_put: send failed: %d (%s)\n",
468                errno, strerror(errno));
469         status = FALSE;
471         goto exit;
472     }
474     /*
475      * Free the message, as this is a copy transport, we maintain MessageQ
476      * semantics.
477      */
478     MessageQ_free(msg);
480 exit:
481     return status;
484 /*
485  *  ======== TransportRpmsg_control ========
486  */
487 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
489     return FALSE;
492 /*
493  *  ======== rpmsgThreadFxn ========
494  */
495 void *rpmsgThreadFxn(void *arg)
497     Int      status = MessageQ_S_SUCCESS;
498     Int      tmpStatus;
499     int      retval;
500     uint64_t event;
501     fd_set   rfds;
502     int      maxFd;
503     int      nfds;
504     MessageQ_Msg     retMsg;
505     MessageQ_QueueId queueId;
506     MessageQ_Handle handle;
507     Bool run = TRUE;
508     int i;
509     int j;
510     int fd;
513     while (run) {
514         maxFd = TransportRpmsg_module->maxFd;
515         rfds = TransportRpmsg_module->rfds;
516         nfds = TransportRpmsg_module->nInFds;
518         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
519                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
521         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
523         /* if error, try again */
524         if (retval < 0) {
525             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
526             continue;
527         }
529         /* dispatch all pending messages, do this first */
530         for (i = 0; i < nfds; i++) {
531             fd = TransportRpmsg_module->inFds[i].fd;
533             if (FD_ISSET(fd, &rfds)) {
534                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
535                         TransportRpmsg_module->inFds[i].fd);
537                 /* transport input fd was signalled: get the message */
538                 tmpStatus = transportGet(fd, &retMsg);
539                 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
540                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
541                            " returned %d\n", fd, tmpStatus);
542                 }
543                 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
544                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
545                            " returned %d\n", fd, tmpStatus);
547                     pthread_mutex_lock(&TransportRpmsg_module->gate);
549                     /*
550                      * Don't close(fd) at this time since it will get closed
551                      * later when MessageQ_delete() is called in response to
552                      * this failure.  Just remove fd's bit from the select mask
553                      * 'rfds' for now, but don't remove it from inFds[].
554                      */
555                     FD_CLR(fd, &TransportRpmsg_module->rfds);
556                     if (fd == TransportRpmsg_module->maxFd) {
557                         /* find new max fd */
558                         maxFd = TransportRpmsg_module->unblockEvent;
559                         for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
560                             maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
561                                          maxFd);
562                         }
563                         TransportRpmsg_module->maxFd = maxFd;
564                     }
565                     queueId = TransportRpmsg_module->inFds[i].qId;
567                     pthread_mutex_unlock(&TransportRpmsg_module->gate);
569                     handle = MessageQ_getLocalHandle(queueId);
571                     PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
572                                   "%p (queueId 0x%x)...\n", handle, queueId)
574                     if (handle != NULL) {
575                         MessageQ_shutdown(handle);
576                     }
577                     else {
578                         printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
579                                "returned NULL, can't shutdown\n", queueId);
580                     }
581                 }
582                 else {
583                     queueId = MessageQ_getDstQueue(retMsg);
584                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
585                             "delivering to queueId 0x%x\n", queueId)
586                     MessageQ_put(queueId, retMsg);
587                 }
588             }
589         }
591         /* check for events */
592         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
594             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
596             do {
597                 if (event & TransportRpmsg_Event_SHUTDOWN) {
598                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
599                     run = FALSE;
600                     break; /* highest priority, stop processing events */
601                 }
602                 if (event & TransportRpmsg_Event_CONTINUE) {
603                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
604                             (int)event);
605                     event &= ~TransportRpmsg_Event_CONTINUE;
606                 }
607                 if (event & TransportRpmsg_Event_PAUSE) {
608                     /*  Our event was signalled by TransportRpmsg_bind()
609                      *  or TransportRpmsg_unbind() to tell us that the set
610                      *  of file descriptors has changed.
611                      */
612                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
613                     /* send the acknowledgement */
614                     event = TransportRpmsg_Event_ACK;
615                     write(TransportRpmsg_module->waitEvent, &event,
616                             sizeof(event));
617                     /* now wait to be released */
618                     read(TransportRpmsg_module->unblockEvent, &event,
619                             sizeof(event));
620                 }
621             } while (event != 0);
622         }
623     }
625     return (void *)status;
628 /*
629  * ======== transportGet ========
630  *  Retrieve a message waiting in the socket's queue.
631  */
632 static Int transportGet(int sock, MessageQ_Msg *retMsg)
634     Int           status    = MessageQ_S_SUCCESS;
635     MessageQ_Msg  msg;
636     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
637     unsigned int  len;
638     int           byteCount;
640     /*
641      * We have no way of peeking to see what message size we'll get, so we
642      * allocate a message of max size to receive contents from the rpmsg socket
643      * (currently, a copy transport)
644      */
645     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
646     if (!msg) {
647         status = MessageQ_E_MEMORY;
648         goto exit;
649     }
651     memset(&fromAddr, 0, sizeof (fromAddr));
652     len = sizeof (fromAddr);
654     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
655                          (struct sockaddr *)&fromAddr, &len);
656     if (len != sizeof (fromAddr)) {
657         printf("recvfrom: got bad addr len (%d)\n", len);
658         status = MessageQ_E_FAIL;
659         goto freeMsg;
660     }
661     if (byteCount < 0) {
662         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
663         if (errno == ENOLINK) {
664             status = MessageQ_E_SHUTDOWN;
665         }
666         else {
667             status = MessageQ_E_FAIL;
668         }
669         goto freeMsg;
670     }
671     else {
672          /*
673           * Update the allocated message size (even though this may waste
674           * space when the actual message is smaller than the maximum rpmsg
675           * size, the message will be freed soon anyway, and it avoids an
676           * extra copy).
677           */
678          msg->msgSize = byteCount;
680          /*
681           * If the message received was statically allocated, reset the
682           * heapId, so the app can free it.
683           */
684          if (msg->heapId == MessageQ_STATICMSG)  {
685              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
686          }
687     }
689     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
690     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
691             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
692     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
693             msg->msgSize)
695     *retMsg = msg;
697     goto exit;
699 freeMsg:
700     MessageQ_free(msg);
702 exit:
703     return status;
706 /*
707  *  ======== bindFdToQueueIndex ========
708  *
709  *  Precondition: caller must be inside the module gate
710  */
711 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
713     Int *queues;
714     Int *oldQueues;
715     UInt oldSize;
716     UInt newCount;
717     UInt queueIndex;
718     int i;
720     /* subtract port offset from queue index */
721     queueIndex = queuePort - MessageQ_PORTOFFSET;
723     if (queueIndex >= obj->numQueues) {
724         newCount = queueIndex + TransportRpmsg_GROWSIZE;
725         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
726                 newCount);
728         /* allocate larger table */
729         oldSize = obj->numQueues * sizeof(int);
730         queues = calloc(newCount, sizeof(int));
732         /* copy contents from old table int new table */
733         memcpy(queues, obj->qIndexToFd, oldSize);
735         /* initialize remaining entries of new (larger) table */
736         for (i = obj->numQueues; i < newCount; i++) {
737             queues[i] = -1;
738         }
740         /* swap in new table, delete old table */
741         oldQueues = obj->qIndexToFd;
742         obj->qIndexToFd = queues;
743         obj->numQueues = newCount;
744         free(oldQueues);
745     }
747     /* add new entry */
748     obj->qIndexToFd[queueIndex] = fd;
751 /*
752  *  ======== unbindQueueIndex ========
753  *
754  *  Precondition: caller must be inside the module gate
755  */
756 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
758     UInt queueIndex;
760     /* subtract port offset from queue index */
761     queueIndex = queuePort - MessageQ_PORTOFFSET;
763     /* clear table entry */
764     obj->qIndexToFd[queueIndex] = -1;
767 /*
768  *  ======== queueIndexToFd ========
769  *
770  *  Precondition: caller must be inside the module gate
771  */
772 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
774     UInt queueIndex;
776     /* subtract port offset from queue index */
777     queueIndex = queuePort - MessageQ_PORTOFFSET;
779     /* return file descriptor */
780     return (obj->qIndexToFd[queueIndex]);
783 /*
784  *  ======== TransportRpmsg_Factory_create ========
785  *  Create the transport instances
786  *
787  *  Attach to all remote processors. For now, must attach to
788  *  at least one to tolerate MessageQ_E_RESOURCE failures.
789  *
790  *  This function implements the IPC Factory interface, so it
791  *  returns Ipc status codes.
792  */
793 Int TransportRpmsg_Factory_create(Void)
795     Int status = Ipc_S_SUCCESS;
796     Int i;
797     UInt16 clusterSize;
798     TransportRpmsg_Handle *inst;
799     int flags;
802     /* needed to enumerate processors in cluster */
803     clusterSize = MultiProc_getNumProcsInCluster();
805     /* allocate the instance array */
806     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
808     if (inst == NULL) {
809         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
810         status = Ipc_E_MEMORY;
811         goto done;
812     }
814     for (i = 0; i < clusterSize; i++) {
815         inst[i] = NULL;
816     }
818     TransportRpmsg_module->inst = inst;
820     /* counter event object for passing commands to dispatch thread */
821     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
823     if (TransportRpmsg_module->unblockEvent == -1) {
824         printf("create: unblock event failed: %d (%s)\n",
825                errno, strerror(errno));
826         status = Ipc_E_FAIL;
827         goto done;
828     }
830     PRINTVERBOSE1("create: created unblock event %d\n",
831             TransportRpmsg_module->unblockEvent)
833     /* semaphore event object for acknowledging client thread */
834     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
836     if (TransportRpmsg_module->waitEvent == -1) {
837         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
838         status = Ipc_E_FAIL;
839         goto done;
840     }
842     PRINTVERBOSE1("create: created wait event %d\n",
843             TransportRpmsg_module->waitEvent)
845     /* make sure eventfds don't exist for 'fork() -> exec*()'ed child */
846     flags = fcntl(TransportRpmsg_module->waitEvent, F_GETFD);
847     if (flags != -1) {
848         fcntl(TransportRpmsg_module->waitEvent, F_SETFD, flags | FD_CLOEXEC);
849     }
850     flags = fcntl(TransportRpmsg_module->unblockEvent, F_GETFD);
851     if (flags != -1) {
852         fcntl(TransportRpmsg_module->unblockEvent, F_SETFD, flags | FD_CLOEXEC);
853     }
855     FD_ZERO(&TransportRpmsg_module->rfds);
856     FD_SET(TransportRpmsg_module->unblockEvent,
857             &TransportRpmsg_module->rfds);
858     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
859     TransportRpmsg_module->nInFds = 0;
861     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
863     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
864             &rpmsgThreadFxn, NULL);
866     if (status < 0) {
867         status = Ipc_E_FAIL;
868         printf("attach: failed to spawn thread\n");
869         goto done;
870     }
871     TransportRpmsg_module->threadStarted = TRUE;
873 done:
874     if (status < 0) {
875         TransportRpmsg_Factory_delete();
876     }
878     return (status);
881 /*
882  *  ======== TransportRpmsg_Factory_delete ========
883  *  Finalize the transport instances
884  */
885 Void TransportRpmsg_Factory_delete(Void)
887     uint64_t event;
890     /* shutdown the message dispatch thread */
891     if (TransportRpmsg_module->threadStarted) {
892         event = TransportRpmsg_Event_SHUTDOWN;
893         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
895         /* wait for dispatch thread to exit */
896         pthread_join(TransportRpmsg_module->threadId, NULL);
897     }
899     /* destroy the mutex object */
900     pthread_mutex_destroy(&TransportRpmsg_module->gate);
902     /* close the client wait event */
903     if (TransportRpmsg_module->waitEvent != -1) {
904         close(TransportRpmsg_module->waitEvent);
905         TransportRpmsg_module->waitEvent = -1;
906     }
908     /* close the dispatch thread unblock event */
909     if (TransportRpmsg_module->unblockEvent != -1) {
910         close(TransportRpmsg_module->unblockEvent);
911         TransportRpmsg_module->unblockEvent = -1;
912     }
914     /* free the instance handle array */
915     if (TransportRpmsg_module->inst != NULL) {
916         free(TransportRpmsg_module->inst);
917         TransportRpmsg_module->inst = NULL;
918     }
920     return;
923 /*
924  *  ======== TransportRpmsg_Factory_attach ========
925  */
926 Int TransportRpmsg_Factory_attach(UInt16 procId)
928     Int status = Ipc_S_SUCCESS;
929     UInt16 clusterId;
930     TransportRpmsg_Params params;
931     TransportRpmsg_Handle transport;
932     IMessageQTransport_Handle iMsgQTrans;
934     /* cannot attach to yourself */
935     if (MultiProc_self() == procId) {
936         status = Ipc_E_INVALIDARG;
937         goto done;
938     }
940     /* processor must be a member of the cluster */
941     clusterId = procId - MultiProc_getBaseIdOfCluster();
943     if (clusterId >= MultiProc_getNumProcsInCluster()) {
944         status = Ipc_E_INVALIDARG;
945         goto done;
946     }
948     /* create transport instance for given processor */
949     params.rprocId = procId;
950     transport = TransportRpmsg_create(&params);
952     if (transport == NULL) {
953         status = Ipc_E_FAIL;
954         goto done;
955     }
957     /* register transport instance with MessageQ */
958     iMsgQTrans = TransportRpmsg_upCast(transport);
959     TransportRpmsg_module->inst[clusterId] = transport;
960     MessageQ_registerTransport(iMsgQTrans, procId, 0);
962 done:
963     return (status);
966 /*
967  *  ======== TransportRpmsg_Factory_detach ========
968  */
969 Int TransportRpmsg_Factory_detach(UInt16 procId)
971     Int status = Ipc_S_SUCCESS;
972     UInt16 clusterId;
974     /* cannot detach from yourself */
975     if (MultiProc_self() == procId) {
976         status = Ipc_E_INVALIDARG;
977         goto done;
978     }
980     /* processor must be a member of the cluster */
981     clusterId = procId - MultiProc_getBaseIdOfCluster();
983     if (clusterId >= MultiProc_getNumProcsInCluster()) {
984         status = Ipc_E_INVALIDARG;
985         goto done;
986     }
988     /* must be attached in order to detach */
989     if (TransportRpmsg_module->inst[clusterId] == NULL) {
990         status = Ipc_E_INVALIDSTATE;
991         goto done;
992     }
994     /* unregister from MessageQ, delete the transport instance */
995     MessageQ_unregisterTransport(procId, 0);
996     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
998 done:
999     return (status);