b236277175613807ba1cb5d12711dbb34f07006e
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 /* Socket Protocol Family */
74 #include <net/rpmsg.h>
76 #include <ladclient.h>
77 #include <_lad.h>
79 /* =============================================================================
80  * Macros/Constants
81  * =============================================================================
82  */
84 /*!
85  *  @brief  Name of the reserved NameServer used for MessageQ.
86  */
87 #define MessageQ_NAMESERVER  "MessageQ"
89 #define MessageQ_MAXTRANSPORTS 8
91 #define MessageQ_GROWSIZE 32
93 /* Trace flag settings: */
94 #define TRACESHIFT    12
95 #define TRACEMASK     0x1000
97 /* Define BENCHMARK to quiet key MessageQ APIs: */
98 //#define BENCHMARK
100 /* =============================================================================
101  * Structures & Enums
102  * =============================================================================
103  */
105 /* params structure evolution */
106 typedef struct {
107     Void *synchronizer;
108 } MessageQ_Params_Legacy;
110 typedef struct {
111     Int __version;
112     Void *synchronizer;
113     MessageQ_QueueIndex queueIndex;
114 } MessageQ_Params_Version2;
116 /* structure for MessageQ module state */
117 typedef struct MessageQ_ModuleObject {
118     MessageQ_Handle           *queues;
119     Int                       numQueues;
120     Int                       refCount;
121     NameServer_Handle         nameServer;
122     pthread_mutex_t           gate;
123     int                       seqNum;
124     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
125     INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
126     MessageQ_PutHookFxn       putHookFxn;
127 } MessageQ_ModuleObject;
129 typedef struct MessageQ_CIRCLEQ_ENTRY {
130      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
131 } MessageQ_CIRCLEQ_ENTRY;
133 /*!
134  *  @brief  Structure for the Handle for the MessageQ.
135  */
136 typedef struct MessageQ_Object_tag {
137     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
138     MessageQ_Params              params;
139     MessageQ_QueueId             queue;
140     int                          unblocked;
141     void                         *serverHandle;
142     sem_t                        synchronizer;
143 } MessageQ_Object;
145 /* traces in this file are controlled via _MessageQ_verbose */
146 Bool _MessageQ_verbose = FALSE;
147 #define verbose _MessageQ_verbose
149 /* =============================================================================
150  *  Globals
151  * =============================================================================
152  */
153 static MessageQ_ModuleObject MessageQ_state =
155     .refCount   = 0,
156     .nameServer = NULL,
157     .putHookFxn = NULL
158 };
160 /*!
161  *  @var    MessageQ_module
162  *
163  *  @brief  Pointer to the MessageQ module state.
164  */
165 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
167 Void _MessageQ_grow(UInt16 queueIndex);
169 /* =============================================================================
170  * APIS
171  * =============================================================================
172  */
174 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
175                                 UInt16 rprocId, UInt priority)
177     Int status = FALSE;
178     UInt16 clusterId;
180     if (handle == NULL) {
181         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
182               );
184         return status;
185     }
187     /* map procId to clusterId */
188     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
190     if (clusterId >= MultiProc_MAXPROCESSORS) {
191         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
193         return status;
194     }
196     if (MessageQ_module->transports[clusterId][priority] == NULL) {
197         MessageQ_module->transports[clusterId][priority] = handle;
199         status = TRUE;
200     }
202     return status;
205 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
207     if (inst == NULL) {
208         printf("MessageQ_registerTransportId: invalid NULL handle\n");
210         return MessageQ_E_INVALIDARG;
211     }
213     if (tid >= MessageQ_MAXTRANSPORTS) {
214         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
215                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
217         return MessageQ_E_INVALIDARG;
218     }
220     if (MessageQ_module->transInst[tid] != NULL) {
221         printf("MessageQ_registerTransportId: transport id %d already "
222                 "registered\n", tid);
224         return MessageQ_E_ALREADYEXISTS;
225     }
227     MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
229     return MessageQ_S_SUCCESS;
232 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
234     UInt16 clusterId;
236     /* map procId to clusterId */
237     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
239     if (clusterId >= MultiProc_MAXPROCESSORS) {
240         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
242         return;
243     }
245     MessageQ_module->transports[clusterId][priority] = NULL;
248 Void MessageQ_unregisterTransportId(UInt tid)
250     if (tid >= MessageQ_MAXTRANSPORTS) {
251         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
252                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
254         return;
255     }
257     MessageQ_module->transInst[tid] = NULL;
260 /*
261  * Function to get default configuration for the MessageQ module.
262  */
263 Void MessageQ_getConfig(MessageQ_Config *cfg)
265     Int status;
266     LAD_ClientHandle handle;
267     struct LAD_CommandObj cmd;
268     union LAD_ResponseObj rsp;
270     assert (cfg != NULL);
272     handle = LAD_findHandle();
273     if (handle == LAD_MAXNUMCLIENTS) {
274         PRINTVERBOSE1(
275           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
276            getpid())
278         return;
279     }
281     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
282     cmd.clientId = handle;
284     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
285         PRINTVERBOSE1(
286           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
287         return;
288     }
290     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
291         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
292                       status)
293         return;
294     }
295     status = rsp.messageQGetConfig.status;
297     PRINTVERBOSE2(
298       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
299       handle, status)
301     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
303     return;
306 /*
307  *  Function to setup the MessageQ module.
308  */
309 Int MessageQ_setup(const MessageQ_Config *cfg)
311     Int status;
312     LAD_ClientHandle handle;
313     struct LAD_CommandObj cmd;
314     union LAD_ResponseObj rsp;
315     Int pri;
316     Int i;
317     Int tid;
319     pthread_mutex_lock(&MessageQ_module->gate);
321     MessageQ_module->refCount++;
322     if (MessageQ_module->refCount > 1) {
324         pthread_mutex_unlock(&MessageQ_module->gate);
326         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
327                       MessageQ_module->refCount)
329         return MessageQ_S_ALREADYSETUP;
330     }
332     pthread_mutex_unlock(&MessageQ_module->gate);
334     handle = LAD_findHandle();
335     if (handle == LAD_MAXNUMCLIENTS) {
336         PRINTVERBOSE1(
337           "MessageQ_setup: can't find connection to daemon for pid %d\n",
338            getpid())
340         return MessageQ_E_RESOURCE;
341     }
343     cmd.cmd = LAD_MESSAGEQ_SETUP;
344     cmd.clientId = handle;
345     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
347     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
348         PRINTVERBOSE1(
349           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
350         return MessageQ_E_FAIL;
351     }
353     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
354         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
355         return status;
356     }
357     status = rsp.setup.status;
359     PRINTVERBOSE2(
360       "MessageQ_setup: got LAD response for client %d, status=%d\n",
361       handle, status)
363     MessageQ_module->seqNum = 0;
364     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
365     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
366     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
367                                      sizeof (MessageQ_Handle));
369     pthread_mutex_init(&MessageQ_module->gate, NULL);
371     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
372         for (pri = 0; pri < 2; pri++) {
373             MessageQ_module->transports[i][pri] = NULL;
374         }
375     }
377     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
378         MessageQ_module->transInst[tid] = NULL;
379     }
381     return status;
384 /*
385  *  MessageQ_destroy - destroy the MessageQ module.
386  */
387 Int MessageQ_destroy(void)
389     Int status;
390     LAD_ClientHandle handle;
391     struct LAD_CommandObj cmd;
392     union LAD_ResponseObj rsp;
394     handle = LAD_findHandle();
395     if (handle == LAD_MAXNUMCLIENTS) {
396         PRINTVERBOSE1(
397           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
398            getpid())
400         return MessageQ_E_RESOURCE;
401     }
403     cmd.cmd = LAD_MESSAGEQ_DESTROY;
404     cmd.clientId = handle;
406     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
407         PRINTVERBOSE1(
408           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
409         return MessageQ_E_FAIL;
410     }
412     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
413         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
414         return status;
415     }
416     status = rsp.status;
418     PRINTVERBOSE2(
419       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
420       handle, status)
422     return status;
425 /*
426  *  ======== MessageQ_Params_init ========
427  *  Legacy implementation.
428  */
429 Void MessageQ_Params_init(MessageQ_Params *params)
431     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
434 /*
435  *  ======== MessageQ_Params_init__S ========
436  *  New implementation which is version aware.
437  */
438 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
440     MessageQ_Params_Version2 *params2;
442     switch (version) {
444         case MessageQ_Params_VERSION_2:
445             params2 = (MessageQ_Params_Version2 *)params;
446             params2->__version = MessageQ_Params_VERSION_2;
447             params2->synchronizer = NULL;
448             params2->queueIndex = MessageQ_ANY;
449             break;
451         default:
452             assert(FALSE);
453             break;
454     }
457 /*
458  *  MessageQ_create - create a MessageQ object for receiving.
459  */
460 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
462     Int                   status;
463     MessageQ_Object      *obj = NULL;
464     IMessageQTransport_Handle transport;
465     INetworkTransport_Handle transInst;
466     UInt16                queueIndex;
467     UInt16                clusterId;
468     Int                   tid;
469     Int                   priority;
470     LAD_ClientHandle      handle;
471     struct LAD_CommandObj cmd;
472     union LAD_ResponseObj rsp;
473     MessageQ_Params ps;
475     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
477     /* copy the given params into the current params structure */
478     if (pp != NULL) {
480         /* snoop the params pointer to see if it's a legacy structure */
481         if ((pp->__version == 0) || (pp->__version > 100)) {
482             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
483         }
485         /* not legacy structure, use params version field */
486         else if (pp->__version == MessageQ_Params_VERSION_2) {
487             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
488             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
489             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
490         }
491         else {
492             assert(FALSE);
493         }
494     }
496     handle = LAD_findHandle();
497     if (handle == LAD_MAXNUMCLIENTS) {
498         PRINTVERBOSE1(
499           "MessageQ_create: can't find connection to daemon for pid %d\n",
500            getpid())
502         return NULL;
503     }
505     cmd.cmd = LAD_MESSAGEQ_CREATE;
506     cmd.clientId = handle;
508     if (name == NULL) {
509         cmd.args.messageQCreate.name[0] = '\0';
510     }
511     else {
512         strncpy(cmd.args.messageQCreate.name, name,
513                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
514         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
515     }
517     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
519     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
520         PRINTVERBOSE1(
521           "MessageQ_create: sending LAD command failed, status=%d\n", status)
522         return NULL;
523     }
525     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
526         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
527         return NULL;
528     }
529     status = rsp.messageQCreate.status;
531     PRINTVERBOSE2(
532       "MessageQ_create: got LAD response for client %d, status=%d\n",
533       handle, status)
535     if (status == -1) {
536        PRINTVERBOSE1(
537           "MessageQ_create: MessageQ server operation failed, status=%d\n",
538           status)
539        return NULL;
540     }
542     /* Create the generic obj */
543     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
545    /* Populate the params member */
546     memcpy(&obj->params, &ps, sizeof(ps));
548     queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
550     obj->queue = rsp.messageQCreate.queueId;
551     obj->serverHandle = rsp.messageQCreate.serverHandle;
552     CIRCLEQ_INIT(&obj->msgList);
553     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
554         PRINTVERBOSE1(
555           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
557         MessageQ_delete((MessageQ_Handle *)&obj);
559         return NULL;
560     }
562     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
563             "queueIndex %d\n", name, queueIndex)
565     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
566         for (priority = 0; priority < 2; priority++) {
567             transport = MessageQ_module->transports[clusterId][priority];
568             if (transport) {
569                 /* need to check return and do something if error */
570                 IMessageQTransport_bind((Void *)transport, obj->queue);
571             }
572         }
573     }
575     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
576         transInst = MessageQ_module->transInst[tid];
577         if (transInst) {
578             /* need to check return and do something if error */
579             INetworkTransport_bind((Void *)transInst, obj->queue);
580         }
581     }
583     /*
584      * Since LAD's MessageQ_module can grow, we need to be able to grow as well
585      */
586     if (queueIndex >= MessageQ_module->numQueues) {
587         _MessageQ_grow(queueIndex);
588     }
590     /*
591      * No need to "allocate" slot since the queueIndex returned by
592      * LAD is guaranteed to be unique.
593      */
594     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
596     return (MessageQ_Handle)obj;
599 /*
600  * MessageQ_delete - delete a MessageQ object.
601  */
602 Int MessageQ_delete(MessageQ_Handle *handlePtr)
604     MessageQ_Object *obj;
605     IMessageQTransport_Handle transport;
606     INetworkTransport_Handle transInst;
607     Int              status = MessageQ_S_SUCCESS;
608     UInt16           queueIndex;
609     UInt16                clusterId;
610     Int                   tid;
611     Int                   priority;
612     LAD_ClientHandle handle;
613     struct LAD_CommandObj cmd;
614     union LAD_ResponseObj rsp;
616     handle = LAD_findHandle();
617     if (handle == LAD_MAXNUMCLIENTS) {
618         PRINTVERBOSE1(
619           "MessageQ_delete: can't find connection to daemon for pid %d\n",
620            getpid())
622         return MessageQ_E_FAIL;
623     }
625     obj = (MessageQ_Object *)(*handlePtr);
627     cmd.cmd = LAD_MESSAGEQ_DELETE;
628     cmd.clientId = handle;
629     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
631     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
632         PRINTVERBOSE1(
633           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
634         return MessageQ_E_FAIL;
635     }
637     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
638         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
639         return MessageQ_E_FAIL;
640     }
641     status = rsp.messageQDelete.status;
643     PRINTVERBOSE2(
644       "MessageQ_delete: got LAD response for client %d, status=%d\n",
645       handle, status)
647     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
648         for (priority = 0; priority < 2; priority++) {
649             transport = MessageQ_module->transports[clusterId][priority];
650             if (transport) {
651                 IMessageQTransport_unbind((Void *)transport, obj->queue);
652             }
653         }
654     }
656     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
657         transInst = MessageQ_module->transInst[tid];
658         if (transInst) {
659             INetworkTransport_unbind((Void *)transInst, obj->queue);
660         }
661     }
663     queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
664     MessageQ_module->queues[queueIndex] = NULL;
666     free(obj);
667     *handlePtr = NULL;
669     return status;
672 /*
673  *  MessageQ_open - Opens an instance of MessageQ for sending.
674  */
675 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
677     Int status = MessageQ_S_SUCCESS;
679     status = NameServer_getUInt32(MessageQ_module->nameServer,
680                                   name, queueId, NULL);
682     if (status == NameServer_E_NOTFOUND) {
683         /* Set return queue ID to invalid */
684         *queueId = MessageQ_INVALIDMESSAGEQ;
685         status = MessageQ_E_NOTFOUND;
686     }
687     else if (status >= 0) {
688         /* Override with a MessageQ status code */
689         status = MessageQ_S_SUCCESS;
690     }
691     else {
692         /* Set return queue ID to invalid */
693         *queueId = MessageQ_INVALIDMESSAGEQ;
695         /* Override with a MessageQ status code */
696         if (status == NameServer_E_TIMEOUT) {
697             status = MessageQ_E_TIMEOUT;
698         }
699         else {
700             status = MessageQ_E_FAIL;
701         }
702     }
704     return status;
707 /*
708  *  MessageQ_close - Closes previously opened instance of MessageQ.
709  */
710 Int MessageQ_close(MessageQ_QueueId *queueId)
712     Int32 status = MessageQ_S_SUCCESS;
714     /* Nothing more to be done for closing the MessageQ. */
715     *queueId = MessageQ_INVALIDMESSAGEQ;
717     return status;
720 /*
721  * MessageQ_put - place a message onto a message queue.
722  *
723  * Calls transport's put(), which handles the sending of the message using the
724  * appropriate kernel interface (socket, device ioctl) call for the remote
725  * procId encoded in the queueId argument.
726  *
727  */
728 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
730     MessageQ_Object *obj;
731     UInt16   dstProcId  = (UInt16)(queueId >> 16);
732     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
733     Int      status = MessageQ_S_SUCCESS;
734     ITransport_Handle transport;
735     IMessageQTransport_Handle msgTrans;
736     INetworkTransport_Handle netTrans;
737     Int priority;
738     UInt tid;
739     UInt16 clusterId;
741     msg->dstId     = queueIndex;
742     msg->dstProc   = dstProcId;
744     /* invoke put hook function after addressing the message */
745     if (MessageQ_module->putHookFxn != NULL) {
746         MessageQ_module->putHookFxn(queueId, msg);
747     }
749     if (dstProcId != MultiProc_self()) {
750         tid = MessageQ_getTransportId(msg);
751         if (tid == 0) {
752             priority = MessageQ_getMsgPri(msg);
753             clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
755             /* primary transport can only be used for intra-cluster delivery */
756             if (clusterId > MultiProc_getNumProcsInCluster()) {
757                 printf("MessageQ_put: Error: destination procId=%d is not "
758                         "in cluster. Must specify a transportId.\n", dstProcId);
759                 return MessageQ_E_FAIL;
760             }
762             msgTrans = MessageQ_module->transports[clusterId][priority];
764             IMessageQTransport_put(msgTrans, (Ptr)msg);
765         }
766         else {
767             if (tid >= MessageQ_MAXTRANSPORTS) {
768                 printf("MessageQ_put: transport id %d too big, must be < %d\n",
769                        tid, MessageQ_MAXTRANSPORTS);
770                 return MessageQ_E_FAIL;
771             }
773             /* use secondary transport */
774             netTrans = MessageQ_module->transInst[tid];
775             transport = INetworkTransport_upCast(netTrans);
777             /* downcast instance pointer to transport interface */
778             switch (ITransport_itype(transport)) {
779                 case INetworkTransport_TypeId:
780                     INetworkTransport_put(netTrans, (Ptr)msg);
781                     break;
783                 default:
784                     /* error */
785                     printf("MessageQ_put: Error: transport id %d is an "
786                             "unsupported transport type\n", tid);
787                     status = MessageQ_E_FAIL;
788                     break;
789             }
790         }
791     }
792     else {
793         obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
795         pthread_mutex_lock(&MessageQ_module->gate);
797         /* It is a local MessageQ */
798         CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
800         pthread_mutex_unlock(&MessageQ_module->gate);
802         sem_post(&obj->synchronizer);
803     }
805     return status;
808 /*
809  *  MessageQ_get - gets a message for a message queue and blocks if
810  *  the queue is empty.
811  *
812  *  If a message is present, it returns it.  Otherwise it blocks
813  *  waiting for a message to arrive.
814  *  When a message is returned, it is owned by the caller.
815  */
816 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
818     MessageQ_Object * obj = (MessageQ_Object *)handle;
819     Int     status = MessageQ_S_SUCCESS;
820     struct timespec ts;
821     struct timeval tv;
823 #if 0
824 /*
825  * Optimization here to get a message without going in to the sem
826  * operation, but the sem count will not be maintained properly.
827  */
828     pthread_mutex_lock(&MessageQ_module->gate);
830     if (obj->msgList.cqh_first != &obj->msgList) {
831         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
832         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
834         pthread_mutex_unlock(&MessageQ_module->gate);
835     }
836     else {
837         pthread_mutex_unlock(&MessageQ_module->gate);
838     }
839 #endif
841     if (timeout == MessageQ_FOREVER) {
842         sem_wait(&obj->synchronizer);
843     }
844     else {
845         gettimeofday(&tv, NULL);
846         ts.tv_sec = tv.tv_sec;
847         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
849         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
850             if (errno == ETIMEDOUT) {
851                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
853                 return MessageQ_E_TIMEOUT;
854             }
855         }
856     }
858     if (obj->unblocked) {
859         return MessageQ_E_UNBLOCKED;
860     }
862     pthread_mutex_lock(&MessageQ_module->gate);
864     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
865     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
867     pthread_mutex_unlock(&MessageQ_module->gate);
869     return status;
872 /*
873  * Return a count of the number of messages in the queue
874  *
875  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
876  */
877 Int MessageQ_count(MessageQ_Handle handle)
879     Int               count = -1;
880 #if 0
881     MessageQ_Object * obj   = (MessageQ_Object *) handle;
882     socklen_t         optlen;
884     /*
885      * TBD: Need to find a way to implement (if anyone uses it!), and
886      * push down into transport..
887      */
889     /*
890      * 2nd arg to getsockopt should be transport independent, but using
891      *  SSKPROTO_SHMFIFO for now:
892      */
893     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
894                  &count, &optlen);
895 #endif
897     return count;
900 /*
901  *  Initializes a message not obtained from MessageQ_alloc.
902  */
903 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
905     /* Fill in the fields of the message */
906     MessageQ_msgInit(msg);
907     msg->heapId = MessageQ_STATICMSG;
908     msg->msgSize = size;
911 /*
912  *  Allocate a message and initialize the needed fields (note some
913  *  of the fields in the header are set via other APIs or in the
914  *  MessageQ_put function,
915  */
916 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
918     MessageQ_Msg msg;
920     /*
921      * heapId not used for local alloc (as this is over a copy transport), but
922      * we need to send to other side as heapId is used in BIOS transport.
923      */
924     msg = (MessageQ_Msg)calloc(1, size);
925     MessageQ_msgInit(msg);
926     msg->msgSize = size;
927     msg->heapId = heapId;
929     return msg;
932 /*
933  *  Frees the message back to the heap that was used to allocate it.
934  */
935 Int MessageQ_free(MessageQ_Msg msg)
937     UInt32 status = MessageQ_S_SUCCESS;
939     /* Check to ensure this was not allocated by user: */
940     if (msg->heapId == MessageQ_STATICMSG) {
941         status = MessageQ_E_CANNOTFREESTATICMSG;
942     }
943     else {
944         free(msg);
945     }
947     return status;
950 /* Register a heap with MessageQ. */
951 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
953     Int status = MessageQ_S_SUCCESS;
955     /* Do nothing, as this uses a copy transport */
957     return status;
960 /* Unregister a heap with MessageQ. */
961 Int MessageQ_unregisterHeap(UInt16 heapId)
963     Int status = MessageQ_S_SUCCESS;
965     /* Do nothing, as this uses a copy transport */
967     return status;
970 /* Unblocks a MessageQ */
971 Void MessageQ_unblock(MessageQ_Handle handle)
973     MessageQ_Object *obj = (MessageQ_Object *)handle;
975     obj->unblocked = TRUE;
976     sem_post(&obj->synchronizer);
979 /* Embeds a source message queue into a message */
980 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
982     MessageQ_Object *obj = (MessageQ_Object *)handle;
984     msg->replyId = (UInt16)(obj->queue);
985     msg->replyProc = (UInt16)(obj->queue >> 16);
988 /* Returns the QueueId associated with the handle. */
989 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
991     MessageQ_Object *obj = (MessageQ_Object *) handle;
992     UInt32 queueId;
994     queueId = (obj->queue);
996     return queueId;
999 /* Sets the tracing of a message */
1000 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1002     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1005 /*
1006  *  Returns the amount of shared memory used by one transport instance.
1007  *
1008  *  The MessageQ module itself does not use any shared memory but the
1009  *  underlying transport may use some shared memory.
1010  */
1011 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1013     SizeT memReq = 0u;
1015     /* Do nothing, as this is a copy transport. */
1017     return memReq;
1020 /*
1021  * This is a helper function to initialize a message.
1022  */
1023 Void MessageQ_msgInit(MessageQ_Msg msg)
1025 #if 0
1026     Int                 status    = MessageQ_S_SUCCESS;
1027     LAD_ClientHandle handle;
1028     struct LAD_CommandObj cmd;
1029     union LAD_ResponseObj rsp;
1031     handle = LAD_findHandle();
1032     if (handle == LAD_MAXNUMCLIENTS) {
1033         PRINTVERBOSE1(
1034           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1035            getpid())
1037         return;
1038     }
1040     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1041     cmd.clientId = handle;
1043     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1044         PRINTVERBOSE1(
1045           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1046         return;
1047     }
1049     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1050         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1051         return;
1052     }
1053     status = rsp.msgInit.status;
1055     PRINTVERBOSE2(
1056       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1057       handle, status)
1059     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1060 #else
1061     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1062     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1063     msg->msgId     = MessageQ_INVALIDMSGID;
1064     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1065     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1066     msg->srcProc   = MultiProc_self();
1068     pthread_mutex_lock(&MessageQ_module->gate);
1069     msg->seqNum  = MessageQ_module->seqNum++;
1070     pthread_mutex_unlock(&MessageQ_module->gate);
1071 #endif
1074 /*
1075  * Grow module's queues[] array to accommodate queueIndex from LAD
1076  */
1077 Void _MessageQ_grow(UInt16 queueIndex)
1079     MessageQ_Handle *queues;
1080     MessageQ_Handle *oldQueues;
1081     UInt oldSize;
1083     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1085     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1086     memcpy(queues, MessageQ_module->queues, oldSize);
1088     oldQueues = MessageQ_module->queues;
1089     MessageQ_module->queues = queues;
1090     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1092     free(oldQueues);
1094     return;