Generated new autoreconf makefile
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 #include <ti/ipc/Std.h>
39 #include <ti/ipc/Ipc.h>
40 #include <ti/ipc/MessageQ.h>
41 #include <ti/ipc/MultiProc.h>
42 #include <_MessageQ.h>
44 /* Socket Headers */
45 #include <sys/socket.h>
46 #include <sys/select.h>
47 #include <sys/eventfd.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <unistd.h>
51 #include <errno.h>
52 #include <string.h>
53 #include <fcntl.h>
54 #include <pthread.h>
56 /* Socket Protocol Family */
57 #include <net/rpmsg.h>
59 /* Socket utils: */
60 #include <SocketFxns.h>
62 #include <_lad.h>
64 #include <TransportRpmsg.h>
67 /* More magic rpmsg port numbers: */
68 #define MESSAGEQ_RPMSG_PORT       61
69 #define MESSAGEQ_RPMSG_MAXSIZE   512
71 #define TransportRpmsg_GROWSIZE 32
73 /* traces in this file are controlled via _TransportMessageQ_verbose */
74 Bool _TransportMessageQ_verbose = FALSE;
75 #define verbose _TransportMessageQ_verbose
77 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
78 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
79 Bool TransportRpmsg_put(Void *handle, Ptr msg);
81 typedef struct TransportRpmsg_Module {
82     int             sock[MultiProc_MAXPROCESSORS];
83     fd_set          rfds;
84     int             maxFd;
85     int             inFds[1024];
86     int             nInFds;
87     pthread_mutex_t gate;
88     int             unblockEvent;    /* eventFd for unblocking socket */
89     pthread_t       threadId;        /* ID returned by pthread_create() */
90     Bool            threadStarted;
92     TransportRpmsg_Handle *inst;    /* array of instances */
93 } TransportRpmsg_Module;
95 IMessageQTransport_Fxns TransportRpmsg_fxns = {
96     .bind    = TransportRpmsg_bind,
97     .unbind  = TransportRpmsg_unbind,
98     .put     = TransportRpmsg_put
99 };
101 typedef struct TransportRpmsg_Object {
102     IMessageQTransport_Object base;
103     Int status;
104     UInt16 rprocId;
105     int numQueues;
106     int *qIndexToFd;
107 } TransportRpmsg_Object;
109 TransportRpmsg_Module TransportRpmsg_state = {
110     .sock = {0},
111     .threadStarted = FALSE,
112     .inst = NULL
113 };
114 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
116 static Int attach(UInt16 rprocId);
117 static Int detach(UInt16 rprocId);
118 static void *rpmsgThreadFxn(void *arg);
119 static Int transportGet(int sock, MessageQ_Msg *retMsg);
120 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
121                                Int fd,
122                                UInt16 qIndex);
123 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
124 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
126 Int TransportRpmsg_Factory_create(Void);
127 Void TransportRpmsg_Factory_delete(Void);
129 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
130     .createFxn = TransportRpmsg_Factory_create,
131     .deleteFxn = TransportRpmsg_Factory_delete
132 };
134 /* -------------------------------------------------------------------------- */
136 /* instance convertors */
137 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
139     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
140     return ((IMessageQTransport_Handle)&obj->base);
143 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
145     return ((TransportRpmsg_Handle)base);
148 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
149                                             Int *attachStatus)
151     TransportRpmsg_Object *obj;
152     Int rv;
154     rv = attach(params->rprocId);
155     if (attachStatus) {
156         *attachStatus = rv;
157     }
159     if (rv != MessageQ_S_SUCCESS) {
160         return NULL;
161     }
163     obj = calloc(1, sizeof (TransportRpmsg_Object));
165     /* structure copy */
166     obj->base.base.interfaceType = IMessageQTransport_TypeId;
167     obj->base.fxns = &TransportRpmsg_fxns;
168     obj->rprocId = params->rprocId;
170     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
171     obj->numQueues = TransportRpmsg_GROWSIZE;
173     return (TransportRpmsg_Handle)obj;
176 Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
178     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
180     detach(obj->rprocId);
182     free(obj->qIndexToFd);
183     free(obj);
185     *handlep = NULL;
188 static Int attach(UInt16 rprocId)
190     Int     status = MessageQ_S_SUCCESS;
191     int     sock;
192     UInt16  clusterId;
195     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
197     /* Create the socket for sending messages to the remote proc: */
198     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
199     if (sock < 0) {
200         status = MessageQ_E_FAIL;
201         printf("attach: socket failed: %d (%s)\n",
202                errno, strerror(errno));
204         goto exit;
205     }
207     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
209     /* Attempt to connect: */
210     status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
211     if (status < 0) {
212         /* is it ok to "borrow" this error code from MessageQ? */
213         status = MessageQ_E_RESOURCE;
215         /* don't hard-printf or exit since this is no longer fatal */
216         PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
218         goto exitSock;
219     }
221     TransportRpmsg_module->sock[clusterId] = sock;
223     if (TransportRpmsg_module->threadStarted == FALSE) {
224         /* create a module wide event to unblock the socket select thread */
225         TransportRpmsg_module->unblockEvent = eventfd(0, 0);
226         if (TransportRpmsg_module->unblockEvent == -1) {
227             printf("attach: unblock socket failed: %d (%s)\n",
228                    errno, strerror(errno));
229             status = MessageQ_E_FAIL;
231             goto exitSock;
232         }
234         PRINTVERBOSE1("attach: created unblock event %d\n",
235                       TransportRpmsg_module->unblockEvent)
237         FD_ZERO(&TransportRpmsg_module->rfds);
238         FD_SET(TransportRpmsg_module->unblockEvent,
239                &TransportRpmsg_module->rfds);
240         TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
241         TransportRpmsg_module->nInFds = 0;
243         pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
245         status = pthread_create(&TransportRpmsg_module->threadId, NULL,
246                                 &rpmsgThreadFxn, NULL);
247         if (status < 0) {
248             status = MessageQ_E_FAIL;
249             printf("attach: failed to spawn thread\n");
251             goto exitEvent;
252         }
253         else {
254             TransportRpmsg_module->threadStarted = TRUE;
255         }
256     }
258     goto exit;
260 exitEvent:
261     close(TransportRpmsg_module->unblockEvent);
263     FD_ZERO(&TransportRpmsg_module->rfds);
264     TransportRpmsg_module->maxFd = 0;
266 exitSock:
267     close(sock);
268     TransportRpmsg_module->sock[clusterId] = 0;
270 exit:
271     return status;
274 static Int detach(UInt16 rprocId)
277     Int     status = -1;
278     int     sock;
279     UInt16  clusterId;
281     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
282     sock = TransportRpmsg_module->sock[clusterId];
284     if (sock) {
285         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
287         status = close(sock);
288     }
290     return status;
293 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
295     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
296     UInt16   queueIndex = queueId & 0x0000ffff;
297     int      fd;
298     int      err;
299     uint64_t buf;
300     UInt16   rprocId;
302     rprocId = obj->rprocId;
304     PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
305             "queueIndex %d\n", rprocId, queueIndex)
307     /*  Create the socket to receive messages for this messageQ. */
308     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
309     if (fd < 0) {
310         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
311                 errno, strerror(errno));
312         goto exitClose;
313     }
315     PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
317     err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
318     if (err < 0) {
319         /* don't hard-printf since this is no longer fatal */
320         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
321                       errno, strerror(errno))
323         close(fd);
325         return -1;
326     }
328     pthread_mutex_lock(&TransportRpmsg_module->gate);
330     /* add to our fat fd array and update select() parameters */
331     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
332     TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
333     FD_SET(fd, &TransportRpmsg_module->rfds);
335     pthread_mutex_unlock(&TransportRpmsg_module->gate);
337     bindFdToQueueIndex(obj, fd, queueIndex);
339     /*
340      * Even though we use the unblock event as just a signalling event with
341      * no related payload, we need to write some non-zero value.  Might as
342      * well make it the fd (which the reader could decide to use if needed).
343      */
344     buf = fd;
345     write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
347     goto exit;
349 exitClose:
350     TransportRpmsg_unbind(handle, fd);
351     fd = 0;
353 exit:
354     return fd;
357 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
359     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
360     UInt16 queueIndex = queueId & 0x0000ffff;
361     uint64_t buf;
362     Int    status = MessageQ_S_SUCCESS;
363     int    maxFd;
364     int    fd;
365     int    i;
366     int    j;
368     fd = queueIndexToFd(obj, queueIndex);
369     if (!fd) {
370         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
371                       queueId)
373         return -1;
374     }
376     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
378     pthread_mutex_lock(&TransportRpmsg_module->gate);
380     /* remove from input fd array */
381     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
382         if (TransportRpmsg_module->inFds[i] == fd) {
383             TransportRpmsg_module->nInFds--;
385             /* shift subsequent elements down */
386             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
387                 TransportRpmsg_module->inFds[j] =
388                     TransportRpmsg_module->inFds[j + 1];
389             }
390             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
392             FD_CLR(fd, &TransportRpmsg_module->rfds);
393             if (fd == TransportRpmsg_module->maxFd) {
394                 /* find new max fd */
395                 maxFd = TransportRpmsg_module->unblockEvent;
396                 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
397                     maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
398                 }
399                 TransportRpmsg_module->maxFd = maxFd;
400             }
402             /*
403              * Even though we use the unblock event as just a signalling
404              * event with no related payload, we need to write some non-zero
405              * value.  Might as well make it the fd (which the reader could
406              * decide to use if needed).
407              */
408             buf = fd;
409             write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
411             break;
412         }
414         close(fd);
415     }
417     unbindQueueIndex(obj, queueIndex);
419     pthread_mutex_unlock(&TransportRpmsg_module->gate);
421     return status;
424 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
426     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
427     Int     status    = TRUE;
428     int     sock;
429     int     err;
430     UInt16  clusterId;
432     /*
433      * Retrieve the socket for the AF_SYSLINK protocol associated with this
434      * transport.
435      */
436     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
437     sock = TransportRpmsg_module->sock[clusterId];
438     if (!sock) {
439         return FALSE;
440     }
442     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
444     err = send(sock, msg, msg->msgSize, 0);
445     if (err < 0) {
446         printf("TransportRpmsg_put: send failed: %d (%s)\n",
447                errno, strerror(errno));
448         status = FALSE;
450         goto exit;
451     }
453     /*
454      * Free the message, as this is a copy transport, we maintain MessageQ
455      * semantics.
456      */
457     MessageQ_free(msg);
459 exit:
460     return status;
463 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
465     return FALSE;
468 void *rpmsgThreadFxn(void *arg)
470     static int lastFdx = 0;
471     int      curFdx = 0;
472     Int      status = MessageQ_S_SUCCESS;
473     Int      tmpStatus;
474     int      retval;
475     uint64_t buf;
476     fd_set   rfds;
477     int      maxFd;
478     int      nfds;
479     MessageQ_Msg     retMsg;
480     MessageQ_QueueId queueId;
482     while (TRUE) {
483         pthread_mutex_lock(&TransportRpmsg_module->gate);
485         maxFd = TransportRpmsg_module->maxFd;
486         rfds = TransportRpmsg_module->rfds;
487         nfds = TransportRpmsg_module->nInFds;
489         pthread_mutex_unlock(&TransportRpmsg_module->gate);
491         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
492                       (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
494         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
495         if (retval) {
496             if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
497                 /*
498                  * Our event was signalled by TransportRpmsg_bind()
499                  * or TransportRpmsg_unbind() to tell us that the set of
500                  * fds has changed.
501                  */
502                 PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
504                 /* we don't need the written value */
505                 read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
506             }
507             else {
508                 /* start where we last left off */
509                 curFdx = lastFdx;
511                 /*
512                  * The set of fds that's used by select has been recorded
513                  * locally, but the array of fds that are scanned below is
514                  * a changing set (MessageQ_create/delete() can change it).
515                  * While this might present an issue in itself, one key
516                  * takeaway is that 'nfds' must not be zero else the % below
517                  * will cause a divide-by-zero exception.  We won't even get
518                  * here if nfds == 0 since it's a local copy of the module's
519                  * 'nInFds' which has to be > 0 for us to get here.  So, even
520                  * though the module's 'nInFds' might go to 0 during this loop,
521                  * the loop itself will still remain intact.
522                  */
523                 do {
524                     if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
526                         PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
527                                       TransportRpmsg_module->inFds[curFdx])
529                         /* transport input fd was signalled: get the message */
530                         tmpStatus = transportGet(
531                             TransportRpmsg_module->inFds[curFdx], &retMsg);
532                         if (tmpStatus < 0) {
533                             printf("rpmsgThreadFxn: transportGet failed.");
534                             status = MessageQ_E_FAIL;
535                         }
536                         else {
537                             queueId = MessageQ_getDstQueue(retMsg);
539                             PRINTVERBOSE1("rpmsgThreadFxn: got message, "
540                                     "delivering to queueId 0x%x\n", queueId)
542                             MessageQ_put(queueId, retMsg);
543                         }
545                         lastFdx = (curFdx + 1) % nfds;
547                         break;
548                     }
550                     curFdx = (curFdx + 1) % nfds;
551                 } while (curFdx != lastFdx);
552             }
553         }
554     }
556     return (void *)status;
559 /*
560  * ======== transportGet ========
561  *  Retrieve a message waiting in the socket's queue.
562 */
563 static Int transportGet(int sock, MessageQ_Msg *retMsg)
565     Int           status    = MessageQ_S_SUCCESS;
566     MessageQ_Msg  msg;
567     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
568     unsigned int  len;
569     int           byteCount;
571     /*
572      * We have no way of peeking to see what message size we'll get, so we
573      * allocate a message of max size to receive contents from the rpmsg socket
574      * (currently, a copy transport)
575      */
576     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
577     if (!msg) {
578         status = MessageQ_E_MEMORY;
579         goto exit;
580     }
582     memset(&fromAddr, 0, sizeof (fromAddr));
583     len = sizeof (fromAddr);
585     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
586                          (struct sockaddr *)&fromAddr, &len);
587     if (len != sizeof (fromAddr)) {
588         printf("recvfrom: got bad addr len (%d)\n", len);
589         status = MessageQ_E_FAIL;
590         goto exit;
591     }
592     if (byteCount < 0) {
593         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
594         status = MessageQ_E_FAIL;
595         goto exit;
596     }
597     else {
598          /*
599           * Update the allocated message size (even though this may waste
600           * space when the actual message is smaller than the maximum rpmsg
601           * size, the message will be freed soon anyway, and it avoids an
602           * extra copy).
603           */
604          msg->msgSize = byteCount;
606          /*
607           * If the message received was statically allocated, reset the
608           * heapId, so the app can free it.
609           */
610          if (msg->heapId == MessageQ_STATICMSG)  {
611              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
612          }
613     }
615     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
616     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
617             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
618     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
619             msg->msgSize)
621     *retMsg = msg;
623 exit:
624     return status;
627 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
629     Int *queues;
630     Int *oldQueues;
631     UInt oldSize;
633     if (qIndex >= obj->numQueues) {
634         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
635                       qIndex + TransportRpmsg_GROWSIZE)
637         oldSize = obj->numQueues * sizeof (Int);
639         queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
640         memcpy(queues, obj->qIndexToFd, oldSize);
642         oldQueues = obj->qIndexToFd;
643         obj->qIndexToFd = queues;
644         obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
646         free(oldQueues);
647     }
649     obj->qIndexToFd[qIndex] = fd;
652 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
654     obj->qIndexToFd[qIndex] = 0;
657 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
659     return obj->qIndexToFd[qIndex];
662 /*
663  *  ======== TransportRpmsg_Factory_create ========
664  *  Create the transport instances
665  *
666  *  Attach to all remote processors. For now, must attach to
667  *  at least one to tolerate MessageQ_E_RESOURCE failures.
668  *
669  *  This function implements the IPC Factory interface, so it
670  *  returns Ipc status codes.
671  */
672 Int TransportRpmsg_Factory_create(Void)
674     Int     status;
675     Int     attachStatus;
676     Int     i;
677     UInt16  procId;
678     Int32   attachedAny;
679     UInt16  clusterSize;
680     UInt16  clusterBase;
682     TransportRpmsg_Handle      *inst;
683     TransportRpmsg_Handle       transport;
684     TransportRpmsg_Params       params;
685     IMessageQTransport_Handle   iMsgQTrans;
688     status = Ipc_S_SUCCESS;
689     attachedAny = FALSE;
691     /* needed to enumerate processors in cluster */
692     clusterSize = MultiProc_getNumProcsInCluster();
693     clusterBase = MultiProc_getBaseIdOfCluster();
695     /* allocate the instance array */
696     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
698     if (inst == NULL) {
699         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
700         status = Ipc_E_MEMORY;
701         goto exit;
702     }
704     TransportRpmsg_module->inst = inst;
706     /* create transport instance for all processors in cluster */
707     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
709         if (MultiProc_self() == procId) {
710             continue;
711         }
713         params.rprocId = procId;
714         transport = TransportRpmsg_create(&params, &attachStatus);
716         if (transport != NULL) {
717             iMsgQTrans = TransportRpmsg_upCast(transport);
718             MessageQ_registerTransport(iMsgQTrans, procId, 0);
719             attachedAny = TRUE;
720         }
721         else {
722             if (attachStatus == MessageQ_E_RESOURCE) {
723                 continue;
724             }
725             printf("TransportRpmsg_Factory_create: failed to attach to "
726                     "procId=%d status=%d\n", procId, attachStatus);
727             status = Ipc_E_FAIL;
728             break;
729         }
731         TransportRpmsg_module->inst[i] = transport;
732     }
734     if (!attachedAny) {
735         status = Ipc_E_FAIL;
736     }
738 exit:
739     return (status);
742 /*
743  *  ======== TransportRpmsg_Factory_delete ========
744  *  Finalize the transport instances
745  */
746 Void TransportRpmsg_Factory_delete(Void)
748     Int     i;
749     UInt16  procId;
750     UInt16  clusterSize;
751     UInt16  clusterBase;
753     /* needed to enumerate processors in cluster */
754     clusterSize = MultiProc_getNumProcsInCluster();
755     clusterBase = MultiProc_getBaseIdOfCluster();
757     /* detach from all remote processors, assuming they are up */
758     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
760         if (MultiProc_self() == procId) {
761             continue;
762         }
764         if (TransportRpmsg_module->inst[i] != NULL) {
765             MessageQ_unregisterTransport(procId, 0);
766             TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
767         }
768     }
770     return;