]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/api/MessageQ.c
Linux/Qnx: MessageQ msg cannot be freed if put fails
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "client" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
44 /* Standard IPC header */
45 #include <ti/ipc/Std.h>
47 /* Linux specific header files, replacing OSAL: */
48 #include <pthread.h>
50 /* Module level headers */
51 #include <ti/ipc/NameServer.h>
52 #include <ti/ipc/MultiProc.h>
53 #include <_MultiProc.h>
54 #include <ti/ipc/MessageQ.h>
55 #include <_MessageQ.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/socket.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
71 /* Socket Protocol Family */
72 #include <net/rpmsg.h>
74 /* Socket utils: */
75 #include <SocketFxns.h>
77 #include <ladclient.h>
78 #include <_lad.h>
80 /* =============================================================================
81  * Macros/Constants
82  * =============================================================================
83  */
85 /*!
86  *  @brief  Name of the reserved NameServer used for MessageQ.
87  */
88 #define MessageQ_NAMESERVER  "MessageQ"
90 /*!
91  *  @brief  Value of an invalid socket ID:
92  */
93 #define Transport_INVALIDSOCKET  (0xFFFFFFFF)
95 /* More magic rpmsg port numbers: */
96 #define MESSAGEQ_RPMSG_PORT       61
97 #define MESSAGEQ_RPMSG_MAXSIZE   512
99 /* Trace flag settings: */
100 #define TRACESHIFT    12
101 #define TRACEMASK     0x1000
103 /* Define BENCHMARK to quiet key MessageQ APIs: */
104 //#define BENCHMARK
106 /* =============================================================================
107  * Structures & Enums
108  * =============================================================================
109  */
111 /* structure for MessageQ module state */
112 typedef struct MessageQ_ModuleObject {
113     Int                 refCount;
114     /*!< Reference count */
115     NameServer_Handle   nameServer;
116     /*!< Handle to the local NameServer used for storing GP objects */
117     pthread_mutex_t     gate;
118     /*!< Handle of gate to be used for local thread safety */
119     MessageQ_Params     defaultInstParams;
120     /*!< Default instance creation parameters */
121     int                 sock[MultiProc_MAXPROCESSORS];
122     /*!< Sockets to for sending to each remote processor */
123     int                 seqNum;
124     /*!< Process-specific sequence number */
125 } MessageQ_ModuleObject;
127 /*!
128  *  @brief  Structure for the Handle for the MessageQ.
129  */
130 typedef struct MessageQ_Object_tag {
131     MessageQ_Params         params;
132     /*! Instance specific creation parameters */
133     MessageQ_QueueId        queue;
134     /* Unique id */
135     int                     fd[MultiProc_MAXPROCESSORS];
136     /* File Descriptor to block on messages from remote processors. */
137     int                     unblockFd;
138     /* Write this fd to unblock the select() call in MessageQ _get() */
139     void                    *serverHandle;
140 } MessageQ_Object;
142 static Bool verbose = FALSE;
145 /* =============================================================================
146  *  Globals
147  * =============================================================================
148  */
149 static MessageQ_ModuleObject MessageQ_state =
151     .refCount               = 0,
152     .nameServer             = NULL,
153 };
155 /*!
156  *  @var    MessageQ_module
157  *
158  *  @brief  Pointer to the MessageQ module state.
159  */
160 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
163 /* =============================================================================
164  * Forward declarations of internal functions
165  * =============================================================================
166  */
168 /* This is a helper function to initialize a message. */
169 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
170 static Int transportCloseEndpoint(int fd);
171 static Int transportGet(int sock, MessageQ_Msg * retMsg);
172 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
174 /* =============================================================================
175  * APIS
176  * =============================================================================
177  */
178 /* Function to get default configuration for the MessageQ module.
179  *
180  */
181 Void MessageQ_getConfig (MessageQ_Config * cfg)
183     Int status;
184     LAD_ClientHandle handle;
185     struct LAD_CommandObj cmd;
186     union LAD_ResponseObj rsp;
188     assert (cfg != NULL);
190     handle = LAD_findHandle();
191     if (handle == LAD_MAXNUMCLIENTS) {
192         PRINTVERBOSE1(
193           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
194            getpid())
196         return;
197     }
199     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
200     cmd.clientId = handle;
202     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
203         PRINTVERBOSE1(
204           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
205         return;
206     }
208     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
209         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
210         return;
211     }
212     status = rsp.messageQGetConfig.status;
214     PRINTVERBOSE2(
215       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
216       handle, status)
218     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
220     return;
223 /* Function to setup the MessageQ module. */
224 Int MessageQ_setup(const MessageQ_Config * cfg)
226     Int status;
227     LAD_ClientHandle handle;
228     struct LAD_CommandObj cmd;
229     union LAD_ResponseObj rsp;
230     Int i;
232     handle = LAD_findHandle();
233     if (handle == LAD_MAXNUMCLIENTS) {
234         PRINTVERBOSE1(
235           "MessageQ_setup: can't find connection to daemon for pid %d\n",
236            getpid())
238         return MessageQ_E_RESOURCE;
239     }
241     cmd.cmd = LAD_MESSAGEQ_SETUP;
242     cmd.clientId = handle;
243     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
245     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
246         PRINTVERBOSE1(
247           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
248         return MessageQ_E_FAIL;
249     }
251     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
252         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
253         return(status);
254     }
255     status = rsp.setup.status;
257     PRINTVERBOSE2(
258       "MessageQ_setup: got LAD response for client %d, status=%d\n",
259       handle, status)
261     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
262     MessageQ_module->seqNum = 0;
264     /* Create a default local gate. */
265     pthread_mutex_init (&(MessageQ_module->gate), NULL);
267     /* Clear sockets array. */
268     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
269         MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
270     }
272     return status;
275 /*
276  * Function to destroy the MessageQ module.
277  * Destroys socket/protocol maps;  sockets themselves should have been
278  * destroyed in MessageQ_delete() and MessageQ_detach() calls.
279  */
280 Int MessageQ_destroy (void)
282     Int status;
283     LAD_ClientHandle handle;
284     struct LAD_CommandObj cmd;
285     union LAD_ResponseObj rsp;
287     handle = LAD_findHandle();
288     if (handle == LAD_MAXNUMCLIENTS) {
289         PRINTVERBOSE1(
290           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
291            getpid())
293         return MessageQ_E_RESOURCE;
294     }
296     cmd.cmd = LAD_MESSAGEQ_DESTROY;
297     cmd.clientId = handle;
299     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
300         PRINTVERBOSE1(
301           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
302         return MessageQ_E_FAIL;
303     }
305     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
306         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
307         return(status);
308     }
309     status = rsp.status;
311     PRINTVERBOSE2(
312       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
313       handle, status)
315     return status;
318 /* Function to initialize the parameters for the MessageQ instance. */
319 Void MessageQ_Params_init (MessageQ_Params * params)
321     memcpy (params, &(MessageQ_module->defaultInstParams),
322             sizeof (MessageQ_Params));
324     return;
327 /*
328  *   Function to create a MessageQ object for receiving.
329  *
330  *   Create a socket and bind the source address (local ProcId/MessageQ ID) in
331  *   order to get messages dispatched to this messageQ.
332  */
333 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
335     Int                   status;
336     MessageQ_Object *     obj    = NULL;
337     UInt16                queueIndex = 0u;
338     UInt16                procId;
339     UInt16                rprocId;
340     LAD_ClientHandle      handle;
341     struct LAD_CommandObj cmd;
342     union LAD_ResponseObj rsp;
344     handle = LAD_findHandle();
345     if (handle == LAD_MAXNUMCLIENTS) {
346         PRINTVERBOSE1(
347           "MessageQ_create: can't find connection to daemon for pid %d\n",
348            getpid())
350         return NULL;
351     }
353     cmd.cmd = LAD_MESSAGEQ_CREATE;
354     cmd.clientId = handle;
355     if (name == NULL) {
356         cmd.args.messageQCreate.name[0] = '\0';
357     }
358     else {
359         strncpy(cmd.args.messageQCreate.name, name,
360                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
361         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
362     }
364     if (params) {
365         memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
366     }
368     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
369         PRINTVERBOSE1(
370           "MessageQ_create: sending LAD command failed, status=%d\n", status)
371         return NULL;
372     }
374     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
375         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
376         return NULL;
377     }
378     status = rsp.messageQCreate.status;
380     PRINTVERBOSE2(
381       "MessageQ_create: got LAD response for client %d, status=%d\n",
382       handle, status)
384     if (status == -1) {
385        PRINTVERBOSE1(
386           "MessageQ_create: MessageQ server operation failed, status=%d\n",
387           status)
388        return NULL;
389     }
391     /* Create the generic obj */
392     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
394     if (params != NULL) {
395        /* Populate the params member */
396         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
397     }
399     procId = MultiProc_self();
400     queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
401     obj->queue = rsp.messageQCreate.queueId;
402     obj->serverHandle = rsp.messageQCreate.serverHandle;
404     /*
405      * Create a set of communication endpoints (one per each remote proc),
406      * and return the socket as target for MessageQ_put() calls, and as
407      * a file descriptor to close during MessageQ_delete().
408      */
409     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
410         obj->fd[rprocId] = Transport_INVALIDSOCKET;
411         if (procId == rprocId) {
412             /* Skip creating an endpoint for ourself. */
413             continue;
414         }
416         PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
418         status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
419                                            queueIndex);
420         if (status < 0) {
421            obj->fd[rprocId] = Transport_INVALIDSOCKET;
422         }
423     }
425     /*
426      * Now, to support MessageQ_unblock() functionality, create an event object.
427      * Writing to this event will unblock the select() call in MessageQ_get().
428      */
429     obj->unblockFd = eventfd(0, 0);
430     if (obj->unblockFd == -1)  {
431         printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
432                    errno, strerror(errno));
433         MessageQ_delete((MessageQ_Handle *)&obj);
434     }
435     else {
436         int endpointFound = 0;
438         for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
439             if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
440                 endpointFound = 1;
441             }
442         }
443         if (!endpointFound) {
444             printf("MessageQ_create: no transport endpoints found, deleting\n");
445             MessageQ_delete((MessageQ_Handle *)&obj);
446         }
447     }
449     return ((MessageQ_Handle) obj);
452 /*
453  * Function to delete a MessageQ object for a specific slave processor.
454  *
455  * Deletes the socket associated with this MessageQ object.
456  */
457 Int MessageQ_delete (MessageQ_Handle * handlePtr)
459     Int               status    = MessageQ_S_SUCCESS;
460     MessageQ_Object * obj       = NULL;
461     UInt16            rprocId;
462     LAD_ClientHandle      handle;
463     struct LAD_CommandObj cmd;
464     union LAD_ResponseObj rsp;
466     handle = LAD_findHandle();
467     if (handle == LAD_MAXNUMCLIENTS) {
468         PRINTVERBOSE1(
469           "MessageQ_delete: can't find connection to daemon for pid %d\n",
470            getpid())
472         return MessageQ_E_FAIL;
473     }
475     obj = (MessageQ_Object *) (*handlePtr);
477     cmd.cmd = LAD_MESSAGEQ_DELETE;
478     cmd.clientId = handle;
479     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
481     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
482         PRINTVERBOSE1(
483           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
484         return MessageQ_E_FAIL;
485     }
487     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
488         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
489         return MessageQ_E_FAIL;
490     }
491     status = rsp.messageQDelete.status;
493     PRINTVERBOSE2(
494       "MessageQ_delete: got LAD response for client %d, status=%d\n",
495       handle, status)
498     /* Close the event used for MessageQ_unblock(): */
499     close(obj->unblockFd);
501     /* Close the communication endpoint: */
502     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
503         if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
504             status = transportCloseEndpoint(obj->fd[rprocId]);
505         }
506     }
508     /* Now free the obj */
509     free (obj);
510     *handlePtr = NULL;
512     return (status);
515 /*
516  *  Opens an instance of MessageQ for sending.
517  *
518  *  We need not create a socket here; the sockets for all remote processors
519  *  were created during MessageQ_attach(), and will be
520  *  retrieved during MessageQ_put().
521  */
522 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
524     Int status = MessageQ_S_SUCCESS;
526     status = NameServer_getUInt32 (MessageQ_module->nameServer,
527                                      name, queueId, NULL);
529     if (status == NameServer_E_NOTFOUND) {
530         /* Set return queue ID to invalid. */
531         *queueId = MessageQ_INVALIDMESSAGEQ;
532         status = MessageQ_E_NOTFOUND;
533     }
534     else if (status >= 0) {
535         /* Override with a MessageQ status code. */
536         status = MessageQ_S_SUCCESS;
537     }
538     else {
539         /* Set return queue ID to invalid. */
540         *queueId = MessageQ_INVALIDMESSAGEQ;
541         /* Override with a MessageQ status code. */
542         if (status == NameServer_E_TIMEOUT) {
543             status = MessageQ_E_TIMEOUT;
544         }
545         else {
546             status = MessageQ_E_FAIL;
547         }
548     }
550     return (status);
553 /* Closes previously opened instance of MessageQ module. */
554 Int MessageQ_close (MessageQ_QueueId * queueId)
556     Int32 status = MessageQ_S_SUCCESS;
558     /* Nothing more to be done for closing the MessageQ. */
559     *queueId = MessageQ_INVALIDMESSAGEQ;
561     return (status);
564 /*
565  * Place a message onto a message queue.
566  *
567  * Calls TransportShm_put(), which handles the sending of the message using the
568  * appropriate kernel interface (socket, device ioctl) call for the remote
569  * procId encoded in the queueId argument.
570  *
571  */
572 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
574     Int      status;
575     UInt16   dstProcId  = (UInt16)(queueId >> 16);
576     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
578     msg->dstId     = queueIndex;
579     msg->dstProc   = dstProcId;
581     status = transportPut(msg, queueIndex, dstProcId);
583     return (status);
586 /*
587  * Gets a message for a message queue and blocks if the queue is empty.
588  * If a message is present, it returns it.  Otherwise it blocks
589  * waiting for a message to arrive.
590  * When a message is returned, it is owned by the caller.
591  *
592  * We block using select() on the receiving socket's file descriptor, then
593  * get the waiting message via the socket API recvfrom().
594  * We use the socket stored in the messageQ object via a previous call to
595  * MessageQ_create().
596  *
597  */
598 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
600     static int last = 0;
601     Int     status = MessageQ_S_SUCCESS;
602     Int     tmpStatus;
603     MessageQ_Object * obj = (MessageQ_Object *) handle;
604     int     retval;
605     int     nfds;
606     fd_set  rfds;
607     struct  timeval tv;
608     void    *timevalPtr;
609     UInt16  rprocId;
610     int     maxfd = 0;
611     int     selfId;
612     int     nProcessors;
614     /* Wait (with timeout) and retreive message from socket: */
615     FD_ZERO(&rfds);
616     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
617         if (rprocId == MultiProc_self() ||
618             obj->fd[rprocId] == Transport_INVALIDSOCKET) {
619             continue;
620         }
621         maxfd = MAX(maxfd, obj->fd[rprocId]);
622         FD_SET(obj->fd[rprocId], &rfds);
623     }
625     /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
626     FD_SET(obj->unblockFd, &rfds);
628     if (timeout == MessageQ_FOREVER) {
629         timevalPtr = NULL;
630     }
631     else {
632         /* Timeout given in msec: convert:  */
633         tv.tv_sec = timeout / 1000;
634         tv.tv_usec = (timeout % 1000) * 1000;
635         timevalPtr = &tv;
636     }
637     /* Add one to last fd created: */
638     nfds = MAX(maxfd, obj->unblockFd) + 1;
640     retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
641     if (retval)  {
642         if (FD_ISSET(obj->unblockFd, &rfds))  {
643             /*
644              * Our event was signalled by MessageQ_unblock().
645              *
646              * This is typically done during a shutdown sequence, where
647              * the intention of the client would be to ignore (i.e. not fetch)
648              * any pending messages in the transport's queue.
649              * Thus, we shall not check for nor return any messages.
650              */
651             *msg = NULL;
652             status = MessageQ_E_UNBLOCKED;
653         }
654         else {
655             /* start where we last left off */
656             rprocId = last;
658             selfId = MultiProc_self();
659             nProcessors = MultiProc_getNumProcessors();
661             do {
662                 if (rprocId != selfId &&
663                     obj->fd[rprocId] != Transport_INVALIDSOCKET) {
665                     if (FD_ISSET(obj->fd[rprocId], &rfds)) {
666                         /* Our transport's fd was signalled: Get the message */
667                         tmpStatus = transportGet(obj->fd[rprocId], msg);
668                         if (tmpStatus < 0) {
669                             printf ("MessageQ_get: tranposrtshm_get failed.");
670                             status = MessageQ_E_FAIL;
671                         }
673                         last = (rprocId + 1) % nProcessors;
674                         break;
675                     }
676                 }
677                 rprocId = (rprocId + 1) % nProcessors;
678             } while (rprocId != last);
679         }
680     }
681     else if (retval == 0) {
682         *msg = NULL;
683         status = MessageQ_E_TIMEOUT;
684     }
686     return (status);
689 /*
690  * Return a count of the number of messages in the queue
691  *
692  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
693  */
694 Int MessageQ_count (MessageQ_Handle handle)
696     Int               count = -1;
697 #if 0
698     MessageQ_Object * obj   = (MessageQ_Object *) handle;
699     socklen_t         optlen;
701     /*
702      * TBD: Need to find a way to implement (if anyone uses it!), and
703      * push down into transport..
704      */
706     /*
707      * 2nd arg to getsockopt should be transport independent, but using
708      *  SSKPROTO_SHMFIFO for now:
709      */
710     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
711                  &count, &optlen);
712 #endif
714     return (count);
717 /* Initializes a message not obtained from MessageQ_alloc. */
718 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
720     /* Fill in the fields of the message */
721     MessageQ_msgInit (msg);
722     msg->heapId  = MessageQ_STATICMSG;
723     msg->msgSize = size;
726 /*
727  * Allocate a message and initialize the needed fields (note some
728  * of the fields in the header are set via other APIs or in the
729  * MessageQ_put function,
730  */
731 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
733     MessageQ_Msg msg       = NULL;
735     /*
736      * heapId not used for local alloc (as this is over a copy transport), but
737      * we need to send to other side as heapId is used in BIOS transport:
738      */
739     msg = (MessageQ_Msg)calloc (1, size);
740     MessageQ_msgInit (msg);
741     msg->msgSize = size;
742     msg->heapId  = heapId;
744     return msg;
747 /* Frees the message back to the heap that was used to allocate it. */
748 Int MessageQ_free (MessageQ_Msg msg)
750     UInt32         status = MessageQ_S_SUCCESS;
752     /* Check to ensure this was not allocated by user: */
753     if (msg->heapId == MessageQ_STATICMSG)  {
754         status =  MessageQ_E_CANNOTFREESTATICMSG;
755     }
756     else {
757         free (msg);
758     }
760     return status;
763 /* Register a heap with MessageQ. */
764 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
766     Int  status = MessageQ_S_SUCCESS;
768     /* Do nothing, as this uses a copy transport: */
770     return status;
773 /* Unregister a heap with MessageQ. */
774 Int MessageQ_unregisterHeap (UInt16 heapId)
776     Int  status = MessageQ_S_SUCCESS;
778     /* Do nothing, as this uses a copy transport: */
780     return status;
783 /* Unblocks a MessageQ */
784 Void MessageQ_unblock (MessageQ_Handle handle)
786     MessageQ_Object * obj   = (MessageQ_Object *) handle;
787     uint64_t     buf = 1;
788     int          numBytes;
790     /* Write 8 bytes to awaken any threads blocked on this messageQ: */
791     numBytes = write(obj->unblockFd, &buf, sizeof(buf));
794 /* Embeds a source message queue into a message. */
795 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
797     MessageQ_Object * obj   = (MessageQ_Object *) handle;
799     msg->replyId   = (UInt16)(obj->queue);
800     msg->replyProc = (UInt16)(obj->queue >> 16);
803 /* Returns the QueueId associated with the handle. */
804 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
806     MessageQ_Object * obj = (MessageQ_Object *) handle;
807     UInt32            queueId;
809     queueId = (obj->queue);
811     return queueId;
814 /* Sets the tracing of a message */
815 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
817     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
820 /*
821  *  Returns the amount of shared memory used by one transport instance.
822  *
823  *  The MessageQ module itself does not use any shared memory but the
824  *  underlying transport may use some shared memory.
825  */
826 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
828     SizeT memReq = 0u;
830     /* Do nothing, as this is a copy transport. */
832     return (memReq);
835 /*
836  *  Create a socket for this remote proc, and attempt to connect.
837  *
838  *  Only creates a socket if one does not already exist for this procId.
839  *
840  *  Note: remoteProcId may be MultiProc_Self() for loopback case.
841  */
842 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
844     Int     status = MessageQ_S_SUCCESS;
845     int     sock;
847     PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
849     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
850         status = MessageQ_E_INVALIDPROCID;
851         goto exit;
852     }
854     pthread_mutex_lock (&(MessageQ_module->gate));
856     /* Only create a socket if one doesn't exist: */
857     if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET)  {
858         /* Create the socket for sending messages to the remote proc: */
859         sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
860         if (sock < 0) {
861             status = MessageQ_E_FAIL;
862             printf ("MessageQ_attach: socket failed: %d, %s\n",
863                        errno, strerror(errno));
864         }
865         else  {
866             PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
867             MessageQ_module->sock[remoteProcId] = sock;
868             /* Attempt to connect: */
869             status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
870             if (status < 0) {
871                 status = MessageQ_E_RESOURCE;
872                 /* don't hard-printf since this is no longer fatal */
873                 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
874                        remoteProcId);
875             }
876         }
877     }
878     else {
879         status = MessageQ_E_ALREADYEXISTS;
880     }
882     pthread_mutex_unlock (&(MessageQ_module->gate));
884     if (status == MessageQ_E_RESOURCE) {
885         MessageQ_detach(remoteProcId);
886     }
888 exit:
889     return (status);
892 /*
893  *  Close the socket for this remote proc.
894  *
895  */
896 Int MessageQ_detach (UInt16 remoteProcId)
898     Int status = MessageQ_S_SUCCESS;
899     int sock;
901     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
902         status = MessageQ_E_INVALIDPROCID;
903         goto exit;
904     }
906     pthread_mutex_lock (&(MessageQ_module->gate));
908     sock = MessageQ_module->sock[remoteProcId];
909     if (sock != Transport_INVALIDSOCKET) {
910         if (close(sock)) {
911             status = MessageQ_E_OSFAILURE;
912             printf("MessageQ_detach: close failed: %d, %s\n",
913                    errno, strerror(errno));
914         }
915         else {
916             PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
917             MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
918         }
919     }
921     pthread_mutex_unlock (&(MessageQ_module->gate));
923 exit:
924     return (status);
927 /*
928  * This is a helper function to initialize a message.
929  */
930 Void MessageQ_msgInit (MessageQ_Msg msg)
932 #if 0
933     Int                 status    = MessageQ_S_SUCCESS;
934     LAD_ClientHandle handle;
935     struct LAD_CommandObj cmd;
936     union LAD_ResponseObj rsp;
938     handle = LAD_findHandle();
939     if (handle == LAD_MAXNUMCLIENTS) {
940         PRINTVERBOSE1(
941           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
942            getpid())
944         return;
945     }
947     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
948     cmd.clientId = handle;
950     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
951         PRINTVERBOSE1(
952           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
953         return;
954     }
956     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
957         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
958         return;
959     }
960     status = rsp.msgInit.status;
962     PRINTVERBOSE2(
963       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
964       handle, status)
966     memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
967 #else
968     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
969     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
970     msg->msgId     = MessageQ_INVALIDMSGID;
971     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
972     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
973     msg->srcProc   = MultiProc_self();
975     pthread_mutex_lock(&(MessageQ_module->gate));
976     msg->seqNum  = MessageQ_module->seqNum++;
977     pthread_mutex_unlock(&(MessageQ_module->gate));
978 #endif
981 /*
982  * =============================================================================
983  * Transport: Fxns kept here until need for a transport layer is realized.
984  * =============================================================================
985  */
986 /*
987  * ======== transportCreateEndpoint ========
988  *
989  * Create a communication endpoint to receive messages.
990  */
991 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
993     Int          status    = MessageQ_S_SUCCESS;
994     int         err;
996     /*  Create the socket to receive messages for this messageQ. */
997     *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
998     if (*fd < 0) {
999         status = MessageQ_E_FAIL;
1000         printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
1001                   errno, strerror(errno));
1002         goto exit;
1003     }
1005     PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1007     err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1008     if (err < 0) {
1009         status = MessageQ_E_FAIL;
1010         /* don't hard-printf since this is no longer fatal */
1011         PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
1012                       errno, strerror(errno));
1013     }
1015 exit:
1016     return (status);
1019 /*
1020  * ======== transportCloseEndpoint ========
1021  *
1022  *  Close the communication endpoint.
1023  */
1024 static Int transportCloseEndpoint(int fd)
1026     Int status = MessageQ_S_SUCCESS;
1028     PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1030     /* Stop communication to this socket:  */
1031     close(fd);
1033     return (status);
1036 /*
1037  * ======== transportGet ========
1038  *  Retrieve a message waiting in the socket's queue.
1039 */
1040 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1042     Int           status    = MessageQ_S_SUCCESS;
1043     MessageQ_Msg  msg;
1044     struct sockaddr_rpmsg fromAddr;  // [Socket address of sender]
1045     unsigned int  len;
1046     int           byteCount;
1048     /*
1049      * We have no way of peeking to see what message size we'll get, so we
1050      * allocate a message of max size to receive contents from the rpmsg socket
1051      * (currently, a copy transport)
1052      */
1053     msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1054     if (!msg)  {
1055         status = MessageQ_E_MEMORY;
1056         goto exit;
1057     }
1059     memset(&fromAddr, 0, sizeof(fromAddr));
1060     len = sizeof(fromAddr);
1062     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1063                       (struct sockaddr *)&fromAddr, &len);
1064     if (len != sizeof(fromAddr)) {
1065         printf("recvfrom: got bad addr len (%d)\n", len);
1066         status = MessageQ_E_FAIL;
1067         goto exit;
1068     }
1069     if (byteCount < 0) {
1070         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1071         status = MessageQ_E_FAIL;
1072         goto exit;
1073     }
1074     else {
1075          /* Update the allocated message size (even though this may waste space
1076           * when the actual message is smaller than the maximum rpmsg size,
1077           * the message will be freed soon anyway, and it avoids an extra copy).
1078           */
1079          msg->msgSize = byteCount;
1081          /*
1082           * If the message received was statically allocated, reset the
1083           * heapId, so the app can free it.
1084           */
1085          if (msg->heapId == MessageQ_STATICMSG)  {
1086              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
1087          }
1088     }
1090     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1091     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1092     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1094     *retMsg = msg;
1096 exit:
1097     return (status);
1100 /*
1101  * ======== transportPut ========
1102  *
1103  * Calls the socket API sendto() on the socket associated with
1104  * with this destination procID.
1105  * Currently, both local and remote messages are sent via the Socket ABI, so
1106  * no local object lists are maintained here.
1107 */
1108 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1110     Int     status    = MessageQ_S_SUCCESS;
1111     int     sock;
1112     int     err;
1114     /*
1115      * Retrieve the socket for the AF_SYSLINK protocol associated with this
1116      * transport.
1117      */
1118     sock = MessageQ_module->sock[dstProcId];
1120     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1122     err = send(sock, msg, msg->msgSize, 0);
1123     if (err < 0) {
1124         printf ("transportPut: send failed: %d, %s\n",
1125                   errno, strerror(errno));
1126         status = MessageQ_E_FAIL;
1127         goto exit;
1128     }
1130     /*
1131      * Free the message, as this is a copy transport, we maintain MessageQ
1132      * semantics.
1133      */
1134     MessageQ_free (msg);
1136 exit:
1137     return (status);