Android: Added a definition for EFD_SEMAPHORE
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h>         /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 #  define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT       61
70 #define MESSAGEQ_RPMSG_MAXSIZE   512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK        (1 << 0)
76 #define TransportRpmsg_Event_PAUSE      (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92     int             sock[MultiProc_MAXPROCESSORS];
93     fd_set          rfds;
94     int             maxFd;
95     int             inFds[1024];
96     int             nInFds;
97     pthread_mutex_t gate;
98     int             unblockEvent;    /* unblock the dispatch thread */
99     int             waitEvent;       /* block the client thread */
100     pthread_t       threadId;        /* ID returned by pthread_create() */
101     Bool            threadStarted;
103     TransportRpmsg_Handle *inst;    /* array of instances */
104 } TransportRpmsg_Module;
106 IMessageQTransport_Fxns TransportRpmsg_fxns = {
107     .bind    = TransportRpmsg_bind,
108     .unbind  = TransportRpmsg_unbind,
109     .put     = TransportRpmsg_put
110 };
112 typedef struct TransportRpmsg_Object {
113     IMessageQTransport_Object base;
114     Int status;
115     UInt16 rprocId;
116     int numQueues;
117     int *qIndexToFd;
118 } TransportRpmsg_Object;
120 TransportRpmsg_Module TransportRpmsg_state = {
121     .sock = {INVALIDSOCKET},
122     .unblockEvent = -1,
123     .waitEvent = -1,
124     .threadStarted = FALSE,
125     .inst = NULL
126 };
127 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
129 static void *rpmsgThreadFxn(void *arg);
130 static Int transportGet(int sock, MessageQ_Msg *retMsg);
131 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
132                                Int fd,
133                                UInt16 qIndex);
134 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
135 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
137 /* factory functions */
138 Int TransportRpmsg_Factory_create(Void);
139 Void TransportRpmsg_Factory_delete(Void);
140 Int TransportRpmsg_Factory_attach(UInt16 procId);
141 Int TransportRpmsg_Factory_detach(UInt16 procId);
143 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
144     .createFxn = TransportRpmsg_Factory_create,
145     .deleteFxn = TransportRpmsg_Factory_delete,
146     .attachFxn = TransportRpmsg_Factory_attach,
147     .detachFxn = TransportRpmsg_Factory_detach
148 };
150 /* -------------------------------------------------------------------------- */
152 /* instance convertors */
153 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
155     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
156     return ((IMessageQTransport_Handle)&obj->base);
159 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
161     return ((TransportRpmsg_Handle)base);
164 /*
165  *  ======== TransportRpmsg_create ========
166  */
167 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
169     Int status = MessageQ_S_SUCCESS;
170     TransportRpmsg_Object *obj = NULL;
171     int sock;
172     UInt16 clusterId;
175     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
177     /* create socket for sending messages to remote processor */
178     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
180     if (sock < 0) {
181         status = Ipc_E_FAIL;
182         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
183                 strerror(errno));
184         goto done;
185     }
186     TransportRpmsg_module->sock[clusterId] = sock;
187     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
189     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
191     if (status < 0) {
192         status = Ipc_E_FAIL;
193         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
194                 errno, strerror(errno), params->rprocId);
195         goto done;
196     }
198     /* create the instance object */
199     obj = calloc(1, sizeof(TransportRpmsg_Object));
201     if (obj == NULL) {
202         status = Ipc_E_MEMORY;
203         goto done;
204     }
206     /* initialize the instance */
207     obj->base.base.interfaceType = IMessageQTransport_TypeId;
208     obj->base.fxns = &TransportRpmsg_fxns;
209     obj->rprocId = params->rprocId;
210     obj->numQueues = TransportRpmsg_GROWSIZE;
212     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
214     if (obj->qIndexToFd == NULL) {
215         status = Ipc_E_MEMORY;
216         goto done;
217     }
219 done:
220     if (status < 0) {
221         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
222     }
224     return (TransportRpmsg_Handle)obj;
227 /*
228  *  ======== TransportRpmsg_delete ========
229  */
230 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
232     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
233     UInt16 clusterId;
234     int sock;
237     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
239     /* close the socket for the given transport instance */
240     sock = TransportRpmsg_module->sock[clusterId];
241     if (sock != INVALIDSOCKET) {
242         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
243         close(sock);
244     }
245     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
247     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
248         free(obj->qIndexToFd);
249         obj->qIndexToFd = NULL;
250     }
252     if (obj != NULL) {
253         free(obj);
254         obj = NULL;
255     }
257     *pHandle = NULL;
260 /*
261  *  ======== TransportRpmsg_bind ========
262  */
263 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
265     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
266     UInt16 queuePort = queueId & 0x0000ffff;
267     int fd;
268     int err;
269     uint64_t event;
270     UInt16 rprocId;
271     pthread_t tid;
273     tid = pthread_self();
274     rprocId = obj->rprocId;
276     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
277             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
279     /*  Create the socket to receive messages for this messageQ. */
280     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
281     if (fd < 0) {
282         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
283                 errno, strerror(errno));
284         return (MessageQ_E_OSFAILURE);
285     }
286     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
287             (unsigned int)tid);
289     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
290     if (err < 0) {
291         /* don't hard-printf since this is no longer fatal */
292         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
293                       errno, strerror(errno));
294         close(fd);
295         return (MessageQ_E_OSFAILURE);
296     }
298     pthread_mutex_lock(&TransportRpmsg_module->gate);
300     /*  pause the dispatch thread */
301     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
302             (unsigned int)tid);
303     event = TransportRpmsg_Event_PAUSE;
304     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
306     /* wait for ACK event */
307     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
308     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
309             (int)event, (unsigned int)tid);
311     /* add to our fat fd array and update select() parameters */
312     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
313     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
314     FD_SET(fd, &TransportRpmsg_module->rfds);
315     bindFdToQueueIndex(obj, fd, queuePort);
317     /* release the dispatch thread */
318     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
319             (unsigned int)tid);
320     event = TransportRpmsg_Event_CONTINUE;
321     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
323     pthread_mutex_unlock(&TransportRpmsg_module->gate);
325     return (MessageQ_S_SUCCESS);
328 /*
329  *  ======== TransportRpmsg_unbind ========
330  */
331 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
333     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
334     UInt16 queuePort = queueId & 0x0000ffff;
335     uint64_t event;
336     Int    status = MessageQ_S_SUCCESS;
337     int    maxFd;
338     int    fd;
339     int    i;
340     int    j;
342     pthread_mutex_lock(&TransportRpmsg_module->gate);
344     /*  pause the dispatch thread */
345     event = TransportRpmsg_Event_PAUSE;
346     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
348     /* wait for ACK event */
349     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
351     /* retrieve file descriptor for the given queue port */
352     fd = queueIndexToFd(obj, queuePort);
353     if (!fd) {
354         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
355                 queueId);
356         status = MessageQ_E_INVALIDARG;
357         goto done;
358     }
359     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
361     /* guarenteed to work because queueIndexToFd above succeeded */
362     unbindQueueIndex(obj, queuePort);
364     /* remove from input fd array */
365     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
366         if (TransportRpmsg_module->inFds[i] == fd) {
367             TransportRpmsg_module->nInFds--;
369             /* shift subsequent elements down */
370             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
371                 TransportRpmsg_module->inFds[j] =
372                         TransportRpmsg_module->inFds[j + 1];
373             }
374             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
375             break;
376         }
377     }
379     /* remove fd from the descriptor set, compute new max value */
380     FD_CLR(fd, &TransportRpmsg_module->rfds);
381     if (fd == TransportRpmsg_module->maxFd) {
382         /* find new max fd */
383         maxFd = TransportRpmsg_module->unblockEvent;
384         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
385             maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
386         }
387         TransportRpmsg_module->maxFd = maxFd;
388     }
390     close(fd);
392     /* release the dispatch thread */
393     event = TransportRpmsg_Event_CONTINUE;
394     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
396 done:
397     pthread_mutex_unlock(&TransportRpmsg_module->gate);
399     return (status);
402 /*
403  *  ======== TransportRpmsg_put ========
404  */
405 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
407     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
408     Int     status    = TRUE;
409     int     sock;
410     int     err;
411     UInt16  clusterId;
413     /*
414      * Retrieve the socket for the AF_SYSLINK protocol associated with this
415      * transport.
416      */
417     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
418     sock = TransportRpmsg_module->sock[clusterId];
419     if (!sock) {
420         return FALSE;
421     }
423     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
425     err = send(sock, msg, msg->msgSize, 0);
426     if (err < 0) {
427         printf("TransportRpmsg_put: send failed: %d (%s)\n",
428                errno, strerror(errno));
429         status = FALSE;
431         goto exit;
432     }
434     /*
435      * Free the message, as this is a copy transport, we maintain MessageQ
436      * semantics.
437      */
438     MessageQ_free(msg);
440 exit:
441     return status;
444 /*
445  *  ======== TransportRpmsg_control ========
446  */
447 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
449     return FALSE;
452 /*
453  *  ======== rpmsgThreadFxn ========
454  */
455 void *rpmsgThreadFxn(void *arg)
457     Int      status = MessageQ_S_SUCCESS;
458     Int      tmpStatus;
459     int      retval;
460     uint64_t event;
461     fd_set   rfds;
462     int      maxFd;
463     int      nfds;
464     MessageQ_Msg     retMsg;
465     MessageQ_QueueId queueId;
466     Bool run = TRUE;
467     int i;
468     int fd;
471     while (run) {
472         maxFd = TransportRpmsg_module->maxFd;
473         rfds = TransportRpmsg_module->rfds;
474         nfds = TransportRpmsg_module->nInFds;
476         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
477                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
479         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
481         /* if error, try again */
482         if (retval < 0) {
483             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
484             continue;
485         }
487         /* dispatch all pending messages, do this first */
488         for (i = 0; i < nfds; i++) {
489             fd = TransportRpmsg_module->inFds[i];
491             if (FD_ISSET(fd, &rfds)) {
492                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
493                         TransportRpmsg_module->inFds[i]);
495                 /* transport input fd was signalled: get the message */
496                 tmpStatus = transportGet(fd, &retMsg);
497                 if (tmpStatus < 0) {
498                     printf("rpmsgThreadFxn: transportGet failed\n");
499                 }
500                 else {
501                     queueId = MessageQ_getDstQueue(retMsg);
502                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
503                             "delivering to queueId 0x%x\n", queueId)
504                     MessageQ_put(queueId, retMsg);
505                 }
506             }
507         }
509         /* check for events */
510         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
512             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
514             do {
515                 if (event & TransportRpmsg_Event_SHUTDOWN) {
516                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
517                     run = FALSE;
518                     break; /* highest priority, stop processing events */
519                 }
520                 if (event & TransportRpmsg_Event_CONTINUE) {
521                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
522                             (int)event);
523                     event &= ~TransportRpmsg_Event_CONTINUE;
524                 }
525                 if (event & TransportRpmsg_Event_PAUSE) {
526                     /*  Our event was signalled by TransportRpmsg_bind()
527                      *  or TransportRpmsg_unbind() to tell us that the set
528                      *  of file descriptors has changed.
529                      */
530                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
531                     /* send the acknowledgement */
532                     event = TransportRpmsg_Event_ACK;
533                     write(TransportRpmsg_module->waitEvent, &event,
534                             sizeof(event));
535                     /* now wait to be released */
536                     read(TransportRpmsg_module->unblockEvent, &event,
537                             sizeof(event));
538                 }
539             } while (event != 0);
540         }
541     }
543     return (void *)status;
546 /*
547  * ======== transportGet ========
548  *  Retrieve a message waiting in the socket's queue.
549  */
550 static Int transportGet(int sock, MessageQ_Msg *retMsg)
552     Int           status    = MessageQ_S_SUCCESS;
553     MessageQ_Msg  msg;
554     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
555     unsigned int  len;
556     int           byteCount;
558     /*
559      * We have no way of peeking to see what message size we'll get, so we
560      * allocate a message of max size to receive contents from the rpmsg socket
561      * (currently, a copy transport)
562      */
563     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
564     if (!msg) {
565         status = MessageQ_E_MEMORY;
566         goto exit;
567     }
569     memset(&fromAddr, 0, sizeof (fromAddr));
570     len = sizeof (fromAddr);
572     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
573                          (struct sockaddr *)&fromAddr, &len);
574     if (len != sizeof (fromAddr)) {
575         printf("recvfrom: got bad addr len (%d)\n", len);
576         status = MessageQ_E_FAIL;
577         goto exit;
578     }
579     if (byteCount < 0) {
580         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
581         status = MessageQ_E_FAIL;
582         goto exit;
583     }
584     else {
585          /*
586           * Update the allocated message size (even though this may waste
587           * space when the actual message is smaller than the maximum rpmsg
588           * size, the message will be freed soon anyway, and it avoids an
589           * extra copy).
590           */
591          msg->msgSize = byteCount;
593          /*
594           * If the message received was statically allocated, reset the
595           * heapId, so the app can free it.
596           */
597          if (msg->heapId == MessageQ_STATICMSG)  {
598              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
599          }
600     }
602     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
603     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
604             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
605     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
606             msg->msgSize)
608     *retMsg = msg;
610 exit:
611     return status;
614 /*
615  *  ======== bindFdToQueueIndex ========
616  *
617  *  Precondition: caller must be inside the module gate
618  */
619 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
621     Int *queues;
622     Int *oldQueues;
623     UInt oldSize;
624     UInt queueIndex;
626     /* subtract port offset from queue index */
627     queueIndex = queuePort - MessageQ_PORTOFFSET;
629     if (queueIndex >= obj->numQueues) {
630         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
631                 queueIndex + TransportRpmsg_GROWSIZE)
633         /* allocate larger table */
634         oldSize = obj->numQueues * sizeof (Int);
635         queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
637         /* copy contents from old table int new table */
638         memcpy(queues, obj->qIndexToFd, oldSize);
640         /* swap in new table, delete old table */
641         oldQueues = obj->qIndexToFd;
642         obj->qIndexToFd = queues;
643         obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
644         free(oldQueues);
645     }
647     /* add new entry */
648     obj->qIndexToFd[queueIndex] = fd;
651 /*
652  *  ======== unbindQueueIndex ========
653  *
654  *  Precondition: caller must be inside the module gate
655  */
656 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
658     UInt queueIndex;
660     /* subtract port offset from queue index */
661     queueIndex = queuePort - MessageQ_PORTOFFSET;
663     /* clear table entry */
664     obj->qIndexToFd[queueIndex] = -1;
667 /*
668  *  ======== queueIndexToFd ========
669  *
670  *  Precondition: caller must be inside the module gate
671  */
672 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
674     UInt queueIndex;
676     /* subtract port offset from queue index */
677     queueIndex = queuePort - MessageQ_PORTOFFSET;
679     /* return file descriptor */
680     return (obj->qIndexToFd[queueIndex]);
683 /*
684  *  ======== TransportRpmsg_Factory_create ========
685  *  Create the transport instances
686  *
687  *  Attach to all remote processors. For now, must attach to
688  *  at least one to tolerate MessageQ_E_RESOURCE failures.
689  *
690  *  This function implements the IPC Factory interface, so it
691  *  returns Ipc status codes.
692  */
693 Int TransportRpmsg_Factory_create(Void)
695     Int status = Ipc_S_SUCCESS;
696     Int i;
697     UInt16 clusterSize;
698     TransportRpmsg_Handle *inst;
701     /* needed to enumerate processors in cluster */
702     clusterSize = MultiProc_getNumProcsInCluster();
704     /* allocate the instance array */
705     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
707     if (inst == NULL) {
708         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
709         status = Ipc_E_MEMORY;
710         goto done;
711     }
713     for (i = 0; i < clusterSize; i++) {
714         inst[i] = NULL;
715     }
717     TransportRpmsg_module->inst = inst;
719     /* counter event object for passing commands to dispatch thread */
720     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
722     if (TransportRpmsg_module->unblockEvent == -1) {
723         printf("create: unblock event failed: %d (%s)\n",
724                errno, strerror(errno));
725         status = Ipc_E_FAIL;
726         goto done;
727     }
729     PRINTVERBOSE1("create: created unblock event %d\n",
730             TransportRpmsg_module->unblockEvent)
732     /* semaphore event object for acknowledging client thread */
733     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
735     if (TransportRpmsg_module->waitEvent == -1) {
736         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
737         status = Ipc_E_FAIL;
738         goto done;
739     }
741     PRINTVERBOSE1("create: created wait event %d\n",
742             TransportRpmsg_module->waitEvent)
744     FD_ZERO(&TransportRpmsg_module->rfds);
745     FD_SET(TransportRpmsg_module->unblockEvent,
746             &TransportRpmsg_module->rfds);
747     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
748     TransportRpmsg_module->nInFds = 0;
750     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
752     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
753             &rpmsgThreadFxn, NULL);
755     if (status < 0) {
756         status = Ipc_E_FAIL;
757         printf("attach: failed to spawn thread\n");
758         goto done;
759     }
760     TransportRpmsg_module->threadStarted = TRUE;
762 done:
763     if (status < 0) {
764         TransportRpmsg_Factory_delete();
765     }
767     return (status);
770 /*
771  *  ======== TransportRpmsg_Factory_delete ========
772  *  Finalize the transport instances
773  */
774 Void TransportRpmsg_Factory_delete(Void)
776     uint64_t event;
779     /* shutdown the message dispatch thread */
780     if (TransportRpmsg_module->threadStarted) {
781         event = TransportRpmsg_Event_SHUTDOWN;
782         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
784         /* wait for dispatch thread to exit */
785         pthread_join(TransportRpmsg_module->threadId, NULL);
786     }
788     /* destroy the mutex object */
789     pthread_mutex_destroy(&TransportRpmsg_module->gate);
791     /* close the client wait event */
792     if (TransportRpmsg_module->waitEvent != -1) {
793         close(TransportRpmsg_module->waitEvent);
794         TransportRpmsg_module->waitEvent = -1;
795     }
797     /* close the dispatch thread unblock event */
798     if (TransportRpmsg_module->unblockEvent != -1) {
799         close(TransportRpmsg_module->unblockEvent);
800         TransportRpmsg_module->unblockEvent = -1;
801     }
803     /* free the instance handle array */
804     if (TransportRpmsg_module->inst != NULL) {
805         free(TransportRpmsg_module->inst);
806         TransportRpmsg_module->inst = NULL;
807     }
809     return;
812 /*
813  *  ======== TransportRpmsg_Factory_attach ========
814  */
815 Int TransportRpmsg_Factory_attach(UInt16 procId)
817     Int status = Ipc_S_SUCCESS;
818     UInt16 clusterId;
819     TransportRpmsg_Params params;
820     TransportRpmsg_Handle transport;
821     IMessageQTransport_Handle iMsgQTrans;
823     /* cannot attach to yourself */
824     if (MultiProc_self() == procId) {
825         status = Ipc_E_INVALIDARG;
826         goto done;
827     }
829     /* processor must be a member of the cluster */
830     clusterId = procId - MultiProc_getBaseIdOfCluster();
832     if (clusterId >= MultiProc_getNumProcsInCluster()) {
833         status = Ipc_E_INVALIDARG;
834         goto done;
835     }
837     /* create transport instance for given processor */
838     params.rprocId = procId;
839     transport = TransportRpmsg_create(&params);
841     if (transport == NULL) {
842         status = Ipc_E_FAIL;
843         goto done;
844     }
846     /* register transport instance with MessageQ */
847     iMsgQTrans = TransportRpmsg_upCast(transport);
848     MessageQ_registerTransport(iMsgQTrans, procId, 0);
849     TransportRpmsg_module->inst[clusterId] = transport;
851 done:
852     return (status);
855 /*
856  *  ======== TransportRpmsg_Factory_detach ========
857  */
858 Int TransportRpmsg_Factory_detach(UInt16 procId)
860     Int status = Ipc_S_SUCCESS;
861     UInt16 clusterId;
863     /* cannot detach from yourself */
864     if (MultiProc_self() == procId) {
865         status = Ipc_E_INVALIDARG;
866         goto done;
867     }
869     /* processor must be a member of the cluster */
870     clusterId = procId - MultiProc_getBaseIdOfCluster();
872     if (clusterId >= MultiProc_getNumProcsInCluster()) {
873         status = Ipc_E_INVALIDARG;
874         goto done;
875     }
877     /* must be attached in order to detach */
878     if (TransportRpmsg_module->inst[clusterId] == NULL) {
879         status = Ipc_E_INVALIDSTATE;
880         goto done;
881     }
883     /* unregister from MessageQ, delete the transport instance */
884     MessageQ_unregisterTransport(procId, 0);
885     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
887 done:
888     return (status);