ae44f7526d04e2c07cc4c82a06e1d6adcaf7aec7
1 /*
2 * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== TransportRpmsg.c ========
34 * Implementation of functions specified in the IMessageQTransport interface.
35 */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
49 /* Socket Protocol Family */
50 #include <net/rpmsg.h>
53 /* IPC headers */
54 #include <ti/ipc/Std.h>
55 #include <SocketFxns.h> /* Socket utils: */
56 #include <ti/ipc/Ipc.h>
57 #include <ti/ipc/MessageQ.h>
58 #include <ti/ipc/MultiProc.h>
59 #include <ti/ipc/transports/TransportRpmsg.h>
60 #include <_MessageQ.h>
61 #include <_lad.h>
63 /* More magic rpmsg port numbers: */
64 #define MESSAGEQ_RPMSG_PORT 61
65 #define MESSAGEQ_RPMSG_MAXSIZE 512
67 #define TransportRpmsg_GROWSIZE 32
69 #define _MAX(a,b) (((a)>(b))?(a):(b))
71 /* traces in this file are controlled via _TransportMessageQ_verbose */
72 Bool _TransportMessageQ_verbose = FALSE;
73 #define verbose _TransportMessageQ_verbose
75 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
76 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
77 Bool TransportRpmsg_put(Void *handle, Ptr msg);
79 typedef struct TransportRpmsg_Module {
80 int sock[MultiProc_MAXPROCESSORS];
81 fd_set rfds;
82 int maxFd;
83 int inFds[1024];
84 int nInFds;
85 pthread_mutex_t gate;
86 int unblockEvent; /* eventFd for unblocking socket */
87 pthread_t threadId; /* ID returned by pthread_create() */
88 Bool threadStarted;
90 TransportRpmsg_Handle *inst; /* array of instances */
91 } TransportRpmsg_Module;
93 IMessageQTransport_Fxns TransportRpmsg_fxns = {
94 .bind = TransportRpmsg_bind,
95 .unbind = TransportRpmsg_unbind,
96 .put = TransportRpmsg_put
97 };
99 typedef struct TransportRpmsg_Object {
100 IMessageQTransport_Object base;
101 Int status;
102 UInt16 rprocId;
103 int numQueues;
104 int *qIndexToFd;
105 } TransportRpmsg_Object;
107 TransportRpmsg_Module TransportRpmsg_state = {
108 .sock = {0},
109 .threadStarted = FALSE,
110 .inst = NULL
111 };
112 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
114 static Int attach(UInt16 rprocId);
115 static Int detach(UInt16 rprocId);
116 static void *rpmsgThreadFxn(void *arg);
117 static Int transportGet(int sock, MessageQ_Msg *retMsg);
118 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
119 Int fd,
120 UInt16 qIndex);
121 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
122 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
124 Int TransportRpmsg_Factory_create(Void);
125 Void TransportRpmsg_Factory_delete(Void);
127 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
128 .createFxn = TransportRpmsg_Factory_create,
129 .deleteFxn = TransportRpmsg_Factory_delete
130 };
132 /* -------------------------------------------------------------------------- */
134 /* instance convertors */
135 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
136 {
137 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
138 return ((IMessageQTransport_Handle)&obj->base);
139 }
141 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
142 {
143 return ((TransportRpmsg_Handle)base);
144 }
146 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
147 Int *attachStatus)
148 {
149 TransportRpmsg_Object *obj;
150 Int rv;
152 rv = attach(params->rprocId);
153 if (attachStatus) {
154 *attachStatus = rv;
155 }
157 if (rv != MessageQ_S_SUCCESS) {
158 return NULL;
159 }
161 obj = calloc(1, sizeof (TransportRpmsg_Object));
163 /* structure copy */
164 obj->base.base.interfaceType = IMessageQTransport_TypeId;
165 obj->base.fxns = &TransportRpmsg_fxns;
166 obj->rprocId = params->rprocId;
168 obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
169 obj->numQueues = TransportRpmsg_GROWSIZE;
171 return (TransportRpmsg_Handle)obj;
172 }
174 Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
175 {
176 TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
178 detach(obj->rprocId);
180 free(obj->qIndexToFd);
181 free(obj);
183 *handlep = NULL;
184 }
186 static Int attach(UInt16 rprocId)
187 {
188 Int status = MessageQ_S_SUCCESS;
189 int sock;
190 UInt16 clusterId;
193 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
195 /* Create the socket for sending messages to the remote proc: */
196 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
197 if (sock < 0) {
198 status = MessageQ_E_FAIL;
199 printf("attach: socket failed: %d (%s)\n",
200 errno, strerror(errno));
202 goto exit;
203 }
205 PRINTVERBOSE1("attach: created send socket: %d\n", sock)
207 /* Attempt to connect: */
208 status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
209 if (status < 0) {
210 /* is it ok to "borrow" this error code from MessageQ? */
211 status = MessageQ_E_RESOURCE;
213 /* don't hard-printf or exit since this is no longer fatal */
214 PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
216 goto exitSock;
217 }
219 TransportRpmsg_module->sock[clusterId] = sock;
221 if (TransportRpmsg_module->threadStarted == FALSE) {
222 /* create a module wide event to unblock the socket select thread */
223 TransportRpmsg_module->unblockEvent = eventfd(0, 0);
224 if (TransportRpmsg_module->unblockEvent == -1) {
225 printf("attach: unblock socket failed: %d (%s)\n",
226 errno, strerror(errno));
227 status = MessageQ_E_FAIL;
229 goto exitSock;
230 }
232 PRINTVERBOSE1("attach: created unblock event %d\n",
233 TransportRpmsg_module->unblockEvent)
235 FD_ZERO(&TransportRpmsg_module->rfds);
236 FD_SET(TransportRpmsg_module->unblockEvent,
237 &TransportRpmsg_module->rfds);
238 TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
239 TransportRpmsg_module->nInFds = 0;
241 pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
243 status = pthread_create(&TransportRpmsg_module->threadId, NULL,
244 &rpmsgThreadFxn, NULL);
245 if (status < 0) {
246 status = MessageQ_E_FAIL;
247 printf("attach: failed to spawn thread\n");
249 goto exitEvent;
250 }
251 else {
252 TransportRpmsg_module->threadStarted = TRUE;
253 }
254 }
256 goto exit;
258 exitEvent:
259 close(TransportRpmsg_module->unblockEvent);
261 FD_ZERO(&TransportRpmsg_module->rfds);
262 TransportRpmsg_module->maxFd = 0;
264 exitSock:
265 close(sock);
266 TransportRpmsg_module->sock[clusterId] = 0;
268 exit:
269 return status;
270 }
272 static Int detach(UInt16 rprocId)
273 {
275 Int status = -1;
276 int sock;
277 UInt16 clusterId;
279 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
280 sock = TransportRpmsg_module->sock[clusterId];
282 if (sock) {
283 PRINTVERBOSE1("detach: closing socket: %d\n", sock)
285 status = close(sock);
286 }
288 return status;
289 }
291 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
292 {
293 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
294 UInt16 queuePort = queueId & 0x0000ffff;
295 int fd;
296 int err;
297 uint64_t buf;
298 UInt16 rprocId;
300 rprocId = obj->rprocId;
302 PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
303 "queuePort 0x%x\n", rprocId, queuePort)
305 /* Create the socket to receive messages for this messageQ. */
306 fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
307 if (fd < 0) {
308 printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
309 errno, strerror(errno));
310 goto exitClose;
311 }
313 PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
315 err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
316 if (err < 0) {
317 /* don't hard-printf since this is no longer fatal */
318 PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
319 errno, strerror(errno))
321 close(fd);
323 return -1;
324 }
326 pthread_mutex_lock(&TransportRpmsg_module->gate);
328 /* add to our fat fd array and update select() parameters */
329 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
330 TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
331 FD_SET(fd, &TransportRpmsg_module->rfds);
333 pthread_mutex_unlock(&TransportRpmsg_module->gate);
335 bindFdToQueueIndex(obj, fd, queuePort);
337 /*
338 * Even though we use the unblock event as just a signalling event with
339 * no related payload, we need to write some non-zero value. Might as
340 * well make it the fd (which the reader could decide to use if needed).
341 */
342 buf = fd;
343 write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
345 goto exit;
347 exitClose:
348 TransportRpmsg_unbind(handle, fd);
349 fd = 0;
351 exit:
352 return fd;
353 }
355 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
356 {
357 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
358 UInt16 queuePort = queueId & 0x0000ffff;
359 uint64_t buf;
360 Int status = MessageQ_S_SUCCESS;
361 int maxFd;
362 int fd;
363 int i;
364 int j;
366 fd = queueIndexToFd(obj, queuePort);
367 if (!fd) {
368 PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
369 queueId)
371 return -1;
372 }
374 PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
376 pthread_mutex_lock(&TransportRpmsg_module->gate);
378 /* remove from input fd array */
379 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
380 if (TransportRpmsg_module->inFds[i] == fd) {
381 TransportRpmsg_module->nInFds--;
383 /* shift subsequent elements down */
384 for (j = i; j < TransportRpmsg_module->nInFds; j++) {
385 TransportRpmsg_module->inFds[j] =
386 TransportRpmsg_module->inFds[j + 1];
387 }
388 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
390 FD_CLR(fd, &TransportRpmsg_module->rfds);
391 if (fd == TransportRpmsg_module->maxFd) {
392 /* find new max fd */
393 maxFd = TransportRpmsg_module->unblockEvent;
394 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
395 maxFd = _MAX(TransportRpmsg_module->inFds[j], maxFd);
396 }
397 TransportRpmsg_module->maxFd = maxFd;
398 }
400 /*
401 * Even though we use the unblock event as just a signalling
402 * event with no related payload, we need to write some non-zero
403 * value. Might as well make it the fd (which the reader could
404 * decide to use if needed).
405 */
406 buf = fd;
407 write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
409 break;
410 }
412 close(fd);
413 }
415 unbindQueueIndex(obj, queuePort);
417 pthread_mutex_unlock(&TransportRpmsg_module->gate);
419 return status;
420 }
422 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
423 {
424 MessageQ_Msg msg = (MessageQ_Msg)pmsg;
425 Int status = TRUE;
426 int sock;
427 int err;
428 UInt16 clusterId;
430 /*
431 * Retrieve the socket for the AF_SYSLINK protocol associated with this
432 * transport.
433 */
434 clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
435 sock = TransportRpmsg_module->sock[clusterId];
436 if (!sock) {
437 return FALSE;
438 }
440 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
442 err = send(sock, msg, msg->msgSize, 0);
443 if (err < 0) {
444 printf("TransportRpmsg_put: send failed: %d (%s)\n",
445 errno, strerror(errno));
446 status = FALSE;
448 goto exit;
449 }
451 /*
452 * Free the message, as this is a copy transport, we maintain MessageQ
453 * semantics.
454 */
455 MessageQ_free(msg);
457 exit:
458 return status;
459 }
461 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
462 {
463 return FALSE;
464 }
466 void *rpmsgThreadFxn(void *arg)
467 {
468 static int lastFdx = 0;
469 int curFdx = 0;
470 Int status = MessageQ_S_SUCCESS;
471 Int tmpStatus;
472 int retval;
473 uint64_t buf;
474 fd_set rfds;
475 int maxFd;
476 int nfds;
477 MessageQ_Msg retMsg;
478 MessageQ_QueueId queueId;
480 while (TRUE) {
481 pthread_mutex_lock(&TransportRpmsg_module->gate);
483 maxFd = TransportRpmsg_module->maxFd;
484 rfds = TransportRpmsg_module->rfds;
485 nfds = TransportRpmsg_module->nInFds;
487 pthread_mutex_unlock(&TransportRpmsg_module->gate);
489 PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
490 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
492 retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
493 if (retval) {
494 if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
495 /*
496 * Our event was signalled by TransportRpmsg_bind()
497 * or TransportRpmsg_unbind() to tell us that the set of
498 * fds has changed.
499 */
500 PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
502 /* we don't need the written value */
503 read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
504 }
505 else {
506 /* start where we last left off */
507 curFdx = lastFdx;
509 /*
510 * The set of fds that's used by select has been recorded
511 * locally, but the array of fds that are scanned below is
512 * a changing set (MessageQ_create/delete() can change it).
513 * While this might present an issue in itself, one key
514 * takeaway is that 'nfds' must not be zero else the % below
515 * will cause a divide-by-zero exception. We won't even get
516 * here if nfds == 0 since it's a local copy of the module's
517 * 'nInFds' which has to be > 0 for us to get here. So, even
518 * though the module's 'nInFds' might go to 0 during this loop,
519 * the loop itself will still remain intact.
520 */
521 do {
522 if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
524 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
525 TransportRpmsg_module->inFds[curFdx])
527 /* transport input fd was signalled: get the message */
528 tmpStatus = transportGet(
529 TransportRpmsg_module->inFds[curFdx], &retMsg);
530 if (tmpStatus < 0) {
531 printf("rpmsgThreadFxn: transportGet failed.");
532 status = MessageQ_E_FAIL;
533 }
534 else {
535 queueId = MessageQ_getDstQueue(retMsg);
537 PRINTVERBOSE1("rpmsgThreadFxn: got message, "
538 "delivering to queueId 0x%x\n", queueId)
540 MessageQ_put(queueId, retMsg);
541 }
543 lastFdx = (curFdx + 1) % nfds;
545 break;
546 }
548 curFdx = (curFdx + 1) % nfds;
549 } while (curFdx != lastFdx);
550 }
551 }
552 }
554 return (void *)status;
555 }
557 /*
558 * ======== transportGet ========
559 * Retrieve a message waiting in the socket's queue.
560 */
561 static Int transportGet(int sock, MessageQ_Msg *retMsg)
562 {
563 Int status = MessageQ_S_SUCCESS;
564 MessageQ_Msg msg;
565 struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
566 unsigned int len;
567 int byteCount;
569 /*
570 * We have no way of peeking to see what message size we'll get, so we
571 * allocate a message of max size to receive contents from the rpmsg socket
572 * (currently, a copy transport)
573 */
574 msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
575 if (!msg) {
576 status = MessageQ_E_MEMORY;
577 goto exit;
578 }
580 memset(&fromAddr, 0, sizeof (fromAddr));
581 len = sizeof (fromAddr);
583 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
584 (struct sockaddr *)&fromAddr, &len);
585 if (len != sizeof (fromAddr)) {
586 printf("recvfrom: got bad addr len (%d)\n", len);
587 status = MessageQ_E_FAIL;
588 goto exit;
589 }
590 if (byteCount < 0) {
591 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
592 status = MessageQ_E_FAIL;
593 goto exit;
594 }
595 else {
596 /*
597 * Update the allocated message size (even though this may waste
598 * space when the actual message is smaller than the maximum rpmsg
599 * size, the message will be freed soon anyway, and it avoids an
600 * extra copy).
601 */
602 msg->msgSize = byteCount;
604 /*
605 * If the message received was statically allocated, reset the
606 * heapId, so the app can free it.
607 */
608 if (msg->heapId == MessageQ_STATICMSG) {
609 msg->heapId = 0; /* for a copy transport, heap id is 0. */
610 }
611 }
613 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
614 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
615 "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
616 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
617 msg->msgSize)
619 *retMsg = msg;
621 exit:
622 return status;
623 }
625 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
626 {
627 Int *queues;
628 Int *oldQueues;
629 UInt oldSize;
630 UInt queueIndex;
632 /* subtract port offset from queue index */
633 queueIndex = queuePort - MessageQ_PORTOFFSET;
635 if (queueIndex >= obj->numQueues) {
636 PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
637 queueIndex + TransportRpmsg_GROWSIZE)
639 /* allocate larget table */
640 oldSize = obj->numQueues * sizeof (Int);
641 queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
643 /* copy contents from old table int new table */
644 memcpy(queues, obj->qIndexToFd, oldSize);
646 /* swap in new table, delete old table */
647 oldQueues = obj->qIndexToFd;
648 obj->qIndexToFd = queues;
649 obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
650 free(oldQueues);
651 }
653 /* add new entry */
654 obj->qIndexToFd[queueIndex] = fd;
655 }
657 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
658 {
659 UInt queueIndex;
661 /* subtract port offset from queue index */
662 queueIndex = queuePort - MessageQ_PORTOFFSET;
664 /* clear table entry */
665 obj->qIndexToFd[queueIndex] = 0;
666 }
668 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
669 {
670 UInt queueIndex;
672 /* subtract port offset from queue index */
673 queueIndex = queuePort - MessageQ_PORTOFFSET;
675 /* return file descriptor */
676 return (obj->qIndexToFd[queueIndex]);
677 }
679 /*
680 * ======== TransportRpmsg_Factory_create ========
681 * Create the transport instances
682 *
683 * Attach to all remote processors. For now, must attach to
684 * at least one to tolerate MessageQ_E_RESOURCE failures.
685 *
686 * This function implements the IPC Factory interface, so it
687 * returns Ipc status codes.
688 */
689 Int TransportRpmsg_Factory_create(Void)
690 {
691 Int status;
692 Int attachStatus;
693 Int i;
694 UInt16 procId;
695 Int32 attachedAny;
696 UInt16 clusterSize;
697 UInt16 clusterBase;
699 TransportRpmsg_Handle *inst;
700 TransportRpmsg_Handle transport;
701 TransportRpmsg_Params params;
702 IMessageQTransport_Handle iMsgQTrans;
705 status = Ipc_S_SUCCESS;
706 attachedAny = FALSE;
708 /* needed to enumerate processors in cluster */
709 clusterSize = MultiProc_getNumProcsInCluster();
710 clusterBase = MultiProc_getBaseIdOfCluster();
712 /* allocate the instance array */
713 inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
715 if (inst == NULL) {
716 printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
717 status = Ipc_E_MEMORY;
718 goto exit;
719 }
721 TransportRpmsg_module->inst = inst;
723 /* create transport instance for all processors in cluster */
724 for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
726 if (MultiProc_self() == procId) {
727 continue;
728 }
730 params.rprocId = procId;
731 transport = TransportRpmsg_create(¶ms, &attachStatus);
733 if (transport != NULL) {
734 iMsgQTrans = TransportRpmsg_upCast(transport);
735 MessageQ_registerTransport(iMsgQTrans, procId, 0);
736 attachedAny = TRUE;
737 }
738 else {
739 if (attachStatus == MessageQ_E_RESOURCE) {
740 continue;
741 }
742 printf("TransportRpmsg_Factory_create: failed to attach to "
743 "procId=%d status=%d\n", procId, attachStatus);
744 status = Ipc_E_FAIL;
745 break;
746 }
748 TransportRpmsg_module->inst[i] = transport;
749 }
751 if (!attachedAny) {
752 status = Ipc_E_FAIL;
753 }
755 exit:
756 return (status);
757 }
759 /*
760 * ======== TransportRpmsg_Factory_delete ========
761 * Finalize the transport instances
762 */
763 Void TransportRpmsg_Factory_delete(Void)
764 {
765 Int i;
766 UInt16 procId;
767 UInt16 clusterSize;
768 UInt16 clusterBase;
770 /* needed to enumerate processors in cluster */
771 clusterSize = MultiProc_getNumProcsInCluster();
772 clusterBase = MultiProc_getBaseIdOfCluster();
774 /* detach from all remote processors, assuming they are up */
775 for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
777 if (MultiProc_self() == procId) {
778 continue;
779 }
781 if (TransportRpmsg_module->inst[i] != NULL) {
782 MessageQ_unregisterTransport(procId, 0);
783 TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
784 }
785 }
787 return;
788 }