]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/transport/TransportRpmsg.c
Generated makefiles to follow up on the previous commit.
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
49 /* Socket Protocol Family */
50 #include <net/rpmsg.h>
53 /* IPC headers */
54 #include <ti/ipc/Std.h>
55 #include <SocketFxns.h>         /* Socket utils: */
56 #include <ti/ipc/Ipc.h>
57 #include <ti/ipc/MessageQ.h>
58 #include <ti/ipc/MultiProc.h>
59 #include <ti/ipc/transports/TransportRpmsg.h>
60 #include <_MessageQ.h>
61 #include <_lad.h>
63 /* More magic rpmsg port numbers: */
64 #define MESSAGEQ_RPMSG_PORT       61
65 #define MESSAGEQ_RPMSG_MAXSIZE   512
67 #define TransportRpmsg_GROWSIZE 32
69 /* traces in this file are controlled via _TransportMessageQ_verbose */
70 Bool _TransportMessageQ_verbose = FALSE;
71 #define verbose _TransportMessageQ_verbose
73 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
74 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
75 Bool TransportRpmsg_put(Void *handle, Ptr msg);
77 typedef struct TransportRpmsg_Module {
78     int             sock[MultiProc_MAXPROCESSORS];
79     fd_set          rfds;
80     int             maxFd;
81     int             inFds[1024];
82     int             nInFds;
83     pthread_mutex_t gate;
84     int             unblockEvent;    /* eventFd for unblocking socket */
85     pthread_t       threadId;        /* ID returned by pthread_create() */
86     Bool            threadStarted;
88     TransportRpmsg_Handle *inst;    /* array of instances */
89 } TransportRpmsg_Module;
91 IMessageQTransport_Fxns TransportRpmsg_fxns = {
92     .bind    = TransportRpmsg_bind,
93     .unbind  = TransportRpmsg_unbind,
94     .put     = TransportRpmsg_put
95 };
97 typedef struct TransportRpmsg_Object {
98     IMessageQTransport_Object base;
99     Int status;
100     UInt16 rprocId;
101     int numQueues;
102     int *qIndexToFd;
103 } TransportRpmsg_Object;
105 TransportRpmsg_Module TransportRpmsg_state = {
106     .sock = {0},
107     .threadStarted = FALSE,
108     .inst = NULL
109 };
110 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
112 static Int attach(UInt16 rprocId);
113 static Int detach(UInt16 rprocId);
114 static void *rpmsgThreadFxn(void *arg);
115 static Int transportGet(int sock, MessageQ_Msg *retMsg);
116 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
117                                Int fd,
118                                UInt16 qIndex);
119 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
120 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
122 Int TransportRpmsg_Factory_create(Void);
123 Void TransportRpmsg_Factory_delete(Void);
125 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
126     .createFxn = TransportRpmsg_Factory_create,
127     .deleteFxn = TransportRpmsg_Factory_delete
128 };
130 /* -------------------------------------------------------------------------- */
132 /* instance convertors */
133 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
135     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
136     return ((IMessageQTransport_Handle)&obj->base);
139 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
141     return ((TransportRpmsg_Handle)base);
144 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
145                                             Int *attachStatus)
147     TransportRpmsg_Object *obj;
148     Int rv;
150     rv = attach(params->rprocId);
151     if (attachStatus) {
152         *attachStatus = rv;
153     }
155     if (rv != MessageQ_S_SUCCESS) {
156         return NULL;
157     }
159     obj = calloc(1, sizeof (TransportRpmsg_Object));
161     /* structure copy */
162     obj->base.base.interfaceType = IMessageQTransport_TypeId;
163     obj->base.fxns = &TransportRpmsg_fxns;
164     obj->rprocId = params->rprocId;
166     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
167     obj->numQueues = TransportRpmsg_GROWSIZE;
169     return (TransportRpmsg_Handle)obj;
172 Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
174     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
176     detach(obj->rprocId);
178     free(obj->qIndexToFd);
179     free(obj);
181     *handlep = NULL;
184 static Int attach(UInt16 rprocId)
186     Int     status = MessageQ_S_SUCCESS;
187     int     sock;
188     UInt16  clusterId;
191     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
193     /* Create the socket for sending messages to the remote proc: */
194     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
195     if (sock < 0) {
196         status = MessageQ_E_FAIL;
197         printf("attach: socket failed: %d (%s)\n",
198                errno, strerror(errno));
200         goto exit;
201     }
203     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
205     /* Attempt to connect: */
206     status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
207     if (status < 0) {
208         /* is it ok to "borrow" this error code from MessageQ? */
209         status = MessageQ_E_RESOURCE;
211         /* don't hard-printf or exit since this is no longer fatal */
212         PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
214         goto exitSock;
215     }
217     TransportRpmsg_module->sock[clusterId] = sock;
219     if (TransportRpmsg_module->threadStarted == FALSE) {
220         /* create a module wide event to unblock the socket select thread */
221         TransportRpmsg_module->unblockEvent = eventfd(0, 0);
222         if (TransportRpmsg_module->unblockEvent == -1) {
223             printf("attach: unblock socket failed: %d (%s)\n",
224                    errno, strerror(errno));
225             status = MessageQ_E_FAIL;
227             goto exitSock;
228         }
230         PRINTVERBOSE1("attach: created unblock event %d\n",
231                       TransportRpmsg_module->unblockEvent)
233         FD_ZERO(&TransportRpmsg_module->rfds);
234         FD_SET(TransportRpmsg_module->unblockEvent,
235                &TransportRpmsg_module->rfds);
236         TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
237         TransportRpmsg_module->nInFds = 0;
239         pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
241         status = pthread_create(&TransportRpmsg_module->threadId, NULL,
242                                 &rpmsgThreadFxn, NULL);
243         if (status < 0) {
244             status = MessageQ_E_FAIL;
245             printf("attach: failed to spawn thread\n");
247             goto exitEvent;
248         }
249         else {
250             TransportRpmsg_module->threadStarted = TRUE;
251         }
252     }
254     goto exit;
256 exitEvent:
257     close(TransportRpmsg_module->unblockEvent);
259     FD_ZERO(&TransportRpmsg_module->rfds);
260     TransportRpmsg_module->maxFd = 0;
262 exitSock:
263     close(sock);
264     TransportRpmsg_module->sock[clusterId] = 0;
266 exit:
267     return status;
270 static Int detach(UInt16 rprocId)
273     Int     status = -1;
274     int     sock;
275     UInt16  clusterId;
277     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
278     sock = TransportRpmsg_module->sock[clusterId];
280     if (sock) {
281         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
283         status = close(sock);
284     }
286     return status;
289 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
291     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
292     UInt16   queueIndex = queueId & 0x0000ffff;
293     int      fd;
294     int      err;
295     uint64_t buf;
296     UInt16   rprocId;
298     rprocId = obj->rprocId;
300     PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
301             "queueIndex %d\n", rprocId, queueIndex)
303     /*  Create the socket to receive messages for this messageQ. */
304     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
305     if (fd < 0) {
306         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
307                 errno, strerror(errno));
308         goto exitClose;
309     }
311     PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
313     err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
314     if (err < 0) {
315         /* don't hard-printf since this is no longer fatal */
316         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
317                       errno, strerror(errno))
319         close(fd);
321         return -1;
322     }
324     pthread_mutex_lock(&TransportRpmsg_module->gate);
326     /* add to our fat fd array and update select() parameters */
327     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
328     TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
329     FD_SET(fd, &TransportRpmsg_module->rfds);
331     pthread_mutex_unlock(&TransportRpmsg_module->gate);
333     bindFdToQueueIndex(obj, fd, queueIndex);
335     /*
336      * Even though we use the unblock event as just a signalling event with
337      * no related payload, we need to write some non-zero value.  Might as
338      * well make it the fd (which the reader could decide to use if needed).
339      */
340     buf = fd;
341     write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
343     goto exit;
345 exitClose:
346     TransportRpmsg_unbind(handle, fd);
347     fd = 0;
349 exit:
350     return fd;
353 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
355     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
356     UInt16 queueIndex = queueId & 0x0000ffff;
357     uint64_t buf;
358     Int    status = MessageQ_S_SUCCESS;
359     int    maxFd;
360     int    fd;
361     int    i;
362     int    j;
364     fd = queueIndexToFd(obj, queueIndex);
365     if (!fd) {
366         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
367                       queueId)
369         return -1;
370     }
372     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
374     pthread_mutex_lock(&TransportRpmsg_module->gate);
376     /* remove from input fd array */
377     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
378         if (TransportRpmsg_module->inFds[i] == fd) {
379             TransportRpmsg_module->nInFds--;
381             /* shift subsequent elements down */
382             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
383                 TransportRpmsg_module->inFds[j] =
384                     TransportRpmsg_module->inFds[j + 1];
385             }
386             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
388             FD_CLR(fd, &TransportRpmsg_module->rfds);
389             if (fd == TransportRpmsg_module->maxFd) {
390                 /* find new max fd */
391                 maxFd = TransportRpmsg_module->unblockEvent;
392                 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
393                     maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
394                 }
395                 TransportRpmsg_module->maxFd = maxFd;
396             }
398             /*
399              * Even though we use the unblock event as just a signalling
400              * event with no related payload, we need to write some non-zero
401              * value.  Might as well make it the fd (which the reader could
402              * decide to use if needed).
403              */
404             buf = fd;
405             write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
407             break;
408         }
410         close(fd);
411     }
413     unbindQueueIndex(obj, queueIndex);
415     pthread_mutex_unlock(&TransportRpmsg_module->gate);
417     return status;
420 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
422     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
423     Int     status    = TRUE;
424     int     sock;
425     int     err;
426     UInt16  clusterId;
428     /*
429      * Retrieve the socket for the AF_SYSLINK protocol associated with this
430      * transport.
431      */
432     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
433     sock = TransportRpmsg_module->sock[clusterId];
434     if (!sock) {
435         return FALSE;
436     }
438     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
440     err = send(sock, msg, msg->msgSize, 0);
441     if (err < 0) {
442         printf("TransportRpmsg_put: send failed: %d (%s)\n",
443                errno, strerror(errno));
444         status = FALSE;
446         goto exit;
447     }
449     /*
450      * Free the message, as this is a copy transport, we maintain MessageQ
451      * semantics.
452      */
453     MessageQ_free(msg);
455 exit:
456     return status;
459 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
461     return FALSE;
464 void *rpmsgThreadFxn(void *arg)
466     static int lastFdx = 0;
467     int      curFdx = 0;
468     Int      status = MessageQ_S_SUCCESS;
469     Int      tmpStatus;
470     int      retval;
471     uint64_t buf;
472     fd_set   rfds;
473     int      maxFd;
474     int      nfds;
475     MessageQ_Msg     retMsg;
476     MessageQ_QueueId queueId;
478     while (TRUE) {
479         pthread_mutex_lock(&TransportRpmsg_module->gate);
481         maxFd = TransportRpmsg_module->maxFd;
482         rfds = TransportRpmsg_module->rfds;
483         nfds = TransportRpmsg_module->nInFds;
485         pthread_mutex_unlock(&TransportRpmsg_module->gate);
487         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
488                       (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
490         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
491         if (retval) {
492             if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
493                 /*
494                  * Our event was signalled by TransportRpmsg_bind()
495                  * or TransportRpmsg_unbind() to tell us that the set of
496                  * fds has changed.
497                  */
498                 PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
500                 /* we don't need the written value */
501                 read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
502             }
503             else {
504                 /* start where we last left off */
505                 curFdx = lastFdx;
507                 /*
508                  * The set of fds that's used by select has been recorded
509                  * locally, but the array of fds that are scanned below is
510                  * a changing set (MessageQ_create/delete() can change it).
511                  * While this might present an issue in itself, one key
512                  * takeaway is that 'nfds' must not be zero else the % below
513                  * will cause a divide-by-zero exception.  We won't even get
514                  * here if nfds == 0 since it's a local copy of the module's
515                  * 'nInFds' which has to be > 0 for us to get here.  So, even
516                  * though the module's 'nInFds' might go to 0 during this loop,
517                  * the loop itself will still remain intact.
518                  */
519                 do {
520                     if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
522                         PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
523                                       TransportRpmsg_module->inFds[curFdx])
525                         /* transport input fd was signalled: get the message */
526                         tmpStatus = transportGet(
527                             TransportRpmsg_module->inFds[curFdx], &retMsg);
528                         if (tmpStatus < 0) {
529                             printf("rpmsgThreadFxn: transportGet failed.");
530                             status = MessageQ_E_FAIL;
531                         }
532                         else {
533                             queueId = MessageQ_getDstQueue(retMsg);
535                             PRINTVERBOSE1("rpmsgThreadFxn: got message, "
536                                     "delivering to queueId 0x%x\n", queueId)
538                             MessageQ_put(queueId, retMsg);
539                         }
541                         lastFdx = (curFdx + 1) % nfds;
543                         break;
544                     }
546                     curFdx = (curFdx + 1) % nfds;
547                 } while (curFdx != lastFdx);
548             }
549         }
550     }
552     return (void *)status;
555 /*
556  * ======== transportGet ========
557  *  Retrieve a message waiting in the socket's queue.
558 */
559 static Int transportGet(int sock, MessageQ_Msg *retMsg)
561     Int           status    = MessageQ_S_SUCCESS;
562     MessageQ_Msg  msg;
563     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
564     unsigned int  len;
565     int           byteCount;
567     /*
568      * We have no way of peeking to see what message size we'll get, so we
569      * allocate a message of max size to receive contents from the rpmsg socket
570      * (currently, a copy transport)
571      */
572     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
573     if (!msg) {
574         status = MessageQ_E_MEMORY;
575         goto exit;
576     }
578     memset(&fromAddr, 0, sizeof (fromAddr));
579     len = sizeof (fromAddr);
581     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
582                          (struct sockaddr *)&fromAddr, &len);
583     if (len != sizeof (fromAddr)) {
584         printf("recvfrom: got bad addr len (%d)\n", len);
585         status = MessageQ_E_FAIL;
586         goto exit;
587     }
588     if (byteCount < 0) {
589         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
590         status = MessageQ_E_FAIL;
591         goto exit;
592     }
593     else {
594          /*
595           * Update the allocated message size (even though this may waste
596           * space when the actual message is smaller than the maximum rpmsg
597           * size, the message will be freed soon anyway, and it avoids an
598           * extra copy).
599           */
600          msg->msgSize = byteCount;
602          /*
603           * If the message received was statically allocated, reset the
604           * heapId, so the app can free it.
605           */
606          if (msg->heapId == MessageQ_STATICMSG)  {
607              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
608          }
609     }
611     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
612     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
613             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
614     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
615             msg->msgSize)
617     *retMsg = msg;
619 exit:
620     return status;
623 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
625     Int *queues;
626     Int *oldQueues;
627     UInt oldSize;
629     if (qIndex >= obj->numQueues) {
630         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
631                       qIndex + TransportRpmsg_GROWSIZE)
633         oldSize = obj->numQueues * sizeof (Int);
635         queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
636         memcpy(queues, obj->qIndexToFd, oldSize);
638         oldQueues = obj->qIndexToFd;
639         obj->qIndexToFd = queues;
640         obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
642         free(oldQueues);
643     }
645     obj->qIndexToFd[qIndex] = fd;
648 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
650     obj->qIndexToFd[qIndex] = 0;
653 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
655     return obj->qIndexToFd[qIndex];
658 /*
659  *  ======== TransportRpmsg_Factory_create ========
660  *  Create the transport instances
661  *
662  *  Attach to all remote processors. For now, must attach to
663  *  at least one to tolerate MessageQ_E_RESOURCE failures.
664  *
665  *  This function implements the IPC Factory interface, so it
666  *  returns Ipc status codes.
667  */
668 Int TransportRpmsg_Factory_create(Void)
670     Int     status;
671     Int     attachStatus;
672     Int     i;
673     UInt16  procId;
674     Int32   attachedAny;
675     UInt16  clusterSize;
676     UInt16  clusterBase;
678     TransportRpmsg_Handle      *inst;
679     TransportRpmsg_Handle       transport;
680     TransportRpmsg_Params       params;
681     IMessageQTransport_Handle   iMsgQTrans;
684     status = Ipc_S_SUCCESS;
685     attachedAny = FALSE;
687     /* needed to enumerate processors in cluster */
688     clusterSize = MultiProc_getNumProcsInCluster();
689     clusterBase = MultiProc_getBaseIdOfCluster();
691     /* allocate the instance array */
692     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
694     if (inst == NULL) {
695         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
696         status = Ipc_E_MEMORY;
697         goto exit;
698     }
700     TransportRpmsg_module->inst = inst;
702     /* create transport instance for all processors in cluster */
703     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
705         if (MultiProc_self() == procId) {
706             continue;
707         }
709         params.rprocId = procId;
710         transport = TransportRpmsg_create(&params, &attachStatus);
712         if (transport != NULL) {
713             iMsgQTrans = TransportRpmsg_upCast(transport);
714             MessageQ_registerTransport(iMsgQTrans, procId, 0);
715             attachedAny = TRUE;
716         }
717         else {
718             if (attachStatus == MessageQ_E_RESOURCE) {
719                 continue;
720             }
721             printf("TransportRpmsg_Factory_create: failed to attach to "
722                     "procId=%d status=%d\n", procId, attachStatus);
723             status = Ipc_E_FAIL;
724             break;
725         }
727         TransportRpmsg_module->inst[i] = transport;
728     }
730     if (!attachedAny) {
731         status = Ipc_E_FAIL;
732     }
734 exit:
735     return (status);
738 /*
739  *  ======== TransportRpmsg_Factory_delete ========
740  *  Finalize the transport instances
741  */
742 Void TransportRpmsg_Factory_delete(Void)
744     Int     i;
745     UInt16  procId;
746     UInt16  clusterSize;
747     UInt16  clusterBase;
749     /* needed to enumerate processors in cluster */
750     clusterSize = MultiProc_getNumProcsInCluster();
751     clusterBase = MultiProc_getBaseIdOfCluster();
753     /* detach from all remote processors, assuming they are up */
754     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
756         if (MultiProc_self() == procId) {
757             continue;
758         }
760         if (TransportRpmsg_module->inst[i] != NULL) {
761             MessageQ_unregisterTransport(procId, 0);
762             TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
763         }
764     }
766     return;