]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/transport/TransportRpmsg.c
Linux/TransportRpmsg: Shutdown only on MessageQ_E_SHUTDOWN
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     struct {
96         int     fd;
97         UInt32  qId;
98     } inFds[1024];
99     int             nInFds;
100     pthread_mutex_t gate;
101     int             unblockEvent;    /* unblock the dispatch thread */
102     int             waitEvent;       /* block the client thread */
103     pthread_t       threadId;        /* ID returned by pthread_create() */
104     Bool            threadStarted;
106     TransportRpmsg_Handle *inst;    /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110     .bind    = TransportRpmsg_bind,
111     .unbind  = TransportRpmsg_unbind,
112     .put     = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116     IMessageQTransport_Object base;
117     Int status;
118     UInt16 rprocId;
119     int numQueues;
120     int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124     .sock = {INVALIDSOCKET},
125     .unblockEvent = -1,
126     .waitEvent = -1,
127     .threadStarted = FALSE,
128     .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135                                Int fd,
136                                UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147     .createFxn = TransportRpmsg_Factory_create,
148     .deleteFxn = TransportRpmsg_Factory_delete,
149     .attachFxn = TransportRpmsg_Factory_attach,
150     .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
158     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159     return ((IMessageQTransport_Handle)&obj->base);
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
164     return ((TransportRpmsg_Handle)base);
167 /*
168  *  ======== TransportRpmsg_create ========
169  */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
172     Int status = MessageQ_S_SUCCESS;
173     TransportRpmsg_Object *obj = NULL;
174     int sock;
175     UInt16 clusterId;
176     int i;
179     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
181     /* create socket for sending messages to remote processor */
182     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
184     if (sock < 0) {
185         status = Ipc_E_FAIL;
186         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
187                 strerror(errno));
188         goto done;
189     }
190     TransportRpmsg_module->sock[clusterId] = sock;
191     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
193     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
195     if (status < 0) {
196         status = Ipc_E_FAIL;
197         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
198                 errno, strerror(errno), params->rprocId);
199         goto done;
200     }
202     /* create the instance object */
203     obj = calloc(1, sizeof(TransportRpmsg_Object));
205     if (obj == NULL) {
206         status = Ipc_E_MEMORY;
207         goto done;
208     }
210     /* initialize the instance */
211     obj->base.base.interfaceType = IMessageQTransport_TypeId;
212     obj->base.fxns = &TransportRpmsg_fxns;
213     obj->rprocId = params->rprocId;
214     obj->numQueues = TransportRpmsg_GROWSIZE;
216     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
218     if (obj->qIndexToFd == NULL) {
219         status = Ipc_E_MEMORY;
220         goto done;
221     }
223     /* must initialize array */
224     for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
225         obj->qIndexToFd[i] = -1;
226     }
228 done:
229     if (status < 0) {
230         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
231     }
233     return (TransportRpmsg_Handle)obj;
236 /*
237  *  ======== TransportRpmsg_delete ========
238  */
239 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
241     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
242     UInt16 clusterId;
243     int sock;
246     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
248     /* close the socket for the given transport instance */
249     sock = TransportRpmsg_module->sock[clusterId];
250     if (sock != INVALIDSOCKET) {
251         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
252         close(sock);
253     }
254     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
256     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
257         free(obj->qIndexToFd);
258         obj->qIndexToFd = NULL;
259     }
261     if (obj != NULL) {
262         free(obj);
263         obj = NULL;
264     }
266     *pHandle = NULL;
269 /*
270  *  ======== TransportRpmsg_bind ========
271  */
272 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
274     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
275     UInt16 queuePort = queueId & 0x0000ffff;
276     int fd;
277     int err;
278     uint64_t event;
279     UInt16 rprocId;
280     pthread_t tid;
281     Int status = MessageQ_S_SUCCESS;
283     tid = pthread_self();
284     rprocId = obj->rprocId;
286     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
287             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
289     pthread_mutex_lock(&TransportRpmsg_module->gate);
291     /*  Check if binding already exists.
292      *
293      *  There is a race condition between a thread calling MessageQ_create
294      *  and another thread calling Ipc_attach. Must make sure we don't bind
295      *  the same queue twice.
296      */
297     if (queueIndexToFd(obj, queueId) != -1) {
298         goto done;
299     }
301     /*  Create the socket to receive messages for this messageQ. */
302     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
303     if (fd < 0) {
304         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
305                 errno, strerror(errno));
306         status = MessageQ_E_OSFAILURE;
307         goto done;
308     }
309     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
310             (unsigned int)tid);
312     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
313     if (err < 0) {
314         /* don't hard-printf since this is no longer fatal */
315         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
316                       errno, strerror(errno));
317         close(fd);
318         status = MessageQ_E_OSFAILURE;
319         goto done;
320     }
322     /*  pause the dispatch thread */
323     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
324             (unsigned int)tid);
325     event = TransportRpmsg_Event_PAUSE;
326     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
328     /* wait for ACK event */
329     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
330     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
331             (int)event, (unsigned int)tid);
333     /* add to our fat fd array and update select() parameters */
334     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
335     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
336     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
337     FD_SET(fd, &TransportRpmsg_module->rfds);
338     bindFdToQueueIndex(obj, fd, queuePort);
340     /* release the dispatch thread */
341     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
342             (unsigned int)tid);
343     event = TransportRpmsg_Event_CONTINUE;
344     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
346 done:
347     pthread_mutex_unlock(&TransportRpmsg_module->gate);
349     return (status);
352 /*
353  *  ======== TransportRpmsg_unbind ========
354  */
355 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
357     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
358     UInt16 queuePort = queueId & 0x0000ffff;
359     uint64_t event;
360     Int    status = MessageQ_S_SUCCESS;
361     int    maxFd;
362     int    fd;
363     int    i;
364     int    j;
366     pthread_mutex_lock(&TransportRpmsg_module->gate);
368     /*  pause the dispatch thread */
369     event = TransportRpmsg_Event_PAUSE;
370     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
372     /* wait for ACK event */
373     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
375     /*  Check if binding already deleted.
376      *
377      *  There is a race condition between a thread calling MessageQ_delete
378      *  and another thread calling Ipc_detach. Must make sure we don't unbind
379      *  the same queue twice.
380      */
381     if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
382         goto done;
383     }
384     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
386     /* guarenteed to work because queueIndexToFd above succeeded */
387     unbindQueueIndex(obj, queuePort);
389     /* remove from input fd array */
390     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
391         if (TransportRpmsg_module->inFds[i].fd == fd) {
392             TransportRpmsg_module->nInFds--;
394             /* shift subsequent elements down */
395             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
396                 TransportRpmsg_module->inFds[j] =
397                         TransportRpmsg_module->inFds[j + 1];
398             }
399             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
400             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
401             break;
402         }
403     }
405     /* remove fd from the descriptor set, compute new max value */
406     FD_CLR(fd, &TransportRpmsg_module->rfds);
407     if (fd == TransportRpmsg_module->maxFd) {
408         /* find new max fd */
409         maxFd = TransportRpmsg_module->unblockEvent;
410         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
411             maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
412         }
413         TransportRpmsg_module->maxFd = maxFd;
414     }
416     close(fd);
418     /* release the dispatch thread */
419     event = TransportRpmsg_Event_CONTINUE;
420     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
422 done:
423     pthread_mutex_unlock(&TransportRpmsg_module->gate);
425     return (status);
428 /*
429  *  ======== TransportRpmsg_put ========
430  */
431 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
433     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
434     Int     status    = TRUE;
435     int     sock;
436     int     err;
437     UInt16  clusterId;
439     /*
440      * Retrieve the socket for the AF_SYSLINK protocol associated with this
441      * transport.
442      */
443     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
444     sock = TransportRpmsg_module->sock[clusterId];
445     if (!sock) {
446         return FALSE;
447     }
449     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
451     err = send(sock, msg, msg->msgSize, 0);
452     if (err < 0) {
453         printf("TransportRpmsg_put: send failed: %d (%s)\n",
454                errno, strerror(errno));
455         status = FALSE;
457         goto exit;
458     }
460     /*
461      * Free the message, as this is a copy transport, we maintain MessageQ
462      * semantics.
463      */
464     MessageQ_free(msg);
466 exit:
467     return status;
470 /*
471  *  ======== TransportRpmsg_control ========
472  */
473 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
475     return FALSE;
478 /*
479  *  ======== rpmsgThreadFxn ========
480  */
481 void *rpmsgThreadFxn(void *arg)
483     Int      status = MessageQ_S_SUCCESS;
484     Int      tmpStatus;
485     int      retval;
486     uint64_t event;
487     fd_set   rfds;
488     int      maxFd;
489     int      nfds;
490     MessageQ_Msg     retMsg;
491     MessageQ_QueueId queueId;
492     MessageQ_Handle handle;
493     Bool run = TRUE;
494     int i;
495     int j;
496     int fd;
499     while (run) {
500         maxFd = TransportRpmsg_module->maxFd;
501         rfds = TransportRpmsg_module->rfds;
502         nfds = TransportRpmsg_module->nInFds;
504         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
505                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
507         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
509         /* if error, try again */
510         if (retval < 0) {
511             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
512             continue;
513         }
515         /* dispatch all pending messages, do this first */
516         for (i = 0; i < nfds; i++) {
517             fd = TransportRpmsg_module->inFds[i].fd;
519             if (FD_ISSET(fd, &rfds)) {
520                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
521                         TransportRpmsg_module->inFds[i].fd);
523                 /* transport input fd was signalled: get the message */
524                 tmpStatus = transportGet(fd, &retMsg);
525                 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
526                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
527                            " returned %d\n", fd, tmpStatus);
528                 }
529                 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
530                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
531                            " returned %d\n", fd, tmpStatus);
533                     pthread_mutex_lock(&TransportRpmsg_module->gate);
535                     /*
536                      * Don't close(fd) at this time since it will get closed
537                      * later when MessageQ_delete() is called in response to
538                      * this failure.  Just remove fd's bit from the select mask
539                      * 'rfds' for now, but don't remove it from inFds[].
540                      */
541                     FD_CLR(fd, &TransportRpmsg_module->rfds);
542                     if (fd == TransportRpmsg_module->maxFd) {
543                         /* find new max fd */
544                         maxFd = TransportRpmsg_module->unblockEvent;
545                         for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
546                             maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
547                                          maxFd);
548                         }
549                         TransportRpmsg_module->maxFd = maxFd;
550                     }
551                     queueId = TransportRpmsg_module->inFds[i].qId;
553                     pthread_mutex_unlock(&TransportRpmsg_module->gate);
555                     handle = MessageQ_getLocalHandle(queueId);
557                     PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
558                                   "%p (queueId 0x%x)...\n", handle, queueId)
560                     if (handle != NULL) {
561                         MessageQ_shutdown(handle);
562                     }
563                     else {
564                         printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
565                                "returned NULL, can't shutdown\n", queueId);
566                     }
567                 }
568                 else {
569                     queueId = MessageQ_getDstQueue(retMsg);
570                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
571                             "delivering to queueId 0x%x\n", queueId)
572                     MessageQ_put(queueId, retMsg);
573                 }
574             }
575         }
577         /* check for events */
578         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
580             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
582             do {
583                 if (event & TransportRpmsg_Event_SHUTDOWN) {
584                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
585                     run = FALSE;
586                     break; /* highest priority, stop processing events */
587                 }
588                 if (event & TransportRpmsg_Event_CONTINUE) {
589                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
590                             (int)event);
591                     event &= ~TransportRpmsg_Event_CONTINUE;
592                 }
593                 if (event & TransportRpmsg_Event_PAUSE) {
594                     /*  Our event was signalled by TransportRpmsg_bind()
595                      *  or TransportRpmsg_unbind() to tell us that the set
596                      *  of file descriptors has changed.
597                      */
598                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
599                     /* send the acknowledgement */
600                     event = TransportRpmsg_Event_ACK;
601                     write(TransportRpmsg_module->waitEvent, &event,
602                             sizeof(event));
603                     /* now wait to be released */
604                     read(TransportRpmsg_module->unblockEvent, &event,
605                             sizeof(event));
606                 }
607             } while (event != 0);
608         }
609     }
611     return (void *)status;
614 /*
615  * ======== transportGet ========
616  *  Retrieve a message waiting in the socket's queue.
617  */
618 static Int transportGet(int sock, MessageQ_Msg *retMsg)
620     Int           status    = MessageQ_S_SUCCESS;
621     MessageQ_Msg  msg;
622     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
623     unsigned int  len;
624     int           byteCount;
626     /*
627      * We have no way of peeking to see what message size we'll get, so we
628      * allocate a message of max size to receive contents from the rpmsg socket
629      * (currently, a copy transport)
630      */
631     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
632     if (!msg) {
633         status = MessageQ_E_MEMORY;
634         goto exit;
635     }
637     memset(&fromAddr, 0, sizeof (fromAddr));
638     len = sizeof (fromAddr);
640     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
641                          (struct sockaddr *)&fromAddr, &len);
642     if (len != sizeof (fromAddr)) {
643         printf("recvfrom: got bad addr len (%d)\n", len);
644         status = MessageQ_E_FAIL;
645         goto freeMsg;
646     }
647     if (byteCount < 0) {
648         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
649         if (errno == ENOLINK) {
650             status = MessageQ_E_SHUTDOWN;
651         }
652         else {
653             status = MessageQ_E_FAIL;
654         }
655         goto freeMsg;
656     }
657     else {
658          /*
659           * Update the allocated message size (even though this may waste
660           * space when the actual message is smaller than the maximum rpmsg
661           * size, the message will be freed soon anyway, and it avoids an
662           * extra copy).
663           */
664          msg->msgSize = byteCount;
666          /*
667           * If the message received was statically allocated, reset the
668           * heapId, so the app can free it.
669           */
670          if (msg->heapId == MessageQ_STATICMSG)  {
671              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
672          }
673     }
675     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
676     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
677             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
678     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
679             msg->msgSize)
681     *retMsg = msg;
683     goto exit;
685 freeMsg:
686     MessageQ_free(msg);
688 exit:
689     return status;
692 /*
693  *  ======== bindFdToQueueIndex ========
694  *
695  *  Precondition: caller must be inside the module gate
696  */
697 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
699     Int *queues;
700     Int *oldQueues;
701     UInt oldSize;
702     UInt newCount;
703     UInt queueIndex;
704     int i;
706     /* subtract port offset from queue index */
707     queueIndex = queuePort - MessageQ_PORTOFFSET;
709     if (queueIndex >= obj->numQueues) {
710         newCount = queueIndex + TransportRpmsg_GROWSIZE;
711         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
712                 newCount);
714         /* allocate larger table */
715         oldSize = obj->numQueues * sizeof(int);
716         queues = calloc(newCount, sizeof(int));
718         /* copy contents from old table int new table */
719         memcpy(queues, obj->qIndexToFd, oldSize);
721         /* initialize remaining entries of new (larger) table */
722         for (i = obj->numQueues; i < newCount; i++) {
723             queues[i] = -1;
724         }
726         /* swap in new table, delete old table */
727         oldQueues = obj->qIndexToFd;
728         obj->qIndexToFd = queues;
729         obj->numQueues = newCount;
730         free(oldQueues);
731     }
733     /* add new entry */
734     obj->qIndexToFd[queueIndex] = fd;
737 /*
738  *  ======== unbindQueueIndex ========
739  *
740  *  Precondition: caller must be inside the module gate
741  */
742 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
744     UInt queueIndex;
746     /* subtract port offset from queue index */
747     queueIndex = queuePort - MessageQ_PORTOFFSET;
749     /* clear table entry */
750     obj->qIndexToFd[queueIndex] = -1;
753 /*
754  *  ======== queueIndexToFd ========
755  *
756  *  Precondition: caller must be inside the module gate
757  */
758 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
760     UInt queueIndex;
762     /* subtract port offset from queue index */
763     queueIndex = queuePort - MessageQ_PORTOFFSET;
765     /* return file descriptor */
766     return (obj->qIndexToFd[queueIndex]);
769 /*
770  *  ======== TransportRpmsg_Factory_create ========
771  *  Create the transport instances
772  *
773  *  Attach to all remote processors. For now, must attach to
774  *  at least one to tolerate MessageQ_E_RESOURCE failures.
775  *
776  *  This function implements the IPC Factory interface, so it
777  *  returns Ipc status codes.
778  */
779 Int TransportRpmsg_Factory_create(Void)
781     Int status = Ipc_S_SUCCESS;
782     Int i;
783     UInt16 clusterSize;
784     TransportRpmsg_Handle *inst;
787     /* needed to enumerate processors in cluster */
788     clusterSize = MultiProc_getNumProcsInCluster();
790     /* allocate the instance array */
791     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
793     if (inst == NULL) {
794         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
795         status = Ipc_E_MEMORY;
796         goto done;
797     }
799     for (i = 0; i < clusterSize; i++) {
800         inst[i] = NULL;
801     }
803     TransportRpmsg_module->inst = inst;
805     /* counter event object for passing commands to dispatch thread */
806     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
808     if (TransportRpmsg_module->unblockEvent == -1) {
809         printf("create: unblock event failed: %d (%s)\n",
810                errno, strerror(errno));
811         status = Ipc_E_FAIL;
812         goto done;
813     }
815     PRINTVERBOSE1("create: created unblock event %d\n",
816             TransportRpmsg_module->unblockEvent)
818     /* semaphore event object for acknowledging client thread */
819     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
821     if (TransportRpmsg_module->waitEvent == -1) {
822         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
823         status = Ipc_E_FAIL;
824         goto done;
825     }
827     PRINTVERBOSE1("create: created wait event %d\n",
828             TransportRpmsg_module->waitEvent)
830     FD_ZERO(&TransportRpmsg_module->rfds);
831     FD_SET(TransportRpmsg_module->unblockEvent,
832             &TransportRpmsg_module->rfds);
833     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
834     TransportRpmsg_module->nInFds = 0;
836     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
838     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
839             &rpmsgThreadFxn, NULL);
841     if (status < 0) {
842         status = Ipc_E_FAIL;
843         printf("attach: failed to spawn thread\n");
844         goto done;
845     }
846     TransportRpmsg_module->threadStarted = TRUE;
848 done:
849     if (status < 0) {
850         TransportRpmsg_Factory_delete();
851     }
853     return (status);
856 /*
857  *  ======== TransportRpmsg_Factory_delete ========
858  *  Finalize the transport instances
859  */
860 Void TransportRpmsg_Factory_delete(Void)
862     uint64_t event;
865     /* shutdown the message dispatch thread */
866     if (TransportRpmsg_module->threadStarted) {
867         event = TransportRpmsg_Event_SHUTDOWN;
868         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
870         /* wait for dispatch thread to exit */
871         pthread_join(TransportRpmsg_module->threadId, NULL);
872     }
874     /* destroy the mutex object */
875     pthread_mutex_destroy(&TransportRpmsg_module->gate);
877     /* close the client wait event */
878     if (TransportRpmsg_module->waitEvent != -1) {
879         close(TransportRpmsg_module->waitEvent);
880         TransportRpmsg_module->waitEvent = -1;
881     }
883     /* close the dispatch thread unblock event */
884     if (TransportRpmsg_module->unblockEvent != -1) {
885         close(TransportRpmsg_module->unblockEvent);
886         TransportRpmsg_module->unblockEvent = -1;
887     }
889     /* free the instance handle array */
890     if (TransportRpmsg_module->inst != NULL) {
891         free(TransportRpmsg_module->inst);
892         TransportRpmsg_module->inst = NULL;
893     }
895     return;
898 /*
899  *  ======== TransportRpmsg_Factory_attach ========
900  */
901 Int TransportRpmsg_Factory_attach(UInt16 procId)
903     Int status = Ipc_S_SUCCESS;
904     UInt16 clusterId;
905     TransportRpmsg_Params params;
906     TransportRpmsg_Handle transport;
907     IMessageQTransport_Handle iMsgQTrans;
909     /* cannot attach to yourself */
910     if (MultiProc_self() == procId) {
911         status = Ipc_E_INVALIDARG;
912         goto done;
913     }
915     /* processor must be a member of the cluster */
916     clusterId = procId - MultiProc_getBaseIdOfCluster();
918     if (clusterId >= MultiProc_getNumProcsInCluster()) {
919         status = Ipc_E_INVALIDARG;
920         goto done;
921     }
923     /* create transport instance for given processor */
924     params.rprocId = procId;
925     transport = TransportRpmsg_create(&params);
927     if (transport == NULL) {
928         status = Ipc_E_FAIL;
929         goto done;
930     }
932     /* register transport instance with MessageQ */
933     iMsgQTrans = TransportRpmsg_upCast(transport);
934     TransportRpmsg_module->inst[clusterId] = transport;
935     MessageQ_registerTransport(iMsgQTrans, procId, 0);
937 done:
938     return (status);
941 /*
942  *  ======== TransportRpmsg_Factory_detach ========
943  */
944 Int TransportRpmsg_Factory_detach(UInt16 procId)
946     Int status = Ipc_S_SUCCESS;
947     UInt16 clusterId;
949     /* cannot detach from yourself */
950     if (MultiProc_self() == procId) {
951         status = Ipc_E_INVALIDARG;
952         goto done;
953     }
955     /* processor must be a member of the cluster */
956     clusterId = procId - MultiProc_getBaseIdOfCluster();
958     if (clusterId >= MultiProc_getNumProcsInCluster()) {
959         status = Ipc_E_INVALIDARG;
960         goto done;
961     }
963     /* must be attached in order to detach */
964     if (TransportRpmsg_module->inst[clusterId] == NULL) {
965         status = Ipc_E_INVALIDSTATE;
966         goto done;
967     }
969     /* unregister from MessageQ, delete the transport instance */
970     MessageQ_unregisterTransport(procId, 0);
971     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
973 done:
974     return (status);