1 /*
2 * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== TransportRpmsg.c ========
34 * Implementation of functions specified in the IMessageQTransport interface.
35 */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h> /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 # define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT 61
70 #define MESSAGEQ_RPMSG_MAXSIZE 512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK (1 << 0)
76 #define TransportRpmsg_Event_PAUSE (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92 int sock[MultiProc_MAXPROCESSORS];
93 fd_set rfds;
94 int maxFd;
95 struct {
96 int fd;
97 UInt32 qId;
98 } inFds[1024];
99 int nInFds;
100 pthread_mutex_t gate;
101 int unblockEvent; /* unblock the dispatch thread */
102 int waitEvent; /* block the client thread */
103 pthread_t threadId; /* ID returned by pthread_create() */
104 Bool threadStarted;
106 TransportRpmsg_Handle *inst; /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110 .bind = TransportRpmsg_bind,
111 .unbind = TransportRpmsg_unbind,
112 .put = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116 IMessageQTransport_Object base;
117 Int status;
118 UInt16 rprocId;
119 int numQueues;
120 int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124 .sock = {INVALIDSOCKET},
125 .unblockEvent = -1,
126 .waitEvent = -1,
127 .threadStarted = FALSE,
128 .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135 Int fd,
136 UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147 .createFxn = TransportRpmsg_Factory_create,
148 .deleteFxn = TransportRpmsg_Factory_delete,
149 .attachFxn = TransportRpmsg_Factory_attach,
150 .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
157 {
158 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159 return ((IMessageQTransport_Handle)&obj->base);
160 }
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
163 {
164 return ((TransportRpmsg_Handle)base);
165 }
167 /*
168 * ======== TransportRpmsg_create ========
169 */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
171 {
172 Int status = MessageQ_S_SUCCESS;
173 TransportRpmsg_Object *obj = NULL;
174 int sock;
175 int flags;
176 UInt16 clusterId;
177 int i;
180 clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
182 /* create socket for sending messages to remote processor */
183 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
185 if (sock < 0) {
186 status = Ipc_E_FAIL;
187 printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
188 strerror(errno));
189 goto done;
190 }
191 TransportRpmsg_module->sock[clusterId] = sock;
192 PRINTVERBOSE1("attach: created send socket: %d\n", sock)
194 status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
196 if (status < 0) {
197 status = Ipc_E_FAIL;
198 printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
199 errno, strerror(errno), params->rprocId);
200 goto done;
201 }
203 /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
204 flags = fcntl(sock, F_GETFD);
205 if (flags != -1) {
206 fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
207 }
209 /* create the instance object */
210 obj = calloc(1, sizeof(TransportRpmsg_Object));
212 if (obj == NULL) {
213 status = Ipc_E_MEMORY;
214 goto done;
215 }
217 /* initialize the instance */
218 obj->base.base.interfaceType = IMessageQTransport_TypeId;
219 obj->base.fxns = &TransportRpmsg_fxns;
220 obj->rprocId = params->rprocId;
221 obj->numQueues = TransportRpmsg_GROWSIZE;
223 obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
225 if (obj->qIndexToFd == NULL) {
226 status = Ipc_E_MEMORY;
227 goto done;
228 }
230 /* must initialize array */
231 for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
232 obj->qIndexToFd[i] = -1;
233 }
235 done:
236 if (status < 0) {
237 TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
238 }
240 return (TransportRpmsg_Handle)obj;
241 }
243 /*
244 * ======== TransportRpmsg_delete ========
245 */
246 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
247 {
248 TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
249 UInt16 clusterId;
250 int sock;
253 clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
255 /* close the socket for the given transport instance */
256 sock = TransportRpmsg_module->sock[clusterId];
257 if (sock != INVALIDSOCKET) {
258 PRINTVERBOSE1("detach: closing socket: %d\n", sock)
259 close(sock);
260 }
261 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
263 if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
264 free(obj->qIndexToFd);
265 obj->qIndexToFd = NULL;
266 }
268 if (obj != NULL) {
269 free(obj);
270 obj = NULL;
271 }
273 *pHandle = NULL;
274 }
276 /*
277 * ======== TransportRpmsg_bind ========
278 */
279 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
280 {
281 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
282 UInt16 queuePort = queueId & 0x0000ffff;
283 int fd;
284 int flags;
285 int err;
286 uint64_t event;
287 UInt16 rprocId;
288 pthread_t tid;
289 Int status = MessageQ_S_SUCCESS;
291 tid = pthread_self();
292 rprocId = obj->rprocId;
294 PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
295 "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
297 pthread_mutex_lock(&TransportRpmsg_module->gate);
299 /* Check if binding already exists.
300 *
301 * There is a race condition between a thread calling MessageQ_create
302 * and another thread calling Ipc_attach. Must make sure we don't bind
303 * the same queue twice.
304 */
305 if (queueIndexToFd(obj, queueId) != -1) {
306 goto done;
307 }
309 /* Create the socket to receive messages for this messageQ. */
310 fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
311 if (fd < 0) {
312 printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
313 errno, strerror(errno));
314 status = MessageQ_E_OSFAILURE;
315 goto done;
316 }
317 PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
318 (unsigned int)tid);
320 err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
321 if (err < 0) {
322 /* don't hard-printf since this is no longer fatal */
323 PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
324 errno, strerror(errno));
325 close(fd);
326 status = MessageQ_E_OSFAILURE;
327 goto done;
328 }
330 /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
331 flags = fcntl(fd, F_GETFD);
332 if (flags != -1) {
333 fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
334 }
336 /* pause the dispatch thread */
337 PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
338 (unsigned int)tid);
339 event = TransportRpmsg_Event_PAUSE;
340 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
342 /* wait for ACK event */
343 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
344 PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
345 (int)event, (unsigned int)tid);
347 /* add to our fat fd array and update select() parameters */
348 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
349 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
350 TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
351 FD_SET(fd, &TransportRpmsg_module->rfds);
352 bindFdToQueueIndex(obj, fd, queuePort);
354 /* release the dispatch thread */
355 PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
356 (unsigned int)tid);
357 event = TransportRpmsg_Event_CONTINUE;
358 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
360 done:
361 pthread_mutex_unlock(&TransportRpmsg_module->gate);
363 return (status);
364 }
366 /*
367 * ======== TransportRpmsg_unbind ========
368 */
369 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
370 {
371 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
372 UInt16 queuePort = queueId & 0x0000ffff;
373 uint64_t event;
374 Int status = MessageQ_S_SUCCESS;
375 int maxFd;
376 int fd;
377 int i;
378 int j;
380 pthread_mutex_lock(&TransportRpmsg_module->gate);
382 /* pause the dispatch thread */
383 event = TransportRpmsg_Event_PAUSE;
384 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
386 /* wait for ACK event */
387 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
389 /* Check if binding already deleted.
390 *
391 * There is a race condition between a thread calling MessageQ_delete
392 * and another thread calling Ipc_detach. Must make sure we don't unbind
393 * the same queue twice.
394 */
395 if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
396 goto done;
397 }
398 PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
400 /* guarenteed to work because queueIndexToFd above succeeded */
401 unbindQueueIndex(obj, queuePort);
403 /* remove from input fd array */
404 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
405 if (TransportRpmsg_module->inFds[i].fd == fd) {
406 TransportRpmsg_module->nInFds--;
408 /* shift subsequent elements down */
409 for (j = i; j < TransportRpmsg_module->nInFds; j++) {
410 TransportRpmsg_module->inFds[j] =
411 TransportRpmsg_module->inFds[j + 1];
412 }
413 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
414 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
415 break;
416 }
417 }
419 /* remove fd from the descriptor set, compute new max value */
420 FD_CLR(fd, &TransportRpmsg_module->rfds);
421 if (fd == TransportRpmsg_module->maxFd) {
422 /* find new max fd */
423 maxFd = TransportRpmsg_module->unblockEvent;
424 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
425 maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
426 }
427 TransportRpmsg_module->maxFd = maxFd;
428 }
430 close(fd);
432 /* release the dispatch thread */
433 event = TransportRpmsg_Event_CONTINUE;
434 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
436 done:
437 pthread_mutex_unlock(&TransportRpmsg_module->gate);
439 return (status);
440 }
442 /*
443 * ======== TransportRpmsg_put ========
444 */
445 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
446 {
447 MessageQ_Msg msg = (MessageQ_Msg)pmsg;
448 Int status = TRUE;
449 int sock;
450 int err;
451 UInt16 clusterId;
453 /*
454 * Retrieve the socket for the AF_SYSLINK protocol associated with this
455 * transport.
456 */
457 clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
458 sock = TransportRpmsg_module->sock[clusterId];
459 if (!sock) {
460 return FALSE;
461 }
463 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
465 err = send(sock, msg, msg->msgSize, 0);
466 if (err < 0) {
467 printf("TransportRpmsg_put: send failed: %d (%s)\n",
468 errno, strerror(errno));
469 status = FALSE;
471 goto exit;
472 }
474 /*
475 * Free the message, as this is a copy transport, we maintain MessageQ
476 * semantics.
477 */
478 MessageQ_free(msg);
480 exit:
481 return status;
482 }
484 /*
485 * ======== TransportRpmsg_control ========
486 */
487 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
488 {
489 return FALSE;
490 }
492 /*
493 * ======== rpmsgThreadFxn ========
494 */
495 void *rpmsgThreadFxn(void *arg)
496 {
497 Int status = MessageQ_S_SUCCESS;
498 Int tmpStatus;
499 int retval;
500 uint64_t event;
501 fd_set rfds;
502 int maxFd;
503 int nfds;
504 MessageQ_Msg retMsg;
505 MessageQ_QueueId queueId;
506 MessageQ_Handle handle;
507 Bool run = TRUE;
508 int i;
509 int j;
510 int fd;
513 while (run) {
514 maxFd = TransportRpmsg_module->maxFd;
515 rfds = TransportRpmsg_module->rfds;
516 nfds = TransportRpmsg_module->nInFds;
518 PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
519 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
521 retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
523 /* if error, try again */
524 if (retval < 0) {
525 printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
526 continue;
527 }
529 /* dispatch all pending messages, do this first */
530 for (i = 0; i < nfds; i++) {
531 fd = TransportRpmsg_module->inFds[i].fd;
533 if (FD_ISSET(fd, &rfds)) {
534 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
535 TransportRpmsg_module->inFds[i].fd);
537 /* transport input fd was signalled: get the message */
538 tmpStatus = transportGet(fd, &retMsg);
539 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
540 printf("rpmsgThreadFxn: transportGet failed on fd %d,"
541 " returned %d\n", fd, tmpStatus);
542 }
543 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
544 printf("rpmsgThreadFxn: transportGet failed on fd %d,"
545 " returned %d\n", fd, tmpStatus);
547 pthread_mutex_lock(&TransportRpmsg_module->gate);
549 /*
550 * Don't close(fd) at this time since it will get closed
551 * later when MessageQ_delete() is called in response to
552 * this failure. Just remove fd's bit from the select mask
553 * 'rfds' for now, but don't remove it from inFds[].
554 */
555 FD_CLR(fd, &TransportRpmsg_module->rfds);
556 if (fd == TransportRpmsg_module->maxFd) {
557 /* find new max fd */
558 maxFd = TransportRpmsg_module->unblockEvent;
559 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
560 maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
561 maxFd);
562 }
563 TransportRpmsg_module->maxFd = maxFd;
564 }
565 queueId = TransportRpmsg_module->inFds[i].qId;
567 pthread_mutex_unlock(&TransportRpmsg_module->gate);
569 handle = MessageQ_getLocalHandle(queueId);
571 PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
572 "%p (queueId 0x%x)...\n", handle, queueId)
574 if (handle != NULL) {
575 MessageQ_shutdown(handle);
576 }
577 else {
578 printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
579 "returned NULL, can't shutdown\n", queueId);
580 }
581 }
582 else {
583 queueId = MessageQ_getDstQueue(retMsg);
584 PRINTVERBOSE1("rpmsgThreadFxn: got message, "
585 "delivering to queueId 0x%x\n", queueId)
586 MessageQ_put(queueId, retMsg);
587 }
588 }
589 }
591 /* check for events */
592 if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
594 read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
596 do {
597 if (event & TransportRpmsg_Event_SHUTDOWN) {
598 PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
599 run = FALSE;
600 break; /* highest priority, stop processing events */
601 }
602 if (event & TransportRpmsg_Event_CONTINUE) {
603 PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
604 (int)event);
605 event &= ~TransportRpmsg_Event_CONTINUE;
606 }
607 if (event & TransportRpmsg_Event_PAUSE) {
608 /* Our event was signalled by TransportRpmsg_bind()
609 * or TransportRpmsg_unbind() to tell us that the set
610 * of file descriptors has changed.
611 */
612 PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
613 /* send the acknowledgement */
614 event = TransportRpmsg_Event_ACK;
615 write(TransportRpmsg_module->waitEvent, &event,
616 sizeof(event));
617 /* now wait to be released */
618 read(TransportRpmsg_module->unblockEvent, &event,
619 sizeof(event));
620 }
621 } while (event != 0);
622 }
623 }
625 return (void *)status;
626 }
628 /*
629 * ======== transportGet ========
630 * Retrieve a message waiting in the socket's queue.
631 */
632 static Int transportGet(int sock, MessageQ_Msg *retMsg)
633 {
634 Int status = MessageQ_S_SUCCESS;
635 MessageQ_Msg msg;
636 struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
637 unsigned int len;
638 int byteCount;
640 /*
641 * We have no way of peeking to see what message size we'll get, so we
642 * allocate a message of max size to receive contents from the rpmsg socket
643 * (currently, a copy transport)
644 */
645 msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
646 if (!msg) {
647 status = MessageQ_E_MEMORY;
648 goto exit;
649 }
651 memset(&fromAddr, 0, sizeof (fromAddr));
652 len = sizeof (fromAddr);
654 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
655 (struct sockaddr *)&fromAddr, &len);
656 if (len != sizeof (fromAddr)) {
657 printf("recvfrom: got bad addr len (%d)\n", len);
658 status = MessageQ_E_FAIL;
659 goto freeMsg;
660 }
661 if (byteCount < 0) {
662 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
663 if (errno == ENOLINK) {
664 status = MessageQ_E_SHUTDOWN;
665 }
666 else {
667 status = MessageQ_E_FAIL;
668 }
669 goto freeMsg;
670 }
671 else {
672 /*
673 * Update the allocated message size (even though this may waste
674 * space when the actual message is smaller than the maximum rpmsg
675 * size, the message will be freed soon anyway, and it avoids an
676 * extra copy).
677 */
678 msg->msgSize = byteCount;
680 /*
681 * If the message received was statically allocated, reset the
682 * heapId, so the app can free it.
683 */
684 if (msg->heapId == MessageQ_STATICMSG) {
685 msg->heapId = 0; /* for a copy transport, heap id is 0. */
686 }
687 }
689 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
690 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
691 "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
692 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
693 msg->msgSize)
695 *retMsg = msg;
697 goto exit;
699 freeMsg:
700 MessageQ_free(msg);
702 exit:
703 return status;
704 }
706 /*
707 * ======== bindFdToQueueIndex ========
708 *
709 * Precondition: caller must be inside the module gate
710 */
711 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
712 {
713 Int *queues;
714 Int *oldQueues;
715 UInt oldSize;
716 UInt newCount;
717 UInt queueIndex;
718 int i;
720 /* subtract port offset from queue index */
721 queueIndex = queuePort - MessageQ_PORTOFFSET;
723 if (queueIndex >= obj->numQueues) {
724 newCount = queueIndex + TransportRpmsg_GROWSIZE;
725 PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
726 newCount);
728 /* allocate larger table */
729 oldSize = obj->numQueues * sizeof(int);
730 queues = calloc(newCount, sizeof(int));
732 /* copy contents from old table int new table */
733 memcpy(queues, obj->qIndexToFd, oldSize);
735 /* initialize remaining entries of new (larger) table */
736 for (i = obj->numQueues; i < newCount; i++) {
737 queues[i] = -1;
738 }
740 /* swap in new table, delete old table */
741 oldQueues = obj->qIndexToFd;
742 obj->qIndexToFd = queues;
743 obj->numQueues = newCount;
744 free(oldQueues);
745 }
747 /* add new entry */
748 obj->qIndexToFd[queueIndex] = fd;
749 }
751 /*
752 * ======== unbindQueueIndex ========
753 *
754 * Precondition: caller must be inside the module gate
755 */
756 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
757 {
758 UInt queueIndex;
760 /* subtract port offset from queue index */
761 queueIndex = queuePort - MessageQ_PORTOFFSET;
763 /* clear table entry */
764 obj->qIndexToFd[queueIndex] = -1;
765 }
767 /*
768 * ======== queueIndexToFd ========
769 *
770 * Precondition: caller must be inside the module gate
771 */
772 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
773 {
774 UInt queueIndex;
776 /* subtract port offset from queue index */
777 queueIndex = queuePort - MessageQ_PORTOFFSET;
779 /* return file descriptor */
780 return (obj->qIndexToFd[queueIndex]);
781 }
783 /*
784 * ======== TransportRpmsg_Factory_create ========
785 * Create the transport instances
786 *
787 * Attach to all remote processors. For now, must attach to
788 * at least one to tolerate MessageQ_E_RESOURCE failures.
789 *
790 * This function implements the IPC Factory interface, so it
791 * returns Ipc status codes.
792 */
793 Int TransportRpmsg_Factory_create(Void)
794 {
795 Int status = Ipc_S_SUCCESS;
796 Int i;
797 UInt16 clusterSize;
798 TransportRpmsg_Handle *inst;
799 int flags;
802 /* needed to enumerate processors in cluster */
803 clusterSize = MultiProc_getNumProcsInCluster();
805 /* allocate the instance array */
806 inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
808 if (inst == NULL) {
809 printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
810 status = Ipc_E_MEMORY;
811 goto done;
812 }
814 for (i = 0; i < clusterSize; i++) {
815 inst[i] = NULL;
816 }
818 TransportRpmsg_module->inst = inst;
820 /* counter event object for passing commands to dispatch thread */
821 TransportRpmsg_module->unblockEvent = eventfd(0, 0);
823 if (TransportRpmsg_module->unblockEvent == -1) {
824 printf("create: unblock event failed: %d (%s)\n",
825 errno, strerror(errno));
826 status = Ipc_E_FAIL;
827 goto done;
828 }
830 PRINTVERBOSE1("create: created unblock event %d\n",
831 TransportRpmsg_module->unblockEvent)
833 /* semaphore event object for acknowledging client thread */
834 TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
836 if (TransportRpmsg_module->waitEvent == -1) {
837 printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
838 status = Ipc_E_FAIL;
839 goto done;
840 }
842 PRINTVERBOSE1("create: created wait event %d\n",
843 TransportRpmsg_module->waitEvent)
845 /* make sure eventfds don't exist for 'fork() -> exec*()'ed child */
846 flags = fcntl(TransportRpmsg_module->waitEvent, F_GETFD);
847 if (flags != -1) {
848 fcntl(TransportRpmsg_module->waitEvent, F_SETFD, flags | FD_CLOEXEC);
849 }
850 flags = fcntl(TransportRpmsg_module->unblockEvent, F_GETFD);
851 if (flags != -1) {
852 fcntl(TransportRpmsg_module->unblockEvent, F_SETFD, flags | FD_CLOEXEC);
853 }
855 FD_ZERO(&TransportRpmsg_module->rfds);
856 FD_SET(TransportRpmsg_module->unblockEvent,
857 &TransportRpmsg_module->rfds);
858 TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
859 TransportRpmsg_module->nInFds = 0;
861 pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
863 status = pthread_create(&TransportRpmsg_module->threadId, NULL,
864 &rpmsgThreadFxn, NULL);
866 if (status < 0) {
867 status = Ipc_E_FAIL;
868 printf("attach: failed to spawn thread\n");
869 goto done;
870 }
871 TransportRpmsg_module->threadStarted = TRUE;
873 done:
874 if (status < 0) {
875 TransportRpmsg_Factory_delete();
876 }
878 return (status);
879 }
881 /*
882 * ======== TransportRpmsg_Factory_delete ========
883 * Finalize the transport instances
884 */
885 Void TransportRpmsg_Factory_delete(Void)
886 {
887 uint64_t event;
890 /* shutdown the message dispatch thread */
891 if (TransportRpmsg_module->threadStarted) {
892 event = TransportRpmsg_Event_SHUTDOWN;
893 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
895 /* wait for dispatch thread to exit */
896 pthread_join(TransportRpmsg_module->threadId, NULL);
897 }
899 /* destroy the mutex object */
900 pthread_mutex_destroy(&TransportRpmsg_module->gate);
902 /* close the client wait event */
903 if (TransportRpmsg_module->waitEvent != -1) {
904 close(TransportRpmsg_module->waitEvent);
905 TransportRpmsg_module->waitEvent = -1;
906 }
908 /* close the dispatch thread unblock event */
909 if (TransportRpmsg_module->unblockEvent != -1) {
910 close(TransportRpmsg_module->unblockEvent);
911 TransportRpmsg_module->unblockEvent = -1;
912 }
914 /* free the instance handle array */
915 if (TransportRpmsg_module->inst != NULL) {
916 free(TransportRpmsg_module->inst);
917 TransportRpmsg_module->inst = NULL;
918 }
920 return;
921 }
923 /*
924 * ======== TransportRpmsg_Factory_attach ========
925 */
926 Int TransportRpmsg_Factory_attach(UInt16 procId)
927 {
928 Int status = Ipc_S_SUCCESS;
929 UInt16 clusterId;
930 TransportRpmsg_Params params;
931 TransportRpmsg_Handle transport;
932 IMessageQTransport_Handle iMsgQTrans;
934 /* cannot attach to yourself */
935 if (MultiProc_self() == procId) {
936 status = Ipc_E_INVALIDARG;
937 goto done;
938 }
940 /* processor must be a member of the cluster */
941 clusterId = procId - MultiProc_getBaseIdOfCluster();
943 if (clusterId >= MultiProc_getNumProcsInCluster()) {
944 status = Ipc_E_INVALIDARG;
945 goto done;
946 }
948 /* create transport instance for given processor */
949 params.rprocId = procId;
950 transport = TransportRpmsg_create(¶ms);
952 if (transport == NULL) {
953 status = Ipc_E_FAIL;
954 goto done;
955 }
957 /* register transport instance with MessageQ */
958 iMsgQTrans = TransportRpmsg_upCast(transport);
959 TransportRpmsg_module->inst[clusterId] = transport;
960 MessageQ_registerTransport(iMsgQTrans, procId, 0);
962 done:
963 return (status);
964 }
966 /*
967 * ======== TransportRpmsg_Factory_detach ========
968 */
969 Int TransportRpmsg_Factory_detach(UInt16 procId)
970 {
971 Int status = Ipc_S_SUCCESS;
972 UInt16 clusterId;
974 /* cannot detach from yourself */
975 if (MultiProc_self() == procId) {
976 status = Ipc_E_INVALIDARG;
977 goto done;
978 }
980 /* processor must be a member of the cluster */
981 clusterId = procId - MultiProc_getBaseIdOfCluster();
983 if (clusterId >= MultiProc_getNumProcsInCluster()) {
984 status = Ipc_E_INVALIDARG;
985 goto done;
986 }
988 /* must be attached in order to detach */
989 if (TransportRpmsg_module->inst[clusterId] == NULL) {
990 status = Ipc_E_INVALIDSTATE;
991 goto done;
992 }
994 /* unregister from MessageQ, delete the transport instance */
995 MessageQ_unregisterTransport(procId, 0);
996 TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
998 done:
999 return (status);
1000 }