Add fault tolerance for TransportRpmsg socket failure
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     struct {
96         int     fd;
97         UInt32  qId;
98     } inFds[1024];
99     int             nInFds;
100     pthread_mutex_t gate;
101     int             unblockEvent;    /* unblock the dispatch thread */
102     int             waitEvent;       /* block the client thread */
103     pthread_t       threadId;        /* ID returned by pthread_create() */
104     Bool            threadStarted;
106     TransportRpmsg_Handle *inst;    /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110     .bind    = TransportRpmsg_bind,
111     .unbind  = TransportRpmsg_unbind,
112     .put     = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116     IMessageQTransport_Object base;
117     Int status;
118     UInt16 rprocId;
119     int numQueues;
120     int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124     .sock = {INVALIDSOCKET},
125     .unblockEvent = -1,
126     .waitEvent = -1,
127     .threadStarted = FALSE,
128     .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135                                Int fd,
136                                UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147     .createFxn = TransportRpmsg_Factory_create,
148     .deleteFxn = TransportRpmsg_Factory_delete,
149     .attachFxn = TransportRpmsg_Factory_attach,
150     .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
158     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159     return ((IMessageQTransport_Handle)&obj->base);
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
164     return ((TransportRpmsg_Handle)base);
167 /*
168  *  ======== TransportRpmsg_create ========
169  */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
172     Int status = MessageQ_S_SUCCESS;
173     TransportRpmsg_Object *obj = NULL;
174     int sock;
175     UInt16 clusterId;
178     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
180     /* create socket for sending messages to remote processor */
181     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
183     if (sock < 0) {
184         status = Ipc_E_FAIL;
185         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
186                 strerror(errno));
187         goto done;
188     }
189     TransportRpmsg_module->sock[clusterId] = sock;
190     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
192     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
194     if (status < 0) {
195         status = Ipc_E_FAIL;
196         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
197                 errno, strerror(errno), params->rprocId);
198         goto done;
199     }
201     /* create the instance object */
202     obj = calloc(1, sizeof(TransportRpmsg_Object));
204     if (obj == NULL) {
205         status = Ipc_E_MEMORY;
206         goto done;
207     }
209     /* initialize the instance */
210     obj->base.base.interfaceType = IMessageQTransport_TypeId;
211     obj->base.fxns = &TransportRpmsg_fxns;
212     obj->rprocId = params->rprocId;
213     obj->numQueues = TransportRpmsg_GROWSIZE;
215     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
217     if (obj->qIndexToFd == NULL) {
218         status = Ipc_E_MEMORY;
219         goto done;
220     }
222 done:
223     if (status < 0) {
224         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
225     }
227     return (TransportRpmsg_Handle)obj;
230 /*
231  *  ======== TransportRpmsg_delete ========
232  */
233 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
235     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
236     UInt16 clusterId;
237     int sock;
240     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
242     /* close the socket for the given transport instance */
243     sock = TransportRpmsg_module->sock[clusterId];
244     if (sock != INVALIDSOCKET) {
245         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
246         close(sock);
247     }
248     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
250     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
251         free(obj->qIndexToFd);
252         obj->qIndexToFd = NULL;
253     }
255     if (obj != NULL) {
256         free(obj);
257         obj = NULL;
258     }
260     *pHandle = NULL;
263 /*
264  *  ======== TransportRpmsg_bind ========
265  */
266 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
268     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
269     UInt16 queuePort = queueId & 0x0000ffff;
270     int fd;
271     int err;
272     uint64_t event;
273     UInt16 rprocId;
274     pthread_t tid;
276     tid = pthread_self();
277     rprocId = obj->rprocId;
279     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
280             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
282     /*  Create the socket to receive messages for this messageQ. */
283     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
284     if (fd < 0) {
285         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
286                 errno, strerror(errno));
287         return (MessageQ_E_OSFAILURE);
288     }
289     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
290             (unsigned int)tid);
292     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
293     if (err < 0) {
294         /* don't hard-printf since this is no longer fatal */
295         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
296                       errno, strerror(errno));
297         close(fd);
298         return (MessageQ_E_OSFAILURE);
299     }
301     pthread_mutex_lock(&TransportRpmsg_module->gate);
303     /*  pause the dispatch thread */
304     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
305             (unsigned int)tid);
306     event = TransportRpmsg_Event_PAUSE;
307     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
309     /* wait for ACK event */
310     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
311     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
312             (int)event, (unsigned int)tid);
314     /* add to our fat fd array and update select() parameters */
315     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
316     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
317     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
318     FD_SET(fd, &TransportRpmsg_module->rfds);
319     bindFdToQueueIndex(obj, fd, queuePort);
321     /* release the dispatch thread */
322     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
323             (unsigned int)tid);
324     event = TransportRpmsg_Event_CONTINUE;
325     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
327     pthread_mutex_unlock(&TransportRpmsg_module->gate);
329     return (MessageQ_S_SUCCESS);
332 /*
333  *  ======== TransportRpmsg_unbind ========
334  */
335 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
337     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
338     UInt16 queuePort = queueId & 0x0000ffff;
339     uint64_t event;
340     Int    status = MessageQ_S_SUCCESS;
341     int    maxFd;
342     int    fd;
343     int    i;
344     int    j;
346     pthread_mutex_lock(&TransportRpmsg_module->gate);
348     /*  pause the dispatch thread */
349     event = TransportRpmsg_Event_PAUSE;
350     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
352     /* wait for ACK event */
353     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
355     /* retrieve file descriptor for the given queue port */
356     fd = queueIndexToFd(obj, queuePort);
357     if (!fd) {
358         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
359                 queueId);
360         status = MessageQ_E_INVALIDARG;
361         goto done;
362     }
363     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
365     /* guarenteed to work because queueIndexToFd above succeeded */
366     unbindQueueIndex(obj, queuePort);
368     /* remove from input fd array */
369     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
370         if (TransportRpmsg_module->inFds[i].fd == fd) {
371             TransportRpmsg_module->nInFds--;
373             /* shift subsequent elements down */
374             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
375                 TransportRpmsg_module->inFds[j] =
376                         TransportRpmsg_module->inFds[j + 1];
377             }
378             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
379             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
380             break;
381         }
382     }
384     /* remove fd from the descriptor set, compute new max value */
385     FD_CLR(fd, &TransportRpmsg_module->rfds);
386     if (fd == TransportRpmsg_module->maxFd) {
387         /* find new max fd */
388         maxFd = TransportRpmsg_module->unblockEvent;
389         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
390             maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
391         }
392         TransportRpmsg_module->maxFd = maxFd;
393     }
395     close(fd);
397     /* release the dispatch thread */
398     event = TransportRpmsg_Event_CONTINUE;
399     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
401 done:
402     pthread_mutex_unlock(&TransportRpmsg_module->gate);
404     return (status);
407 /*
408  *  ======== TransportRpmsg_put ========
409  */
410 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
412     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
413     Int     status    = TRUE;
414     int     sock;
415     int     err;
416     UInt16  clusterId;
418     /*
419      * Retrieve the socket for the AF_SYSLINK protocol associated with this
420      * transport.
421      */
422     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
423     sock = TransportRpmsg_module->sock[clusterId];
424     if (!sock) {
425         return FALSE;
426     }
428     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
430     err = send(sock, msg, msg->msgSize, 0);
431     if (err < 0) {
432         printf("TransportRpmsg_put: send failed: %d (%s)\n",
433                errno, strerror(errno));
434         status = FALSE;
436         goto exit;
437     }
439     /*
440      * Free the message, as this is a copy transport, we maintain MessageQ
441      * semantics.
442      */
443     MessageQ_free(msg);
445 exit:
446     return status;
449 /*
450  *  ======== TransportRpmsg_control ========
451  */
452 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
454     return FALSE;
457 /*
458  *  ======== rpmsgThreadFxn ========
459  */
460 void *rpmsgThreadFxn(void *arg)
462     Int      status = MessageQ_S_SUCCESS;
463     Int      tmpStatus;
464     int      retval;
465     uint64_t event;
466     fd_set   rfds;
467     int      maxFd;
468     int      nfds;
469     MessageQ_Msg     retMsg;
470     MessageQ_QueueId queueId;
471     MessageQ_Handle handle;
472     Bool run = TRUE;
473     int i;
474     int j;
475     int fd;
478     while (run) {
479         maxFd = TransportRpmsg_module->maxFd;
480         rfds = TransportRpmsg_module->rfds;
481         nfds = TransportRpmsg_module->nInFds;
483         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
484                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
486         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
488         /* if error, try again */
489         if (retval < 0) {
490             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
491             continue;
492         }
494         /* dispatch all pending messages, do this first */
495         for (i = 0; i < nfds; i++) {
496             fd = TransportRpmsg_module->inFds[i].fd;
498             if (FD_ISSET(fd, &rfds)) {
499                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
500                         TransportRpmsg_module->inFds[i].fd);
502                 /* transport input fd was signalled: get the message */
503                 tmpStatus = transportGet(fd, &retMsg);
504                 if (tmpStatus < 0) {
505                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
506                            " returned %d\n", fd, tmpStatus);
508                     pthread_mutex_lock(&TransportRpmsg_module->gate);
510                     /*
511                      * Don't close(fd) at this time since it will get closed
512                      * later when MessageQ_delete() is called in response to
513                      * this failure.  Just remove fd's bit from the select mask
514                      * 'rfds' for now, but don't remove it from inFds[].
515                      */
516                     FD_CLR(fd, &TransportRpmsg_module->rfds);
517                     if (fd == TransportRpmsg_module->maxFd) {
518                         /* find new max fd */
519                         maxFd = TransportRpmsg_module->unblockEvent;
520                         for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
521                             maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
522                                          maxFd);
523                         }
524                         TransportRpmsg_module->maxFd = maxFd;
525                     }
526                     queueId = TransportRpmsg_module->inFds[i].qId;
528                     pthread_mutex_unlock(&TransportRpmsg_module->gate);
530                     handle = MessageQ_getLocalHandle(queueId);
532                     PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
533                                   "%p (queueId 0x%x)...\n", handle, queueId)
535                     if (handle != NULL) {
536                         MessageQ_shutdown(handle);
537                     }
538                     else {
539                         printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
540                                "returned NULL, can't shutdown\n", queueId);
541                     }
542                 }
543                 else {
544                     queueId = MessageQ_getDstQueue(retMsg);
545                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
546                             "delivering to queueId 0x%x\n", queueId)
547                     MessageQ_put(queueId, retMsg);
548                 }
549             }
550         }
552         /* check for events */
553         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
555             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
557             do {
558                 if (event & TransportRpmsg_Event_SHUTDOWN) {
559                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
560                     run = FALSE;
561                     break; /* highest priority, stop processing events */
562                 }
563                 if (event & TransportRpmsg_Event_CONTINUE) {
564                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
565                             (int)event);
566                     event &= ~TransportRpmsg_Event_CONTINUE;
567                 }
568                 if (event & TransportRpmsg_Event_PAUSE) {
569                     /*  Our event was signalled by TransportRpmsg_bind()
570                      *  or TransportRpmsg_unbind() to tell us that the set
571                      *  of file descriptors has changed.
572                      */
573                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
574                     /* send the acknowledgement */
575                     event = TransportRpmsg_Event_ACK;
576                     write(TransportRpmsg_module->waitEvent, &event,
577                             sizeof(event));
578                     /* now wait to be released */
579                     read(TransportRpmsg_module->unblockEvent, &event,
580                             sizeof(event));
581                 }
582             } while (event != 0);
583         }
584     }
586     return (void *)status;
589 /*
590  * ======== transportGet ========
591  *  Retrieve a message waiting in the socket's queue.
592  */
593 static Int transportGet(int sock, MessageQ_Msg *retMsg)
595     Int           status    = MessageQ_S_SUCCESS;
596     MessageQ_Msg  msg;
597     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
598     unsigned int  len;
599     int           byteCount;
601     /*
602      * We have no way of peeking to see what message size we'll get, so we
603      * allocate a message of max size to receive contents from the rpmsg socket
604      * (currently, a copy transport)
605      */
606     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
607     if (!msg) {
608         status = MessageQ_E_MEMORY;
609         goto exit;
610     }
612     memset(&fromAddr, 0, sizeof (fromAddr));
613     len = sizeof (fromAddr);
615     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
616                          (struct sockaddr *)&fromAddr, &len);
617     if (len != sizeof (fromAddr)) {
618         printf("recvfrom: got bad addr len (%d)\n", len);
619         status = MessageQ_E_FAIL;
620         goto freeMsg;
621     }
622     if (byteCount < 0) {
623         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
624         if (errno == ESHUTDOWN) {
625             status = MessageQ_E_SHUTDOWN;
626         }
627         else {
628             status = MessageQ_E_FAIL;
629         }
630         goto freeMsg;
631     }
632     else {
633          /*
634           * Update the allocated message size (even though this may waste
635           * space when the actual message is smaller than the maximum rpmsg
636           * size, the message will be freed soon anyway, and it avoids an
637           * extra copy).
638           */
639          msg->msgSize = byteCount;
641          /*
642           * If the message received was statically allocated, reset the
643           * heapId, so the app can free it.
644           */
645          if (msg->heapId == MessageQ_STATICMSG)  {
646              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
647          }
648     }
650     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
651     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
652             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
653     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
654             msg->msgSize)
656     *retMsg = msg;
658     goto exit;
660 freeMsg:
661     MessageQ_free(msg);
663 exit:
664     return status;
667 /*
668  *  ======== bindFdToQueueIndex ========
669  *
670  *  Precondition: caller must be inside the module gate
671  */
672 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
674     Int *queues;
675     Int *oldQueues;
676     UInt oldSize;
677     UInt queueIndex;
679     /* subtract port offset from queue index */
680     queueIndex = queuePort - MessageQ_PORTOFFSET;
682     if (queueIndex >= obj->numQueues) {
683         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
684                 queueIndex + TransportRpmsg_GROWSIZE)
686         /* allocate larger table */
687         oldSize = obj->numQueues * sizeof (Int);
688         queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
690         /* copy contents from old table int new table */
691         memcpy(queues, obj->qIndexToFd, oldSize);
693         /* swap in new table, delete old table */
694         oldQueues = obj->qIndexToFd;
695         obj->qIndexToFd = queues;
696         obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
697         free(oldQueues);
698     }
700     /* add new entry */
701     obj->qIndexToFd[queueIndex] = fd;
704 /*
705  *  ======== unbindQueueIndex ========
706  *
707  *  Precondition: caller must be inside the module gate
708  */
709 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
711     UInt queueIndex;
713     /* subtract port offset from queue index */
714     queueIndex = queuePort - MessageQ_PORTOFFSET;
716     /* clear table entry */
717     obj->qIndexToFd[queueIndex] = -1;
720 /*
721  *  ======== queueIndexToFd ========
722  *
723  *  Precondition: caller must be inside the module gate
724  */
725 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
727     UInt queueIndex;
729     /* subtract port offset from queue index */
730     queueIndex = queuePort - MessageQ_PORTOFFSET;
732     /* return file descriptor */
733     return (obj->qIndexToFd[queueIndex]);
736 /*
737  *  ======== TransportRpmsg_Factory_create ========
738  *  Create the transport instances
739  *
740  *  Attach to all remote processors. For now, must attach to
741  *  at least one to tolerate MessageQ_E_RESOURCE failures.
742  *
743  *  This function implements the IPC Factory interface, so it
744  *  returns Ipc status codes.
745  */
746 Int TransportRpmsg_Factory_create(Void)
748     Int status = Ipc_S_SUCCESS;
749     Int i;
750     UInt16 clusterSize;
751     TransportRpmsg_Handle *inst;
754     /* needed to enumerate processors in cluster */
755     clusterSize = MultiProc_getNumProcsInCluster();
757     /* allocate the instance array */
758     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
760     if (inst == NULL) {
761         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
762         status = Ipc_E_MEMORY;
763         goto done;
764     }
766     for (i = 0; i < clusterSize; i++) {
767         inst[i] = NULL;
768     }
770     TransportRpmsg_module->inst = inst;
772     /* counter event object for passing commands to dispatch thread */
773     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
775     if (TransportRpmsg_module->unblockEvent == -1) {
776         printf("create: unblock event failed: %d (%s)\n",
777                errno, strerror(errno));
778         status = Ipc_E_FAIL;
779         goto done;
780     }
782     PRINTVERBOSE1("create: created unblock event %d\n",
783             TransportRpmsg_module->unblockEvent)
785     /* semaphore event object for acknowledging client thread */
786     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
788     if (TransportRpmsg_module->waitEvent == -1) {
789         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
790         status = Ipc_E_FAIL;
791         goto done;
792     }
794     PRINTVERBOSE1("create: created wait event %d\n",
795             TransportRpmsg_module->waitEvent)
797     FD_ZERO(&TransportRpmsg_module->rfds);
798     FD_SET(TransportRpmsg_module->unblockEvent,
799             &TransportRpmsg_module->rfds);
800     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
801     TransportRpmsg_module->nInFds = 0;
803     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
805     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
806             &rpmsgThreadFxn, NULL);
808     if (status < 0) {
809         status = Ipc_E_FAIL;
810         printf("attach: failed to spawn thread\n");
811         goto done;
812     }
813     TransportRpmsg_module->threadStarted = TRUE;
815 done:
816     if (status < 0) {
817         TransportRpmsg_Factory_delete();
818     }
820     return (status);
823 /*
824  *  ======== TransportRpmsg_Factory_delete ========
825  *  Finalize the transport instances
826  */
827 Void TransportRpmsg_Factory_delete(Void)
829     uint64_t event;
832     /* shutdown the message dispatch thread */
833     if (TransportRpmsg_module->threadStarted) {
834         event = TransportRpmsg_Event_SHUTDOWN;
835         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
837         /* wait for dispatch thread to exit */
838         pthread_join(TransportRpmsg_module->threadId, NULL);
839     }
841     /* destroy the mutex object */
842     pthread_mutex_destroy(&TransportRpmsg_module->gate);
844     /* close the client wait event */
845     if (TransportRpmsg_module->waitEvent != -1) {
846         close(TransportRpmsg_module->waitEvent);
847         TransportRpmsg_module->waitEvent = -1;
848     }
850     /* close the dispatch thread unblock event */
851     if (TransportRpmsg_module->unblockEvent != -1) {
852         close(TransportRpmsg_module->unblockEvent);
853         TransportRpmsg_module->unblockEvent = -1;
854     }
856     /* free the instance handle array */
857     if (TransportRpmsg_module->inst != NULL) {
858         free(TransportRpmsg_module->inst);
859         TransportRpmsg_module->inst = NULL;
860     }
862     return;
865 /*
866  *  ======== TransportRpmsg_Factory_attach ========
867  */
868 Int TransportRpmsg_Factory_attach(UInt16 procId)
870     Int status = Ipc_S_SUCCESS;
871     UInt16 clusterId;
872     TransportRpmsg_Params params;
873     TransportRpmsg_Handle transport;
874     IMessageQTransport_Handle iMsgQTrans;
876     /* cannot attach to yourself */
877     if (MultiProc_self() == procId) {
878         status = Ipc_E_INVALIDARG;
879         goto done;
880     }
882     /* processor must be a member of the cluster */
883     clusterId = procId - MultiProc_getBaseIdOfCluster();
885     if (clusterId >= MultiProc_getNumProcsInCluster()) {
886         status = Ipc_E_INVALIDARG;
887         goto done;
888     }
890     /* create transport instance for given processor */
891     params.rprocId = procId;
892     transport = TransportRpmsg_create(&params);
894     if (transport == NULL) {
895         status = Ipc_E_FAIL;
896         goto done;
897     }
899     /* register transport instance with MessageQ */
900     iMsgQTrans = TransportRpmsg_upCast(transport);
901     MessageQ_registerTransport(iMsgQTrans, procId, 0);
902     TransportRpmsg_module->inst[clusterId] = transport;
904 done:
905     return (status);
908 /*
909  *  ======== TransportRpmsg_Factory_detach ========
910  */
911 Int TransportRpmsg_Factory_detach(UInt16 procId)
913     Int status = Ipc_S_SUCCESS;
914     UInt16 clusterId;
916     /* cannot detach from yourself */
917     if (MultiProc_self() == procId) {
918         status = Ipc_E_INVALIDARG;
919         goto done;
920     }
922     /* processor must be a member of the cluster */
923     clusterId = procId - MultiProc_getBaseIdOfCluster();
925     if (clusterId >= MultiProc_getNumProcsInCluster()) {
926         status = Ipc_E_INVALIDARG;
927         goto done;
928     }
930     /* must be attached in order to detach */
931     if (TransportRpmsg_module->inst[clusterId] == NULL) {
932         status = Ipc_E_INVALIDSTATE;
933         goto done;
934     }
936     /* unregister from MessageQ, delete the transport instance */
937     MessageQ_unregisterTransport(procId, 0);
938     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
940 done:
941     return (status);