1 /*
2 * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== TransportRpmsg.c ========
34 * Implementation of functions specified in the IMessageQTransport interface.
35 */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h> /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 # define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT 61
70 #define MESSAGEQ_RPMSG_MAXSIZE 512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK (1 << 0)
76 #define TransportRpmsg_Event_PAUSE (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92 int sock[MultiProc_MAXPROCESSORS];
93 fd_set rfds;
94 int maxFd;
95 struct {
96 int fd;
97 UInt32 qId;
98 } inFds[1024];
99 int nInFds;
100 pthread_mutex_t gate;
101 int unblockEvent; /* unblock the dispatch thread */
102 int waitEvent; /* block the client thread */
103 pthread_t threadId; /* ID returned by pthread_create() */
104 Bool threadStarted;
106 TransportRpmsg_Handle *inst; /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110 .bind = TransportRpmsg_bind,
111 .unbind = TransportRpmsg_unbind,
112 .put = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116 IMessageQTransport_Object base;
117 Int status;
118 UInt16 rprocId;
119 int numQueues;
120 int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124 .sock = {INVALIDSOCKET},
125 .unblockEvent = -1,
126 .waitEvent = -1,
127 .threadStarted = FALSE,
128 .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135 Int fd,
136 UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147 .createFxn = TransportRpmsg_Factory_create,
148 .deleteFxn = TransportRpmsg_Factory_delete,
149 .attachFxn = TransportRpmsg_Factory_attach,
150 .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
157 {
158 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159 return ((IMessageQTransport_Handle)&obj->base);
160 }
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
163 {
164 return ((TransportRpmsg_Handle)base);
165 }
167 /*
168 * ======== TransportRpmsg_create ========
169 */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
171 {
172 Int status = MessageQ_S_SUCCESS;
173 TransportRpmsg_Object *obj = NULL;
174 int sock;
175 UInt16 clusterId;
176 int i;
179 clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
181 /* create socket for sending messages to remote processor */
182 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
184 if (sock < 0) {
185 status = Ipc_E_FAIL;
186 printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
187 strerror(errno));
188 goto done;
189 }
190 TransportRpmsg_module->sock[clusterId] = sock;
191 PRINTVERBOSE1("attach: created send socket: %d\n", sock)
193 status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
195 if (status < 0) {
196 status = Ipc_E_FAIL;
197 printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
198 errno, strerror(errno), params->rprocId);
199 goto done;
200 }
202 /* create the instance object */
203 obj = calloc(1, sizeof(TransportRpmsg_Object));
205 if (obj == NULL) {
206 status = Ipc_E_MEMORY;
207 goto done;
208 }
210 /* initialize the instance */
211 obj->base.base.interfaceType = IMessageQTransport_TypeId;
212 obj->base.fxns = &TransportRpmsg_fxns;
213 obj->rprocId = params->rprocId;
214 obj->numQueues = TransportRpmsg_GROWSIZE;
216 obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
218 if (obj->qIndexToFd == NULL) {
219 status = Ipc_E_MEMORY;
220 goto done;
221 }
223 /* must initialize array */
224 for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
225 obj->qIndexToFd[i] = -1;
226 }
228 done:
229 if (status < 0) {
230 TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
231 }
233 return (TransportRpmsg_Handle)obj;
234 }
236 /*
237 * ======== TransportRpmsg_delete ========
238 */
239 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
240 {
241 TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
242 UInt16 clusterId;
243 int sock;
246 clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
248 /* close the socket for the given transport instance */
249 sock = TransportRpmsg_module->sock[clusterId];
250 if (sock != INVALIDSOCKET) {
251 PRINTVERBOSE1("detach: closing socket: %d\n", sock)
252 close(sock);
253 }
254 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
256 if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
257 free(obj->qIndexToFd);
258 obj->qIndexToFd = NULL;
259 }
261 if (obj != NULL) {
262 free(obj);
263 obj = NULL;
264 }
266 *pHandle = NULL;
267 }
269 /*
270 * ======== TransportRpmsg_bind ========
271 */
272 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
273 {
274 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
275 UInt16 queuePort = queueId & 0x0000ffff;
276 int fd;
277 int err;
278 uint64_t event;
279 UInt16 rprocId;
280 pthread_t tid;
281 Int status = MessageQ_S_SUCCESS;
283 tid = pthread_self();
284 rprocId = obj->rprocId;
286 PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
287 "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
289 pthread_mutex_lock(&TransportRpmsg_module->gate);
291 /* Check if binding already exists.
292 *
293 * There is a race condition between a thread calling MessageQ_create
294 * and another thread calling Ipc_attach. Must make sure we don't bind
295 * the same queue twice.
296 */
297 if (queueIndexToFd(obj, queueId) != -1) {
298 goto done;
299 }
301 /* Create the socket to receive messages for this messageQ. */
302 fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
303 if (fd < 0) {
304 printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
305 errno, strerror(errno));
306 status = MessageQ_E_OSFAILURE;
307 goto done;
308 }
309 PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
310 (unsigned int)tid);
312 err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
313 if (err < 0) {
314 /* don't hard-printf since this is no longer fatal */
315 PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
316 errno, strerror(errno));
317 close(fd);
318 status = MessageQ_E_OSFAILURE;
319 goto done;
320 }
322 /* pause the dispatch thread */
323 PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
324 (unsigned int)tid);
325 event = TransportRpmsg_Event_PAUSE;
326 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
328 /* wait for ACK event */
329 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
330 PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
331 (int)event, (unsigned int)tid);
333 /* add to our fat fd array and update select() parameters */
334 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
335 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
336 TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
337 FD_SET(fd, &TransportRpmsg_module->rfds);
338 bindFdToQueueIndex(obj, fd, queuePort);
340 /* release the dispatch thread */
341 PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
342 (unsigned int)tid);
343 event = TransportRpmsg_Event_CONTINUE;
344 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
346 done:
347 pthread_mutex_unlock(&TransportRpmsg_module->gate);
349 return (status);
350 }
352 /*
353 * ======== TransportRpmsg_unbind ========
354 */
355 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
356 {
357 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
358 UInt16 queuePort = queueId & 0x0000ffff;
359 uint64_t event;
360 Int status = MessageQ_S_SUCCESS;
361 int maxFd;
362 int fd;
363 int i;
364 int j;
366 pthread_mutex_lock(&TransportRpmsg_module->gate);
368 /* pause the dispatch thread */
369 event = TransportRpmsg_Event_PAUSE;
370 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
372 /* wait for ACK event */
373 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
375 /* Check if binding already deleted.
376 *
377 * There is a race condition between a thread calling MessageQ_delete
378 * and another thread calling Ipc_detach. Must make sure we don't unbind
379 * the same queue twice.
380 */
381 if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
382 goto done;
383 }
384 PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
386 /* guarenteed to work because queueIndexToFd above succeeded */
387 unbindQueueIndex(obj, queuePort);
389 /* remove from input fd array */
390 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
391 if (TransportRpmsg_module->inFds[i].fd == fd) {
392 TransportRpmsg_module->nInFds--;
394 /* shift subsequent elements down */
395 for (j = i; j < TransportRpmsg_module->nInFds; j++) {
396 TransportRpmsg_module->inFds[j] =
397 TransportRpmsg_module->inFds[j + 1];
398 }
399 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
400 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
401 break;
402 }
403 }
405 /* remove fd from the descriptor set, compute new max value */
406 FD_CLR(fd, &TransportRpmsg_module->rfds);
407 if (fd == TransportRpmsg_module->maxFd) {
408 /* find new max fd */
409 maxFd = TransportRpmsg_module->unblockEvent;
410 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
411 maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
412 }
413 TransportRpmsg_module->maxFd = maxFd;
414 }
416 close(fd);
418 /* release the dispatch thread */
419 event = TransportRpmsg_Event_CONTINUE;
420 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
422 done:
423 pthread_mutex_unlock(&TransportRpmsg_module->gate);
425 return (status);
426 }
428 /*
429 * ======== TransportRpmsg_put ========
430 */
431 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
432 {
433 MessageQ_Msg msg = (MessageQ_Msg)pmsg;
434 Int status = TRUE;
435 int sock;
436 int err;
437 UInt16 clusterId;
439 /*
440 * Retrieve the socket for the AF_SYSLINK protocol associated with this
441 * transport.
442 */
443 clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
444 sock = TransportRpmsg_module->sock[clusterId];
445 if (!sock) {
446 return FALSE;
447 }
449 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
451 err = send(sock, msg, msg->msgSize, 0);
452 if (err < 0) {
453 printf("TransportRpmsg_put: send failed: %d (%s)\n",
454 errno, strerror(errno));
455 status = FALSE;
457 goto exit;
458 }
460 /*
461 * Free the message, as this is a copy transport, we maintain MessageQ
462 * semantics.
463 */
464 MessageQ_free(msg);
466 exit:
467 return status;
468 }
470 /*
471 * ======== TransportRpmsg_control ========
472 */
473 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
474 {
475 return FALSE;
476 }
478 /*
479 * ======== rpmsgThreadFxn ========
480 */
481 void *rpmsgThreadFxn(void *arg)
482 {
483 Int status = MessageQ_S_SUCCESS;
484 Int tmpStatus;
485 int retval;
486 uint64_t event;
487 fd_set rfds;
488 int maxFd;
489 int nfds;
490 MessageQ_Msg retMsg;
491 MessageQ_QueueId queueId;
492 MessageQ_Handle handle;
493 Bool run = TRUE;
494 int i;
495 int j;
496 int fd;
499 while (run) {
500 maxFd = TransportRpmsg_module->maxFd;
501 rfds = TransportRpmsg_module->rfds;
502 nfds = TransportRpmsg_module->nInFds;
504 PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
505 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
507 retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
509 /* if error, try again */
510 if (retval < 0) {
511 printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
512 continue;
513 }
515 /* dispatch all pending messages, do this first */
516 for (i = 0; i < nfds; i++) {
517 fd = TransportRpmsg_module->inFds[i].fd;
519 if (FD_ISSET(fd, &rfds)) {
520 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
521 TransportRpmsg_module->inFds[i].fd);
523 /* transport input fd was signalled: get the message */
524 tmpStatus = transportGet(fd, &retMsg);
525 if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
526 printf("rpmsgThreadFxn: transportGet failed on fd %d,"
527 " returned %d\n", fd, tmpStatus);
528 }
529 else if (tmpStatus == MessageQ_E_SHUTDOWN) {
530 printf("rpmsgThreadFxn: transportGet failed on fd %d,"
531 " returned %d\n", fd, tmpStatus);
533 pthread_mutex_lock(&TransportRpmsg_module->gate);
535 /*
536 * Don't close(fd) at this time since it will get closed
537 * later when MessageQ_delete() is called in response to
538 * this failure. Just remove fd's bit from the select mask
539 * 'rfds' for now, but don't remove it from inFds[].
540 */
541 FD_CLR(fd, &TransportRpmsg_module->rfds);
542 if (fd == TransportRpmsg_module->maxFd) {
543 /* find new max fd */
544 maxFd = TransportRpmsg_module->unblockEvent;
545 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
546 maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
547 maxFd);
548 }
549 TransportRpmsg_module->maxFd = maxFd;
550 }
551 queueId = TransportRpmsg_module->inFds[i].qId;
553 pthread_mutex_unlock(&TransportRpmsg_module->gate);
555 handle = MessageQ_getLocalHandle(queueId);
557 PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
558 "%p (queueId 0x%x)...\n", handle, queueId)
560 if (handle != NULL) {
561 MessageQ_shutdown(handle);
562 }
563 else {
564 printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
565 "returned NULL, can't shutdown\n", queueId);
566 }
567 }
568 else {
569 queueId = MessageQ_getDstQueue(retMsg);
570 PRINTVERBOSE1("rpmsgThreadFxn: got message, "
571 "delivering to queueId 0x%x\n", queueId)
572 MessageQ_put(queueId, retMsg);
573 }
574 }
575 }
577 /* check for events */
578 if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
580 read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
582 do {
583 if (event & TransportRpmsg_Event_SHUTDOWN) {
584 PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
585 run = FALSE;
586 break; /* highest priority, stop processing events */
587 }
588 if (event & TransportRpmsg_Event_CONTINUE) {
589 PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
590 (int)event);
591 event &= ~TransportRpmsg_Event_CONTINUE;
592 }
593 if (event & TransportRpmsg_Event_PAUSE) {
594 /* Our event was signalled by TransportRpmsg_bind()
595 * or TransportRpmsg_unbind() to tell us that the set
596 * of file descriptors has changed.
597 */
598 PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
599 /* send the acknowledgement */
600 event = TransportRpmsg_Event_ACK;
601 write(TransportRpmsg_module->waitEvent, &event,
602 sizeof(event));
603 /* now wait to be released */
604 read(TransportRpmsg_module->unblockEvent, &event,
605 sizeof(event));
606 }
607 } while (event != 0);
608 }
609 }
611 return (void *)status;
612 }
614 /*
615 * ======== transportGet ========
616 * Retrieve a message waiting in the socket's queue.
617 */
618 static Int transportGet(int sock, MessageQ_Msg *retMsg)
619 {
620 Int status = MessageQ_S_SUCCESS;
621 MessageQ_Msg msg;
622 struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
623 unsigned int len;
624 int byteCount;
626 /*
627 * We have no way of peeking to see what message size we'll get, so we
628 * allocate a message of max size to receive contents from the rpmsg socket
629 * (currently, a copy transport)
630 */
631 msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
632 if (!msg) {
633 status = MessageQ_E_MEMORY;
634 goto exit;
635 }
637 memset(&fromAddr, 0, sizeof (fromAddr));
638 len = sizeof (fromAddr);
640 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
641 (struct sockaddr *)&fromAddr, &len);
642 if (len != sizeof (fromAddr)) {
643 printf("recvfrom: got bad addr len (%d)\n", len);
644 status = MessageQ_E_FAIL;
645 goto freeMsg;
646 }
647 if (byteCount < 0) {
648 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
649 if (errno == ENOLINK) {
650 status = MessageQ_E_SHUTDOWN;
651 }
652 else {
653 status = MessageQ_E_FAIL;
654 }
655 goto freeMsg;
656 }
657 else {
658 /*
659 * Update the allocated message size (even though this may waste
660 * space when the actual message is smaller than the maximum rpmsg
661 * size, the message will be freed soon anyway, and it avoids an
662 * extra copy).
663 */
664 msg->msgSize = byteCount;
666 /*
667 * If the message received was statically allocated, reset the
668 * heapId, so the app can free it.
669 */
670 if (msg->heapId == MessageQ_STATICMSG) {
671 msg->heapId = 0; /* for a copy transport, heap id is 0. */
672 }
673 }
675 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
676 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
677 "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
678 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
679 msg->msgSize)
681 *retMsg = msg;
683 goto exit;
685 freeMsg:
686 MessageQ_free(msg);
688 exit:
689 return status;
690 }
692 /*
693 * ======== bindFdToQueueIndex ========
694 *
695 * Precondition: caller must be inside the module gate
696 */
697 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
698 {
699 Int *queues;
700 Int *oldQueues;
701 UInt oldSize;
702 UInt newCount;
703 UInt queueIndex;
704 int i;
706 /* subtract port offset from queue index */
707 queueIndex = queuePort - MessageQ_PORTOFFSET;
709 if (queueIndex >= obj->numQueues) {
710 newCount = queueIndex + TransportRpmsg_GROWSIZE;
711 PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
712 newCount);
714 /* allocate larger table */
715 oldSize = obj->numQueues * sizeof(int);
716 queues = calloc(newCount, sizeof(int));
718 /* copy contents from old table int new table */
719 memcpy(queues, obj->qIndexToFd, oldSize);
721 /* initialize remaining entries of new (larger) table */
722 for (i = obj->numQueues; i < newCount; i++) {
723 queues[i] = -1;
724 }
726 /* swap in new table, delete old table */
727 oldQueues = obj->qIndexToFd;
728 obj->qIndexToFd = queues;
729 obj->numQueues = newCount;
730 free(oldQueues);
731 }
733 /* add new entry */
734 obj->qIndexToFd[queueIndex] = fd;
735 }
737 /*
738 * ======== unbindQueueIndex ========
739 *
740 * Precondition: caller must be inside the module gate
741 */
742 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
743 {
744 UInt queueIndex;
746 /* subtract port offset from queue index */
747 queueIndex = queuePort - MessageQ_PORTOFFSET;
749 /* clear table entry */
750 obj->qIndexToFd[queueIndex] = -1;
751 }
753 /*
754 * ======== queueIndexToFd ========
755 *
756 * Precondition: caller must be inside the module gate
757 */
758 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
759 {
760 UInt queueIndex;
762 /* subtract port offset from queue index */
763 queueIndex = queuePort - MessageQ_PORTOFFSET;
765 /* return file descriptor */
766 return (obj->qIndexToFd[queueIndex]);
767 }
769 /*
770 * ======== TransportRpmsg_Factory_create ========
771 * Create the transport instances
772 *
773 * Attach to all remote processors. For now, must attach to
774 * at least one to tolerate MessageQ_E_RESOURCE failures.
775 *
776 * This function implements the IPC Factory interface, so it
777 * returns Ipc status codes.
778 */
779 Int TransportRpmsg_Factory_create(Void)
780 {
781 Int status = Ipc_S_SUCCESS;
782 Int i;
783 UInt16 clusterSize;
784 TransportRpmsg_Handle *inst;
787 /* needed to enumerate processors in cluster */
788 clusterSize = MultiProc_getNumProcsInCluster();
790 /* allocate the instance array */
791 inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
793 if (inst == NULL) {
794 printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
795 status = Ipc_E_MEMORY;
796 goto done;
797 }
799 for (i = 0; i < clusterSize; i++) {
800 inst[i] = NULL;
801 }
803 TransportRpmsg_module->inst = inst;
805 /* counter event object for passing commands to dispatch thread */
806 TransportRpmsg_module->unblockEvent = eventfd(0, 0);
808 if (TransportRpmsg_module->unblockEvent == -1) {
809 printf("create: unblock event failed: %d (%s)\n",
810 errno, strerror(errno));
811 status = Ipc_E_FAIL;
812 goto done;
813 }
815 PRINTVERBOSE1("create: created unblock event %d\n",
816 TransportRpmsg_module->unblockEvent)
818 /* semaphore event object for acknowledging client thread */
819 TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
821 if (TransportRpmsg_module->waitEvent == -1) {
822 printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
823 status = Ipc_E_FAIL;
824 goto done;
825 }
827 PRINTVERBOSE1("create: created wait event %d\n",
828 TransportRpmsg_module->waitEvent)
830 FD_ZERO(&TransportRpmsg_module->rfds);
831 FD_SET(TransportRpmsg_module->unblockEvent,
832 &TransportRpmsg_module->rfds);
833 TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
834 TransportRpmsg_module->nInFds = 0;
836 pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
838 status = pthread_create(&TransportRpmsg_module->threadId, NULL,
839 &rpmsgThreadFxn, NULL);
841 if (status < 0) {
842 status = Ipc_E_FAIL;
843 printf("attach: failed to spawn thread\n");
844 goto done;
845 }
846 TransportRpmsg_module->threadStarted = TRUE;
848 done:
849 if (status < 0) {
850 TransportRpmsg_Factory_delete();
851 }
853 return (status);
854 }
856 /*
857 * ======== TransportRpmsg_Factory_delete ========
858 * Finalize the transport instances
859 */
860 Void TransportRpmsg_Factory_delete(Void)
861 {
862 uint64_t event;
865 /* shutdown the message dispatch thread */
866 if (TransportRpmsg_module->threadStarted) {
867 event = TransportRpmsg_Event_SHUTDOWN;
868 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
870 /* wait for dispatch thread to exit */
871 pthread_join(TransportRpmsg_module->threadId, NULL);
872 }
874 /* destroy the mutex object */
875 pthread_mutex_destroy(&TransportRpmsg_module->gate);
877 /* close the client wait event */
878 if (TransportRpmsg_module->waitEvent != -1) {
879 close(TransportRpmsg_module->waitEvent);
880 TransportRpmsg_module->waitEvent = -1;
881 }
883 /* close the dispatch thread unblock event */
884 if (TransportRpmsg_module->unblockEvent != -1) {
885 close(TransportRpmsg_module->unblockEvent);
886 TransportRpmsg_module->unblockEvent = -1;
887 }
889 /* free the instance handle array */
890 if (TransportRpmsg_module->inst != NULL) {
891 free(TransportRpmsg_module->inst);
892 TransportRpmsg_module->inst = NULL;
893 }
895 return;
896 }
898 /*
899 * ======== TransportRpmsg_Factory_attach ========
900 */
901 Int TransportRpmsg_Factory_attach(UInt16 procId)
902 {
903 Int status = Ipc_S_SUCCESS;
904 UInt16 clusterId;
905 TransportRpmsg_Params params;
906 TransportRpmsg_Handle transport;
907 IMessageQTransport_Handle iMsgQTrans;
909 /* cannot attach to yourself */
910 if (MultiProc_self() == procId) {
911 status = Ipc_E_INVALIDARG;
912 goto done;
913 }
915 /* processor must be a member of the cluster */
916 clusterId = procId - MultiProc_getBaseIdOfCluster();
918 if (clusterId >= MultiProc_getNumProcsInCluster()) {
919 status = Ipc_E_INVALIDARG;
920 goto done;
921 }
923 /* create transport instance for given processor */
924 params.rprocId = procId;
925 transport = TransportRpmsg_create(¶ms);
927 if (transport == NULL) {
928 status = Ipc_E_FAIL;
929 goto done;
930 }
932 /* register transport instance with MessageQ */
933 iMsgQTrans = TransportRpmsg_upCast(transport);
934 TransportRpmsg_module->inst[clusterId] = transport;
935 MessageQ_registerTransport(iMsgQTrans, procId, 0);
937 done:
938 return (status);
939 }
941 /*
942 * ======== TransportRpmsg_Factory_detach ========
943 */
944 Int TransportRpmsg_Factory_detach(UInt16 procId)
945 {
946 Int status = Ipc_S_SUCCESS;
947 UInt16 clusterId;
949 /* cannot detach from yourself */
950 if (MultiProc_self() == procId) {
951 status = Ipc_E_INVALIDARG;
952 goto done;
953 }
955 /* processor must be a member of the cluster */
956 clusterId = procId - MultiProc_getBaseIdOfCluster();
958 if (clusterId >= MultiProc_getNumProcsInCluster()) {
959 status = Ipc_E_INVALIDARG;
960 goto done;
961 }
963 /* must be attached in order to detach */
964 if (TransportRpmsg_module->inst[clusterId] == NULL) {
965 status = Ipc_E_INVALIDSTATE;
966 goto done;
967 }
969 /* unregister from MessageQ, delete the transport instance */
970 MessageQ_unregisterTransport(procId, 0);
971 TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
973 done:
974 return (status);
975 }