1 /*
2 * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== TransportRpmsg.c ========
34 * Implementation of functions specified in the IMessageQTransport interface.
35 */
37 /* Socket Headers */
38 #include <sys/socket.h>
39 #include <sys/select.h>
40 #include <sys/eventfd.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <errno.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <pthread.h>
50 /* Socket Protocol Family */
51 #include <net/rpmsg.h>
54 /* IPC headers */
55 #include <ti/ipc/Std.h>
56 #include <SocketFxns.h> /* Socket utils: */
57 #include <ti/ipc/Ipc.h>
58 #include <ti/ipc/MessageQ.h>
59 #include <ti/ipc/MultiProc.h>
60 #include <ti/ipc/transports/TransportRpmsg.h>
61 #include <_MessageQ.h>
62 #include <_lad.h>
64 #if !defined(EFD_SEMAPHORE)
65 # define EFD_SEMAPHORE (1 << 0)
66 #endif
68 /* More magic rpmsg port numbers: */
69 #define MESSAGEQ_RPMSG_PORT 61
70 #define MESSAGEQ_RPMSG_MAXSIZE 512
72 #define TransportRpmsg_GROWSIZE 32
73 #define INVALIDSOCKET (-1)
75 #define TransportRpmsg_Event_ACK (1 << 0)
76 #define TransportRpmsg_Event_PAUSE (1 << 1)
77 #define TransportRpmsg_Event_CONTINUE (1 << 2)
78 #define TransportRpmsg_Event_SHUTDOWN (1 << 3)
81 #define _MAX(a,b) (((a)>(b))?(a):(b))
83 /* traces in this file are controlled via _TransportMessageQ_verbose */
84 Bool _TransportMessageQ_verbose = FALSE;
85 #define verbose _TransportMessageQ_verbose
87 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
88 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
89 Bool TransportRpmsg_put(Void *handle, Ptr msg);
91 typedef struct TransportRpmsg_Module {
92 int sock[MultiProc_MAXPROCESSORS];
93 fd_set rfds;
94 int maxFd;
95 struct {
96 int fd;
97 UInt32 qId;
98 } inFds[1024];
99 int nInFds;
100 pthread_mutex_t gate;
101 int unblockEvent; /* unblock the dispatch thread */
102 int waitEvent; /* block the client thread */
103 pthread_t threadId; /* ID returned by pthread_create() */
104 Bool threadStarted;
106 TransportRpmsg_Handle *inst; /* array of instances */
107 } TransportRpmsg_Module;
109 IMessageQTransport_Fxns TransportRpmsg_fxns = {
110 .bind = TransportRpmsg_bind,
111 .unbind = TransportRpmsg_unbind,
112 .put = TransportRpmsg_put
113 };
115 typedef struct TransportRpmsg_Object {
116 IMessageQTransport_Object base;
117 Int status;
118 UInt16 rprocId;
119 int numQueues;
120 int *qIndexToFd;
121 } TransportRpmsg_Object;
123 TransportRpmsg_Module TransportRpmsg_state = {
124 .sock = {INVALIDSOCKET},
125 .unblockEvent = -1,
126 .waitEvent = -1,
127 .threadStarted = FALSE,
128 .inst = NULL
129 };
130 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
132 static void *rpmsgThreadFxn(void *arg);
133 static Int transportGet(int sock, MessageQ_Msg *retMsg);
134 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
135 Int fd,
136 UInt16 qIndex);
137 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
138 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
140 /* factory functions */
141 Int TransportRpmsg_Factory_create(Void);
142 Void TransportRpmsg_Factory_delete(Void);
143 Int TransportRpmsg_Factory_attach(UInt16 procId);
144 Int TransportRpmsg_Factory_detach(UInt16 procId);
146 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
147 .createFxn = TransportRpmsg_Factory_create,
148 .deleteFxn = TransportRpmsg_Factory_delete,
149 .attachFxn = TransportRpmsg_Factory_attach,
150 .detachFxn = TransportRpmsg_Factory_detach
151 };
153 /* -------------------------------------------------------------------------- */
155 /* instance convertors */
156 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
157 {
158 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
159 return ((IMessageQTransport_Handle)&obj->base);
160 }
162 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
163 {
164 return ((TransportRpmsg_Handle)base);
165 }
167 /*
168 * ======== TransportRpmsg_create ========
169 */
170 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
171 {
172 Int status = MessageQ_S_SUCCESS;
173 TransportRpmsg_Object *obj = NULL;
174 int sock;
175 UInt16 clusterId;
178 clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
180 /* create socket for sending messages to remote processor */
181 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
183 if (sock < 0) {
184 status = Ipc_E_FAIL;
185 printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
186 strerror(errno));
187 goto done;
188 }
189 TransportRpmsg_module->sock[clusterId] = sock;
190 PRINTVERBOSE1("attach: created send socket: %d\n", sock)
192 status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
194 if (status < 0) {
195 status = Ipc_E_FAIL;
196 printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
197 errno, strerror(errno), params->rprocId);
198 goto done;
199 }
201 /* create the instance object */
202 obj = calloc(1, sizeof(TransportRpmsg_Object));
204 if (obj == NULL) {
205 status = Ipc_E_MEMORY;
206 goto done;
207 }
209 /* initialize the instance */
210 obj->base.base.interfaceType = IMessageQTransport_TypeId;
211 obj->base.fxns = &TransportRpmsg_fxns;
212 obj->rprocId = params->rprocId;
213 obj->numQueues = TransportRpmsg_GROWSIZE;
215 obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
217 if (obj->qIndexToFd == NULL) {
218 status = Ipc_E_MEMORY;
219 goto done;
220 }
222 done:
223 if (status < 0) {
224 TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
225 }
227 return (TransportRpmsg_Handle)obj;
228 }
230 /*
231 * ======== TransportRpmsg_delete ========
232 */
233 Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
234 {
235 TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
236 UInt16 clusterId;
237 int sock;
240 clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
242 /* close the socket for the given transport instance */
243 sock = TransportRpmsg_module->sock[clusterId];
244 if (sock != INVALIDSOCKET) {
245 PRINTVERBOSE1("detach: closing socket: %d\n", sock)
246 close(sock);
247 }
248 TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
250 if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
251 free(obj->qIndexToFd);
252 obj->qIndexToFd = NULL;
253 }
255 if (obj != NULL) {
256 free(obj);
257 obj = NULL;
258 }
260 *pHandle = NULL;
261 }
263 /*
264 * ======== TransportRpmsg_bind ========
265 */
266 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
267 {
268 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
269 UInt16 queuePort = queueId & 0x0000ffff;
270 int fd;
271 int err;
272 uint64_t event;
273 UInt16 rprocId;
274 pthread_t tid;
276 tid = pthread_self();
277 rprocId = obj->rprocId;
279 PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
280 "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
282 /* Create the socket to receive messages for this messageQ. */
283 fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
284 if (fd < 0) {
285 printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
286 errno, strerror(errno));
287 return (MessageQ_E_OSFAILURE);
288 }
289 PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
290 (unsigned int)tid);
292 err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
293 if (err < 0) {
294 /* don't hard-printf since this is no longer fatal */
295 PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
296 errno, strerror(errno));
297 close(fd);
298 return (MessageQ_E_OSFAILURE);
299 }
301 pthread_mutex_lock(&TransportRpmsg_module->gate);
303 /* pause the dispatch thread */
304 PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
305 (unsigned int)tid);
306 event = TransportRpmsg_Event_PAUSE;
307 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
309 /* wait for ACK event */
310 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
311 PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
312 (int)event, (unsigned int)tid);
314 /* add to our fat fd array and update select() parameters */
315 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
316 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
317 TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
318 FD_SET(fd, &TransportRpmsg_module->rfds);
319 bindFdToQueueIndex(obj, fd, queuePort);
321 /* release the dispatch thread */
322 PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
323 (unsigned int)tid);
324 event = TransportRpmsg_Event_CONTINUE;
325 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
327 pthread_mutex_unlock(&TransportRpmsg_module->gate);
329 return (MessageQ_S_SUCCESS);
330 }
332 /*
333 * ======== TransportRpmsg_unbind ========
334 */
335 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
336 {
337 TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
338 UInt16 queuePort = queueId & 0x0000ffff;
339 uint64_t event;
340 Int status = MessageQ_S_SUCCESS;
341 int maxFd;
342 int fd;
343 int i;
344 int j;
346 pthread_mutex_lock(&TransportRpmsg_module->gate);
348 /* pause the dispatch thread */
349 event = TransportRpmsg_Event_PAUSE;
350 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
352 /* wait for ACK event */
353 read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
355 /* retrieve file descriptor for the given queue port */
356 fd = queueIndexToFd(obj, queuePort);
357 if (!fd) {
358 PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
359 queueId);
360 status = MessageQ_E_INVALIDARG;
361 goto done;
362 }
363 PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
365 /* guarenteed to work because queueIndexToFd above succeeded */
366 unbindQueueIndex(obj, queuePort);
368 /* remove from input fd array */
369 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
370 if (TransportRpmsg_module->inFds[i].fd == fd) {
371 TransportRpmsg_module->nInFds--;
373 /* shift subsequent elements down */
374 for (j = i; j < TransportRpmsg_module->nInFds; j++) {
375 TransportRpmsg_module->inFds[j] =
376 TransportRpmsg_module->inFds[j + 1];
377 }
378 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
379 TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
380 break;
381 }
382 }
384 /* remove fd from the descriptor set, compute new max value */
385 FD_CLR(fd, &TransportRpmsg_module->rfds);
386 if (fd == TransportRpmsg_module->maxFd) {
387 /* find new max fd */
388 maxFd = TransportRpmsg_module->unblockEvent;
389 for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
390 maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
391 }
392 TransportRpmsg_module->maxFd = maxFd;
393 }
395 close(fd);
397 /* release the dispatch thread */
398 event = TransportRpmsg_Event_CONTINUE;
399 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
401 done:
402 pthread_mutex_unlock(&TransportRpmsg_module->gate);
404 return (status);
405 }
407 /*
408 * ======== TransportRpmsg_put ========
409 */
410 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
411 {
412 MessageQ_Msg msg = (MessageQ_Msg)pmsg;
413 Int status = TRUE;
414 int sock;
415 int err;
416 UInt16 clusterId;
418 /*
419 * Retrieve the socket for the AF_SYSLINK protocol associated with this
420 * transport.
421 */
422 clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
423 sock = TransportRpmsg_module->sock[clusterId];
424 if (!sock) {
425 return FALSE;
426 }
428 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
430 err = send(sock, msg, msg->msgSize, 0);
431 if (err < 0) {
432 printf("TransportRpmsg_put: send failed: %d (%s)\n",
433 errno, strerror(errno));
434 status = FALSE;
436 goto exit;
437 }
439 /*
440 * Free the message, as this is a copy transport, we maintain MessageQ
441 * semantics.
442 */
443 MessageQ_free(msg);
445 exit:
446 return status;
447 }
449 /*
450 * ======== TransportRpmsg_control ========
451 */
452 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
453 {
454 return FALSE;
455 }
457 /*
458 * ======== rpmsgThreadFxn ========
459 */
460 void *rpmsgThreadFxn(void *arg)
461 {
462 Int status = MessageQ_S_SUCCESS;
463 Int tmpStatus;
464 int retval;
465 uint64_t event;
466 fd_set rfds;
467 int maxFd;
468 int nfds;
469 MessageQ_Msg retMsg;
470 MessageQ_QueueId queueId;
471 MessageQ_Handle handle;
472 Bool run = TRUE;
473 int i;
474 int j;
475 int fd;
478 while (run) {
479 maxFd = TransportRpmsg_module->maxFd;
480 rfds = TransportRpmsg_module->rfds;
481 nfds = TransportRpmsg_module->nInFds;
483 PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
484 (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
486 retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
488 /* if error, try again */
489 if (retval < 0) {
490 printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
491 continue;
492 }
494 /* dispatch all pending messages, do this first */
495 for (i = 0; i < nfds; i++) {
496 fd = TransportRpmsg_module->inFds[i].fd;
498 if (FD_ISSET(fd, &rfds)) {
499 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
500 TransportRpmsg_module->inFds[i].fd);
502 /* transport input fd was signalled: get the message */
503 tmpStatus = transportGet(fd, &retMsg);
504 if (tmpStatus < 0) {
505 printf("rpmsgThreadFxn: transportGet failed on fd %d,"
506 " returned %d\n", fd, tmpStatus);
508 pthread_mutex_lock(&TransportRpmsg_module->gate);
510 /*
511 * Don't close(fd) at this time since it will get closed
512 * later when MessageQ_delete() is called in response to
513 * this failure. Just remove fd's bit from the select mask
514 * 'rfds' for now, but don't remove it from inFds[].
515 */
516 FD_CLR(fd, &TransportRpmsg_module->rfds);
517 if (fd == TransportRpmsg_module->maxFd) {
518 /* find new max fd */
519 maxFd = TransportRpmsg_module->unblockEvent;
520 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
521 maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
522 maxFd);
523 }
524 TransportRpmsg_module->maxFd = maxFd;
525 }
526 queueId = TransportRpmsg_module->inFds[i].qId;
528 pthread_mutex_unlock(&TransportRpmsg_module->gate);
530 handle = MessageQ_getLocalHandle(queueId);
532 PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
533 "%p (queueId 0x%x)...\n", handle, queueId)
535 if (handle != NULL) {
536 MessageQ_shutdown(handle);
537 }
538 else {
539 printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
540 "returned NULL, can't shutdown\n", queueId);
541 }
542 }
543 else {
544 queueId = MessageQ_getDstQueue(retMsg);
545 PRINTVERBOSE1("rpmsgThreadFxn: got message, "
546 "delivering to queueId 0x%x\n", queueId)
547 MessageQ_put(queueId, retMsg);
548 }
549 }
550 }
552 /* check for events */
553 if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
555 read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
557 do {
558 if (event & TransportRpmsg_Event_SHUTDOWN) {
559 PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
560 run = FALSE;
561 break; /* highest priority, stop processing events */
562 }
563 if (event & TransportRpmsg_Event_CONTINUE) {
564 PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
565 (int)event);
566 event &= ~TransportRpmsg_Event_CONTINUE;
567 }
568 if (event & TransportRpmsg_Event_PAUSE) {
569 /* Our event was signalled by TransportRpmsg_bind()
570 * or TransportRpmsg_unbind() to tell us that the set
571 * of file descriptors has changed.
572 */
573 PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
574 /* send the acknowledgement */
575 event = TransportRpmsg_Event_ACK;
576 write(TransportRpmsg_module->waitEvent, &event,
577 sizeof(event));
578 /* now wait to be released */
579 read(TransportRpmsg_module->unblockEvent, &event,
580 sizeof(event));
581 }
582 } while (event != 0);
583 }
584 }
586 return (void *)status;
587 }
589 /*
590 * ======== transportGet ========
591 * Retrieve a message waiting in the socket's queue.
592 */
593 static Int transportGet(int sock, MessageQ_Msg *retMsg)
594 {
595 Int status = MessageQ_S_SUCCESS;
596 MessageQ_Msg msg;
597 struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
598 unsigned int len;
599 int byteCount;
601 /*
602 * We have no way of peeking to see what message size we'll get, so we
603 * allocate a message of max size to receive contents from the rpmsg socket
604 * (currently, a copy transport)
605 */
606 msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
607 if (!msg) {
608 status = MessageQ_E_MEMORY;
609 goto exit;
610 }
612 memset(&fromAddr, 0, sizeof (fromAddr));
613 len = sizeof (fromAddr);
615 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
616 (struct sockaddr *)&fromAddr, &len);
617 if (len != sizeof (fromAddr)) {
618 printf("recvfrom: got bad addr len (%d)\n", len);
619 status = MessageQ_E_FAIL;
620 goto freeMsg;
621 }
622 if (byteCount < 0) {
623 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
624 if (errno == ESHUTDOWN) {
625 status = MessageQ_E_SHUTDOWN;
626 }
627 else {
628 status = MessageQ_E_FAIL;
629 }
630 goto freeMsg;
631 }
632 else {
633 /*
634 * Update the allocated message size (even though this may waste
635 * space when the actual message is smaller than the maximum rpmsg
636 * size, the message will be freed soon anyway, and it avoids an
637 * extra copy).
638 */
639 msg->msgSize = byteCount;
641 /*
642 * If the message received was statically allocated, reset the
643 * heapId, so the app can free it.
644 */
645 if (msg->heapId == MessageQ_STATICMSG) {
646 msg->heapId = 0; /* for a copy transport, heap id is 0. */
647 }
648 }
650 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
651 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
652 "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
653 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
654 msg->msgSize)
656 *retMsg = msg;
658 goto exit;
660 freeMsg:
661 MessageQ_free(msg);
663 exit:
664 return status;
665 }
667 /*
668 * ======== bindFdToQueueIndex ========
669 *
670 * Precondition: caller must be inside the module gate
671 */
672 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
673 {
674 Int *queues;
675 Int *oldQueues;
676 UInt oldSize;
677 UInt queueIndex;
679 /* subtract port offset from queue index */
680 queueIndex = queuePort - MessageQ_PORTOFFSET;
682 if (queueIndex >= obj->numQueues) {
683 PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
684 queueIndex + TransportRpmsg_GROWSIZE)
686 /* allocate larger table */
687 oldSize = obj->numQueues * sizeof (Int);
688 queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
690 /* copy contents from old table int new table */
691 memcpy(queues, obj->qIndexToFd, oldSize);
693 /* swap in new table, delete old table */
694 oldQueues = obj->qIndexToFd;
695 obj->qIndexToFd = queues;
696 obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
697 free(oldQueues);
698 }
700 /* add new entry */
701 obj->qIndexToFd[queueIndex] = fd;
702 }
704 /*
705 * ======== unbindQueueIndex ========
706 *
707 * Precondition: caller must be inside the module gate
708 */
709 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
710 {
711 UInt queueIndex;
713 /* subtract port offset from queue index */
714 queueIndex = queuePort - MessageQ_PORTOFFSET;
716 /* clear table entry */
717 obj->qIndexToFd[queueIndex] = -1;
718 }
720 /*
721 * ======== queueIndexToFd ========
722 *
723 * Precondition: caller must be inside the module gate
724 */
725 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
726 {
727 UInt queueIndex;
729 /* subtract port offset from queue index */
730 queueIndex = queuePort - MessageQ_PORTOFFSET;
732 /* return file descriptor */
733 return (obj->qIndexToFd[queueIndex]);
734 }
736 /*
737 * ======== TransportRpmsg_Factory_create ========
738 * Create the transport instances
739 *
740 * Attach to all remote processors. For now, must attach to
741 * at least one to tolerate MessageQ_E_RESOURCE failures.
742 *
743 * This function implements the IPC Factory interface, so it
744 * returns Ipc status codes.
745 */
746 Int TransportRpmsg_Factory_create(Void)
747 {
748 Int status = Ipc_S_SUCCESS;
749 Int i;
750 UInt16 clusterSize;
751 TransportRpmsg_Handle *inst;
754 /* needed to enumerate processors in cluster */
755 clusterSize = MultiProc_getNumProcsInCluster();
757 /* allocate the instance array */
758 inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
760 if (inst == NULL) {
761 printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
762 status = Ipc_E_MEMORY;
763 goto done;
764 }
766 for (i = 0; i < clusterSize; i++) {
767 inst[i] = NULL;
768 }
770 TransportRpmsg_module->inst = inst;
772 /* counter event object for passing commands to dispatch thread */
773 TransportRpmsg_module->unblockEvent = eventfd(0, 0);
775 if (TransportRpmsg_module->unblockEvent == -1) {
776 printf("create: unblock event failed: %d (%s)\n",
777 errno, strerror(errno));
778 status = Ipc_E_FAIL;
779 goto done;
780 }
782 PRINTVERBOSE1("create: created unblock event %d\n",
783 TransportRpmsg_module->unblockEvent)
785 /* semaphore event object for acknowledging client thread */
786 TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
788 if (TransportRpmsg_module->waitEvent == -1) {
789 printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
790 status = Ipc_E_FAIL;
791 goto done;
792 }
794 PRINTVERBOSE1("create: created wait event %d\n",
795 TransportRpmsg_module->waitEvent)
797 FD_ZERO(&TransportRpmsg_module->rfds);
798 FD_SET(TransportRpmsg_module->unblockEvent,
799 &TransportRpmsg_module->rfds);
800 TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
801 TransportRpmsg_module->nInFds = 0;
803 pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
805 status = pthread_create(&TransportRpmsg_module->threadId, NULL,
806 &rpmsgThreadFxn, NULL);
808 if (status < 0) {
809 status = Ipc_E_FAIL;
810 printf("attach: failed to spawn thread\n");
811 goto done;
812 }
813 TransportRpmsg_module->threadStarted = TRUE;
815 done:
816 if (status < 0) {
817 TransportRpmsg_Factory_delete();
818 }
820 return (status);
821 }
823 /*
824 * ======== TransportRpmsg_Factory_delete ========
825 * Finalize the transport instances
826 */
827 Void TransportRpmsg_Factory_delete(Void)
828 {
829 uint64_t event;
832 /* shutdown the message dispatch thread */
833 if (TransportRpmsg_module->threadStarted) {
834 event = TransportRpmsg_Event_SHUTDOWN;
835 write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
837 /* wait for dispatch thread to exit */
838 pthread_join(TransportRpmsg_module->threadId, NULL);
839 }
841 /* destroy the mutex object */
842 pthread_mutex_destroy(&TransportRpmsg_module->gate);
844 /* close the client wait event */
845 if (TransportRpmsg_module->waitEvent != -1) {
846 close(TransportRpmsg_module->waitEvent);
847 TransportRpmsg_module->waitEvent = -1;
848 }
850 /* close the dispatch thread unblock event */
851 if (TransportRpmsg_module->unblockEvent != -1) {
852 close(TransportRpmsg_module->unblockEvent);
853 TransportRpmsg_module->unblockEvent = -1;
854 }
856 /* free the instance handle array */
857 if (TransportRpmsg_module->inst != NULL) {
858 free(TransportRpmsg_module->inst);
859 TransportRpmsg_module->inst = NULL;
860 }
862 return;
863 }
865 /*
866 * ======== TransportRpmsg_Factory_attach ========
867 */
868 Int TransportRpmsg_Factory_attach(UInt16 procId)
869 {
870 Int status = Ipc_S_SUCCESS;
871 UInt16 clusterId;
872 TransportRpmsg_Params params;
873 TransportRpmsg_Handle transport;
874 IMessageQTransport_Handle iMsgQTrans;
876 /* cannot attach to yourself */
877 if (MultiProc_self() == procId) {
878 status = Ipc_E_INVALIDARG;
879 goto done;
880 }
882 /* processor must be a member of the cluster */
883 clusterId = procId - MultiProc_getBaseIdOfCluster();
885 if (clusterId >= MultiProc_getNumProcsInCluster()) {
886 status = Ipc_E_INVALIDARG;
887 goto done;
888 }
890 /* create transport instance for given processor */
891 params.rprocId = procId;
892 transport = TransportRpmsg_create(¶ms);
894 if (transport == NULL) {
895 status = Ipc_E_FAIL;
896 goto done;
897 }
899 /* register transport instance with MessageQ */
900 iMsgQTrans = TransportRpmsg_upCast(transport);
901 MessageQ_registerTransport(iMsgQTrans, procId, 0);
902 TransportRpmsg_module->inst[clusterId] = transport;
904 done:
905 return (status);
906 }
908 /*
909 * ======== TransportRpmsg_Factory_detach ========
910 */
911 Int TransportRpmsg_Factory_detach(UInt16 procId)
912 {
913 Int status = Ipc_S_SUCCESS;
914 UInt16 clusterId;
916 /* cannot detach from yourself */
917 if (MultiProc_self() == procId) {
918 status = Ipc_E_INVALIDARG;
919 goto done;
920 }
922 /* processor must be a member of the cluster */
923 clusterId = procId - MultiProc_getBaseIdOfCluster();
925 if (clusterId >= MultiProc_getNumProcsInCluster()) {
926 status = Ipc_E_INVALIDARG;
927 goto done;
928 }
930 /* must be attached in order to detach */
931 if (TransportRpmsg_module->inst[clusterId] == NULL) {
932 status = Ipc_E_INVALIDSTATE;
933 goto done;
934 }
936 /* unregister from MessageQ, delete the transport instance */
937 MessageQ_unregisterTransport(procId, 0);
938 TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
940 done:
941 return (status);
942 }