9ebf0063332aec489d697b1d8519aabcc4948947
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
122     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
123     MessageQ_PutHookFxn       putHookFxn;
124 } MessageQ_ModuleObject;
126 typedef struct MessageQ_CIRCLEQ_ENTRY {
127      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
128 } MessageQ_CIRCLEQ_ENTRY;
130 /*!
131  *  @brief  Structure for the Handle for the MessageQ.
132  */
133 typedef struct MessageQ_Object_tag {
134     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
135     MessageQ_Params              params;
136     MessageQ_QueueId             queue;
137     int                          unblocked;
138     void                         *serverHandle;
139     sem_t                        synchronizer;
140 } MessageQ_Object;
142 /* traces in this file are controlled via _MessageQ_verbose */
143 Bool _MessageQ_verbose = FALSE;
144 #define verbose _MessageQ_verbose
146 /* =============================================================================
147  *  Globals
148  * =============================================================================
149  */
150 static MessageQ_ModuleObject MessageQ_state =
152     .refCount   = 0,
153     .nameServer = NULL,
154     .gate       = PTHREAD_MUTEX_INITIALIZER,
155     .putHookFxn = NULL
156 };
158 /*!
159  *  @var    MessageQ_module
160  *
161  *  @brief  Pointer to the MessageQ module state.
162  */
163 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
165 Void _MessageQ_grow(UInt16 queueIndex);
167 /* =============================================================================
168  * APIS
169  * =============================================================================
170  */
172 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
173                                 UInt16 rprocId, UInt priority)
175     Int status = FALSE;
176     UInt16 clusterId;
178     if (handle == NULL) {
179         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
180               );
182         return status;
183     }
185     /* map procId to clusterId */
186     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
188     if (clusterId >= MultiProc_MAXPROCESSORS) {
189         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
191         return status;
192     }
194     if (MessageQ_module->transports[clusterId][priority] == NULL) {
195         MessageQ_module->transports[clusterId][priority] = handle;
197         status = TRUE;
198     }
200     return status;
203 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
205     if (inst == NULL) {
206         printf("MessageQ_registerTransportId: invalid NULL handle\n");
208         return MessageQ_E_INVALIDARG;
209     }
211     if (tid >= MessageQ_MAXTRANSPORTS) {
212         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
213                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
215         return MessageQ_E_INVALIDARG;
216     }
218     if (MessageQ_module->transInst[tid] != NULL) {
219         printf("MessageQ_registerTransportId: transport id %d already "
220                 "registered\n", tid);
222         return MessageQ_E_ALREADYEXISTS;
223     }
225     MessageQ_module->transInst[tid] = inst;
227     return MessageQ_S_SUCCESS;
230 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
232     UInt16 clusterId;
234     /* map procId to clusterId */
235     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
237     if (clusterId >= MultiProc_MAXPROCESSORS) {
238         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
240         return;
241     }
243     MessageQ_module->transports[clusterId][priority] = NULL;
246 Void MessageQ_unregisterTransportId(UInt tid)
248     if (tid >= MessageQ_MAXTRANSPORTS) {
249         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
250                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
252         return;
253     }
255     MessageQ_module->transInst[tid] = NULL;
258 /*
259  * Function to get default configuration for the MessageQ module.
260  */
261 Void MessageQ_getConfig(MessageQ_Config *cfg)
263     Int status;
264     LAD_ClientHandle handle;
265     struct LAD_CommandObj cmd;
266     union LAD_ResponseObj rsp;
268     assert (cfg != NULL);
270     handle = LAD_findHandle();
271     if (handle == LAD_MAXNUMCLIENTS) {
272         PRINTVERBOSE1(
273           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
274            getpid())
276         return;
277     }
279     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
280     cmd.clientId = handle;
282     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
283         PRINTVERBOSE1(
284           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
285         return;
286     }
288     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
289         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
290                       status)
291         return;
292     }
293     status = rsp.messageQGetConfig.status;
295     PRINTVERBOSE2(
296       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
297       handle, status)
299     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
301     return;
304 /*
305  *  Function to setup the MessageQ module.
306  */
307 Int MessageQ_setup(const MessageQ_Config *cfg)
309     Int status = MessageQ_S_SUCCESS;
310     LAD_ClientHandle handle;
311     struct LAD_CommandObj cmd;
312     union LAD_ResponseObj rsp;
313     Int pri;
314     Int i;
315     Int tid;
317     /* this entire function must be serialized */
318     pthread_mutex_lock(&MessageQ_module->gate);
320     /* ensure only first thread performs startup procedure */
321     if (++MessageQ_module->refCount > 1) {
322         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
323                 MessageQ_module->refCount)
324         status = MessageQ_S_ALREADYSETUP;
325         goto exit;
326     }
328     handle = LAD_findHandle();
329     if (handle == LAD_MAXNUMCLIENTS) {
330         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
331                 "pid %d\n", getpid())
332         status = MessageQ_E_RESOURCE;
333         goto exit;
334     }
336     cmd.cmd = LAD_MESSAGEQ_SETUP;
337     cmd.clientId = handle;
338     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
340     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
341         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
342                 "status=%d\n", status)
343         status = MessageQ_E_FAIL;
344         goto exit;
345     }
347     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
348         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
349         status = MessageQ_E_FAIL;
350         goto exit;
351     }
352     status = rsp.setup.status;
354     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
355             handle, status)
357     MessageQ_module->seqNum = 0;
358     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
359     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
360     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
361             sizeof(MessageQ_Handle));
363     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
364         for (pri = 0; pri < 2; pri++) {
365             MessageQ_module->transports[i][pri] = NULL;
366         }
367     }
369     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
370         MessageQ_module->transInst[tid] = NULL;
371     }
373 exit:
374     /* if error, must decrement reference count */
375     if (status < 0) {
376         MessageQ_module->refCount--;
377     }
379     pthread_mutex_unlock(&MessageQ_module->gate);
381     return (status);
384 /*
385  *  MessageQ_destroy - destroy the MessageQ module.
386  */
387 Int MessageQ_destroy(void)
389     Int status = MessageQ_S_SUCCESS;
390     LAD_ClientHandle handle;
391     struct LAD_CommandObj cmd;
392     union LAD_ResponseObj rsp;
394     /* this entire function must be serialized */
395     pthread_mutex_lock(&MessageQ_module->gate);
397     /* ensure only last thread does the work */
398     if (--MessageQ_module->refCount > 0) {
399         goto exit;
400     }
402     handle = LAD_findHandle();
403     if (handle == LAD_MAXNUMCLIENTS) {
404         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
405                 "for pid %d\n", getpid())
406         status =  MessageQ_E_RESOURCE;
407         goto exit;
408     }
410     cmd.cmd = LAD_MESSAGEQ_DESTROY;
411     cmd.clientId = handle;
413     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
414         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
415                 "status=%d\n", status)
416         status = MessageQ_E_FAIL;
417         goto exit;
418     }
420     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
421         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
422         status = MessageQ_E_FAIL;
423         goto exit;
424     }
425     status = rsp.status;
427     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
428             "status=%d\n", handle, status)
430 exit:
431     pthread_mutex_unlock(&MessageQ_module->gate);
433     return (status);
436 /*
437  *  ======== MessageQ_Params_init ========
438  *  Legacy implementation.
439  */
440 Void MessageQ_Params_init(MessageQ_Params *params)
442     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
445 /*
446  *  ======== MessageQ_Params_init__S ========
447  *  New implementation which is version aware.
448  */
449 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
451     MessageQ_Params_Version2 *params2;
453     switch (version) {
455         case MessageQ_Params_VERSION_2:
456             params2 = (MessageQ_Params_Version2 *)params;
457             params2->__version = MessageQ_Params_VERSION_2;
458             params2->synchronizer = NULL;
459             params2->queueIndex = MessageQ_ANY;
460             break;
462         default:
463             assert(FALSE);
464             break;
465     }
468 /*
469  *  ======== MessageQ_create ========
470  */
471 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
473     Int                   status;
474     MessageQ_Object      *obj = NULL;
475     IMessageQTransport_Handle transport;
476     INetworkTransport_Handle netTrans;
477     ITransport_Handle     baseTrans;
478     UInt16                queueIndex;
479     UInt16                clusterId;
480     Int                   tid;
481     Int                   priority;
482     LAD_ClientHandle      handle;
483     struct LAD_CommandObj cmd;
484     union LAD_ResponseObj rsp;
485     MessageQ_Params ps;
487     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
489     /* copy the given params into the current params structure */
490     if (pp != NULL) {
492         /* snoop the params pointer to see if it's a legacy structure */
493         if ((pp->__version == 0) || (pp->__version > 100)) {
494             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
495         }
497         /* not legacy structure, use params version field */
498         else if (pp->__version == MessageQ_Params_VERSION_2) {
499             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
500             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
501             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
502         }
503         else {
504             assert(FALSE);
505         }
506     }
508     handle = LAD_findHandle();
509     if (handle == LAD_MAXNUMCLIENTS) {
510         PRINTVERBOSE1(
511           "MessageQ_create: can't find connection to daemon for pid %d\n",
512            getpid())
514         return NULL;
515     }
517     cmd.cmd = LAD_MESSAGEQ_CREATE;
518     cmd.clientId = handle;
520     if (name == NULL) {
521         cmd.args.messageQCreate.name[0] = '\0';
522     }
523     else {
524         strncpy(cmd.args.messageQCreate.name, name,
525                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
526         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
527     }
529     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
531     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
532         PRINTVERBOSE1(
533           "MessageQ_create: sending LAD command failed, status=%d\n", status)
534         return NULL;
535     }
537     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
538         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
539         return NULL;
540     }
541     status = rsp.messageQCreate.status;
543     PRINTVERBOSE2(
544       "MessageQ_create: got LAD response for client %d, status=%d\n",
545       handle, status)
547     if (status == -1) {
548        PRINTVERBOSE1(
549           "MessageQ_create: MessageQ server operation failed, status=%d\n",
550           status)
551        return NULL;
552     }
554     /* Create the generic obj */
555     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
557    /* Populate the params member */
558     memcpy(&obj->params, &ps, sizeof(ps));
561     obj->queue = rsp.messageQCreate.queueId;
562     obj->serverHandle = rsp.messageQCreate.serverHandle;
563     CIRCLEQ_INIT(&obj->msgList);
564     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
565         PRINTVERBOSE1(
566           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
568         MessageQ_delete((MessageQ_Handle *)&obj);
570         return NULL;
571     }
573     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
574     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
576     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
577             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
579     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
580         for (priority = 0; priority < 2; priority++) {
581             transport = MessageQ_module->transports[clusterId][priority];
582             if (transport) {
583                 /* need to check return and do something if error */
584                 IMessageQTransport_bind((Void *)transport, obj->queue);
585             }
586         }
587     }
589     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
590         baseTrans = MessageQ_module->transInst[tid];
592         if (baseTrans != NULL) {
593             switch (ITransport_itype(baseTrans)) {
594                 case INetworkTransport_TypeId:
595                     netTrans = INetworkTransport_downCast(baseTrans);
596                     INetworkTransport_bind((void *)netTrans, obj->queue);
597                     break;
599                 default:
600                     /* error */
601                     printf("MessageQ_create: Error: transport id %d is an "
602                             "unsupported transport type.\n", tid);
603                     break;
604             }
605         }
606     }
608     /* LAD's MessageQ module can grow, we need to grow as well */
609     if (queueIndex >= MessageQ_module->numQueues) {
610         _MessageQ_grow(queueIndex);
611     }
613     /*  No need to "allocate" slot since the queueIndex returned by
614      *  LAD is guaranteed to be unique.
615      */
616     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
618     return (MessageQ_Handle)obj;
621 /*
622  *  ======== MessageQ_delete ========
623  */
624 Int MessageQ_delete(MessageQ_Handle *handlePtr)
626     MessageQ_Object *obj;
627     IMessageQTransport_Handle transport;
628     INetworkTransport_Handle  netTrans;
629     ITransport_Handle         baseTrans;
630     Int              status = MessageQ_S_SUCCESS;
631     UInt16           queueIndex;
632     UInt16                clusterId;
633     Int                   tid;
634     Int                   priority;
635     LAD_ClientHandle handle;
636     struct LAD_CommandObj cmd;
637     union LAD_ResponseObj rsp;
639     handle = LAD_findHandle();
640     if (handle == LAD_MAXNUMCLIENTS) {
641         PRINTVERBOSE1(
642           "MessageQ_delete: can't find connection to daemon for pid %d\n",
643            getpid())
645         return MessageQ_E_FAIL;
646     }
648     obj = (MessageQ_Object *)(*handlePtr);
650     cmd.cmd = LAD_MESSAGEQ_DELETE;
651     cmd.clientId = handle;
652     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
654     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
655         PRINTVERBOSE1(
656           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
657         return MessageQ_E_FAIL;
658     }
660     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
661         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
662         return MessageQ_E_FAIL;
663     }
664     status = rsp.messageQDelete.status;
666     PRINTVERBOSE2(
667       "MessageQ_delete: got LAD response for client %d, status=%d\n",
668       handle, status)
670     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
671         for (priority = 0; priority < 2; priority++) {
672             transport = MessageQ_module->transports[clusterId][priority];
673             if (transport) {
674                 IMessageQTransport_unbind((Void *)transport, obj->queue);
675             }
676         }
677     }
679     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
680         baseTrans = MessageQ_module->transInst[tid];
682         if (baseTrans != NULL) {
683             switch (ITransport_itype(baseTrans)) {
684                 case INetworkTransport_TypeId:
685                     netTrans = INetworkTransport_downCast(baseTrans);
686                     INetworkTransport_unbind((void *)netTrans, obj->queue);
687                     break;
689                 default:
690                     /* error */
691                     printf("MessageQ_create: Error: transport id %d is an "
692                             "unsupported transport type.\n", tid);
693                     break;
694             }
695         }
696     }
698     /* extract the queue index from the queueId */
699     queueIndex = MessageQ_getQueueIndex(obj->queue);
700     MessageQ_module->queues[queueIndex] = NULL;
702     free(obj);
703     *handlePtr = NULL;
705     return status;
708 /*
709  *  ======== MessageQ_open ========
710  *  Acquire a queueId for use in sending messages to the queue
711  */
712 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
714     Int status = MessageQ_S_SUCCESS;
716     status = NameServer_getUInt32(MessageQ_module->nameServer,
717                                   name, queueId, NULL);
719     if (status == NameServer_E_NOTFOUND) {
720         /* Set return queue ID to invalid */
721         *queueId = MessageQ_INVALIDMESSAGEQ;
722         status = MessageQ_E_NOTFOUND;
723     }
724     else if (status >= 0) {
725         /* Override with a MessageQ status code */
726         status = MessageQ_S_SUCCESS;
727     }
728     else {
729         /* Set return queue ID to invalid */
730         *queueId = MessageQ_INVALIDMESSAGEQ;
732         /* Override with a MessageQ status code */
733         if (status == NameServer_E_TIMEOUT) {
734             status = MessageQ_E_TIMEOUT;
735         }
736         else {
737             status = MessageQ_E_FAIL;
738         }
739     }
741     return status;
744 /*
745  *  ======== MessageQ_openQueueId ========
746  */
747 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
749     MessageQ_QueueIndex queuePort;
750     MessageQ_QueueId queueId;
752     /* queue port is embedded in the queueId */
753     queuePort = queueIndex + MessageQ_PORTOFFSET;
754     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
756     return (queueId);
759 /*
760  *  ======== MessageQ_close ========
761  *  Closes previously opened instance of MessageQ
762  */
763 Int MessageQ_close(MessageQ_QueueId *queueId)
765     Int32 status = MessageQ_S_SUCCESS;
767     /* Nothing more to be done for closing the MessageQ. */
768     *queueId = MessageQ_INVALIDMESSAGEQ;
770     return status;
773 /*
774  *  ======== MessageQ_put ========
775  *  Place a message onto a message queue
776  *
777  *  Calls transport's put(), which handles the sending of the message
778  *  using the appropriate kernel interface (socket, device ioctl) call
779  *  for the remote procId encoded in the queueId argument.
780  */
781 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
783     MessageQ_Object *obj;
784     UInt16   dstProcId  = (UInt16)(queueId >> 16);
785     UInt16   queueIndex;
786     UInt16   queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
787     Int      status = MessageQ_S_SUCCESS;
788     ITransport_Handle baseTrans;
789     IMessageQTransport_Handle msgTrans;
790     INetworkTransport_Handle netTrans;
791     Int priority;
792     UInt tid;
793     UInt16 clusterId;
795     /* use the queue port # for destination address */
796     msg->dstId = queuePort;
797     msg->dstProc= dstProcId;
799     /* invoke put hook function after addressing the message */
800     if (MessageQ_module->putHookFxn != NULL) {
801         MessageQ_module->putHookFxn(queueId, msg);
802     }
804     /* extract the transport ID from the message header */
805     tid = MessageQ_getTransportId(msg);
807     if (tid >= MessageQ_MAXTRANSPORTS) {
808         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
809                 tid, MessageQ_MAXTRANSPORTS);
810         return (MessageQ_E_FAIL);
811     }
813     /* if tid is set, use secondary transport regardless of destination */
814     if (tid != 0) {
815         baseTrans = MessageQ_module->transInst[tid];
817         if (baseTrans == NULL) {
818             printf("MessageQ_put: Error: transport is null\n");
819             return (MessageQ_E_FAIL);
820         }
822         /* downcast instance pointer to transport interface */
823         switch (ITransport_itype(baseTrans)) {
824             case INetworkTransport_TypeId:
825                 netTrans = INetworkTransport_downCast(baseTrans);
826                 INetworkTransport_put(netTrans, (Ptr)msg);
827                 break;
829             default:
830                 /* error */
831                 printf("MessageQ_put: Error: transport id %d is an "
832                         "unsupported transport type\n", tid);
833                 status = MessageQ_E_FAIL;
834                 break;
835         }
836     }
837     else {
838         /* if destination on another processor, use primary transport */
839         if (dstProcId != MultiProc_self()) {
840             priority = MessageQ_getMsgPri(msg);
841             clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
843             /* primary transport can only be used for intra-cluster delivery */
844             if (clusterId > MultiProc_getNumProcsInCluster()) {
845                 printf("MessageQ_put: Error: destination procId=%d is not "
846                         "in cluster. Must specify a transportId.\n", dstProcId);
847                 return (MessageQ_E_FAIL);
848             }
850             msgTrans = MessageQ_module->transports[clusterId][priority];
852             IMessageQTransport_put(msgTrans, (Ptr)msg);
853         }
854         else {
855             /* check if destination queue is in this process */
856             queueIndex = queuePort - MessageQ_PORTOFFSET;
858             if (queueIndex >= MessageQ_module->numQueues) {
859                 printf("MessageQ_put: Error: unable to deliver message, "
860                         "queueIndex too large or transportId missing.\n");
861                 return (MessageQ_E_FAIL);
862             }
864             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
866             if (obj == NULL) {
867                 printf("MessageQ_put: Error: unable to deliver message, "
868                         "destination queue not in this process.\n");
869                 return (MessageQ_E_FAIL);
870             }
872             /* deliver message to process local queue */
873             pthread_mutex_lock(&MessageQ_module->gate);
874             CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
875                     elem);
876             pthread_mutex_unlock(&MessageQ_module->gate);
877             sem_post(&obj->synchronizer);
878         }
879     }
881     return (status);
884 /*
885  *  MessageQ_get - gets a message for a message queue and blocks if
886  *  the queue is empty.
887  *
888  *  If a message is present, it returns it.  Otherwise it blocks
889  *  waiting for a message to arrive.
890  *  When a message is returned, it is owned by the caller.
891  */
892 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
894     MessageQ_Object * obj = (MessageQ_Object *)handle;
895     Int     status = MessageQ_S_SUCCESS;
896     struct timespec ts;
897     struct timeval tv;
899 #if 0
900 /*
901  * Optimization here to get a message without going in to the sem
902  * operation, but the sem count will not be maintained properly.
903  */
904     pthread_mutex_lock(&MessageQ_module->gate);
906     if (obj->msgList.cqh_first != &obj->msgList) {
907         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
908         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
910         pthread_mutex_unlock(&MessageQ_module->gate);
911     }
912     else {
913         pthread_mutex_unlock(&MessageQ_module->gate);
914     }
915 #endif
917     if (timeout == MessageQ_FOREVER) {
918         sem_wait(&obj->synchronizer);
919     }
920     else {
921         gettimeofday(&tv, NULL);
922         ts.tv_sec = tv.tv_sec;
923         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
925         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
926             if (errno == ETIMEDOUT) {
927                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
929                 return MessageQ_E_TIMEOUT;
930             }
931         }
932     }
934     if (obj->unblocked) {
935         return MessageQ_E_UNBLOCKED;
936     }
938     pthread_mutex_lock(&MessageQ_module->gate);
940     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
941     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
943     pthread_mutex_unlock(&MessageQ_module->gate);
945     return status;
948 /*
949  * Return a count of the number of messages in the queue
950  *
951  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
952  */
953 Int MessageQ_count(MessageQ_Handle handle)
955     Int               count = -1;
956 #if 0
957     MessageQ_Object * obj   = (MessageQ_Object *) handle;
958     socklen_t         optlen;
960     /*
961      * TBD: Need to find a way to implement (if anyone uses it!), and
962      * push down into transport..
963      */
965     /*
966      * 2nd arg to getsockopt should be transport independent, but using
967      *  SSKPROTO_SHMFIFO for now:
968      */
969     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
970                  &count, &optlen);
971 #endif
973     return count;
976 /*
977  *  Initializes a message not obtained from MessageQ_alloc.
978  */
979 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
981     /* Fill in the fields of the message */
982     MessageQ_msgInit(msg);
983     msg->heapId = MessageQ_STATICMSG;
984     msg->msgSize = size;
987 /*
988  *  Allocate a message and initialize the needed fields (note some
989  *  of the fields in the header are set via other APIs or in the
990  *  MessageQ_put function,
991  */
992 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
994     MessageQ_Msg msg;
996     /*
997      * heapId not used for local alloc (as this is over a copy transport), but
998      * we need to send to other side as heapId is used in BIOS transport.
999      */
1000     msg = (MessageQ_Msg)calloc(1, size);
1001     MessageQ_msgInit(msg);
1002     msg->msgSize = size;
1003     msg->heapId = heapId;
1005     return msg;
1008 /*
1009  *  Frees the message back to the heap that was used to allocate it.
1010  */
1011 Int MessageQ_free(MessageQ_Msg msg)
1013     UInt32 status = MessageQ_S_SUCCESS;
1015     /* Check to ensure this was not allocated by user: */
1016     if (msg->heapId == MessageQ_STATICMSG) {
1017         status = MessageQ_E_CANNOTFREESTATICMSG;
1018     }
1019     else {
1020         free(msg);
1021     }
1023     return status;
1026 /* Register a heap with MessageQ. */
1027 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1029     Int status = MessageQ_S_SUCCESS;
1031     /* Do nothing, as this uses a copy transport */
1033     return status;
1036 /* Unregister a heap with MessageQ. */
1037 Int MessageQ_unregisterHeap(UInt16 heapId)
1039     Int status = MessageQ_S_SUCCESS;
1041     /* Do nothing, as this uses a copy transport */
1043     return status;
1046 /* Unblocks a MessageQ */
1047 Void MessageQ_unblock(MessageQ_Handle handle)
1049     MessageQ_Object *obj = (MessageQ_Object *)handle;
1051     obj->unblocked = TRUE;
1052     sem_post(&obj->synchronizer);
1055 /* Embeds a source message queue into a message */
1056 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1058     MessageQ_Object *obj = (MessageQ_Object *)handle;
1060     msg->replyId = (UInt16)(obj->queue);
1061     msg->replyProc = (UInt16)(obj->queue >> 16);
1064 /* Returns the QueueId associated with the handle. */
1065 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1067     MessageQ_Object *obj = (MessageQ_Object *) handle;
1068     UInt32 queueId;
1070     queueId = (obj->queue);
1072     return queueId;
1075 /* Sets the tracing of a message */
1076 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1078     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1081 /*
1082  *  Returns the amount of shared memory used by one transport instance.
1083  *
1084  *  The MessageQ module itself does not use any shared memory but the
1085  *  underlying transport may use some shared memory.
1086  */
1087 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1089     SizeT memReq = 0u;
1091     /* Do nothing, as this is a copy transport. */
1093     return memReq;
1096 /*
1097  * This is a helper function to initialize a message.
1098  */
1099 Void MessageQ_msgInit(MessageQ_Msg msg)
1101 #if 0
1102     Int                 status    = MessageQ_S_SUCCESS;
1103     LAD_ClientHandle handle;
1104     struct LAD_CommandObj cmd;
1105     union LAD_ResponseObj rsp;
1107     handle = LAD_findHandle();
1108     if (handle == LAD_MAXNUMCLIENTS) {
1109         PRINTVERBOSE1(
1110           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1111            getpid())
1113         return;
1114     }
1116     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1117     cmd.clientId = handle;
1119     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1120         PRINTVERBOSE1(
1121           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1122         return;
1123     }
1125     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1126         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1127         return;
1128     }
1129     status = rsp.msgInit.status;
1131     PRINTVERBOSE2(
1132       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1133       handle, status)
1135     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1136 #else
1137     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1138     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1139     msg->msgId     = MessageQ_INVALIDMSGID;
1140     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1141     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1142     msg->srcProc   = MultiProc_self();
1144     pthread_mutex_lock(&MessageQ_module->gate);
1145     msg->seqNum  = MessageQ_module->seqNum++;
1146     pthread_mutex_unlock(&MessageQ_module->gate);
1147 #endif
1150 /*
1151  *  ======== _MessageQ_grow ========
1152  *  Increase module's queues array to accommodate queueIndex from LAD
1153  *
1154  *  Note: this function takes the queue index value (i.e. without the
1155  *  port offset).
1156  */
1157 Void _MessageQ_grow(UInt16 queueIndex)
1159     MessageQ_Handle *queues;
1160     MessageQ_Handle *oldQueues;
1161     UInt oldSize;
1163     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1165     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1166     memcpy(queues, MessageQ_module->queues, oldSize);
1168     oldQueues = MessageQ_module->queues;
1169     MessageQ_module->queues = queues;
1170     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1172     free(oldQueues);
1174     return;