1 /*
2 * Copyright (c) 2014-2018 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== TransportRpmsg.c ========
34 * Implementation of functions specified in the IMessageQTransport interface.
35 */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h> /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 # define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT 61
70 #define MESSAGEQ_RPMSG_MAXSIZE 512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK (1 << 0)
76 #define TransportRpmsg_Event_PAUSE (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92 int sock[MultiProc_MAXPROCESSORS];
93 fd_set rfds;
94 int maxFd;
95 struct {
96 int fd;
97 UInt32 qId;
98 } inFds[1024];
99 int nInFds;
100 pthread_mutex_t gate;
101 int unblockEvent; /* unblock the dispatch thread */
102 int waitEvent; /* block the client thread */
103 pthread_t threadId; /* ID returned by pthread_create() */
104 Bool threadStarted;
106 TransportRpmsg_Handle *inst; /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110 .bind = TransportRpmsg_bind,
111 .unbind = TransportRpmsg_unbind,
112 .put = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116 IMessageQTransport_Object base;
117 Int status;
118 UInt16 rprocId;
119 int numQueues;
120 int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124 .sock = {INVALIDSOCKET},
125 .unblockEvent = -1,
126 .waitEvent = -1,
127 .threadStarted = FALSE,
128 .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135 Int fd,
136 UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147 .createFxn = TransportRpmsg_Factory_create,
148 .deleteFxn = TransportRpmsg_Factory_delete,
149 .attachFxn = TransportRpmsg_Factory_attach,
150 .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
157 {
158 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159 return ((IMessageQTransport_Handle)&obj->base);
160 }
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
163 {
164 return ((TransportRpmsg_Handle)base);
165 }
167 /*
168 * ======== TransportRpmsg_create ========
169 */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
171 {
172 Int status = MessageQ_S_SUCCESS;
173 TransportRpmsg_Object *obj = NULL;
174 int sock;
175 int flags;
176 UInt16 clusterId;
177 int i;
180 clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
182 /* create socket for sending messages to remote processor */
183 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
185 if (sock < 0) {
186 status = Ipc_E_FAIL;
187 fprintf(stderr,
188 "TransportRpmsg_create: socket failed: %d (%s)\n", errno,
189 strerror(errno));
190 goto done;
191 }
192 TransportRpmsg_module->sock[clusterId] = sock;
193 PRINTVERBOSE1("attach: created send socket: %d\n", sock)
195 status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
197 if (status < 0) {
198 status = Ipc_E_FAIL;
199 fprintf(stderr,
200 "TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
201 errno, strerror(errno), params->rprocId);
202 close(sock);
203 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
204 goto done;
205 }
207 /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
208 flags = fcntl(sock, F_GETFD);
209 if (flags != -1) {
210 fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
211 }
213 /* create the instance object */
214 obj = calloc(1, sizeof(TransportRpmsg_Object));
216 if (obj == NULL) {
217 status = Ipc_E_MEMORY;
218 close(sock);
219 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
220 goto done;
221 }
223 /* initialize the instance */
224 obj->base.base.interfaceType = IMessageQTransport_TypeId;
225 obj->base.fxns = &TransportRpmsg_fxns;
226 obj->rprocId = params->rprocId;
227 obj->numQueues = TransportRpmsg_GROWSIZE;
229 obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
231 if (obj->qIndexToFd == NULL) {
232 status = Ipc_E_MEMORY;
233 goto done;
234 }
236 /* must initialize array */
237 for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
238 obj->qIndexToFd[i] = -1;
239 }
241 done:
242 if (status < 0) {
243 TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
244 }
246 return (TransportRpmsg_Handle)obj;
247 }
249 /*
250 * ======== TransportRpmsg_delete ========
251 */
252 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
253 {
254 TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
255 UInt16 clusterId;
256 int sock;
258 if (obj == NULL) {
259 goto done;
260 }
262 clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
264 /* close the socket for the given transport instance */
265 sock = TransportRpmsg_module->sock[clusterId];
266 if (sock != INVALIDSOCKET) {
267 PRINTVERBOSE1("detach: closing socket: %d\n", sock)
268 close(sock);
269 }
270 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
272 if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
273 free(obj->qIndexToFd);
274 obj->qIndexToFd = NULL;
275 }
277 if (obj != NULL) {
278 free(obj);
279 obj = NULL;
280 }
282 done:
283 *pHandle = NULL;
284 }
286 /*
287 * ======== TransportRpmsg_bind ========
288 */
289 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
290 {
291 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
292 UInt16 queuePort = queueId & 0x0000ffff;
293 int fd;
294 int flags;
295 int err;
296 uint64_t event;
297 UInt16 rprocId;
298 pthread_t tid;
299 Int status = MessageQ_S_SUCCESS;
301 tid = pthread_self();
302 rprocId = obj->rprocId;
304 PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
305 "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
307 pthread_mutex_lock(&TransportRpmsg_module->gate);
309 /* Check if binding already exists.
310 *
311 * There is a race condition between a thread calling MessageQ_create
312 * and another thread calling Ipc_attach. Must make sure we don't bind
313 * the same queue twice.
314 */
315 if (queueIndexToFd(obj, queueId) != -1) {
316 goto done;
317 }
319 /* Create the socket to receive messages for this messageQ. */
320 fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
321 if (fd < 0) {
322 fprintf(stderr, "TransportRpmsg_bind: socket call failed: %d (%s)\n",
323 errno, strerror(errno));
324 status = MessageQ_E_OSFAILURE;
325 goto done;
326 }
327 PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
328 (unsigned int)tid);
330 err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
331 if (err < 0) {
332 /* don't hard-printf since this is no longer fatal */
333 PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
334 errno, strerror(errno));
335 close(fd);
336 status = MessageQ_E_OSFAILURE;
337 goto done;
338 }
340 /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
341 flags = fcntl(fd, F_GETFD);
342 if (flags != -1) {
343 fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
344 }
346 /* pause the dispatch thread */
347 PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
348 (unsigned int)tid);
349 event = TransportRpmsg_Event_PAUSE;
350 err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
351 if (err < 0) {
352 /* don't hard-printf since this is no longer fatal */
353 PRINTVERBOSE2("TransportRpmsg_bind: pause write failed: %d (%s)\n",
354 errno, strerror(errno));
355 close(fd);
356 status = MessageQ_E_OSFAILURE;
357 goto done;
358 }
360 /* wait for ACK event */
361 err = read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
362 if (err < 0) {
363 /* don't hard-printf since this is no longer fatal */
364 PRINTVERBOSE2("TransportRpmsg_bind: ack read failed: %d (%s)\n",
365 errno, strerror(errno));
366 close(fd);
367 status = MessageQ_E_OSFAILURE;
368 goto done;
369 }
371 PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
372 (int)event, (unsigned int)tid);
374 /* add to our fat fd array and update select() parameters */
375 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
376 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
377 TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
378 FD_SET(fd, &TransportRpmsg_module->rfds);
379 bindFdToQueueIndex(obj, fd, queuePort);
381 /* release the dispatch thread */
382 PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
383 (unsigned int)tid);
384 event = TransportRpmsg_Event_CONTINUE;
385 err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
386 if (err < 0) {
387 /* don't hard-printf since this is no longer fatal */
388 PRINTVERBOSE2("TransportRpmsg_bind: dispatch write failed: %d (%s)\n",
389 errno, strerror(errno));
390 close(fd);
391 status = MessageQ_E_OSFAILURE;
392 goto done;
393 }
395 done:
396 pthread_mutex_unlock(&TransportRpmsg_module->gate);
398 return (status);
399 }
401 /*
402 * ======== TransportRpmsg_unbind ========
403 */
404 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
405 {
406 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
407 UInt16 queuePort = queueId & 0x0000ffff;
408 uint64_t event;
409 Int status = MessageQ_S_SUCCESS;
410 int maxFd;
411 int fd;
412 int i;
413 int j;
414 int err;
416 pthread_mutex_lock(&TransportRpmsg_module->gate);
418 /* pause the dispatch thread */
419 event = TransportRpmsg_Event_PAUSE;
420 err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
421 if (err < 0) {
422 /* don't hard-printf since this is no longer fatal */
423 PRINTVERBOSE2("TransportRpmsg_unbind: pause write failed: %d (%s)\n",
424 errno, strerror(errno));
425 }
427 /* wait for ACK event */
428 err = read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
429 if (err < 0) {
430 /* don't hard-printf since this is no longer fatal */
431 PRINTVERBOSE2("TransportRpmsg_unbind: ack read failed: %d (%s)\n",
432 errno, strerror(errno));
433 }
435 /* Check if binding already deleted.
436 *
437 * There is a race condition between a thread calling MessageQ_delete
438 * and another thread calling Ipc_detach. Must make sure we don't unbind
439 * the same queue twice.
440 */
441 if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
442 goto done;
443 }
444 PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
446 /* guarenteed to work because queueIndexToFd above succeeded */
447 unbindQueueIndex(obj, queuePort);
449 /* remove from input fd array */
450 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
451 if (TransportRpmsg_module->inFds[i].fd == fd) {
452 TransportRpmsg_module->nInFds--;
454 /* shift subsequent elements down */
455 for (j = i; j < TransportRpmsg_module->nInFds; j++) {
456 TransportRpmsg_module->inFds[j] =
457 TransportRpmsg_module->inFds[j + 1];
458 }
459 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
460 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
461 break;
462 }
463 }
465 /* remove fd from the descriptor set, compute new max value */
466 FD_CLR(fd, &TransportRpmsg_module->rfds);
467 if (fd == TransportRpmsg_module->maxFd) {
468 /* find new max fd */
469 maxFd = TransportRpmsg_module->unblockEvent;
470 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
471 maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
472 }
473 TransportRpmsg_module->maxFd = maxFd;
474 }
476 close(fd);
478 /* release the dispatch thread */
479 event = TransportRpmsg_Event_CONTINUE;
480 err = write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
481 if (err < 0) {
482 /* don't hard-printf since this is no longer fatal */
483 PRINTVERBOSE2("TransportRpmsg_unbind: dispatch write failed: %d (%s)\n",
484 errno, strerror(errno));
485 }
487 done:
488 pthread_mutex_unlock(&TransportRpmsg_module->gate);
490 return (status);
491 }
493 /*
494 * ======== TransportRpmsg_put ========
495 */
496 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
497 {
498 MessageQ_Msg msg = (MessageQ_Msg)pmsg;
499 Int status = TRUE;
500 int sock;
501 int err;
502 UInt16 clusterId;
503 (Void)handle;
505 /*
506 * Retrieve the socket for the AF_SYSLINK protocol associated with this
507 * transport.
508 */
509 clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
510 sock = TransportRpmsg_module->sock[clusterId];
511 if (sock == INVALIDSOCKET) {
512 return FALSE;
513 }
515 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
517 err = send(sock, msg, msg->msgSize, 0);
518 if (err < 0) {
519 fprintf(stderr, "TransportRpmsg_put: send failed: %d (%s)\n",
520 errno, strerror(errno));
521 status = FALSE;
523 goto exit;
524 }
526 /*
527 * Free the message, as this is a copy transport, we maintain MessageQ
528 * semantics.
529 */
530 MessageQ_free(msg);
532 exit:
533 return status;
534 }
536 /*
537 * ======== TransportRpmsg_control ========
538 */
539 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
540 {
541 (Void)handle;
542 (Void)cmd;
543 (Void)cmdArg;
544 return FALSE;
545 }
547 /*
548 * ======== rpmsgThreadFxn ========
549 */
550 void *rpmsgThreadFxn(void *arg)
551 {
552 Int status = MessageQ_S_SUCCESS;
553 Int tmpStatus;
554 int retval;
555 uint64_t event;
556 fd_set rfds;
557 int maxFd;
558 int nfds;
559 MessageQ_Msg retMsg = NULL;
560 MessageQ_QueueId queueId;
561 MessageQ_Handle handle;
562 Bool run = TRUE;
563 int i;
564 int j;
565 int fd;
566 (Void)arg;
567 int err;
569 while (run) {
570 maxFd = TransportRpmsg_module->maxFd;
571 rfds = TransportRpmsg_module->rfds;
572 nfds = TransportRpmsg_module->nInFds;
574 PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
575 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
577 retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
579 /* if error, try again */
580 if (retval < 0) {
581 printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
582 continue;
583 }
585 /* dispatch all pending messages, do this first */
586 for (i = 0; i < nfds; i++) {
587 fd = TransportRpmsg_module->inFds[i].fd;
589 if (FD_ISSET(fd, &rfds)) {
590 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
591 TransportRpmsg_module->inFds[i].fd);
593 /* transport input fd was signalled: get the message */
594 tmpStatus = transportGet(fd, &retMsg);
595 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
596 fprintf(stderr,
597 "rpmsgThreadFxn: transportGet failed on fd %d, "
598 "returned %d\n", fd, tmpStatus);
599 }
600 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
601 fprintf(stderr,
602 "rpmsgThreadFxn: transportGet failed on fd %d, "
603 "returned %d\n", fd, tmpStatus);
605 retval = pthread_mutex_trylock(&TransportRpmsg_module->gate);
606 if (retval != 0) {
607 printf("Warning: rpmsgThreadFxn: "
608 "unable to get the lock during shutdown, "
609 "will try again\n");
610 continue;
611 }
613 /*
614 * Don't close(fd) at this time since it will get closed
615 * later when MessageQ_delete() is called in response to
616 * this failure. Just remove fd's bit from the select mask
617 * 'rfds' for now, but don't remove it from inFds[].
618 */
619 FD_CLR(fd, &TransportRpmsg_module->rfds);
620 if (fd == TransportRpmsg_module->maxFd) {
621 /* find new max fd */
622 maxFd = TransportRpmsg_module->unblockEvent;
623 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
624 maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
625 maxFd);
626 }
627 TransportRpmsg_module->maxFd = maxFd;
628 }
629 queueId = TransportRpmsg_module->inFds[i].qId;
631 pthread_mutex_unlock(&TransportRpmsg_module->gate);
633 handle = MessageQ_getLocalHandle(queueId);
635 PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
636 "%p (queueId 0x%x)...\n", handle, queueId)
638 if (handle != NULL) {
639 MessageQ_shutdown(handle);
640 }
641 else {
642 fprintf(stderr,
643 "rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
644 "returned NULL, can't shutdown\n", queueId);
645 }
646 }
647 else {
648 queueId = MessageQ_getDstQueue(retMsg);
649 PRINTVERBOSE1("rpmsgThreadFxn: got message, "
650 "delivering to queueId 0x%x\n", queueId)
651 MessageQ_put(queueId, retMsg);
652 }
653 }
654 }
656 /* check for events */
657 if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
659 err = read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
660 if (err < 0) {
661 /* don't hard-printf since this is no longer fatal */
662 PRINTVERBOSE2("rpmsgThreadFxn: event read failed: %d (%s)\n",
663 errno, strerror(errno));
664 }
666 do {
667 if (event & TransportRpmsg_Event_SHUTDOWN) {
668 PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
669 run = FALSE;
670 break; /* highest priority, stop processing events */
671 }
672 if (event & TransportRpmsg_Event_CONTINUE) {
673 PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
674 (int)event);
675 event &= ~TransportRpmsg_Event_CONTINUE;
676 }
677 if (event & TransportRpmsg_Event_PAUSE) {
678 /* Our event was signalled by TransportRpmsg_bind()
679 * or TransportRpmsg_unbind() to tell us that the set
680 * of file descriptors has changed.
681 */
682 PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
683 /* send the acknowledgement */
684 event = TransportRpmsg_Event_ACK;
685 err = write(TransportRpmsg_module->waitEvent, &event,
686 sizeof(event));
687 if (err < 0) {
688 /* don't hard-printf since this is no longer fatal */
689 PRINTVERBOSE2("rpmsgThreadFxn: ack write failed: %d (%s)\n",
690 errno, strerror(errno));
691 }
693 /* now wait to be released */
694 err = read(TransportRpmsg_module->unblockEvent, &event,
695 sizeof(event));
696 if (err < 0) {
697 /* don't hard-printf since this is no longer fatal */
698 PRINTVERBOSE2("rpmsgThreadFxn: wait read failed: %d (%s)\n",
699 errno, strerror(errno));
700 }
701 }
702 } while (event != 0);
703 }
704 }
706 return (void *)status;
707 }
709 /*
710 * ======== transportGet ========
711 * Retrieve a message waiting in the socket's queue.
712 */
713 static Int transportGet(int sock, MessageQ_Msg *retMsg)
714 {
715 Int status = MessageQ_S_SUCCESS;
716 MessageQ_Msg msg;
717 struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
718 socklen_t len;
719 int byteCount;
721 /*
722 * We have no way of peeking to see what message size we'll get, so we
723 * allocate a message of max size to receive contents from the rpmsg socket
724 * (currently, a copy transport)
725 */
726 msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
727 if (!msg) {
728 status = MessageQ_E_MEMORY;
729 goto exit;
730 }
732 memset(&fromAddr, 0, sizeof (fromAddr));
733 len = sizeof (fromAddr);
735 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
736 (struct sockaddr *)&fromAddr, &len);
737 if (len != sizeof (fromAddr)) {
738 fprintf(stderr, "recvfrom: got bad addr len (%d)\n", len);
739 status = MessageQ_E_FAIL;
740 goto freeMsg;
741 }
742 if (byteCount < 0) {
743 fprintf(stderr, "recvfrom failed: %s (%d)\n", strerror(errno), errno);
744 if (errno == ENOLINK) {
745 status = MessageQ_E_SHUTDOWN;
746 }
747 else {
748 status = MessageQ_E_FAIL;
749 }
750 goto freeMsg;
751 }
752 else {
753 /*
754 * Update the allocated message size (even though this may waste
755 * space when the actual message is smaller than the maximum rpmsg
756 * size, the message will be freed soon anyway, and it avoids an
757 * extra copy).
758 */
759 msg->msgSize = byteCount;
761 /* set the heapId in the message header to match allocation above */
762 msg->heapId = 0;
763 }
765 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
766 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
767 "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
768 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
769 msg->msgSize)
771 *retMsg = msg;
773 goto exit;
775 freeMsg:
776 MessageQ_free(msg);
778 exit:
779 return status;
780 }
782 /*
783 * ======== bindFdToQueueIndex ========
784 *
785 * Precondition: caller must be inside the module gate
786 */
787 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
788 {
789 Int *queues;
790 Int *oldQueues;
791 UInt oldSize;
792 UInt newCount;
793 UInt queueIndex;
794 UInt i;
796 /* subtract port offset from queue index */
797 queueIndex = queuePort - MessageQ_PORTOFFSET;
799 if (queueIndex >= (UInt)obj->numQueues) {
800 newCount = queueIndex + TransportRpmsg_GROWSIZE;
801 PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
802 newCount);
804 /* allocate larger table */
805 oldSize = obj->numQueues * sizeof(int);
806 queues = calloc(newCount, sizeof(int));
808 /* copy contents from old table int new table */
809 memcpy(queues, obj->qIndexToFd, oldSize);
811 /* initialize remaining entries of new (larger) table */
812 for (i = obj->numQueues; i < newCount; i++) {
813 queues[i] = -1;
814 }
816 /* swap in new table, delete old table */
817 oldQueues = obj->qIndexToFd;
818 obj->qIndexToFd = queues;
819 obj->numQueues = newCount;
820 free(oldQueues);
821 }
823 /* add new entry */
824 obj->qIndexToFd[queueIndex] = fd;
825 }
827 /*
828 * ======== unbindQueueIndex ========
829 *
830 * Precondition: caller must be inside the module gate
831 */
832 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
833 {
834 UInt queueIndex;
836 /* subtract port offset from queue index */
837 queueIndex = queuePort - MessageQ_PORTOFFSET;
839 /* clear table entry */
840 obj->qIndexToFd[queueIndex] = -1;
841 }
843 /*
844 * ======== queueIndexToFd ========
845 *
846 * Precondition: caller must be inside the module gate
847 */
848 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
849 {
850 UInt queueIndex;
852 /* subtract port offset from queue index */
853 queueIndex = queuePort - MessageQ_PORTOFFSET;
855 /* return file descriptor */
856 return (obj->qIndexToFd[queueIndex]);
857 }
859 /*
860 * ======== TransportRpmsg_Factory_create ========
861 * Create the transport instances
862 *
863 * Attach to all remote processors. For now, must attach to
864 * at least one to tolerate MessageQ_E_RESOURCE failures.
865 *
866 * This function implements the IPC Factory interface, so it
867 * returns Ipc status codes.
868 */
869 Int TransportRpmsg_Factory_create(Void)
870 {
871 Int status = Ipc_S_SUCCESS;
872 Int i;
873 UInt16 clusterSize;
874 TransportRpmsg_Handle *inst;
875 int flags;
878 /* needed to enumerate processors in cluster */
879 clusterSize = MultiProc_getNumProcsInCluster();
881 /* allocate the instance array */
882 inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
884 if (inst == NULL) {
885 fprintf(stderr,
886 "Error: TransportRpmsg_Factory_create failed, no memory\n");
887 status = Ipc_E_MEMORY;
888 goto done;
889 }
891 for (i = 0; i < clusterSize; i++) {
892 inst[i] = NULL;
893 }
895 TransportRpmsg_module->inst = inst;
897 /* counter event object for passing commands to dispatch thread */
898 TransportRpmsg_module->unblockEvent = eventfd(0, 0);
900 if (TransportRpmsg_module->unblockEvent == -1) {
901 fprintf(stderr, "create: unblock event failed: %d (%s)\n",
902 errno, strerror(errno));
903 status = Ipc_E_FAIL;
904 goto done;
905 }
907 PRINTVERBOSE1("create: created unblock event %d\n",
908 TransportRpmsg_module->unblockEvent)
910 /* semaphore event object for acknowledging client thread */
911 TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
913 if (TransportRpmsg_module->waitEvent == -1) {
914 fprintf(stderr,
915 "create: wait event failed: %d (%s)\n", errno, strerror(errno));
916 status = Ipc_E_FAIL;
917 goto done;
918 }
920 PRINTVERBOSE1("create: created wait event %d\n",
921 TransportRpmsg_module->waitEvent)
923 /* make sure eventfds don't exist for 'fork() -> exec*()'ed child */
924 flags = fcntl(TransportRpmsg_module->waitEvent, F_GETFD);
925 if (flags != -1) {
926 fcntl(TransportRpmsg_module->waitEvent, F_SETFD, flags | FD_CLOEXEC);
927 }
928 flags = fcntl(TransportRpmsg_module->unblockEvent, F_GETFD);
929 if (flags != -1) {
930 fcntl(TransportRpmsg_module->unblockEvent, F_SETFD, flags | FD_CLOEXEC);
931 }
933 FD_ZERO(&TransportRpmsg_module->rfds);
934 FD_SET(TransportRpmsg_module->unblockEvent,
935 &TransportRpmsg_module->rfds);
936 TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
937 TransportRpmsg_module->nInFds = 0;
939 pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
941 status = pthread_create(&TransportRpmsg_module->threadId, NULL,
942 &rpmsgThreadFxn, NULL);
944 if (status < 0) {
945 status = Ipc_E_FAIL;
946 fprintf(stderr, "create: failed to spawn thread\n");
947 goto done;
948 }
949 TransportRpmsg_module->threadStarted = TRUE;
951 done:
952 if (status < 0) {
953 TransportRpmsg_Factory_delete();
954 }
956 return (status);
957 }
959 /*
960 * ======== TransportRpmsg_Factory_delete ========
961 * Finalize the transport instances
962 */
963 Void TransportRpmsg_Factory_delete(Void)
964 {
965 uint64_t event;
968 /* shutdown the message dispatch thread */
969 if (TransportRpmsg_module->threadStarted) {
970 event = TransportRpmsg_Event_SHUTDOWN;
971 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
973 /* wait for dispatch thread to exit */
974 pthread_join(TransportRpmsg_module->threadId, NULL);
975 }
977 /* destroy the mutex object */
978 pthread_mutex_destroy(&TransportRpmsg_module->gate);
980 /* close the client wait event */
981 if (TransportRpmsg_module->waitEvent != -1) {
982 close(TransportRpmsg_module->waitEvent);
983 TransportRpmsg_module->waitEvent = -1;
984 }
986 /* close the dispatch thread unblock event */
987 if (TransportRpmsg_module->unblockEvent != -1) {
988 close(TransportRpmsg_module->unblockEvent);
989 TransportRpmsg_module->unblockEvent = -1;
990 }
992 /* free the instance handle array */
993 if (TransportRpmsg_module->inst != NULL) {
994 free(TransportRpmsg_module->inst);
995 TransportRpmsg_module->inst = NULL;
996 }
998 return;
999 }
1001 /*
1002 * ======== TransportRpmsg_Factory_attach ========
1003 */
1004 Int TransportRpmsg_Factory_attach(UInt16 procId)
1005 {
1006 Int status = Ipc_S_SUCCESS;
1007 UInt16 clusterId;
1008 TransportRpmsg_Params params;
1009 TransportRpmsg_Handle transport;
1010 IMessageQTransport_Handle iMsgQTrans;
1012 /* cannot attach to yourself */
1013 if (MultiProc_self() == procId) {
1014 status = Ipc_E_INVALIDARG;
1015 goto done;
1016 }
1018 /* processor must be a member of the cluster */
1019 clusterId = procId - MultiProc_getBaseIdOfCluster();
1021 if (clusterId >= MultiProc_getNumProcsInCluster()) {
1022 status = Ipc_E_INVALIDARG;
1023 goto done;
1024 }
1026 /* create transport instance for given processor */
1027 params.rprocId = procId;
1028 transport = TransportRpmsg_create(¶ms);
1030 if (transport == NULL) {
1031 status = Ipc_E_FAIL;
1032 goto done;
1033 }
1035 /* register transport instance with MessageQ */
1036 iMsgQTrans = TransportRpmsg_upCast(transport);
1037 TransportRpmsg_module->inst[clusterId] = transport;
1038 MessageQ_registerTransport(iMsgQTrans, procId, 0);
1040 done:
1041 return (status);
1042 }
1044 /*
1045 * ======== TransportRpmsg_Factory_detach ========
1046 */
1047 Int TransportRpmsg_Factory_detach(UInt16 procId)
1048 {
1049 Int status = Ipc_S_SUCCESS;
1050 UInt16 clusterId;
1052 /* cannot detach from yourself */
1053 if (MultiProc_self() == procId) {
1054 status = Ipc_E_INVALIDARG;
1055 goto done;
1056 }
1058 /* processor must be a member of the cluster */
1059 clusterId = procId - MultiProc_getBaseIdOfCluster();
1061 if (clusterId >= MultiProc_getNumProcsInCluster()) {
1062 status = Ipc_E_INVALIDARG;
1063 goto done;
1064 }
1066 /* must be attached in order to detach */
1067 if (TransportRpmsg_module->inst[clusterId] == NULL) {
1068 status = Ipc_E_INVALIDSTATE;
1069 goto done;
1070 }
1072 /* unregister from MessageQ, delete the transport instance */
1073 MessageQ_unregisterTransport(procId, 0);
1074 TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
1076 done:
1077 return (status);
1078 }