Bind message queue to new remote processor
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     struct {
96         int     fd;
97         UInt32  qId;
98     } inFds[1024];
99     int             nInFds;
100     pthread_mutex_t gate;
101     int             unblockEvent;    /* unblock the dispatch thread */
102     int             waitEvent;       /* block the client thread */
103     pthread_t       threadId;        /* ID returned by pthread_create() */
104     Bool            threadStarted;
106     TransportRpmsg_Handle *inst;    /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110     .bind    = TransportRpmsg_bind,
111     .unbind  = TransportRpmsg_unbind,
112     .put     = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116     IMessageQTransport_Object base;
117     Int status;
118     UInt16 rprocId;
119     int numQueues;
120     int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124     .sock = {INVALIDSOCKET},
125     .unblockEvent = -1,
126     .waitEvent = -1,
127     .threadStarted = FALSE,
128     .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135                                Int fd,
136                                UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147     .createFxn = TransportRpmsg_Factory_create,
148     .deleteFxn = TransportRpmsg_Factory_delete,
149     .attachFxn = TransportRpmsg_Factory_attach,
150     .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
158     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159     return ((IMessageQTransport_Handle)&obj->base);
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
164     return ((TransportRpmsg_Handle)base);
167 /*
168  *  ======== TransportRpmsg_create ========
169  */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
172     Int status = MessageQ_S_SUCCESS;
173     TransportRpmsg_Object *obj = NULL;
174     int sock;
175     UInt16 clusterId;
176     int i;
179     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
181     /* create socket for sending messages to remote processor */
182     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
184     if (sock < 0) {
185         status = Ipc_E_FAIL;
186         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
187                 strerror(errno));
188         goto done;
189     }
190     TransportRpmsg_module->sock[clusterId] = sock;
191     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
193     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
195     if (status < 0) {
196         status = Ipc_E_FAIL;
197         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
198                 errno, strerror(errno), params->rprocId);
199         goto done;
200     }
202     /* create the instance object */
203     obj = calloc(1, sizeof(TransportRpmsg_Object));
205     if (obj == NULL) {
206         status = Ipc_E_MEMORY;
207         goto done;
208     }
210     /* initialize the instance */
211     obj->base.base.interfaceType = IMessageQTransport_TypeId;
212     obj->base.fxns = &TransportRpmsg_fxns;
213     obj->rprocId = params->rprocId;
214     obj->numQueues = TransportRpmsg_GROWSIZE;
216     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
218     if (obj->qIndexToFd == NULL) {
219         status = Ipc_E_MEMORY;
220         goto done;
221     }
223     /* must initialize array */
224     for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
225         obj->qIndexToFd[i] = -1;
226     }
228 done:
229     if (status < 0) {
230         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
231     }
233     return (TransportRpmsg_Handle)obj;
236 /*
237  *  ======== TransportRpmsg_delete ========
238  */
239 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
241     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
242     UInt16 clusterId;
243     int sock;
246     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
248     /* close the socket for the given transport instance */
249     sock = TransportRpmsg_module->sock[clusterId];
250     if (sock != INVALIDSOCKET) {
251         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
252         close(sock);
253     }
254     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
256     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
257         free(obj->qIndexToFd);
258         obj->qIndexToFd = NULL;
259     }
261     if (obj != NULL) {
262         free(obj);
263         obj = NULL;
264     }
266     *pHandle = NULL;
269 /*
270  *  ======== TransportRpmsg_bind ========
271  */
272 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
274     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
275     UInt16 queuePort = queueId & 0x0000ffff;
276     int fd;
277     int err;
278     uint64_t event;
279     UInt16 rprocId;
280     pthread_t tid;
281     Int status = MessageQ_S_SUCCESS;
283     tid = pthread_self();
284     rprocId = obj->rprocId;
286     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
287             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
289     pthread_mutex_lock(&TransportRpmsg_module->gate);
291     /*  Check if binding already exists.
292      *
293      *  There is a race condition between a thread calling MessageQ_create
294      *  and another thread calling Ipc_attach. Must make sure we don't bind
295      *  the same queue twice.
296      */
297     if (queueIndexToFd(obj, queueId) != -1) {
298         goto done;
299     }
301     /*  Create the socket to receive messages for this messageQ. */
302     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
303     if (fd < 0) {
304         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
305                 errno, strerror(errno));
306         status = MessageQ_E_OSFAILURE;
307         goto done;
308     }
309     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
310             (unsigned int)tid);
312     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
313     if (err < 0) {
314         /* don't hard-printf since this is no longer fatal */
315         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
316                       errno, strerror(errno));
317         close(fd);
318         status = MessageQ_E_OSFAILURE;
319         goto done;
320     }
322     /*  pause the dispatch thread */
323     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
324             (unsigned int)tid);
325     event = TransportRpmsg_Event_PAUSE;
326     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
328     /* wait for ACK event */
329     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
330     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
331             (int)event, (unsigned int)tid);
333     /* add to our fat fd array and update select() parameters */
334     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
335     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
336     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
337     FD_SET(fd, &TransportRpmsg_module->rfds);
338     bindFdToQueueIndex(obj, fd, queuePort);
340     /* release the dispatch thread */
341     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
342             (unsigned int)tid);
343     event = TransportRpmsg_Event_CONTINUE;
344     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
346 done:
347     pthread_mutex_unlock(&TransportRpmsg_module->gate);
349     return (status);
352 /*
353  *  ======== TransportRpmsg_unbind ========
354  */
355 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
357     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
358     UInt16 queuePort = queueId & 0x0000ffff;
359     uint64_t event;
360     Int    status = MessageQ_S_SUCCESS;
361     int    maxFd;
362     int    fd;
363     int    i;
364     int    j;
366     pthread_mutex_lock(&TransportRpmsg_module->gate);
368     /*  pause the dispatch thread */
369     event = TransportRpmsg_Event_PAUSE;
370     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
372     /* wait for ACK event */
373     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
375     /*  Check if binding already deleted.
376      *
377      *  There is a race condition between a thread calling MessageQ_delete
378      *  and another thread calling Ipc_detach. Must make sure we don't unbind
379      *  the same queue twice.
380      */
381     if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
382         goto done;
383     }
384     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
386     /* guarenteed to work because queueIndexToFd above succeeded */
387     unbindQueueIndex(obj, queuePort);
389     /* remove from input fd array */
390     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
391         if (TransportRpmsg_module->inFds[i].fd == fd) {
392             TransportRpmsg_module->nInFds--;
394             /* shift subsequent elements down */
395             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
396                 TransportRpmsg_module->inFds[j] =
397                         TransportRpmsg_module->inFds[j + 1];
398             }
399             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
400             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
401             break;
402         }
403     }
405     /* remove fd from the descriptor set, compute new max value */
406     FD_CLR(fd, &TransportRpmsg_module->rfds);
407     if (fd == TransportRpmsg_module->maxFd) {
408         /* find new max fd */
409         maxFd = TransportRpmsg_module->unblockEvent;
410         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
411             maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
412         }
413         TransportRpmsg_module->maxFd = maxFd;
414     }
416     close(fd);
418     /* release the dispatch thread */
419     event = TransportRpmsg_Event_CONTINUE;
420     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
422 done:
423     pthread_mutex_unlock(&TransportRpmsg_module->gate);
425     return (status);
428 /*
429  *  ======== TransportRpmsg_put ========
430  */
431 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
433     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
434     Int     status    = TRUE;
435     int     sock;
436     int     err;
437     UInt16  clusterId;
439     /*
440      * Retrieve the socket for the AF_SYSLINK protocol associated with this
441      * transport.
442      */
443     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
444     sock = TransportRpmsg_module->sock[clusterId];
445     if (!sock) {
446         return FALSE;
447     }
449     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
451     err = send(sock, msg, msg->msgSize, 0);
452     if (err < 0) {
453         printf("TransportRpmsg_put: send failed: %d (%s)\n",
454                errno, strerror(errno));
455         status = FALSE;
457         goto exit;
458     }
460     /*
461      * Free the message, as this is a copy transport, we maintain MessageQ
462      * semantics.
463      */
464     MessageQ_free(msg);
466 exit:
467     return status;
470 /*
471  *  ======== TransportRpmsg_control ========
472  */
473 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
475     return FALSE;
478 /*
479  *  ======== rpmsgThreadFxn ========
480  */
481 void *rpmsgThreadFxn(void *arg)
483     Int      status = MessageQ_S_SUCCESS;
484     Int      tmpStatus;
485     int      retval;
486     uint64_t event;
487     fd_set   rfds;
488     int      maxFd;
489     int      nfds;
490     MessageQ_Msg     retMsg;
491     MessageQ_QueueId queueId;
492     MessageQ_Handle handle;
493     Bool run = TRUE;
494     int i;
495     int j;
496     int fd;
499     while (run) {
500         maxFd = TransportRpmsg_module->maxFd;
501         rfds = TransportRpmsg_module->rfds;
502         nfds = TransportRpmsg_module->nInFds;
504         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
505                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
507         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
509         /* if error, try again */
510         if (retval < 0) {
511             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
512             continue;
513         }
515         /* dispatch all pending messages, do this first */
516         for (i = 0; i < nfds; i++) {
517             fd = TransportRpmsg_module->inFds[i].fd;
519             if (FD_ISSET(fd, &rfds)) {
520                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
521                         TransportRpmsg_module->inFds[i].fd);
523                 /* transport input fd was signalled: get the message */
524                 tmpStatus = transportGet(fd, &retMsg);
525                 if (tmpStatus < 0) {
526                     printf("rpmsgThreadFxn: transportGet failed on fd %d,"
527                            " returned %d\n", fd, tmpStatus);
529                     pthread_mutex_lock(&TransportRpmsg_module->gate);
531                     /*
532                      * Don't close(fd) at this time since it will get closed
533                      * later when MessageQ_delete() is called in response to
534                      * this failure.  Just remove fd's bit from the select mask
535                      * 'rfds' for now, but don't remove it from inFds[].
536                      */
537                     FD_CLR(fd, &TransportRpmsg_module->rfds);
538                     if (fd == TransportRpmsg_module->maxFd) {
539                         /* find new max fd */
540                         maxFd = TransportRpmsg_module->unblockEvent;
541                         for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
542                             maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
543                                          maxFd);
544                         }
545                         TransportRpmsg_module->maxFd = maxFd;
546                     }
547                     queueId = TransportRpmsg_module->inFds[i].qId;
549                     pthread_mutex_unlock(&TransportRpmsg_module->gate);
551                     handle = MessageQ_getLocalHandle(queueId);
553                     PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
554                                   "%p (queueId 0x%x)...\n", handle, queueId)
556                     if (handle != NULL) {
557                         MessageQ_shutdown(handle);
558                     }
559                     else {
560                         printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
561                                "returned NULL, can't shutdown\n", queueId);
562                     }
563                 }
564                 else {
565                     queueId = MessageQ_getDstQueue(retMsg);
566                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
567                             "delivering to queueId 0x%x\n", queueId)
568                     MessageQ_put(queueId, retMsg);
569                 }
570             }
571         }
573         /* check for events */
574         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
576             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
578             do {
579                 if (event & TransportRpmsg_Event_SHUTDOWN) {
580                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
581                     run = FALSE;
582                     break; /* highest priority, stop processing events */
583                 }
584                 if (event & TransportRpmsg_Event_CONTINUE) {
585                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
586                             (int)event);
587                     event &= ~TransportRpmsg_Event_CONTINUE;
588                 }
589                 if (event & TransportRpmsg_Event_PAUSE) {
590                     /*  Our event was signalled by TransportRpmsg_bind()
591                      *  or TransportRpmsg_unbind() to tell us that the set
592                      *  of file descriptors has changed.
593                      */
594                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
595                     /* send the acknowledgement */
596                     event = TransportRpmsg_Event_ACK;
597                     write(TransportRpmsg_module->waitEvent, &event,
598                             sizeof(event));
599                     /* now wait to be released */
600                     read(TransportRpmsg_module->unblockEvent, &event,
601                             sizeof(event));
602                 }
603             } while (event != 0);
604         }
605     }
607     return (void *)status;
610 /*
611  * ======== transportGet ========
612  *  Retrieve a message waiting in the socket's queue.
613  */
614 static Int transportGet(int sock, MessageQ_Msg *retMsg)
616     Int           status    = MessageQ_S_SUCCESS;
617     MessageQ_Msg  msg;
618     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
619     unsigned int  len;
620     int           byteCount;
622     /*
623      * We have no way of peeking to see what message size we'll get, so we
624      * allocate a message of max size to receive contents from the rpmsg socket
625      * (currently, a copy transport)
626      */
627     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
628     if (!msg) {
629         status = MessageQ_E_MEMORY;
630         goto exit;
631     }
633     memset(&fromAddr, 0, sizeof (fromAddr));
634     len = sizeof (fromAddr);
636     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
637                          (struct sockaddr *)&fromAddr, &len);
638     if (len != sizeof (fromAddr)) {
639         printf("recvfrom: got bad addr len (%d)\n", len);
640         status = MessageQ_E_FAIL;
641         goto freeMsg;
642     }
643     if (byteCount < 0) {
644         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
645         if (errno == ESHUTDOWN) {
646             status = MessageQ_E_SHUTDOWN;
647         }
648         else {
649             status = MessageQ_E_FAIL;
650         }
651         goto freeMsg;
652     }
653     else {
654          /*
655           * Update the allocated message size (even though this may waste
656           * space when the actual message is smaller than the maximum rpmsg
657           * size, the message will be freed soon anyway, and it avoids an
658           * extra copy).
659           */
660          msg->msgSize = byteCount;
662          /*
663           * If the message received was statically allocated, reset the
664           * heapId, so the app can free it.
665           */
666          if (msg->heapId == MessageQ_STATICMSG)  {
667              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
668          }
669     }
671     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
672     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
673             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
674     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
675             msg->msgSize)
677     *retMsg = msg;
679     goto exit;
681 freeMsg:
682     MessageQ_free(msg);
684 exit:
685     return status;
688 /*
689  *  ======== bindFdToQueueIndex ========
690  *
691  *  Precondition: caller must be inside the module gate
692  */
693 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
695     Int *queues;
696     Int *oldQueues;
697     UInt oldSize;
698     UInt newCount;
699     UInt queueIndex;
700     int i;
702     /* subtract port offset from queue index */
703     queueIndex = queuePort - MessageQ_PORTOFFSET;
705     if (queueIndex >= obj->numQueues) {
706         newCount = queueIndex + TransportRpmsg_GROWSIZE;
707         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
708                 newCount);
710         /* allocate larger table */
711         oldSize = obj->numQueues * sizeof(int);
712         queues = calloc(newCount, sizeof(int));
714         /* copy contents from old table int new table */
715         memcpy(queues, obj->qIndexToFd, oldSize);
717         /* initialize remaining entries of new (larger) table */
718         for (i = obj->numQueues; i < newCount; i++) {
719             queues[i] = -1;
720         }
722         /* swap in new table, delete old table */
723         oldQueues = obj->qIndexToFd;
724         obj->qIndexToFd = queues;
725         obj->numQueues = newCount;
726         free(oldQueues);
727     }
729     /* add new entry */
730     obj->qIndexToFd[queueIndex] = fd;
733 /*
734  *  ======== unbindQueueIndex ========
735  *
736  *  Precondition: caller must be inside the module gate
737  */
738 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
740     UInt queueIndex;
742     /* subtract port offset from queue index */
743     queueIndex = queuePort - MessageQ_PORTOFFSET;
745     /* clear table entry */
746     obj->qIndexToFd[queueIndex] = -1;
749 /*
750  *  ======== queueIndexToFd ========
751  *
752  *  Precondition: caller must be inside the module gate
753  */
754 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
756     UInt queueIndex;
758     /* subtract port offset from queue index */
759     queueIndex = queuePort - MessageQ_PORTOFFSET;
761     /* return file descriptor */
762     return (obj->qIndexToFd[queueIndex]);
765 /*
766  *  ======== TransportRpmsg_Factory_create ========
767  *  Create the transport instances
768  *
769  *  Attach to all remote processors. For now, must attach to
770  *  at least one to tolerate MessageQ_E_RESOURCE failures.
771  *
772  *  This function implements the IPC Factory interface, so it
773  *  returns Ipc status codes.
774  */
775 Int TransportRpmsg_Factory_create(Void)
777     Int status = Ipc_S_SUCCESS;
778     Int i;
779     UInt16 clusterSize;
780     TransportRpmsg_Handle *inst;
783     /* needed to enumerate processors in cluster */
784     clusterSize = MultiProc_getNumProcsInCluster();
786     /* allocate the instance array */
787     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
789     if (inst == NULL) {
790         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
791         status = Ipc_E_MEMORY;
792         goto done;
793     }
795     for (i = 0; i < clusterSize; i++) {
796         inst[i] = NULL;
797     }
799     TransportRpmsg_module->inst = inst;
801     /* counter event object for passing commands to dispatch thread */
802     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
804     if (TransportRpmsg_module->unblockEvent == -1) {
805         printf("create: unblock event failed: %d (%s)\n",
806                errno, strerror(errno));
807         status = Ipc_E_FAIL;
808         goto done;
809     }
811     PRINTVERBOSE1("create: created unblock event %d\n",
812             TransportRpmsg_module->unblockEvent)
814     /* semaphore event object for acknowledging client thread */
815     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
817     if (TransportRpmsg_module->waitEvent == -1) {
818         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
819         status = Ipc_E_FAIL;
820         goto done;
821     }
823     PRINTVERBOSE1("create: created wait event %d\n",
824             TransportRpmsg_module->waitEvent)
826     FD_ZERO(&TransportRpmsg_module->rfds);
827     FD_SET(TransportRpmsg_module->unblockEvent,
828             &TransportRpmsg_module->rfds);
829     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
830     TransportRpmsg_module->nInFds = 0;
832     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
834     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
835             &rpmsgThreadFxn, NULL);
837     if (status < 0) {
838         status = Ipc_E_FAIL;
839         printf("attach: failed to spawn thread\n");
840         goto done;
841     }
842     TransportRpmsg_module->threadStarted = TRUE;
844 done:
845     if (status < 0) {
846         TransportRpmsg_Factory_delete();
847     }
849     return (status);
852 /*
853  *  ======== TransportRpmsg_Factory_delete ========
854  *  Finalize the transport instances
855  */
856 Void TransportRpmsg_Factory_delete(Void)
858     uint64_t event;
861     /* shutdown the message dispatch thread */
862     if (TransportRpmsg_module->threadStarted) {
863         event = TransportRpmsg_Event_SHUTDOWN;
864         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
866         /* wait for dispatch thread to exit */
867         pthread_join(TransportRpmsg_module->threadId, NULL);
868     }
870     /* destroy the mutex object */
871     pthread_mutex_destroy(&TransportRpmsg_module->gate);
873     /* close the client wait event */
874     if (TransportRpmsg_module->waitEvent != -1) {
875         close(TransportRpmsg_module->waitEvent);
876         TransportRpmsg_module->waitEvent = -1;
877     }
879     /* close the dispatch thread unblock event */
880     if (TransportRpmsg_module->unblockEvent != -1) {
881         close(TransportRpmsg_module->unblockEvent);
882         TransportRpmsg_module->unblockEvent = -1;
883     }
885     /* free the instance handle array */
886     if (TransportRpmsg_module->inst != NULL) {
887         free(TransportRpmsg_module->inst);
888         TransportRpmsg_module->inst = NULL;
889     }
891     return;
894 /*
895  *  ======== TransportRpmsg_Factory_attach ========
896  */
897 Int TransportRpmsg_Factory_attach(UInt16 procId)
899     Int status = Ipc_S_SUCCESS;
900     UInt16 clusterId;
901     TransportRpmsg_Params params;
902     TransportRpmsg_Handle transport;
903     IMessageQTransport_Handle iMsgQTrans;
905     /* cannot attach to yourself */
906     if (MultiProc_self() == procId) {
907         status = Ipc_E_INVALIDARG;
908         goto done;
909     }
911     /* processor must be a member of the cluster */
912     clusterId = procId - MultiProc_getBaseIdOfCluster();
914     if (clusterId >= MultiProc_getNumProcsInCluster()) {
915         status = Ipc_E_INVALIDARG;
916         goto done;
917     }
919     /* create transport instance for given processor */
920     params.rprocId = procId;
921     transport = TransportRpmsg_create(&params);
923     if (transport == NULL) {
924         status = Ipc_E_FAIL;
925         goto done;
926     }
928     /* register transport instance with MessageQ */
929     iMsgQTrans = TransportRpmsg_upCast(transport);
930     TransportRpmsg_module->inst[clusterId] = transport;
931     MessageQ_registerTransport(iMsgQTrans, procId, 0);
933 done:
934     return (status);
937 /*
938  *  ======== TransportRpmsg_Factory_detach ========
939  */
940 Int TransportRpmsg_Factory_detach(UInt16 procId)
942     Int status = Ipc_S_SUCCESS;
943     UInt16 clusterId;
945     /* cannot detach from yourself */
946     if (MultiProc_self() == procId) {
947         status = Ipc_E_INVALIDARG;
948         goto done;
949     }
951     /* processor must be a member of the cluster */
952     clusterId = procId - MultiProc_getBaseIdOfCluster();
954     if (clusterId >= MultiProc_getNumProcsInCluster()) {
955         status = Ipc_E_INVALIDARG;
956         goto done;
957     }
959     /* must be attached in order to detach */
960     if (TransportRpmsg_module->inst[clusterId] == NULL) {
961         status = Ipc_E_INVALIDSTATE;
962         goto done;
963     }
965     /* unregister from MessageQ, delete the transport instance */
966     MessageQ_unregisterTransport(procId, 0);
967     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
969 done:
970     return (status);