SDOCM00114391 IPC cluster support is missing on Linux
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== TransportRpmsg.c ========
34  *  Implementation of functions specified in the IMessageQTransport interface.
35  */
37 #include <ti/ipc/Std.h>
39 #include <ti/ipc/MessageQ.h>
40 #include <ti/ipc/MultiProc.h>
41 #include <_MessageQ.h>
43 /* Socket Headers */
44 #include <sys/socket.h>
45 #include <sys/select.h>
46 #include <sys/eventfd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <unistd.h>
50 #include <errno.h>
51 #include <string.h>
52 #include <fcntl.h>
53 #include <pthread.h>
55 /* Socket Protocol Family */
56 #include <net/rpmsg.h>
58 /* Socket utils: */
59 #include <SocketFxns.h>
61 #include <_lad.h>
63 #include <TransportRpmsg.h>
66 /* More magic rpmsg port numbers: */
67 #define MESSAGEQ_RPMSG_PORT       61
68 #define MESSAGEQ_RPMSG_MAXSIZE   512
70 #define TransportRpmsg_GROWSIZE 32
72 /* traces in this file are controlled via _TransportMessageQ_verbose */
73 Bool _TransportMessageQ_verbose = FALSE;
74 #define verbose _TransportMessageQ_verbose
76 Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
77 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
78 Bool TransportRpmsg_put(Void *handle, Ptr msg);
80 typedef struct TransportRpmsg_Module {
81     int             sock[MultiProc_MAXPROCESSORS];
82     fd_set          rfds;
83     int             maxFd;
84     int             inFds[1024];
85     int             nInFds;
86     pthread_mutex_t gate;
87     int             unblockEvent;    /* eventFd for unblocking socket */
88     pthread_t       threadId;        /* ID returned by pthread_create() */
89     Bool            threadStarted;
90 } TransportRpmsg_Module;
92 IMessageQTransport_Fxns TransportRpmsg_fxns = {
93     .bind    = TransportRpmsg_bind,
94     .unbind  = TransportRpmsg_unbind,
95     .put     = TransportRpmsg_put,
96 };
98 typedef struct TransportRpmsg_Object {
99     IMessageQTransport_Object base;
100     Int status;
101     UInt16 rprocId;
102     int numQueues;
103     int *qIndexToFd;
104 } TransportRpmsg_Object;
106 TransportRpmsg_Module TransportRpmsg_state = {
107     .sock = {0},
108     .threadStarted = FALSE,
109 };
110 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
112 static Int attach(UInt16 rprocId);
113 static Int detach(UInt16 rprocId);
114 static void *rpmsgThreadFxn(void *arg);
115 static Int transportGet(int sock, MessageQ_Msg *retMsg);
116 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
117                                Int fd,
118                                UInt16 qIndex);
119 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
120 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
122 /* -------------------------------------------------------------------------- */
124 /* instance convertors */
125 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
127     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
128     return ((IMessageQTransport_Handle)&obj->base);
131 TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
133     return ((TransportRpmsg_Handle)base);
136 TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
137                                             Int *attachStatus)
139     TransportRpmsg_Object *obj;
140     Int rv;
142     rv = attach(params->rprocId);
143     if (attachStatus) {
144         *attachStatus = rv;
145     }
147     if (rv != MessageQ_S_SUCCESS) {
148         return NULL;
149     }
151     obj = calloc(1, sizeof (TransportRpmsg_Object));
153     /* structure copy */
154     obj->base.base.interfaceType = IMessageQTransport_TypeId;
155     obj->base.fxns = &TransportRpmsg_fxns;
156     obj->rprocId = params->rprocId;
158     obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
159     obj->numQueues = TransportRpmsg_GROWSIZE;
161     return (TransportRpmsg_Handle)obj;
164 Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
166     TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
168     detach(obj->rprocId);
170     free(obj->qIndexToFd);
171     free(obj);
173     *handlep = NULL;
176 static Int attach(UInt16 rprocId)
178     Int     status = MessageQ_S_SUCCESS;
179     int     sock;
180     UInt16  clusterId;
183     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
185     /* Create the socket for sending messages to the remote proc: */
186     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
187     if (sock < 0) {
188         status = MessageQ_E_FAIL;
189         printf("attach: socket failed: %d (%s)\n",
190                errno, strerror(errno));
192         goto exit;
193     }
195     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
197     /* Attempt to connect: */
198     status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
199     if (status < 0) {
200         /* is it ok to "borrow" this error code from MessageQ? */
201         status = MessageQ_E_RESOURCE;
203         /* don't hard-printf or exit since this is no longer fatal */
204         PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
206         goto exitSock;
207     }
209     TransportRpmsg_module->sock[clusterId] = sock;
211     if (TransportRpmsg_module->threadStarted == FALSE) {
212         /* create a module wide event to unblock the socket select thread */
213         TransportRpmsg_module->unblockEvent = eventfd(0, 0);
214         if (TransportRpmsg_module->unblockEvent == -1) {
215             printf("attach: unblock socket failed: %d (%s)\n",
216                    errno, strerror(errno));
217             status = MessageQ_E_FAIL;
219             goto exitSock;
220         }
222         PRINTVERBOSE1("attach: created unblock event %d\n",
223                       TransportRpmsg_module->unblockEvent)
225         FD_ZERO(&TransportRpmsg_module->rfds);
226         FD_SET(TransportRpmsg_module->unblockEvent,
227                &TransportRpmsg_module->rfds);
228         TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
229         TransportRpmsg_module->nInFds = 0;
231         pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
233         status = pthread_create(&TransportRpmsg_module->threadId, NULL,
234                                 &rpmsgThreadFxn, NULL);
235         if (status < 0) {
236             status = MessageQ_E_FAIL;
237             printf("attach: failed to spawn thread\n");
239             goto exitEvent;
240         }
241         else {
242             TransportRpmsg_module->threadStarted = TRUE;
243         }
244     }
246     goto exit;
248 exitEvent:
249     close(TransportRpmsg_module->unblockEvent);
251     FD_ZERO(&TransportRpmsg_module->rfds);
252     TransportRpmsg_module->maxFd = 0;
254 exitSock:
255     close(sock);
256     TransportRpmsg_module->sock[clusterId] = 0;
258 exit:
259     return status;
262 static Int detach(UInt16 rprocId)
265     Int     status = -1;
266     int     sock;
267     UInt16  clusterId;
269     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
270     sock = TransportRpmsg_module->sock[clusterId];
272     if (sock) {
273         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
275         status = close(sock);
276     }
278     return status;
281 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
283     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
284     UInt16   queueIndex = queueId & 0x0000ffff;
285     int      fd;
286     int      err;
287     uint64_t buf;
288     UInt16   rprocId;
290     rprocId = obj->rprocId;
292     PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
293             "queueIndex %d\n", rprocId, queueIndex)
295     /*  Create the socket to receive messages for this messageQ. */
296     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
297     if (fd < 0) {
298         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
299                 errno, strerror(errno));
300         goto exitClose;
301     }
303     PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
305     err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
306     if (err < 0) {
307         /* don't hard-printf since this is no longer fatal */
308         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
309                       errno, strerror(errno))
311         close(fd);
313         return -1;
314     }
316     pthread_mutex_lock(&TransportRpmsg_module->gate);
318     /* add to our fat fd array and update select() parameters */
319     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
320     TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
321     FD_SET(fd, &TransportRpmsg_module->rfds);
323     pthread_mutex_unlock(&TransportRpmsg_module->gate);
325     bindFdToQueueIndex(obj, fd, queueIndex);
327     /*
328      * Even though we use the unblock event as just a signalling event with
329      * no related payload, we need to write some non-zero value.  Might as
330      * well make it the fd (which the reader could decide to use if needed).
331      */
332     buf = fd;
333     write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
335     goto exit;
337 exitClose:
338     TransportRpmsg_unbind(handle, fd);
339     fd = 0;
341 exit:
342     return fd;
345 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
347     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
348     UInt16 queueIndex = queueId & 0x0000ffff;
349     uint64_t buf;
350     Int    status = MessageQ_S_SUCCESS;
351     int    maxFd;
352     int    fd;
353     int    i;
354     int    j;
356     fd = queueIndexToFd(obj, queueIndex);
357     if (!fd) {
358         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
359                       queueId)
361         return -1;
362     }
364     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
366     pthread_mutex_lock(&TransportRpmsg_module->gate);
368     /* remove from input fd array */
369     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
370         if (TransportRpmsg_module->inFds[i] == fd) {
371             TransportRpmsg_module->nInFds--;
373             /* shift subsequent elements down */
374             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
375                 TransportRpmsg_module->inFds[j] =
376                     TransportRpmsg_module->inFds[j + 1];
377             }
378             TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
380             FD_CLR(fd, &TransportRpmsg_module->rfds);
381             if (fd == TransportRpmsg_module->maxFd) {
382                 /* find new max fd */
383                 maxFd = TransportRpmsg_module->unblockEvent;
384                 for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
385                     maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
386                 }
387                 TransportRpmsg_module->maxFd = maxFd;
388             }
390             /*
391              * Even though we use the unblock event as just a signalling
392              * event with no related payload, we need to write some non-zero
393              * value.  Might as well make it the fd (which the reader could
394              * decide to use if needed).
395              */
396             buf = fd;
397             write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
399             break;
400         }
402         close(fd);
403     }
405     unbindQueueIndex(obj, queueIndex);
407     pthread_mutex_unlock(&TransportRpmsg_module->gate);
409     return status;
412 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
414     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
415     Int     status    = TRUE;
416     int     sock;
417     int     err;
418     UInt16  clusterId;
420     /*
421      * Retrieve the socket for the AF_SYSLINK protocol associated with this
422      * transport.
423      */
424     clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
425     sock = TransportRpmsg_module->sock[clusterId];
426     if (!sock) {
427         return FALSE;
428     }
430     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
432     err = send(sock, msg, msg->msgSize, 0);
433     if (err < 0) {
434         printf("TransportRpmsg_put: send failed: %d (%s)\n",
435                errno, strerror(errno));
436         status = FALSE;
438         goto exit;
439     }
441     /*
442      * Free the message, as this is a copy transport, we maintain MessageQ
443      * semantics.
444      */
445     MessageQ_free(msg);
447 exit:
448     return status;
451 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
453     return FALSE;
456 void *rpmsgThreadFxn(void *arg)
458     static int lastFdx = 0;
459     int      curFdx = 0;
460     Int      status = MessageQ_S_SUCCESS;
461     Int      tmpStatus;
462     int      retval;
463     uint64_t buf;
464     fd_set   rfds;
465     int      maxFd;
466     int      nfds;
467     MessageQ_Msg     retMsg;
468     MessageQ_QueueId queueId;
470     while (TRUE) {
471         pthread_mutex_lock(&TransportRpmsg_module->gate);
473         maxFd = TransportRpmsg_module->maxFd;
474         rfds = TransportRpmsg_module->rfds;
475         nfds = TransportRpmsg_module->nInFds;
477         pthread_mutex_unlock(&TransportRpmsg_module->gate);
479         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
480                       (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
482         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
483         if (retval) {
484             if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
485                 /*
486                  * Our event was signalled by TransportRpmsg_bind()
487                  * or TransportRpmsg_unbind() to tell us that the set of
488                  * fds has changed.
489                  */
490                 PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
492                 /* we don't need the written value */
493                 read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
494             }
495             else {
496                 /* start where we last left off */
497                 curFdx = lastFdx;
499                 /*
500                  * The set of fds that's used by select has been recorded
501                  * locally, but the array of fds that are scanned below is
502                  * a changing set (MessageQ_create/delete() can change it).
503                  * While this might present an issue in itself, one key
504                  * takeaway is that 'nfds' must not be zero else the % below
505                  * will cause a divide-by-zero exception.  We won't even get
506                  * here if nfds == 0 since it's a local copy of the module's
507                  * 'nInFds' which has to be > 0 for us to get here.  So, even
508                  * though the module's 'nInFds' might go to 0 during this loop,
509                  * the loop itself will still remain intact.
510                  */
511                 do {
512                     if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
514                         PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
515                                       TransportRpmsg_module->inFds[curFdx])
517                         /* transport input fd was signalled: get the message */
518                         tmpStatus = transportGet(
519                             TransportRpmsg_module->inFds[curFdx], &retMsg);
520                         if (tmpStatus < 0) {
521                             printf("rpmsgThreadFxn: transportGet failed.");
522                             status = MessageQ_E_FAIL;
523                         }
524                         else {
525                             queueId = MessageQ_getDstQueue(retMsg);
527                             PRINTVERBOSE1("rpmsgThreadFxn: got message, "
528                                     "delivering to queueId 0x%x\n", queueId)
530                             MessageQ_put(queueId, retMsg);
531                         }
533                         lastFdx = (curFdx + 1) % nfds;
535                         break;
536                     }
538                     curFdx = (curFdx + 1) % nfds;
539                 } while (curFdx != lastFdx);
540             }
541         }
542     }
544     return (void *)status;
547 /*
548  * ======== transportGet ========
549  *  Retrieve a message waiting in the socket's queue.
550 */
551 static Int transportGet(int sock, MessageQ_Msg *retMsg)
553     Int           status    = MessageQ_S_SUCCESS;
554     MessageQ_Msg  msg;
555     struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
556     unsigned int  len;
557     int           byteCount;
559     /*
560      * We have no way of peeking to see what message size we'll get, so we
561      * allocate a message of max size to receive contents from the rpmsg socket
562      * (currently, a copy transport)
563      */
564     msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
565     if (!msg) {
566         status = MessageQ_E_MEMORY;
567         goto exit;
568     }
570     memset(&fromAddr, 0, sizeof (fromAddr));
571     len = sizeof (fromAddr);
573     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
574                          (struct sockaddr *)&fromAddr, &len);
575     if (len != sizeof (fromAddr)) {
576         printf("recvfrom: got bad addr len (%d)\n", len);
577         status = MessageQ_E_FAIL;
578         goto exit;
579     }
580     if (byteCount < 0) {
581         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
582         status = MessageQ_E_FAIL;
583         goto exit;
584     }
585     else {
586          /*
587           * Update the allocated message size (even though this may waste
588           * space when the actual message is smaller than the maximum rpmsg
589           * size, the message will be freed soon anyway, and it avoids an
590           * extra copy).
591           */
592          msg->msgSize = byteCount;
594          /*
595           * If the message received was statically allocated, reset the
596           * heapId, so the app can free it.
597           */
598          if (msg->heapId == MessageQ_STATICMSG)  {
599              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
600          }
601     }
603     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
604     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
605             "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
606     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
607             msg->msgSize)
609     *retMsg = msg;
611 exit:
612     return status;
615 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
617     Int *queues;
618     Int *oldQueues;
619     UInt oldSize;
621     if (qIndex >= obj->numQueues) {
622         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
623                       qIndex + TransportRpmsg_GROWSIZE)
625         oldSize = obj->numQueues * sizeof (Int);
627         queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
628         memcpy(queues, obj->qIndexToFd, oldSize);
630         oldQueues = obj->qIndexToFd;
631         obj->qIndexToFd = queues;
632         obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
634         free(oldQueues);
635     }
637     obj->qIndexToFd[qIndex] = fd;
640 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
642     obj->qIndexToFd[qIndex] = 0;
645 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
647     return obj->qIndexToFd[qIndex];