Linux: Tracing ease of use
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <_MessageQ.h>
53 /* Socket Headers */
54 #include <sys/select.h>
55 #include <sys/time.h>
56 #include <sys/types.h>
57 #include <sys/param.h>
58 #include <sys/eventfd.h>
59 #include <sys/socket.h>
60 #include <errno.h>
61 #include <stdio.h>
62 #include <string.h>
63 #include <stdlib.h>
64 #include <unistd.h>
65 #include <assert.h>
66 #include <pthread.h>
68 /* Socket Protocol Family */
69 #include <net/rpmsg.h>
71 /* Socket utils: */
72 #include <SocketFxns.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78  * Macros/Constants
79  * =============================================================================
80  */
82 /*!
83  *  @brief  Name of the reserved NameServer used for MessageQ.
84  */
85 #define MessageQ_NAMESERVER  "MessageQ"
87 /*!
88  *  @brief  Value of an invalid socket ID:
89  */
90 #define Transport_INVALIDSOCKET  (0xFFFFFFFF)
92 /* More magic rpmsg port numbers: */
93 #define MESSAGEQ_RPMSG_PORT       61
94 #define MESSAGEQ_RPMSG_MAXSIZE   512
96 /* Trace flag settings: */
97 #define TRACESHIFT    12
98 #define TRACEMASK     0x1000
100 /* Define BENCHMARK to quiet key MessageQ APIs: */
101 //#define BENCHMARK
103 /* =============================================================================
104  * Structures & Enums
105  * =============================================================================
106  */
108 /* structure for MessageQ module state */
109 typedef struct MessageQ_ModuleObject {
110     Int                 refCount;
111     /*!< Reference count */
112     NameServer_Handle   nameServer;
113     /*!< Handle to the local NameServer used for storing GP objects */
114     pthread_mutex_t     gate;
115     /*!< Handle of gate to be used for local thread safety */
116     MessageQ_Params     defaultInstParams;
117     /*!< Default instance creation parameters */
118     int                 sock[MultiProc_MAXPROCESSORS];
119     /*!< Sockets to for sending to each remote processor */
120     int                 seqNum;
121     /*!< Process-specific sequence number */
122 } MessageQ_ModuleObject;
124 /*!
125  *  @brief  Structure for the Handle for the MessageQ.
126  */
127 typedef struct MessageQ_Object_tag {
128     MessageQ_Params         params;
129     /*! Instance specific creation parameters */
130     MessageQ_QueueId        queue;
131     /* Unique id */
132     int                     fd[MultiProc_MAXPROCESSORS];
133     /* File Descriptor to block on messages from remote processors. */
134     int                     unblockFd;
135     /* Write this fd to unblock the select() call in MessageQ _get() */
136     void                    *serverHandle;
137 } MessageQ_Object;
139 /* traces in this file are controlled via _MessageQ_verbose */
140 Bool _MessageQ_verbose = FALSE;
141 #define verbose _MessageQ_verbose
144 /* =============================================================================
145  *  Globals
146  * =============================================================================
147  */
148 static MessageQ_ModuleObject MessageQ_state =
150     .refCount               = 0,
151     .nameServer             = NULL,
152 };
154 /*!
155  *  @var    MessageQ_module
156  *
157  *  @brief  Pointer to the MessageQ module state.
158  */
159 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
162 /* =============================================================================
163  * Forward declarations of internal functions
164  * =============================================================================
165  */
167 /* This is a helper function to initialize a message. */
168 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
169 static Int transportCloseEndpoint(int fd);
170 static Int transportGet(int sock, MessageQ_Msg * retMsg);
171 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
173 /* =============================================================================
174  * APIS
175  * =============================================================================
176  */
177 /* Function to get default configuration for the MessageQ module.
178  *
179  */
180 Void MessageQ_getConfig (MessageQ_Config * cfg)
182     Int status;
183     LAD_ClientHandle handle;
184     struct LAD_CommandObj cmd;
185     union LAD_ResponseObj rsp;
187     assert (cfg != NULL);
189     handle = LAD_findHandle();
190     if (handle == LAD_MAXNUMCLIENTS) {
191         PRINTVERBOSE1(
192           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
193            getpid())
195         return;
196     }
198     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
199     cmd.clientId = handle;
201     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
202         PRINTVERBOSE1(
203           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
204         return;
205     }
207     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
208         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
209         return;
210     }
211     status = rsp.messageQGetConfig.status;
213     PRINTVERBOSE2(
214       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
215       handle, status)
217     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
219     return;
222 /* Function to setup the MessageQ module. */
223 Int MessageQ_setup(const MessageQ_Config * cfg)
225     Int status;
226     LAD_ClientHandle handle;
227     struct LAD_CommandObj cmd;
228     union LAD_ResponseObj rsp;
229     Int i;
231     handle = LAD_findHandle();
232     if (handle == LAD_MAXNUMCLIENTS) {
233         PRINTVERBOSE1(
234           "MessageQ_setup: can't find connection to daemon for pid %d\n",
235            getpid())
237         return MessageQ_E_RESOURCE;
238     }
240     cmd.cmd = LAD_MESSAGEQ_SETUP;
241     cmd.clientId = handle;
242     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
244     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
245         PRINTVERBOSE1(
246           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
247         return MessageQ_E_FAIL;
248     }
250     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
251         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
252         return(status);
253     }
254     status = rsp.setup.status;
256     PRINTVERBOSE2(
257       "MessageQ_setup: got LAD response for client %d, status=%d\n",
258       handle, status)
260     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
261     MessageQ_module->seqNum = 0;
263     /* Create a default local gate. */
264     pthread_mutex_init (&(MessageQ_module->gate), NULL);
266     /* Clear sockets array. */
267     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
268         MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
269     }
271     return status;
274 /*
275  * Function to destroy the MessageQ module.
276  * Destroys socket/protocol maps;  sockets themselves should have been
277  * destroyed in MessageQ_delete() and MessageQ_detach() calls.
278  */
279 Int MessageQ_destroy (void)
281     Int status;
282     LAD_ClientHandle handle;
283     struct LAD_CommandObj cmd;
284     union LAD_ResponseObj rsp;
286     handle = LAD_findHandle();
287     if (handle == LAD_MAXNUMCLIENTS) {
288         PRINTVERBOSE1(
289           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
290            getpid())
292         return MessageQ_E_RESOURCE;
293     }
295     cmd.cmd = LAD_MESSAGEQ_DESTROY;
296     cmd.clientId = handle;
298     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
299         PRINTVERBOSE1(
300           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
301         return MessageQ_E_FAIL;
302     }
304     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
305         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
306         return(status);
307     }
308     status = rsp.status;
310     PRINTVERBOSE2(
311       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
312       handle, status)
314     return status;
317 /* Function to initialize the parameters for the MessageQ instance. */
318 Void MessageQ_Params_init (MessageQ_Params * params)
320     memcpy (params, &(MessageQ_module->defaultInstParams),
321             sizeof (MessageQ_Params));
323     return;
326 /*
327  *   Function to create a MessageQ object for receiving.
328  *
329  *   Create a socket and bind the source address (local ProcId/MessageQ ID) in
330  *   order to get messages dispatched to this messageQ.
331  */
332 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
334     Int                   status;
335     MessageQ_Object *     obj    = NULL;
336     UInt16                queueIndex = 0u;
337     UInt16                procId;
338     UInt16                rprocId;
339     LAD_ClientHandle      handle;
340     struct LAD_CommandObj cmd;
341     union LAD_ResponseObj rsp;
343     handle = LAD_findHandle();
344     if (handle == LAD_MAXNUMCLIENTS) {
345         PRINTVERBOSE1(
346           "MessageQ_create: can't find connection to daemon for pid %d\n",
347            getpid())
349         return NULL;
350     }
352     cmd.cmd = LAD_MESSAGEQ_CREATE;
353     cmd.clientId = handle;
354     if (name == NULL) {
355         cmd.args.messageQCreate.name[0] = '\0';
356     }
357     else {
358         strncpy(cmd.args.messageQCreate.name, name,
359                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
360         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
361     }
363     if (params) {
364         memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
365     }
367     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
368         PRINTVERBOSE1(
369           "MessageQ_create: sending LAD command failed, status=%d\n", status)
370         return NULL;
371     }
373     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
374         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
375         return NULL;
376     }
377     status = rsp.messageQCreate.status;
379     PRINTVERBOSE2(
380       "MessageQ_create: got LAD response for client %d, status=%d\n",
381       handle, status)
383     if (status == -1) {
384        PRINTVERBOSE1(
385           "MessageQ_create: MessageQ server operation failed, status=%d\n",
386           status)
387        return NULL;
388     }
390     /* Create the generic obj */
391     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
393     if (params != NULL) {
394        /* Populate the params member */
395         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
396     }
398     procId = MultiProc_self();
399     queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
400     obj->queue = rsp.messageQCreate.queueId;
401     obj->serverHandle = rsp.messageQCreate.serverHandle;
403     /*
404      * Create a set of communication endpoints (one per each remote proc),
405      * and return the socket as target for MessageQ_put() calls, and as
406      * a file descriptor to close during MessageQ_delete().
407      */
408     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
409         obj->fd[rprocId] = Transport_INVALIDSOCKET;
410         if (procId == rprocId) {
411             /* Skip creating an endpoint for ourself. */
412             continue;
413         }
415         PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
417         status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
418                                            queueIndex);
419         if (status < 0) {
420            obj->fd[rprocId] = Transport_INVALIDSOCKET;
421         }
422     }
424     /*
425      * Now, to support MessageQ_unblock() functionality, create an event object.
426      * Writing to this event will unblock the select() call in MessageQ_get().
427      */
428     obj->unblockFd = eventfd(0, 0);
429     if (obj->unblockFd == -1)  {
430         printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
431                    errno, strerror(errno));
432         MessageQ_delete((MessageQ_Handle *)&obj);
433     }
434     else {
435         int endpointFound = 0;
437         for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
438             if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
439                 endpointFound = 1;
440             }
441         }
442         if (!endpointFound) {
443             printf("MessageQ_create: no transport endpoints found, deleting\n");
444             MessageQ_delete((MessageQ_Handle *)&obj);
445         }
446     }
448     return ((MessageQ_Handle) obj);
451 /*
452  * Function to delete a MessageQ object for a specific slave processor.
453  *
454  * Deletes the socket associated with this MessageQ object.
455  */
456 Int MessageQ_delete (MessageQ_Handle * handlePtr)
458     Int               status    = MessageQ_S_SUCCESS;
459     MessageQ_Object * obj       = NULL;
460     UInt16            rprocId;
461     LAD_ClientHandle      handle;
462     struct LAD_CommandObj cmd;
463     union LAD_ResponseObj rsp;
465     handle = LAD_findHandle();
466     if (handle == LAD_MAXNUMCLIENTS) {
467         PRINTVERBOSE1(
468           "MessageQ_delete: can't find connection to daemon for pid %d\n",
469            getpid())
471         return MessageQ_E_FAIL;
472     }
474     obj = (MessageQ_Object *) (*handlePtr);
476     cmd.cmd = LAD_MESSAGEQ_DELETE;
477     cmd.clientId = handle;
478     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
480     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
481         PRINTVERBOSE1(
482           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
483         return MessageQ_E_FAIL;
484     }
486     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
487         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
488         return MessageQ_E_FAIL;
489     }
490     status = rsp.messageQDelete.status;
492     PRINTVERBOSE2(
493       "MessageQ_delete: got LAD response for client %d, status=%d\n",
494       handle, status)
497     /* Close the event used for MessageQ_unblock(): */
498     close(obj->unblockFd);
500     /* Close the communication endpoint: */
501     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
502         if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
503             status = transportCloseEndpoint(obj->fd[rprocId]);
504         }
505     }
507     /* Now free the obj */
508     free (obj);
509     *handlePtr = NULL;
511     return (status);
514 /*
515  *  Opens an instance of MessageQ for sending.
516  *
517  *  We need not create a socket here; the sockets for all remote processors
518  *  were created during MessageQ_attach(), and will be
519  *  retrieved during MessageQ_put().
520  */
521 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
523     Int status = MessageQ_S_SUCCESS;
525     status = NameServer_getUInt32 (MessageQ_module->nameServer,
526                                      name, queueId, NULL);
528     if (status == NameServer_E_NOTFOUND) {
529         /* Set return queue ID to invalid. */
530         *queueId = MessageQ_INVALIDMESSAGEQ;
531         status = MessageQ_E_NOTFOUND;
532     }
533     else if (status >= 0) {
534         /* Override with a MessageQ status code. */
535         status = MessageQ_S_SUCCESS;
536     }
537     else {
538         /* Set return queue ID to invalid. */
539         *queueId = MessageQ_INVALIDMESSAGEQ;
540         /* Override with a MessageQ status code. */
541         if (status == NameServer_E_TIMEOUT) {
542             status = MessageQ_E_TIMEOUT;
543         }
544         else {
545             status = MessageQ_E_FAIL;
546         }
547     }
549     return (status);
552 /* Closes previously opened instance of MessageQ module. */
553 Int MessageQ_close (MessageQ_QueueId * queueId)
555     Int32 status = MessageQ_S_SUCCESS;
557     /* Nothing more to be done for closing the MessageQ. */
558     *queueId = MessageQ_INVALIDMESSAGEQ;
560     return (status);
563 /*
564  * Place a message onto a message queue.
565  *
566  * Calls TransportShm_put(), which handles the sending of the message using the
567  * appropriate kernel interface (socket, device ioctl) call for the remote
568  * procId encoded in the queueId argument.
569  *
570  */
571 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
573     Int      status;
574     UInt16   dstProcId  = (UInt16)(queueId >> 16);
575     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
577     msg->dstId     = queueIndex;
578     msg->dstProc   = dstProcId;
580     status = transportPut(msg, queueIndex, dstProcId);
582     return (status);
585 /*
586  * Gets a message for a message queue and blocks if the queue is empty.
587  * If a message is present, it returns it.  Otherwise it blocks
588  * waiting for a message to arrive.
589  * When a message is returned, it is owned by the caller.
590  *
591  * We block using select() on the receiving socket's file descriptor, then
592  * get the waiting message via the socket API recvfrom().
593  * We use the socket stored in the messageQ object via a previous call to
594  * MessageQ_create().
595  *
596  */
597 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
599     static int last = 0;
600     Int     status = MessageQ_S_SUCCESS;
601     Int     tmpStatus;
602     MessageQ_Object * obj = (MessageQ_Object *) handle;
603     int     retval;
604     int     nfds;
605     fd_set  rfds;
606     struct  timeval tv;
607     void    *timevalPtr;
608     UInt16  rprocId;
609     int     maxfd = 0;
610     int     selfId;
611     int     nProcessors;
613     /* Wait (with timeout) and retreive message from socket: */
614     FD_ZERO(&rfds);
615     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
616         if (rprocId == MultiProc_self() ||
617             obj->fd[rprocId] == Transport_INVALIDSOCKET) {
618             continue;
619         }
620         maxfd = MAX(maxfd, obj->fd[rprocId]);
621         FD_SET(obj->fd[rprocId], &rfds);
622     }
624     /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
625     FD_SET(obj->unblockFd, &rfds);
627     if (timeout == MessageQ_FOREVER) {
628         timevalPtr = NULL;
629     }
630     else {
631         /* Timeout given in msec: convert:  */
632         tv.tv_sec = timeout / 1000;
633         tv.tv_usec = (timeout % 1000) * 1000;
634         timevalPtr = &tv;
635     }
636     /* Add one to last fd created: */
637     nfds = MAX(maxfd, obj->unblockFd) + 1;
639     retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
640     if (retval)  {
641         if (FD_ISSET(obj->unblockFd, &rfds))  {
642             /*
643              * Our event was signalled by MessageQ_unblock().
644              *
645              * This is typically done during a shutdown sequence, where
646              * the intention of the client would be to ignore (i.e. not fetch)
647              * any pending messages in the transport's queue.
648              * Thus, we shall not check for nor return any messages.
649              */
650             *msg = NULL;
651             status = MessageQ_E_UNBLOCKED;
652         }
653         else {
654             /* start where we last left off */
655             rprocId = last;
657             selfId = MultiProc_self();
658             nProcessors = MultiProc_getNumProcessors();
660             do {
661                 if (rprocId != selfId &&
662                     obj->fd[rprocId] != Transport_INVALIDSOCKET) {
664                     if (FD_ISSET(obj->fd[rprocId], &rfds)) {
665                         /* Our transport's fd was signalled: Get the message */
666                         tmpStatus = transportGet(obj->fd[rprocId], msg);
667                         if (tmpStatus < 0) {
668                             printf ("MessageQ_get: tranposrtshm_get failed.");
669                             status = MessageQ_E_FAIL;
670                         }
672                         last = (rprocId + 1) % nProcessors;
673                         break;
674                     }
675                 }
676                 rprocId = (rprocId + 1) % nProcessors;
677             } while (rprocId != last);
678         }
679     }
680     else if (retval == 0) {
681         *msg = NULL;
682         status = MessageQ_E_TIMEOUT;
683     }
685     return (status);
688 /*
689  * Return a count of the number of messages in the queue
690  *
691  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
692  */
693 Int MessageQ_count (MessageQ_Handle handle)
695     Int               count = -1;
696 #if 0
697     MessageQ_Object * obj   = (MessageQ_Object *) handle;
698     socklen_t         optlen;
700     /*
701      * TBD: Need to find a way to implement (if anyone uses it!), and
702      * push down into transport..
703      */
705     /*
706      * 2nd arg to getsockopt should be transport independent, but using
707      *  SSKPROTO_SHMFIFO for now:
708      */
709     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
710                  &count, &optlen);
711 #endif
713     return (count);
716 /* Initializes a message not obtained from MessageQ_alloc. */
717 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
719     /* Fill in the fields of the message */
720     MessageQ_msgInit (msg);
721     msg->heapId  = MessageQ_STATICMSG;
722     msg->msgSize = size;
725 /*
726  * Allocate a message and initialize the needed fields (note some
727  * of the fields in the header are set via other APIs or in the
728  * MessageQ_put function,
729  */
730 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
732     MessageQ_Msg msg       = NULL;
734     /*
735      * heapId not used for local alloc (as this is over a copy transport), but
736      * we need to send to other side as heapId is used in BIOS transport:
737      */
738     msg = (MessageQ_Msg)calloc (1, size);
739     MessageQ_msgInit (msg);
740     msg->msgSize = size;
741     msg->heapId  = heapId;
743     return msg;
746 /* Frees the message back to the heap that was used to allocate it. */
747 Int MessageQ_free (MessageQ_Msg msg)
749     UInt32         status = MessageQ_S_SUCCESS;
751     /* Check to ensure this was not allocated by user: */
752     if (msg->heapId == MessageQ_STATICMSG)  {
753         status =  MessageQ_E_CANNOTFREESTATICMSG;
754     }
755     else {
756         free (msg);
757     }
759     return status;
762 /* Register a heap with MessageQ. */
763 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
765     Int  status = MessageQ_S_SUCCESS;
767     /* Do nothing, as this uses a copy transport: */
769     return status;
772 /* Unregister a heap with MessageQ. */
773 Int MessageQ_unregisterHeap (UInt16 heapId)
775     Int  status = MessageQ_S_SUCCESS;
777     /* Do nothing, as this uses a copy transport: */
779     return status;
782 /* Unblocks a MessageQ */
783 Void MessageQ_unblock (MessageQ_Handle handle)
785     MessageQ_Object * obj   = (MessageQ_Object *) handle;
786     uint64_t     buf = 1;
787     int          numBytes;
789     /* Write 8 bytes to awaken any threads blocked on this messageQ: */
790     numBytes = write(obj->unblockFd, &buf, sizeof(buf));
793 /* Embeds a source message queue into a message. */
794 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
796     MessageQ_Object * obj   = (MessageQ_Object *) handle;
798     msg->replyId   = (UInt16)(obj->queue);
799     msg->replyProc = (UInt16)(obj->queue >> 16);
802 /* Returns the QueueId associated with the handle. */
803 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
805     MessageQ_Object * obj = (MessageQ_Object *) handle;
806     UInt32            queueId;
808     queueId = (obj->queue);
810     return queueId;
813 /* Sets the tracing of a message */
814 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
816     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
819 /*
820  *  Returns the amount of shared memory used by one transport instance.
821  *
822  *  The MessageQ module itself does not use any shared memory but the
823  *  underlying transport may use some shared memory.
824  */
825 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
827     SizeT memReq = 0u;
829     /* Do nothing, as this is a copy transport. */
831     return (memReq);
834 /*
835  *  Create a socket for this remote proc, and attempt to connect.
836  *
837  *  Only creates a socket if one does not already exist for this procId.
838  *
839  *  Note: remoteProcId may be MultiProc_Self() for loopback case.
840  */
841 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
843     Int     status = MessageQ_S_SUCCESS;
844     int     sock;
846     PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
848     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
849         status = MessageQ_E_INVALIDPROCID;
850         goto exit;
851     }
853     pthread_mutex_lock (&(MessageQ_module->gate));
855     /* Only create a socket if one doesn't exist: */
856     if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET)  {
857         /* Create the socket for sending messages to the remote proc: */
858         sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
859         if (sock < 0) {
860             status = MessageQ_E_FAIL;
861             printf ("MessageQ_attach: socket failed: %d, %s\n",
862                        errno, strerror(errno));
863         }
864         else  {
865             PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
866             MessageQ_module->sock[remoteProcId] = sock;
867             /* Attempt to connect: */
868             status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
869             if (status < 0) {
870                 status = MessageQ_E_RESOURCE;
871                 /* don't hard-printf since this is no longer fatal */
872                 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
873                        remoteProcId);
874             }
875         }
876     }
877     else {
878         status = MessageQ_E_ALREADYEXISTS;
879     }
881     pthread_mutex_unlock (&(MessageQ_module->gate));
883     if (status == MessageQ_E_RESOURCE) {
884         MessageQ_detach(remoteProcId);
885     }
887 exit:
888     return (status);
891 /*
892  *  Close the socket for this remote proc.
893  *
894  */
895 Int MessageQ_detach (UInt16 remoteProcId)
897     Int status = MessageQ_S_SUCCESS;
898     int sock;
900     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
901         status = MessageQ_E_INVALIDPROCID;
902         goto exit;
903     }
905     pthread_mutex_lock (&(MessageQ_module->gate));
907     sock = MessageQ_module->sock[remoteProcId];
908     if (sock != Transport_INVALIDSOCKET) {
909         if (close(sock)) {
910             status = MessageQ_E_OSFAILURE;
911             printf("MessageQ_detach: close failed: %d, %s\n",
912                    errno, strerror(errno));
913         }
914         else {
915             PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
916             MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
917         }
918     }
920     pthread_mutex_unlock (&(MessageQ_module->gate));
922 exit:
923     return (status);
926 /*
927  * This is a helper function to initialize a message.
928  */
929 Void MessageQ_msgInit (MessageQ_Msg msg)
931 #if 0
932     Int                 status    = MessageQ_S_SUCCESS;
933     LAD_ClientHandle handle;
934     struct LAD_CommandObj cmd;
935     union LAD_ResponseObj rsp;
937     handle = LAD_findHandle();
938     if (handle == LAD_MAXNUMCLIENTS) {
939         PRINTVERBOSE1(
940           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
941            getpid())
943         return;
944     }
946     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
947     cmd.clientId = handle;
949     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
950         PRINTVERBOSE1(
951           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
952         return;
953     }
955     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
956         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
957         return;
958     }
959     status = rsp.msgInit.status;
961     PRINTVERBOSE2(
962       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
963       handle, status)
965     memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
966 #else
967     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
968     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
969     msg->msgId     = MessageQ_INVALIDMSGID;
970     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
971     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
972     msg->srcProc   = MultiProc_self();
974     pthread_mutex_lock(&(MessageQ_module->gate));
975     msg->seqNum  = MessageQ_module->seqNum++;
976     pthread_mutex_unlock(&(MessageQ_module->gate));
977 #endif
980 /*
981  * =============================================================================
982  * Transport: Fxns kept here until need for a transport layer is realized.
983  * =============================================================================
984  */
985 /*
986  * ======== transportCreateEndpoint ========
987  *
988  * Create a communication endpoint to receive messages.
989  */
990 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
992     Int          status    = MessageQ_S_SUCCESS;
993     int         err;
995     /*  Create the socket to receive messages for this messageQ. */
996     *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
997     if (*fd < 0) {
998         status = MessageQ_E_FAIL;
999         printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
1000                   errno, strerror(errno));
1001         goto exit;
1002     }
1004     PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1006     err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1007     if (err < 0) {
1008         status = MessageQ_E_FAIL;
1009         /* don't hard-printf since this is no longer fatal */
1010         PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
1011                       errno, strerror(errno));
1012     }
1014 exit:
1015     return (status);
1018 /*
1019  * ======== transportCloseEndpoint ========
1020  *
1021  *  Close the communication endpoint.
1022  */
1023 static Int transportCloseEndpoint(int fd)
1025     Int status = MessageQ_S_SUCCESS;
1027     PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1029     /* Stop communication to this socket:  */
1030     close(fd);
1032     return (status);
1035 /*
1036  * ======== transportGet ========
1037  *  Retrieve a message waiting in the socket's queue.
1038 */
1039 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1041     Int           status    = MessageQ_S_SUCCESS;
1042     MessageQ_Msg  msg;
1043     struct sockaddr_rpmsg fromAddr;  // [Socket address of sender]
1044     unsigned int  len;
1045     int           byteCount;
1047     /*
1048      * We have no way of peeking to see what message size we'll get, so we
1049      * allocate a message of max size to receive contents from the rpmsg socket
1050      * (currently, a copy transport)
1051      */
1052     msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1053     if (!msg)  {
1054         status = MessageQ_E_MEMORY;
1055         goto exit;
1056     }
1058     memset(&fromAddr, 0, sizeof(fromAddr));
1059     len = sizeof(fromAddr);
1061     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1062                       (struct sockaddr *)&fromAddr, &len);
1063     if (len != sizeof(fromAddr)) {
1064         printf("recvfrom: got bad addr len (%d)\n", len);
1065         status = MessageQ_E_FAIL;
1066         goto exit;
1067     }
1068     if (byteCount < 0) {
1069         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1070         status = MessageQ_E_FAIL;
1071         goto exit;
1072     }
1073     else {
1074          /* Update the allocated message size (even though this may waste space
1075           * when the actual message is smaller than the maximum rpmsg size,
1076           * the message will be freed soon anyway, and it avoids an extra copy).
1077           */
1078          msg->msgSize = byteCount;
1080          /*
1081           * If the message received was statically allocated, reset the
1082           * heapId, so the app can free it.
1083           */
1084          if (msg->heapId == MessageQ_STATICMSG)  {
1085              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
1086          }
1087     }
1089     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1090     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1091     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1093     *retMsg = msg;
1095 exit:
1096     return (status);
1099 /*
1100  * ======== transportPut ========
1101  *
1102  * Calls the socket API sendto() on the socket associated with
1103  * with this destination procID.
1104  * Currently, both local and remote messages are sent via the Socket ABI, so
1105  * no local object lists are maintained here.
1106 */
1107 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1109     Int     status    = MessageQ_S_SUCCESS;
1110     int     sock;
1111     int     err;
1113     /*
1114      * Retrieve the socket for the AF_SYSLINK protocol associated with this
1115      * transport.
1116      */
1117     sock = MessageQ_module->sock[dstProcId];
1119     PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1121     err = send(sock, msg, msg->msgSize, 0);
1122     if (err < 0) {
1123         printf ("transportPut: send failed: %d, %s\n",
1124                   errno, strerror(errno));
1125         status = MessageQ_E_FAIL;
1126         goto exit;
1127     }
1129     /*
1130      * Free the message, as this is a copy transport, we maintain MessageQ
1131      * semantics.
1132      */
1133     MessageQ_free (msg);
1135 exit:
1136     return (status);