Remove MAX #define from Linux Std.h
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
49 /* Socket Protocol Family */
50 #include <net/rpmsg.h>
53 /* IPC headers */
54 #include <ti/ipc/Std.h>
55 #include <SocketFxns.h>         /* Socket utils: */
56 #include <ti/ipc/Ipc.h>
57 #include <ti/ipc/MessageQ.h>
58 #include <ti/ipc/MultiProc.h>
59 #include <ti/ipc/transports/TransportRpmsg.h>
60 #include <_MessageQ.h>
61 #include <_lad.h>
63 /* More magic rpmsg port numbers: */
64 #define MESSAGEQ_RPMSG_PORT       61
65 #define MESSAGEQ_RPMSG_MAXSIZE   512
67 #define TransportRpmsg_GROWSIZE 32
69 #define _MAX(a,b) (((a)>(b))?(a):(b))
71 /* traces in this file are controlled via _TransportMessageQ_verbose */
72 Bool _TransportMessageQ_verbose = FALSE;
73 #define verbose _TransportMessageQ_verbose
75 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
76 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
77 Bool TransportRpmsg_put(Void *handle, Ptr msg);
79 typedef struct TransportRpmsg_Module {
80     int             sock[MultiProc_MAXPROCESSORS];
81     fd_set          rfds;
82     int             maxFd;
83     int             inFds[1024];
84     int             nInFds;
85     pthread_mutex_t gate;
86     int             unblockEvent;    /* eventFd for unblocking socket */
87     pthread_t       threadId;        /* ID returned by pthread_create() */
88     Bool            threadStarted;
90     TransportRpmsg_Handle *inst;    /* array of instances */
91 } TransportRpmsg_Module;
93 IMessageQTransport_Fxns TransportRpmsg_fxns = {
94     .bind    = TransportRpmsg_bind,
95     .unbind  = TransportRpmsg_unbind,
96     .put     = TransportRpmsg_put
97 };
99 typedef struct TransportRpmsg_Object {
100     IMessageQTransport_Object base;
101     Int status;
102     UInt16 rprocId;
103     int numQueues;
104     int *qIndexToFd;
105 } TransportRpmsg_Object;
107 TransportRpmsg_Module TransportRpmsg_state = {
108     .sock = {0},
109     .threadStarted = FALSE,
110     .inst = NULL
111 };
112 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
114 static Int attach(UInt16 rprocId);
115 static Int detach(UInt16 rprocId);
116 static void *rpmsgThreadFxn(void *arg);
117 static Int transportGet(int sock, MessageQ_Msg *retMsg);
118 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
119                                Int fd,
120                                UInt16 qIndex);
121 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
122 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
124 Int TransportRpmsg_Factory_create(Void);
125 Void TransportRpmsg_Factory_delete(Void);
127 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
128     .createFxn = TransportRpmsg_Factory_create,
129     .deleteFxn = TransportRpmsg_Factory_delete
130 };
132 /* -------------------------------------------------------------------------- */
134 /* instance convertors */
135 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
137     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
138     return ((IMessageQTransport_Handle)&obj->base);
141 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
143     return ((TransportRpmsg_Handle)base);
146 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
147                                             Int *attachStatus)
149     TransportRpmsg_Object *obj;
150     Int rv;
152     rv = attach(params->rprocId);
153     if (attachStatus) {
154         *attachStatus = rv;
155     }
157     if (rv != MessageQ_S_SUCCESS) {
158         return NULL;
159     }
161     obj = calloc(1, sizeof (TransportRpmsg_Object));
163     /* structure copy */
164     obj->base.base.interfaceType = IMessageQTransport_TypeId;
165     obj->base.fxns = &TransportRpmsg_fxns;
166     obj->rprocId = params->rprocId;
168     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
169     obj->numQueues = TransportRpmsg_GROWSIZE;
171     return (TransportRpmsg_Handle)obj;
174 Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
176     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
178     detach(obj->rprocId);
180     free(obj->qIndexToFd);
181     free(obj);
183     *handlep = NULL;
186 static Int attach(UInt16 rprocId)
188     Int     status = MessageQ_S_SUCCESS;
189     int     sock;
190     UInt16  clusterId;
193     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
195     /* Create the socket for sending messages to the remote proc: */
196     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
197     if (sock < 0) {
198         status = MessageQ_E_FAIL;
199         printf("attach: socket failed: %d (%s)\n",
200                errno, strerror(errno));
202         goto exit;
203     }
205     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
207     /* Attempt to connect: */
208     status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
209     if (status < 0) {
210         /* is it ok to "borrow" this error code from MessageQ? */
211         status = MessageQ_E_RESOURCE;
213         /* don't hard-printf or exit since this is no longer fatal */
214         PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
216         goto exitSock;
217     }
219     TransportRpmsg_module->sock[clusterId] = sock;
221     if (TransportRpmsg_module->threadStarted == FALSE) {
222         /* create a module wide event to unblock the socket select thread */
223         TransportRpmsg_module->unblockEvent = eventfd(0, 0);
224         if (TransportRpmsg_module->unblockEvent == -1) {
225             printf("attach: unblock socket failed: %d (%s)\n",
226                    errno, strerror(errno));
227             status = MessageQ_E_FAIL;
229             goto exitSock;
230         }
232         PRINTVERBOSE1("attach: created unblock event %d\n",
233                       TransportRpmsg_module->unblockEvent)
235         FD_ZERO(&TransportRpmsg_module->rfds);
236         FD_SET(TransportRpmsg_module->unblockEvent,
237                &TransportRpmsg_module->rfds);
238         TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
239         TransportRpmsg_module->nInFds = 0;
241         pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
243         status = pthread_create(&TransportRpmsg_module->threadId, NULL,
244                                 &rpmsgThreadFxn, NULL);
245         if (status < 0) {
246             status = MessageQ_E_FAIL;
247             printf("attach: failed to spawn thread\n");
249             goto exitEvent;
250         }
251         else {
252             TransportRpmsg_module->threadStarted = TRUE;
253         }
254     }
256     goto exit;
258 exitEvent:
259     close(TransportRpmsg_module->unblockEvent);
261     FD_ZERO(&TransportRpmsg_module->rfds);
262     TransportRpmsg_module->maxFd = 0;
264 exitSock:
265     close(sock);
266     TransportRpmsg_module->sock[clusterId] = 0;
268 exit:
269     return status;
272 static Int detach(UInt16 rprocId)
275     Int     status = -1;
276     int     sock;
277     UInt16  clusterId;
279     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
280     sock = TransportRpmsg_module->sock[clusterId];
282     if (sock) {
283         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
285         status = close(sock);
286     }
288     return status;
291 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
293     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
294     UInt16   queuePort = queueId & 0x0000ffff;
295     int      fd;
296     int      err;
297     uint64_t buf;
298     UInt16   rprocId;
300     rprocId = obj->rprocId;
302     PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
303             "queuePort 0x%x\n", rprocId, queuePort)
305     /*  Create the socket to receive messages for this messageQ. */
306     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
307     if (fd < 0) {
308         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
309                 errno, strerror(errno));
310         goto exitClose;
311     }
313     PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
315     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
316     if (err < 0) {
317         /* don't hard-printf since this is no longer fatal */
318         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
319                       errno, strerror(errno))
321         close(fd);
323         return -1;
324     }
326     pthread_mutex_lock(&TransportRpmsg_module->gate);
328     /* add to our fat fd array and update select() parameters */
329     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
330     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
331     FD_SET(fd, &TransportRpmsg_module->rfds);
333     pthread_mutex_unlock(&TransportRpmsg_module->gate);
335     bindFdToQueueIndex(obj, fd, queuePort);
337     /*
338      * Even though we use the unblock event as just a signalling event with
339      * no related payload, we need to write some non-zero value.  Might as
340      * well make it the fd (which the reader could decide to use if needed).
341      */
342     buf = fd;
343     write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
345     goto exit;
347 exitClose:
348     TransportRpmsg_unbind(handle, fd);
349     fd = 0;
351 exit:
352     return fd;
355 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
357     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
358     UInt16 queuePort = queueId & 0x0000ffff;
359     uint64_t buf;
360     Int    status = MessageQ_S_SUCCESS;
361     int    maxFd;
362     int    fd;
363     int    i;
364     int    j;
366     fd = queueIndexToFd(obj, queuePort);
367     if (!fd) {
368         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
369                       queueId)
371         return -1;
372     }
374     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
376     pthread_mutex_lock(&TransportRpmsg_module->gate);
378     /* remove from input fd array */
379     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
380         if (TransportRpmsg_module->inFds[i] == fd) {
381             TransportRpmsg_module->nInFds--;
383             /* shift subsequent elements down */
384             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
385                 TransportRpmsg_module->inFds[j] =
386                     TransportRpmsg_module->inFds[j + 1];
387             }
388             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
390             FD_CLR(fd, &TransportRpmsg_module->rfds);
391             if (fd == TransportRpmsg_module->maxFd) {
392                 /* find new max fd */
393                 maxFd = TransportRpmsg_module->unblockEvent;
394                 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
395                     maxFd = _MAX(TransportRpmsg_module->inFds[j], maxFd);
396                 }
397                 TransportRpmsg_module->maxFd = maxFd;
398             }
400             /*
401              * Even though we use the unblock event as just a signalling
402              * event with no related payload, we need to write some non-zero
403              * value.  Might as well make it the fd (which the reader could
404              * decide to use if needed).
405              */
406             buf = fd;
407             write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
409             break;
410         }
412         close(fd);
413     }
415     unbindQueueIndex(obj, queuePort);
417     pthread_mutex_unlock(&TransportRpmsg_module->gate);
419     return status;
422 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
424     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
425     Int     status    = TRUE;
426     int     sock;
427     int     err;
428     UInt16  clusterId;
430     /*
431      * Retrieve the socket for the AF_SYSLINK protocol associated with this
432      * transport.
433      */
434     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
435     sock = TransportRpmsg_module->sock[clusterId];
436     if (!sock) {
437         return FALSE;
438     }
440     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
442     err = send(sock, msg, msg->msgSize, 0);
443     if (err < 0) {
444         printf("TransportRpmsg_put: send failed: %d (%s)\n",
445                errno, strerror(errno));
446         status = FALSE;
448         goto exit;
449     }
451     /*
452      * Free the message, as this is a copy transport, we maintain MessageQ
453      * semantics.
454      */
455     MessageQ_free(msg);
457 exit:
458     return status;
461 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
463     return FALSE;
466 void *rpmsgThreadFxn(void *arg)
468     static int lastFdx = 0;
469     int      curFdx = 0;
470     Int      status = MessageQ_S_SUCCESS;
471     Int      tmpStatus;
472     int      retval;
473     uint64_t buf;
474     fd_set   rfds;
475     int      maxFd;
476     int      nfds;
477     MessageQ_Msg     retMsg;
478     MessageQ_QueueId queueId;
480     while (TRUE) {
481         pthread_mutex_lock(&TransportRpmsg_module->gate);
483         maxFd = TransportRpmsg_module->maxFd;
484         rfds = TransportRpmsg_module->rfds;
485         nfds = TransportRpmsg_module->nInFds;
487         pthread_mutex_unlock(&TransportRpmsg_module->gate);
489         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
490                       (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
492         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
493         if (retval) {
494             if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
495                 /*
496                  * Our event was signalled by TransportRpmsg_bind()
497                  * or TransportRpmsg_unbind() to tell us that the set of
498                  * fds has changed.
499                  */
500                 PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
502                 /* we don't need the written value */
503                 read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
504             }
505             else {
506                 /* start where we last left off */
507                 curFdx = lastFdx;
509                 /*
510                  * The set of fds that's used by select has been recorded
511                  * locally, but the array of fds that are scanned below is
512                  * a changing set (MessageQ_create/delete() can change it).
513                  * While this might present an issue in itself, one key
514                  * takeaway is that 'nfds' must not be zero else the % below
515                  * will cause a divide-by-zero exception.  We won't even get
516                  * here if nfds == 0 since it's a local copy of the module's
517                  * 'nInFds' which has to be > 0 for us to get here.  So, even
518                  * though the module's 'nInFds' might go to 0 during this loop,
519                  * the loop itself will still remain intact.
520                  */
521                 do {
522                     if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
524                         PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
525                                       TransportRpmsg_module->inFds[curFdx])
527                         /* transport input fd was signalled: get the message */
528                         tmpStatus = transportGet(
529                             TransportRpmsg_module->inFds[curFdx], &retMsg);
530                         if (tmpStatus < 0) {
531                             printf("rpmsgThreadFxn: transportGet failed.");
532                             status = MessageQ_E_FAIL;
533                         }
534                         else {
535                             queueId = MessageQ_getDstQueue(retMsg);
537                             PRINTVERBOSE1("rpmsgThreadFxn: got message, "
538                                     "delivering to queueId 0x%x\n", queueId)
540                             MessageQ_put(queueId, retMsg);
541                         }
543                         lastFdx = (curFdx + 1) % nfds;
545                         break;
546                     }
548                     curFdx = (curFdx + 1) % nfds;
549                 } while (curFdx != lastFdx);
550             }
551         }
552     }
554     return (void *)status;
557 /*
558  * ======== transportGet ========
559  *  Retrieve a message waiting in the socket's queue.
560 */
561 static Int transportGet(int sock, MessageQ_Msg *retMsg)
563     Int           status    = MessageQ_S_SUCCESS;
564     MessageQ_Msg  msg;
565     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
566     unsigned int  len;
567     int           byteCount;
569     /*
570      * We have no way of peeking to see what message size we'll get, so we
571      * allocate a message of max size to receive contents from the rpmsg socket
572      * (currently, a copy transport)
573      */
574     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
575     if (!msg) {
576         status = MessageQ_E_MEMORY;
577         goto exit;
578     }
580     memset(&fromAddr, 0, sizeof (fromAddr));
581     len = sizeof (fromAddr);
583     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
584                          (struct sockaddr *)&fromAddr, &len);
585     if (len != sizeof (fromAddr)) {
586         printf("recvfrom: got bad addr len (%d)\n", len);
587         status = MessageQ_E_FAIL;
588         goto exit;
589     }
590     if (byteCount < 0) {
591         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
592         status = MessageQ_E_FAIL;
593         goto exit;
594     }
595     else {
596          /*
597           * Update the allocated message size (even though this may waste
598           * space when the actual message is smaller than the maximum rpmsg
599           * size, the message will be freed soon anyway, and it avoids an
600           * extra copy).
601           */
602          msg->msgSize = byteCount;
604          /*
605           * If the message received was statically allocated, reset the
606           * heapId, so the app can free it.
607           */
608          if (msg->heapId == MessageQ_STATICMSG)  {
609              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
610          }
611     }
613     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
614     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
615             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
616     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
617             msg->msgSize)
619     *retMsg = msg;
621 exit:
622     return status;
625 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
627     Int *queues;
628     Int *oldQueues;
629     UInt oldSize;
630     UInt queueIndex;
632     /* subtract port offset from queue index */
633     queueIndex = queuePort - MessageQ_PORTOFFSET;
635     if (queueIndex >= obj->numQueues) {
636         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
637                 queueIndex + TransportRpmsg_GROWSIZE)
639         /* allocate larget table */
640         oldSize = obj->numQueues * sizeof (Int);
641         queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
643         /* copy contents from old table int new table */
644         memcpy(queues, obj->qIndexToFd, oldSize);
646         /* swap in new table, delete old table */
647         oldQueues = obj->qIndexToFd;
648         obj->qIndexToFd = queues;
649         obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
650         free(oldQueues);
651     }
653     /* add new entry */
654     obj->qIndexToFd[queueIndex] = fd;
657 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
659     UInt queueIndex;
661     /* subtract port offset from queue index */
662     queueIndex = queuePort - MessageQ_PORTOFFSET;
664     /* clear table entry */
665     obj->qIndexToFd[queueIndex] = 0;
668 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
670     UInt queueIndex;
672     /* subtract port offset from queue index */
673     queueIndex = queuePort - MessageQ_PORTOFFSET;
675     /* return file descriptor */
676     return (obj->qIndexToFd[queueIndex]);
679 /*
680  *  ======== TransportRpmsg_Factory_create ========
681  *  Create the transport instances
682  *
683  *  Attach to all remote processors. For now, must attach to
684  *  at least one to tolerate MessageQ_E_RESOURCE failures.
685  *
686  *  This function implements the IPC Factory interface, so it
687  *  returns Ipc status codes.
688  */
689 Int TransportRpmsg_Factory_create(Void)
691     Int     status;
692     Int     attachStatus;
693     Int     i;
694     UInt16  procId;
695     Int32   attachedAny;
696     UInt16  clusterSize;
697     UInt16  clusterBase;
699     TransportRpmsg_Handle      *inst;
700     TransportRpmsg_Handle       transport;
701     TransportRpmsg_Params       params;
702     IMessageQTransport_Handle   iMsgQTrans;
705     status = Ipc_S_SUCCESS;
706     attachedAny = FALSE;
708     /* needed to enumerate processors in cluster */
709     clusterSize = MultiProc_getNumProcsInCluster();
710     clusterBase = MultiProc_getBaseIdOfCluster();
712     /* allocate the instance array */
713     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
715     if (inst == NULL) {
716         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
717         status = Ipc_E_MEMORY;
718         goto exit;
719     }
721     TransportRpmsg_module->inst = inst;
723     /* create transport instance for all processors in cluster */
724     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
726         if (MultiProc_self() == procId) {
727             continue;
728         }
730         params.rprocId = procId;
731         transport = TransportRpmsg_create(&params, &attachStatus);
733         if (transport != NULL) {
734             iMsgQTrans = TransportRpmsg_upCast(transport);
735             MessageQ_registerTransport(iMsgQTrans, procId, 0);
736             attachedAny = TRUE;
737         }
738         else {
739             if (attachStatus == MessageQ_E_RESOURCE) {
740                 continue;
741             }
742             printf("TransportRpmsg_Factory_create: failed to attach to "
743                     "procId=%d status=%d\n", procId, attachStatus);
744             status = Ipc_E_FAIL;
745             break;
746         }
748         TransportRpmsg_module->inst[i] = transport;
749     }
751     if (!attachedAny) {
752         status = Ipc_E_FAIL;
753     }
755 exit:
756     return (status);
759 /*
760  *  ======== TransportRpmsg_Factory_delete ========
761  *  Finalize the transport instances
762  */
763 Void TransportRpmsg_Factory_delete(Void)
765     Int     i;
766     UInt16  procId;
767     UInt16  clusterSize;
768     UInt16  clusterBase;
770     /* needed to enumerate processors in cluster */
771     clusterSize = MultiProc_getNumProcsInCluster();
772     clusterBase = MultiProc_getBaseIdOfCluster();
774     /* detach from all remote processors, assuming they are up */
775     for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
777         if (MultiProc_self() == procId) {
778             continue;
779         }
781         if (TransportRpmsg_module->inst[i] != NULL) {
782             MessageQ_unregisterTransport(procId, 0);
783             TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
784         }
785     }
787     return;