]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/api/MessageQ.c
SDOCM00114730 Reserved message queues on Linux/Keystone II
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
122     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
123     MessageQ_PutHookFxn       putHookFxn;
124 } MessageQ_ModuleObject;
126 typedef struct MessageQ_CIRCLEQ_ENTRY {
127      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
128 } MessageQ_CIRCLEQ_ENTRY;
130 /*!
131  *  @brief  Structure for the Handle for the MessageQ.
132  */
133 typedef struct MessageQ_Object_tag {
134     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
135     MessageQ_Params              params;
136     MessageQ_QueueId             queue;
137     int                          unblocked;
138     void                         *serverHandle;
139     sem_t                        synchronizer;
140 } MessageQ_Object;
142 /* traces in this file are controlled via _MessageQ_verbose */
143 Bool _MessageQ_verbose = FALSE;
144 #define verbose _MessageQ_verbose
146 /* =============================================================================
147  *  Globals
148  * =============================================================================
149  */
150 static MessageQ_ModuleObject MessageQ_state =
152     .refCount   = 0,
153     .nameServer = NULL,
154     .putHookFxn = NULL
155 };
157 /*!
158  *  @var    MessageQ_module
159  *
160  *  @brief  Pointer to the MessageQ module state.
161  */
162 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
164 Void _MessageQ_grow(UInt16 queueIndex);
166 /* =============================================================================
167  * APIS
168  * =============================================================================
169  */
171 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
172                                 UInt16 rprocId, UInt priority)
174     Int status = FALSE;
175     UInt16 clusterId;
177     if (handle == NULL) {
178         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
179               );
181         return status;
182     }
184     /* map procId to clusterId */
185     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
187     if (clusterId >= MultiProc_MAXPROCESSORS) {
188         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
190         return status;
191     }
193     if (MessageQ_module->transports[clusterId][priority] == NULL) {
194         MessageQ_module->transports[clusterId][priority] = handle;
196         status = TRUE;
197     }
199     return status;
202 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
204     if (inst == NULL) {
205         printf("MessageQ_registerTransportId: invalid NULL handle\n");
207         return MessageQ_E_INVALIDARG;
208     }
210     if (tid >= MessageQ_MAXTRANSPORTS) {
211         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
212                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
214         return MessageQ_E_INVALIDARG;
215     }
217     if (MessageQ_module->transInst[tid] != NULL) {
218         printf("MessageQ_registerTransportId: transport id %d already "
219                 "registered\n", tid);
221         return MessageQ_E_ALREADYEXISTS;
222     }
224     MessageQ_module->transInst[tid] = inst;
226     return MessageQ_S_SUCCESS;
229 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
231     UInt16 clusterId;
233     /* map procId to clusterId */
234     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
236     if (clusterId >= MultiProc_MAXPROCESSORS) {
237         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
239         return;
240     }
242     MessageQ_module->transports[clusterId][priority] = NULL;
245 Void MessageQ_unregisterTransportId(UInt tid)
247     if (tid >= MessageQ_MAXTRANSPORTS) {
248         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
249                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
251         return;
252     }
254     MessageQ_module->transInst[tid] = NULL;
257 /*
258  * Function to get default configuration for the MessageQ module.
259  */
260 Void MessageQ_getConfig(MessageQ_Config *cfg)
262     Int status;
263     LAD_ClientHandle handle;
264     struct LAD_CommandObj cmd;
265     union LAD_ResponseObj rsp;
267     assert (cfg != NULL);
269     handle = LAD_findHandle();
270     if (handle == LAD_MAXNUMCLIENTS) {
271         PRINTVERBOSE1(
272           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
273            getpid())
275         return;
276     }
278     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
279     cmd.clientId = handle;
281     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
282         PRINTVERBOSE1(
283           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
284         return;
285     }
287     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
288         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
289                       status)
290         return;
291     }
292     status = rsp.messageQGetConfig.status;
294     PRINTVERBOSE2(
295       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
296       handle, status)
298     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
300     return;
303 /*
304  *  Function to setup the MessageQ module.
305  */
306 Int MessageQ_setup(const MessageQ_Config *cfg)
308     Int status;
309     LAD_ClientHandle handle;
310     struct LAD_CommandObj cmd;
311     union LAD_ResponseObj rsp;
312     Int pri;
313     Int i;
314     Int tid;
316     pthread_mutex_lock(&MessageQ_module->gate);
318     MessageQ_module->refCount++;
319     if (MessageQ_module->refCount > 1) {
321         pthread_mutex_unlock(&MessageQ_module->gate);
323         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
324                       MessageQ_module->refCount)
326         return MessageQ_S_ALREADYSETUP;
327     }
329     pthread_mutex_unlock(&MessageQ_module->gate);
331     handle = LAD_findHandle();
332     if (handle == LAD_MAXNUMCLIENTS) {
333         PRINTVERBOSE1(
334           "MessageQ_setup: can't find connection to daemon for pid %d\n",
335            getpid())
337         return MessageQ_E_RESOURCE;
338     }
340     cmd.cmd = LAD_MESSAGEQ_SETUP;
341     cmd.clientId = handle;
342     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
344     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
345         PRINTVERBOSE1(
346           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
347         return MessageQ_E_FAIL;
348     }
350     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
351         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
352         return status;
353     }
354     status = rsp.setup.status;
356     PRINTVERBOSE2(
357       "MessageQ_setup: got LAD response for client %d, status=%d\n",
358       handle, status)
360     MessageQ_module->seqNum = 0;
361     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
362     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
363     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
364                                      sizeof (MessageQ_Handle));
366     pthread_mutex_init(&MessageQ_module->gate, NULL);
368     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
369         for (pri = 0; pri < 2; pri++) {
370             MessageQ_module->transports[i][pri] = NULL;
371         }
372     }
374     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
375         MessageQ_module->transInst[tid] = NULL;
376     }
378     return status;
381 /*
382  *  MessageQ_destroy - destroy the MessageQ module.
383  */
384 Int MessageQ_destroy(void)
386     Int status;
387     LAD_ClientHandle handle;
388     struct LAD_CommandObj cmd;
389     union LAD_ResponseObj rsp;
391     handle = LAD_findHandle();
392     if (handle == LAD_MAXNUMCLIENTS) {
393         PRINTVERBOSE1(
394           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
395            getpid())
397         return MessageQ_E_RESOURCE;
398     }
400     cmd.cmd = LAD_MESSAGEQ_DESTROY;
401     cmd.clientId = handle;
403     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
404         PRINTVERBOSE1(
405           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
406         return MessageQ_E_FAIL;
407     }
409     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
410         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
411         return status;
412     }
413     status = rsp.status;
415     PRINTVERBOSE2(
416       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
417       handle, status)
419     return status;
422 /*
423  *  ======== MessageQ_Params_init ========
424  *  Legacy implementation.
425  */
426 Void MessageQ_Params_init(MessageQ_Params *params)
428     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
431 /*
432  *  ======== MessageQ_Params_init__S ========
433  *  New implementation which is version aware.
434  */
435 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
437     MessageQ_Params_Version2 *params2;
439     switch (version) {
441         case MessageQ_Params_VERSION_2:
442             params2 = (MessageQ_Params_Version2 *)params;
443             params2->__version = MessageQ_Params_VERSION_2;
444             params2->synchronizer = NULL;
445             params2->queueIndex = MessageQ_ANY;
446             break;
448         default:
449             assert(FALSE);
450             break;
451     }
454 /*
455  *  ======== MessageQ_create ========
456  */
457 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
459     Int                   status;
460     MessageQ_Object      *obj = NULL;
461     IMessageQTransport_Handle transport;
462     INetworkTransport_Handle netTrans;
463     ITransport_Handle     baseTrans;
464     UInt16                queueIndex;
465     UInt16                clusterId;
466     Int                   tid;
467     Int                   priority;
468     LAD_ClientHandle      handle;
469     struct LAD_CommandObj cmd;
470     union LAD_ResponseObj rsp;
471     MessageQ_Params ps;
473     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
475     /* copy the given params into the current params structure */
476     if (pp != NULL) {
478         /* snoop the params pointer to see if it's a legacy structure */
479         if ((pp->__version == 0) || (pp->__version > 100)) {
480             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
481         }
483         /* not legacy structure, use params version field */
484         else if (pp->__version == MessageQ_Params_VERSION_2) {
485             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
486             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
487             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
488         }
489         else {
490             assert(FALSE);
491         }
492     }
494     handle = LAD_findHandle();
495     if (handle == LAD_MAXNUMCLIENTS) {
496         PRINTVERBOSE1(
497           "MessageQ_create: can't find connection to daemon for pid %d\n",
498            getpid())
500         return NULL;
501     }
503     cmd.cmd = LAD_MESSAGEQ_CREATE;
504     cmd.clientId = handle;
506     if (name == NULL) {
507         cmd.args.messageQCreate.name[0] = '\0';
508     }
509     else {
510         strncpy(cmd.args.messageQCreate.name, name,
511                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
512         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
513     }
515     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
517     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
518         PRINTVERBOSE1(
519           "MessageQ_create: sending LAD command failed, status=%d\n", status)
520         return NULL;
521     }
523     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
524         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
525         return NULL;
526     }
527     status = rsp.messageQCreate.status;
529     PRINTVERBOSE2(
530       "MessageQ_create: got LAD response for client %d, status=%d\n",
531       handle, status)
533     if (status == -1) {
534        PRINTVERBOSE1(
535           "MessageQ_create: MessageQ server operation failed, status=%d\n",
536           status)
537        return NULL;
538     }
540     /* Create the generic obj */
541     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
543    /* Populate the params member */
544     memcpy(&obj->params, &ps, sizeof(ps));
547     obj->queue = rsp.messageQCreate.queueId;
548     obj->serverHandle = rsp.messageQCreate.serverHandle;
549     CIRCLEQ_INIT(&obj->msgList);
550     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
551         PRINTVERBOSE1(
552           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
554         MessageQ_delete((MessageQ_Handle *)&obj);
556         return NULL;
557     }
559     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
560     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
562     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
563             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
565     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
566         for (priority = 0; priority < 2; priority++) {
567             transport = MessageQ_module->transports[clusterId][priority];
568             if (transport) {
569                 /* need to check return and do something if error */
570                 IMessageQTransport_bind((Void *)transport, obj->queue);
571             }
572         }
573     }
575     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
576         baseTrans = MessageQ_module->transInst[tid];
578         if (baseTrans != NULL) {
579             switch (ITransport_itype(baseTrans)) {
580                 case INetworkTransport_TypeId:
581                     netTrans = INetworkTransport_downCast(baseTrans);
582                     INetworkTransport_bind((void *)netTrans, obj->queue);
583                     break;
585                 default:
586                     /* error */
587                     printf("MessageQ_create: Error: transport id %d is an "
588                             "unsupported transport type.\n", tid);
589                     break;
590             }
591         }
592     }
594     /* LAD's MessageQ module can grow, we need to grow as well */
595     if (queueIndex >= MessageQ_module->numQueues) {
596         _MessageQ_grow(queueIndex);
597     }
599     /*  No need to "allocate" slot since the queueIndex returned by
600      *  LAD is guaranteed to be unique.
601      */
602     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
604     return (MessageQ_Handle)obj;
607 /*
608  *  ======== MessageQ_delete ========
609  */
610 Int MessageQ_delete(MessageQ_Handle *handlePtr)
612     MessageQ_Object *obj;
613     IMessageQTransport_Handle transport;
614     INetworkTransport_Handle  netTrans;
615     ITransport_Handle         baseTrans;
616     Int              status = MessageQ_S_SUCCESS;
617     UInt16           queueIndex;
618     UInt16                clusterId;
619     Int                   tid;
620     Int                   priority;
621     LAD_ClientHandle handle;
622     struct LAD_CommandObj cmd;
623     union LAD_ResponseObj rsp;
625     handle = LAD_findHandle();
626     if (handle == LAD_MAXNUMCLIENTS) {
627         PRINTVERBOSE1(
628           "MessageQ_delete: can't find connection to daemon for pid %d\n",
629            getpid())
631         return MessageQ_E_FAIL;
632     }
634     obj = (MessageQ_Object *)(*handlePtr);
636     cmd.cmd = LAD_MESSAGEQ_DELETE;
637     cmd.clientId = handle;
638     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
640     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
641         PRINTVERBOSE1(
642           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
643         return MessageQ_E_FAIL;
644     }
646     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
647         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
648         return MessageQ_E_FAIL;
649     }
650     status = rsp.messageQDelete.status;
652     PRINTVERBOSE2(
653       "MessageQ_delete: got LAD response for client %d, status=%d\n",
654       handle, status)
656     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
657         for (priority = 0; priority < 2; priority++) {
658             transport = MessageQ_module->transports[clusterId][priority];
659             if (transport) {
660                 IMessageQTransport_unbind((Void *)transport, obj->queue);
661             }
662         }
663     }
665     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
666         baseTrans = MessageQ_module->transInst[tid];
668         if (baseTrans != NULL) {
669             switch (ITransport_itype(baseTrans)) {
670                 case INetworkTransport_TypeId:
671                     netTrans = INetworkTransport_downCast(baseTrans);
672                     INetworkTransport_unbind((void *)netTrans, obj->queue);
673                     break;
675                 default:
676                     /* error */
677                     printf("MessageQ_create: Error: transport id %d is an "
678                             "unsupported transport type.\n", tid);
679                     break;
680             }
681         }
682     }
684     /* extract the queue index from the queueId */
685     queueIndex = MessageQ_getQueueIndex(obj->queue);
686     MessageQ_module->queues[queueIndex] = NULL;
688     free(obj);
689     *handlePtr = NULL;
691     return status;
694 /*
695  *  ======== MessageQ_open ========
696  *  Acquire a queueId for use in sending messages to the queue
697  */
698 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
700     Int status = MessageQ_S_SUCCESS;
702     status = NameServer_getUInt32(MessageQ_module->nameServer,
703                                   name, queueId, NULL);
705     if (status == NameServer_E_NOTFOUND) {
706         /* Set return queue ID to invalid */
707         *queueId = MessageQ_INVALIDMESSAGEQ;
708         status = MessageQ_E_NOTFOUND;
709     }
710     else if (status >= 0) {
711         /* Override with a MessageQ status code */
712         status = MessageQ_S_SUCCESS;
713     }
714     else {
715         /* Set return queue ID to invalid */
716         *queueId = MessageQ_INVALIDMESSAGEQ;
718         /* Override with a MessageQ status code */
719         if (status == NameServer_E_TIMEOUT) {
720             status = MessageQ_E_TIMEOUT;
721         }
722         else {
723             status = MessageQ_E_FAIL;
724         }
725     }
727     return status;
730 /*
731  *  ======== MessageQ_openQueueId ========
732  */
733 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
735     MessageQ_QueueIndex queuePort;
736     MessageQ_QueueId queueId;
738     /* queue port is embedded in the queueId */
739     queuePort = queueIndex + MessageQ_PORTOFFSET;
740     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
742     return (queueId);
745 /*
746  *  ======== MessageQ_close ========
747  *  Closes previously opened instance of MessageQ
748  */
749 Int MessageQ_close(MessageQ_QueueId *queueId)
751     Int32 status = MessageQ_S_SUCCESS;
753     /* Nothing more to be done for closing the MessageQ. */
754     *queueId = MessageQ_INVALIDMESSAGEQ;
756     return status;
759 /*
760  *  ======== MessageQ_put ========
761  *  Place a message onto a message queue
762  *
763  *  Calls transport's put(), which handles the sending of the message
764  *  using the appropriate kernel interface (socket, device ioctl) call
765  *  for the remote procId encoded in the queueId argument.
766  */
767 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
769     MessageQ_Object *obj;
770     UInt16   dstProcId  = (UInt16)(queueId >> 16);
771     UInt16   queueIndex;
772     UInt16   queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
773     Int      status = MessageQ_S_SUCCESS;
774     ITransport_Handle baseTrans;
775     IMessageQTransport_Handle msgTrans;
776     INetworkTransport_Handle netTrans;
777     Int priority;
778     UInt tid;
779     UInt16 clusterId;
781     /* use the queue port # for destination address */
782     msg->dstId = queuePort;
783     msg->dstProc= dstProcId;
785     /* invoke put hook function after addressing the message */
786     if (MessageQ_module->putHookFxn != NULL) {
787         MessageQ_module->putHookFxn(queueId, msg);
788     }
790     /* extract the transport ID from the message header */
791     tid = MessageQ_getTransportId(msg);
793     if (tid >= MessageQ_MAXTRANSPORTS) {
794         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
795                 tid, MessageQ_MAXTRANSPORTS);
796         return (MessageQ_E_FAIL);
797     }
799     /* if tid is set, use secondary transport regardless of destination */
800     if (tid != 0) {
801         baseTrans = MessageQ_module->transInst[tid];
803         if (baseTrans == NULL) {
804             printf("MessageQ_put: Error: transport is null\n");
805             return (MessageQ_E_FAIL);
806         }
808         /* downcast instance pointer to transport interface */
809         switch (ITransport_itype(baseTrans)) {
810             case INetworkTransport_TypeId:
811                 netTrans = INetworkTransport_downCast(baseTrans);
812                 INetworkTransport_put(netTrans, (Ptr)msg);
813                 break;
815             default:
816                 /* error */
817                 printf("MessageQ_put: Error: transport id %d is an "
818                         "unsupported transport type\n", tid);
819                 status = MessageQ_E_FAIL;
820                 break;
821         }
822     }
823     else {
824         /* if destination on another processor, use primary transport */
825         if (dstProcId != MultiProc_self()) {
826             priority = MessageQ_getMsgPri(msg);
827             clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
829             /* primary transport can only be used for intra-cluster delivery */
830             if (clusterId > MultiProc_getNumProcsInCluster()) {
831                 printf("MessageQ_put: Error: destination procId=%d is not "
832                         "in cluster. Must specify a transportId.\n", dstProcId);
833                 return (MessageQ_E_FAIL);
834             }
836             msgTrans = MessageQ_module->transports[clusterId][priority];
838             IMessageQTransport_put(msgTrans, (Ptr)msg);
839         }
840         else {
841             /* check if destination queue is in this process */
842             queueIndex = queuePort - MessageQ_PORTOFFSET;
844             if (queueIndex >= MessageQ_module->numQueues) {
845                 printf("MessageQ_put: Error: unable to deliver message, "
846                         "queueIndex too large or transportId missing.\n");
847                 return (MessageQ_E_FAIL);
848             }
850             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
852             if (obj == NULL) {
853                 printf("MessageQ_put: Error: unable to deliver message, "
854                         "destination queue not in this process.\n");
855                 return (MessageQ_E_FAIL);
856             }
858             /* deliver message to process local queue */
859             pthread_mutex_lock(&MessageQ_module->gate);
860             CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
861                     elem);
862             pthread_mutex_unlock(&MessageQ_module->gate);
863             sem_post(&obj->synchronizer);
864         }
865     }
867     return (status);
870 /*
871  *  MessageQ_get - gets a message for a message queue and blocks if
872  *  the queue is empty.
873  *
874  *  If a message is present, it returns it.  Otherwise it blocks
875  *  waiting for a message to arrive.
876  *  When a message is returned, it is owned by the caller.
877  */
878 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
880     MessageQ_Object * obj = (MessageQ_Object *)handle;
881     Int     status = MessageQ_S_SUCCESS;
882     struct timespec ts;
883     struct timeval tv;
885 #if 0
886 /*
887  * Optimization here to get a message without going in to the sem
888  * operation, but the sem count will not be maintained properly.
889  */
890     pthread_mutex_lock(&MessageQ_module->gate);
892     if (obj->msgList.cqh_first != &obj->msgList) {
893         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
894         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
896         pthread_mutex_unlock(&MessageQ_module->gate);
897     }
898     else {
899         pthread_mutex_unlock(&MessageQ_module->gate);
900     }
901 #endif
903     if (timeout == MessageQ_FOREVER) {
904         sem_wait(&obj->synchronizer);
905     }
906     else {
907         gettimeofday(&tv, NULL);
908         ts.tv_sec = tv.tv_sec;
909         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
911         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
912             if (errno == ETIMEDOUT) {
913                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
915                 return MessageQ_E_TIMEOUT;
916             }
917         }
918     }
920     if (obj->unblocked) {
921         return MessageQ_E_UNBLOCKED;
922     }
924     pthread_mutex_lock(&MessageQ_module->gate);
926     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
927     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
929     pthread_mutex_unlock(&MessageQ_module->gate);
931     return status;
934 /*
935  * Return a count of the number of messages in the queue
936  *
937  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
938  */
939 Int MessageQ_count(MessageQ_Handle handle)
941     Int               count = -1;
942 #if 0
943     MessageQ_Object * obj   = (MessageQ_Object *) handle;
944     socklen_t         optlen;
946     /*
947      * TBD: Need to find a way to implement (if anyone uses it!), and
948      * push down into transport..
949      */
951     /*
952      * 2nd arg to getsockopt should be transport independent, but using
953      *  SSKPROTO_SHMFIFO for now:
954      */
955     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
956                  &count, &optlen);
957 #endif
959     return count;
962 /*
963  *  Initializes a message not obtained from MessageQ_alloc.
964  */
965 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
967     /* Fill in the fields of the message */
968     MessageQ_msgInit(msg);
969     msg->heapId = MessageQ_STATICMSG;
970     msg->msgSize = size;
973 /*
974  *  Allocate a message and initialize the needed fields (note some
975  *  of the fields in the header are set via other APIs or in the
976  *  MessageQ_put function,
977  */
978 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
980     MessageQ_Msg msg;
982     /*
983      * heapId not used for local alloc (as this is over a copy transport), but
984      * we need to send to other side as heapId is used in BIOS transport.
985      */
986     msg = (MessageQ_Msg)calloc(1, size);
987     MessageQ_msgInit(msg);
988     msg->msgSize = size;
989     msg->heapId = heapId;
991     return msg;
994 /*
995  *  Frees the message back to the heap that was used to allocate it.
996  */
997 Int MessageQ_free(MessageQ_Msg msg)
999     UInt32 status = MessageQ_S_SUCCESS;
1001     /* Check to ensure this was not allocated by user: */
1002     if (msg->heapId == MessageQ_STATICMSG) {
1003         status = MessageQ_E_CANNOTFREESTATICMSG;
1004     }
1005     else {
1006         free(msg);
1007     }
1009     return status;
1012 /* Register a heap with MessageQ. */
1013 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1015     Int status = MessageQ_S_SUCCESS;
1017     /* Do nothing, as this uses a copy transport */
1019     return status;
1022 /* Unregister a heap with MessageQ. */
1023 Int MessageQ_unregisterHeap(UInt16 heapId)
1025     Int status = MessageQ_S_SUCCESS;
1027     /* Do nothing, as this uses a copy transport */
1029     return status;
1032 /* Unblocks a MessageQ */
1033 Void MessageQ_unblock(MessageQ_Handle handle)
1035     MessageQ_Object *obj = (MessageQ_Object *)handle;
1037     obj->unblocked = TRUE;
1038     sem_post(&obj->synchronizer);
1041 /* Embeds a source message queue into a message */
1042 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1044     MessageQ_Object *obj = (MessageQ_Object *)handle;
1046     msg->replyId = (UInt16)(obj->queue);
1047     msg->replyProc = (UInt16)(obj->queue >> 16);
1050 /* Returns the QueueId associated with the handle. */
1051 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1053     MessageQ_Object *obj = (MessageQ_Object *) handle;
1054     UInt32 queueId;
1056     queueId = (obj->queue);
1058     return queueId;
1061 /* Sets the tracing of a message */
1062 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1064     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1067 /*
1068  *  Returns the amount of shared memory used by one transport instance.
1069  *
1070  *  The MessageQ module itself does not use any shared memory but the
1071  *  underlying transport may use some shared memory.
1072  */
1073 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1075     SizeT memReq = 0u;
1077     /* Do nothing, as this is a copy transport. */
1079     return memReq;
1082 /*
1083  * This is a helper function to initialize a message.
1084  */
1085 Void MessageQ_msgInit(MessageQ_Msg msg)
1087 #if 0
1088     Int                 status    = MessageQ_S_SUCCESS;
1089     LAD_ClientHandle handle;
1090     struct LAD_CommandObj cmd;
1091     union LAD_ResponseObj rsp;
1093     handle = LAD_findHandle();
1094     if (handle == LAD_MAXNUMCLIENTS) {
1095         PRINTVERBOSE1(
1096           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1097            getpid())
1099         return;
1100     }
1102     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1103     cmd.clientId = handle;
1105     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1106         PRINTVERBOSE1(
1107           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1108         return;
1109     }
1111     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1112         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1113         return;
1114     }
1115     status = rsp.msgInit.status;
1117     PRINTVERBOSE2(
1118       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1119       handle, status)
1121     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1122 #else
1123     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1124     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1125     msg->msgId     = MessageQ_INVALIDMSGID;
1126     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1127     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1128     msg->srcProc   = MultiProc_self();
1130     pthread_mutex_lock(&MessageQ_module->gate);
1131     msg->seqNum  = MessageQ_module->seqNum++;
1132     pthread_mutex_unlock(&MessageQ_module->gate);
1133 #endif
1136 /*
1137  *  ======== _MessageQ_grow ========
1138  *  Increase module's queues array to accommodate queueIndex from LAD
1139  *
1140  *  Note: this function takes the queue index value (i.e. without the
1141  *  port offset).
1142  */
1143 Void _MessageQ_grow(UInt16 queueIndex)
1145     MessageQ_Handle *queues;
1146     MessageQ_Handle *oldQueues;
1147     UInt oldSize;
1149     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1151     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1152     memcpy(queues, MessageQ_module->queues, oldSize);
1154     oldQueues = MessageQ_module->queues;
1155     MessageQ_module->queues = queues;
1156     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1158     free(oldQueues);
1160     return;