SDOCM00115373 NameServer local get methods are missing on Linux
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
49 /* Socket Protocol Family */
50 #include <net/rpmsg.h>
53 /* IPC headers */
54 #include <ti/ipc/Std.h>
55 #include <SocketFxns.h>         /* Socket utils: */
56 #include <ti/ipc/Ipc.h>
57 #include <ti/ipc/MessageQ.h>
58 #include <ti/ipc/MultiProc.h>
59 #include <ti/ipc/transports/TransportRpmsg.h>
60 #include <_MessageQ.h>
61 #include <_lad.h>
63 /* More magic rpmsg port numbers: */
64 #define MESSAGEQ_RPMSG_PORT       61
65 #define MESSAGEQ_RPMSG_MAXSIZE   512
67 #define TransportRpmsg_GROWSIZE 32
68 #define INVALIDSOCKET (-1)
70 #define TransportRpmsg_Event_ACK        (1 << 0)
71 #define TransportRpmsg_Event_PAUSE      (1 << 1)
72 #define TransportRpmsg_Event_CONTINUE   (1 << 2)
73 #define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
76 #define _MAX(a,b) (((a)>(b))?(a):(b))
78 /* traces in this file are controlled via _TransportMessageQ_verbose */
79 Bool _TransportMessageQ_verbose = FALSE;
80 #define verbose _TransportMessageQ_verbose
82 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
83 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
84 Bool TransportRpmsg_put(Void *handle, Ptr msg);
86 typedef struct TransportRpmsg_Module {
87     int             sock[MultiProc_MAXPROCESSORS];
88     fd_set          rfds;
89     int             maxFd;
90     int             inFds[1024];
91     int             nInFds;
92     pthread_mutex_t gate;
93     int             unblockEvent;    /* unblock the dispatch thread */
94     int             waitEvent;       /* block the client thread */
95     pthread_t       threadId;        /* ID returned by pthread_create() */
96     Bool            threadStarted;
98     TransportRpmsg_Handle *inst;    /* array of instances */
99 } TransportRpmsg_Module;
101 IMessageQTransport_Fxns TransportRpmsg_fxns = {
102     .bind    = TransportRpmsg_bind,
103     .unbind  = TransportRpmsg_unbind,
104     .put     = TransportRpmsg_put
105 };
107 typedef struct TransportRpmsg_Object {
108     IMessageQTransport_Object base;
109     Int status;
110     UInt16 rprocId;
111     int numQueues;
112     int *qIndexToFd;
113 } TransportRpmsg_Object;
115 TransportRpmsg_Module TransportRpmsg_state = {
116     .sock = {INVALIDSOCKET},
117     .unblockEvent = -1,
118     .waitEvent = -1,
119     .threadStarted = FALSE,
120     .inst = NULL
121 };
122 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
124 static void *rpmsgThreadFxn(void *arg);
125 static Int transportGet(int sock, MessageQ_Msg *retMsg);
126 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
127                                Int fd,
128                                UInt16 qIndex);
129 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
130 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
132 /* factory functions */
133 Int TransportRpmsg_Factory_create(Void);
134 Void TransportRpmsg_Factory_delete(Void);
135 Int TransportRpmsg_Factory_attach(UInt16 procId);
136 Int TransportRpmsg_Factory_detach(UInt16 procId);
138 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
139     .createFxn = TransportRpmsg_Factory_create,
140     .deleteFxn = TransportRpmsg_Factory_delete,
141     .attachFxn = TransportRpmsg_Factory_attach,
142     .detachFxn = TransportRpmsg_Factory_detach
143 };
145 /* -------------------------------------------------------------------------- */
147 /* instance convertors */
148 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
150     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
151     return ((IMessageQTransport_Handle)&obj->base);
154 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
156     return ((TransportRpmsg_Handle)base);
159 /*
160  *  ======== TransportRpmsg_create ========
161  */
162 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
164     Int status = MessageQ_S_SUCCESS;
165     TransportRpmsg_Object *obj = NULL;
166     int sock;
167     UInt16 clusterId;
170     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
172     /* create socket for sending messages to remote processor */
173     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
175     if (sock < 0) {
176         status = Ipc_E_FAIL;
177         printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
178                 strerror(errno));
179         goto done;
180     }
181     TransportRpmsg_module->sock[clusterId] = sock;
182     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
184     status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
186     if (status < 0) {
187         status = Ipc_E_FAIL;
188         printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
189                 errno, strerror(errno), params->rprocId);
190         goto done;
191     }
193     /* create the instance object */
194     obj = calloc(1, sizeof(TransportRpmsg_Object));
196     if (obj == NULL) {
197         status = Ipc_E_MEMORY;
198         goto done;
199     }
201     /* initialize the instance */
202     obj->base.base.interfaceType = IMessageQTransport_TypeId;
203     obj->base.fxns = &TransportRpmsg_fxns;
204     obj->rprocId = params->rprocId;
205     obj->numQueues = TransportRpmsg_GROWSIZE;
207     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
209     if (obj->qIndexToFd == NULL) {
210         status = Ipc_E_MEMORY;
211         goto done;
212     }
214 done:
215     if (status < 0) {
216         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
217     }
219     return (TransportRpmsg_Handle)obj;
222 /*
223  *  ======== TransportRpmsg_delete ========
224  */
225 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
227     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
228     UInt16 clusterId;
229     int sock;
232     clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
234     /* close the socket for the given transport instance */
235     sock = TransportRpmsg_module->sock[clusterId];
236     if (sock != INVALIDSOCKET) {
237         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
238         close(sock);
239     }
240     TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
242     if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
243         free(obj->qIndexToFd);
244         obj->qIndexToFd = NULL;
245     }
247     if (obj != NULL) {
248         free(obj);
249         obj = NULL;
250     }
252     *pHandle = NULL;
255 /*
256  *  ======== TransportRpmsg_bind ========
257  */
258 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
260     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
261     UInt16 queuePort = queueId & 0x0000ffff;
262     int fd;
263     int err;
264     uint64_t event;
265     UInt16 rprocId;
266     pthread_t tid;
268     tid = pthread_self();
269     rprocId = obj->rprocId;
271     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
272             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
274     /*  Create the socket to receive messages for this messageQ. */
275     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
276     if (fd < 0) {
277         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
278                 errno, strerror(errno));
279         return (MessageQ_E_OSFAILURE);
280     }
281     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
282             (unsigned int)tid);
284     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
285     if (err < 0) {
286         /* don't hard-printf since this is no longer fatal */
287         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
288                       errno, strerror(errno));
289         close(fd);
290         return (MessageQ_E_OSFAILURE);
291     }
293     pthread_mutex_lock(&TransportRpmsg_module->gate);
295     /*  pause the dispatch thread */
296     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
297             (unsigned int)tid);
298     event = TransportRpmsg_Event_PAUSE;
299     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
301     /* wait for ACK event */
302     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
303     PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
304             (int)event, (unsigned int)tid);
306     /* add to our fat fd array and update select() parameters */
307     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
308     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
309     FD_SET(fd, &TransportRpmsg_module->rfds);
310     bindFdToQueueIndex(obj, fd, queuePort);
312     /* release the dispatch thread */
313     PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
314             (unsigned int)tid);
315     event = TransportRpmsg_Event_CONTINUE;
316     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
318     pthread_mutex_unlock(&TransportRpmsg_module->gate);
320     return (MessageQ_S_SUCCESS);
323 /*
324  *  ======== TransportRpmsg_unbind ========
325  */
326 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
328     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
329     UInt16 queuePort = queueId & 0x0000ffff;
330     uint64_t event;
331     Int    status = MessageQ_S_SUCCESS;
332     int    maxFd;
333     int    fd;
334     int    i;
335     int    j;
337     pthread_mutex_lock(&TransportRpmsg_module->gate);
339     /*  pause the dispatch thread */
340     event = TransportRpmsg_Event_PAUSE;
341     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
343     /* wait for ACK event */
344     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
346     /* retrieve file descriptor for the given queue port */
347     fd = queueIndexToFd(obj, queuePort);
348     if (!fd) {
349         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
350                 queueId);
351         status = MessageQ_E_INVALIDARG;
352         goto done;
353     }
354     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
356     /* guarenteed to work because queueIndexToFd above succeeded */
357     unbindQueueIndex(obj, queuePort);
359     /* remove from input fd array */
360     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
361         if (TransportRpmsg_module->inFds[i] == fd) {
362             TransportRpmsg_module->nInFds--;
364             /* shift subsequent elements down */
365             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
366                 TransportRpmsg_module->inFds[j] =
367                         TransportRpmsg_module->inFds[j + 1];
368             }
369             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
370             break;
371         }
372     }
374     /* remove fd from the descriptor set, compute new max value */
375     FD_CLR(fd, &TransportRpmsg_module->rfds);
376     if (fd == TransportRpmsg_module->maxFd) {
377         /* find new max fd */
378         maxFd = TransportRpmsg_module->unblockEvent;
379         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
380             maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
381         }
382         TransportRpmsg_module->maxFd = maxFd;
383     }
385     close(fd);
387     /* release the dispatch thread */
388     event = TransportRpmsg_Event_CONTINUE;
389     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
391 done:
392     pthread_mutex_unlock(&TransportRpmsg_module->gate);
394     return (status);
397 /*
398  *  ======== TransportRpmsg_put ========
399  */
400 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
402     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
403     Int     status    = TRUE;
404     int     sock;
405     int     err;
406     UInt16  clusterId;
408     /*
409      * Retrieve the socket for the AF_SYSLINK protocol associated with this
410      * transport.
411      */
412     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
413     sock = TransportRpmsg_module->sock[clusterId];
414     if (!sock) {
415         return FALSE;
416     }
418     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
420     err = send(sock, msg, msg->msgSize, 0);
421     if (err < 0) {
422         printf("TransportRpmsg_put: send failed: %d (%s)\n",
423                errno, strerror(errno));
424         status = FALSE;
426         goto exit;
427     }
429     /*
430      * Free the message, as this is a copy transport, we maintain MessageQ
431      * semantics.
432      */
433     MessageQ_free(msg);
435 exit:
436     return status;
439 /*
440  *  ======== TransportRpmsg_control ========
441  */
442 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
444     return FALSE;
447 /*
448  *  ======== rpmsgThreadFxn ========
449  */
450 void *rpmsgThreadFxn(void *arg)
452     Int      status = MessageQ_S_SUCCESS;
453     Int      tmpStatus;
454     int      retval;
455     uint64_t event;
456     fd_set   rfds;
457     int      maxFd;
458     int      nfds;
459     MessageQ_Msg     retMsg;
460     MessageQ_QueueId queueId;
461     Bool run = TRUE;
462     int i;
463     int fd;
466     while (run) {
467         maxFd = TransportRpmsg_module->maxFd;
468         rfds = TransportRpmsg_module->rfds;
469         nfds = TransportRpmsg_module->nInFds;
471         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
472                 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
474         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
476         /* if error, try again */
477         if (retval < 0) {
478             printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
479             continue;
480         }
482         /* dispatch all pending messages, do this first */
483         for (i = 0; i < nfds; i++) {
484             fd = TransportRpmsg_module->inFds[i];
486             if (FD_ISSET(fd, &rfds)) {
487                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
488                         TransportRpmsg_module->inFds[i]);
490                 /* transport input fd was signalled: get the message */
491                 tmpStatus = transportGet(fd, &retMsg);
492                 if (tmpStatus < 0) {
493                     printf("rpmsgThreadFxn: transportGet failed\n");
494                 }
495                 else {
496                     queueId = MessageQ_getDstQueue(retMsg);
497                     PRINTVERBOSE1("rpmsgThreadFxn: got message, "
498                             "delivering to queueId 0x%x\n", queueId)
499                     MessageQ_put(queueId, retMsg);
500                 }
501             }
502         }
504         /* check for events */
505         if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
507             read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
509             do {
510                 if (event & TransportRpmsg_Event_SHUTDOWN) {
511                     PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
512                     run = FALSE;
513                     break; /* highest priority, stop processing events */
514                 }
515                 if (event & TransportRpmsg_Event_CONTINUE) {
516                     PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
517                             (int)event);
518                     event &= ~TransportRpmsg_Event_CONTINUE;
519                 }
520                 if (event & TransportRpmsg_Event_PAUSE) {
521                     /*  Our event was signalled by TransportRpmsg_bind()
522                      *  or TransportRpmsg_unbind() to tell us that the set
523                      *  of file descriptors has changed.
524                      */
525                     PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
526                     /* send the acknowledgement */
527                     event = TransportRpmsg_Event_ACK;
528                     write(TransportRpmsg_module->waitEvent, &event,
529                             sizeof(event));
530                     /* now wait to be released */
531                     read(TransportRpmsg_module->unblockEvent, &event,
532                             sizeof(event));
533                 }
534             } while (event != 0);
535         }
536     }
538     return (void *)status;
541 /*
542  * ======== transportGet ========
543  *  Retrieve a message waiting in the socket's queue.
544  */
545 static Int transportGet(int sock, MessageQ_Msg *retMsg)
547     Int           status    = MessageQ_S_SUCCESS;
548     MessageQ_Msg  msg;
549     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
550     unsigned int  len;
551     int           byteCount;
553     /*
554      * We have no way of peeking to see what message size we'll get, so we
555      * allocate a message of max size to receive contents from the rpmsg socket
556      * (currently, a copy transport)
557      */
558     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
559     if (!msg) {
560         status = MessageQ_E_MEMORY;
561         goto exit;
562     }
564     memset(&fromAddr, 0, sizeof (fromAddr));
565     len = sizeof (fromAddr);
567     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
568                          (struct sockaddr *)&fromAddr, &len);
569     if (len != sizeof (fromAddr)) {
570         printf("recvfrom: got bad addr len (%d)\n", len);
571         status = MessageQ_E_FAIL;
572         goto exit;
573     }
574     if (byteCount < 0) {
575         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
576         status = MessageQ_E_FAIL;
577         goto exit;
578     }
579     else {
580          /*
581           * Update the allocated message size (even though this may waste
582           * space when the actual message is smaller than the maximum rpmsg
583           * size, the message will be freed soon anyway, and it avoids an
584           * extra copy).
585           */
586          msg->msgSize = byteCount;
588          /*
589           * If the message received was statically allocated, reset the
590           * heapId, so the app can free it.
591           */
592          if (msg->heapId == MessageQ_STATICMSG)  {
593              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
594          }
595     }
597     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
598     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
599             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
600     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
601             msg->msgSize)
603     *retMsg = msg;
605 exit:
606     return status;
609 /*
610  *  ======== bindFdToQueueIndex ========
611  *
612  *  Precondition: caller must be inside the module gate
613  */
614 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
616     Int *queues;
617     Int *oldQueues;
618     UInt oldSize;
619     UInt queueIndex;
621     /* subtract port offset from queue index */
622     queueIndex = queuePort - MessageQ_PORTOFFSET;
624     if (queueIndex >= obj->numQueues) {
625         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
626                 queueIndex + TransportRpmsg_GROWSIZE)
628         /* allocate larger table */
629         oldSize = obj->numQueues * sizeof (Int);
630         queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
632         /* copy contents from old table int new table */
633         memcpy(queues, obj->qIndexToFd, oldSize);
635         /* swap in new table, delete old table */
636         oldQueues = obj->qIndexToFd;
637         obj->qIndexToFd = queues;
638         obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
639         free(oldQueues);
640     }
642     /* add new entry */
643     obj->qIndexToFd[queueIndex] = fd;
646 /*
647  *  ======== unbindQueueIndex ========
648  *
649  *  Precondition: caller must be inside the module gate
650  */
651 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
653     UInt queueIndex;
655     /* subtract port offset from queue index */
656     queueIndex = queuePort - MessageQ_PORTOFFSET;
658     /* clear table entry */
659     obj->qIndexToFd[queueIndex] = -1;
662 /*
663  *  ======== queueIndexToFd ========
664  *
665  *  Precondition: caller must be inside the module gate
666  */
667 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
669     UInt queueIndex;
671     /* subtract port offset from queue index */
672     queueIndex = queuePort - MessageQ_PORTOFFSET;
674     /* return file descriptor */
675     return (obj->qIndexToFd[queueIndex]);
678 /*
679  *  ======== TransportRpmsg_Factory_create ========
680  *  Create the transport instances
681  *
682  *  Attach to all remote processors. For now, must attach to
683  *  at least one to tolerate MessageQ_E_RESOURCE failures.
684  *
685  *  This function implements the IPC Factory interface, so it
686  *  returns Ipc status codes.
687  */
688 Int TransportRpmsg_Factory_create(Void)
690     Int status = Ipc_S_SUCCESS;
691     Int i;
692     UInt16 clusterSize;
693     TransportRpmsg_Handle *inst;
696     /* needed to enumerate processors in cluster */
697     clusterSize = MultiProc_getNumProcsInCluster();
699     /* allocate the instance array */
700     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
702     if (inst == NULL) {
703         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
704         status = Ipc_E_MEMORY;
705         goto done;
706     }
708     for (i = 0; i < clusterSize; i++) {
709         inst[i] = NULL;
710     }
712     TransportRpmsg_module->inst = inst;
714     /* counter event object for passing commands to dispatch thread */
715     TransportRpmsg_module->unblockEvent = eventfd(0, 0);
717     if (TransportRpmsg_module->unblockEvent == -1) {
718         printf("create: unblock event failed: %d (%s)\n",
719                errno, strerror(errno));
720         status = Ipc_E_FAIL;
721         goto done;
722     }
724     PRINTVERBOSE1("create: created unblock event %d\n",
725             TransportRpmsg_module->unblockEvent)
727     /* semaphore event object for acknowledging client thread */
728     TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
730     if (TransportRpmsg_module->waitEvent == -1) {
731         printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
732         status = Ipc_E_FAIL;
733         goto done;
734     }
736     PRINTVERBOSE1("create: created wait event %d\n",
737             TransportRpmsg_module->waitEvent)
739     FD_ZERO(&TransportRpmsg_module->rfds);
740     FD_SET(TransportRpmsg_module->unblockEvent,
741             &TransportRpmsg_module->rfds);
742     TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
743     TransportRpmsg_module->nInFds = 0;
745     pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
747     status = pthread_create(&TransportRpmsg_module->threadId, NULL,
748             &rpmsgThreadFxn, NULL);
750     if (status < 0) {
751         status = Ipc_E_FAIL;
752         printf("attach: failed to spawn thread\n");
753         goto done;
754     }
755     TransportRpmsg_module->threadStarted = TRUE;
757 done:
758     if (status < 0) {
759         TransportRpmsg_Factory_delete();
760     }
762     return (status);
765 /*
766  *  ======== TransportRpmsg_Factory_delete ========
767  *  Finalize the transport instances
768  */
769 Void TransportRpmsg_Factory_delete(Void)
771     uint64_t event;
774     /* shutdown the message dispatch thread */
775     if (TransportRpmsg_module->threadStarted) {
776         event = TransportRpmsg_Event_SHUTDOWN;
777         write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
779         /* wait for dispatch thread to exit */
780         pthread_join(TransportRpmsg_module->threadId, NULL);
781     }
783     /* destroy the mutex object */
784     pthread_mutex_destroy(&TransportRpmsg_module->gate);
786     /* close the client wait event */
787     if (TransportRpmsg_module->waitEvent != -1) {
788         close(TransportRpmsg_module->waitEvent);
789         TransportRpmsg_module->waitEvent = -1;
790     }
792     /* close the dispatch thread unblock event */
793     if (TransportRpmsg_module->unblockEvent != -1) {
794         close(TransportRpmsg_module->unblockEvent);
795         TransportRpmsg_module->unblockEvent = -1;
796     }
798     /* free the instance handle array */
799     if (TransportRpmsg_module->inst != NULL) {
800         free(TransportRpmsg_module->inst);
801         TransportRpmsg_module->inst = NULL;
802     }
804     return;
807 /*
808  *  ======== TransportRpmsg_Factory_attach ========
809  */
810 Int TransportRpmsg_Factory_attach(UInt16 procId)
812     Int status = Ipc_S_SUCCESS;
813     UInt16 clusterId;
814     TransportRpmsg_Params params;
815     TransportRpmsg_Handle transport;
816     IMessageQTransport_Handle iMsgQTrans;
818     /* cannot attach to yourself */
819     if (MultiProc_self() == procId) {
820         status = Ipc_E_INVALIDARG;
821         goto done;
822     }
824     /* processor must be a member of the cluster */
825     clusterId = procId - MultiProc_getBaseIdOfCluster();
827     if (clusterId >= MultiProc_getNumProcsInCluster()) {
828         status = Ipc_E_INVALIDARG;
829         goto done;
830     }
832     /* create transport instance for given processor */
833     params.rprocId = procId;
834     transport = TransportRpmsg_create(&params);
836     if (transport == NULL) {
837         status = Ipc_E_FAIL;
838         goto done;
839     }
841     /* register transport instance with MessageQ */
842     iMsgQTrans = TransportRpmsg_upCast(transport);
843     MessageQ_registerTransport(iMsgQTrans, procId, 0);
844     TransportRpmsg_module->inst[clusterId] = transport;
846 done:
847     return (status);
850 /*
851  *  ======== TransportRpmsg_Factory_detach ========
852  */
853 Int TransportRpmsg_Factory_detach(UInt16 procId)
855     Int status = Ipc_S_SUCCESS;
856     UInt16 clusterId;
858     /* cannot detach from yourself */
859     if (MultiProc_self() == procId) {
860         status = Ipc_E_INVALIDARG;
861         goto done;
862     }
864     /* processor must be a member of the cluster */
865     clusterId = procId - MultiProc_getBaseIdOfCluster();
867     if (clusterId >= MultiProc_getNumProcsInCluster()) {
868         status = Ipc_E_INVALIDARG;
869         goto done;
870     }
872     /* must be attached in order to detach */
873     if (TransportRpmsg_module->inst[clusterId] == NULL) {
874         status = Ipc_E_INVALIDSTATE;
875         goto done;
876     }
878     /* unregister from MessageQ, delete the transport instance */
879     MessageQ_unregisterTransport(procId, 0);
880     TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
882 done:
883     return (status);