Re-work MessageQ_put to eliminate transport recursion
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
122     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
123     MessageQ_PutHookFxn       putHookFxn;
124 } MessageQ_ModuleObject;
126 typedef struct MessageQ_CIRCLEQ_ENTRY {
127      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
128 } MessageQ_CIRCLEQ_ENTRY;
130 /*!
131  *  @brief  Structure for the Handle for the MessageQ.
132  */
133 typedef struct MessageQ_Object_tag {
134     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
135     MessageQ_Params              params;
136     MessageQ_QueueId             queue;
137     int                          unblocked;
138     void                         *serverHandle;
139     sem_t                        synchronizer;
140 } MessageQ_Object;
142 /* traces in this file are controlled via _MessageQ_verbose */
143 Bool _MessageQ_verbose = FALSE;
144 #define verbose _MessageQ_verbose
146 /* =============================================================================
147  *  Globals
148  * =============================================================================
149  */
150 static MessageQ_ModuleObject MessageQ_state =
152     .refCount   = 0,
153     .nameServer = NULL,
154     .gate       = PTHREAD_MUTEX_INITIALIZER,
155     .putHookFxn = NULL
156 };
158 /*!
159  *  @var    MessageQ_module
160  *
161  *  @brief  Pointer to the MessageQ module state.
162  */
163 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
165 Void _MessageQ_grow(UInt16 queueIndex);
167 /* =============================================================================
168  * APIS
169  * =============================================================================
170  */
172 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
173                                 UInt16 rprocId, UInt priority)
175     Int status = FALSE;
176     UInt16 clusterId;
178     if (handle == NULL) {
179         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
180               );
182         return status;
183     }
185     /* map procId to clusterId */
186     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
188     if (clusterId >= MultiProc_MAXPROCESSORS) {
189         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
191         return status;
192     }
194     if (MessageQ_module->transports[clusterId][priority] == NULL) {
195         MessageQ_module->transports[clusterId][priority] = handle;
197         status = TRUE;
198     }
200     return status;
203 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
205     if (inst == NULL) {
206         printf("MessageQ_registerTransportId: invalid NULL handle\n");
208         return MessageQ_E_INVALIDARG;
209     }
211     if (tid >= MessageQ_MAXTRANSPORTS) {
212         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
213                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
215         return MessageQ_E_INVALIDARG;
216     }
218     if (MessageQ_module->transInst[tid] != NULL) {
219         printf("MessageQ_registerTransportId: transport id %d already "
220                 "registered\n", tid);
222         return MessageQ_E_ALREADYEXISTS;
223     }
225     MessageQ_module->transInst[tid] = inst;
227     return MessageQ_S_SUCCESS;
230 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
232     UInt16 clusterId;
234     /* map procId to clusterId */
235     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
237     if (clusterId >= MultiProc_MAXPROCESSORS) {
238         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
240         return;
241     }
243     MessageQ_module->transports[clusterId][priority] = NULL;
246 Void MessageQ_unregisterTransportId(UInt tid)
248     if (tid >= MessageQ_MAXTRANSPORTS) {
249         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
250                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
252         return;
253     }
255     MessageQ_module->transInst[tid] = NULL;
258 /*
259  * Function to get default configuration for the MessageQ module.
260  */
261 Void MessageQ_getConfig(MessageQ_Config *cfg)
263     Int status;
264     LAD_ClientHandle handle;
265     struct LAD_CommandObj cmd;
266     union LAD_ResponseObj rsp;
268     assert (cfg != NULL);
270     handle = LAD_findHandle();
271     if (handle == LAD_MAXNUMCLIENTS) {
272         PRINTVERBOSE1(
273           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
274            getpid())
276         return;
277     }
279     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
280     cmd.clientId = handle;
282     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
283         PRINTVERBOSE1(
284           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
285         return;
286     }
288     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
289         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
290                       status)
291         return;
292     }
293     status = rsp.messageQGetConfig.status;
295     PRINTVERBOSE2(
296       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
297       handle, status)
299     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
301     return;
304 /*
305  *  Function to setup the MessageQ module.
306  */
307 Int MessageQ_setup(const MessageQ_Config *cfg)
309     Int status = MessageQ_S_SUCCESS;
310     LAD_ClientHandle handle;
311     struct LAD_CommandObj cmd;
312     union LAD_ResponseObj rsp;
313     Int pri;
314     Int i;
315     Int tid;
317     /* this entire function must be serialized */
318     pthread_mutex_lock(&MessageQ_module->gate);
320     /* ensure only first thread performs startup procedure */
321     if (++MessageQ_module->refCount > 1) {
322         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
323                 MessageQ_module->refCount)
324         status = MessageQ_S_ALREADYSETUP;
325         goto exit;
326     }
328     handle = LAD_findHandle();
329     if (handle == LAD_MAXNUMCLIENTS) {
330         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
331                 "pid %d\n", getpid())
332         status = MessageQ_E_RESOURCE;
333         goto exit;
334     }
336     cmd.cmd = LAD_MESSAGEQ_SETUP;
337     cmd.clientId = handle;
338     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
340     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
341         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
342                 "status=%d\n", status)
343         status = MessageQ_E_FAIL;
344         goto exit;
345     }
347     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
348         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
349         status = MessageQ_E_FAIL;
350         goto exit;
351     }
352     status = rsp.setup.status;
354     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
355             handle, status)
357     MessageQ_module->seqNum = 0;
358     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
359     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
360     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
361             sizeof(MessageQ_Handle));
363     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
364         for (pri = 0; pri < 2; pri++) {
365             MessageQ_module->transports[i][pri] = NULL;
366         }
367     }
369     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
370         MessageQ_module->transInst[tid] = NULL;
371     }
373 exit:
374     /* if error, must decrement reference count */
375     if (status < 0) {
376         MessageQ_module->refCount--;
377     }
379     pthread_mutex_unlock(&MessageQ_module->gate);
381     return (status);
384 /*
385  *  MessageQ_destroy - destroy the MessageQ module.
386  */
387 Int MessageQ_destroy(void)
389     Int status = MessageQ_S_SUCCESS;
390     LAD_ClientHandle handle;
391     struct LAD_CommandObj cmd;
392     union LAD_ResponseObj rsp;
394     /* this entire function must be serialized */
395     pthread_mutex_lock(&MessageQ_module->gate);
397     /* ensure only last thread does the work */
398     if (--MessageQ_module->refCount > 0) {
399         goto exit;
400     }
402     handle = LAD_findHandle();
403     if (handle == LAD_MAXNUMCLIENTS) {
404         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
405                 "for pid %d\n", getpid())
406         status =  MessageQ_E_RESOURCE;
407         goto exit;
408     }
410     cmd.cmd = LAD_MESSAGEQ_DESTROY;
411     cmd.clientId = handle;
413     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
414         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
415                 "status=%d\n", status)
416         status = MessageQ_E_FAIL;
417         goto exit;
418     }
420     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
421         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
422         status = MessageQ_E_FAIL;
423         goto exit;
424     }
425     status = rsp.status;
427     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
428             "status=%d\n", handle, status)
430 exit:
431     pthread_mutex_unlock(&MessageQ_module->gate);
433     return (status);
436 /*
437  *  ======== MessageQ_Params_init ========
438  *  Legacy implementation.
439  */
440 Void MessageQ_Params_init(MessageQ_Params *params)
442     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
445 /*
446  *  ======== MessageQ_Params_init__S ========
447  *  New implementation which is version aware.
448  */
449 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
451     MessageQ_Params_Version2 *params2;
453     switch (version) {
455         case MessageQ_Params_VERSION_2:
456             params2 = (MessageQ_Params_Version2 *)params;
457             params2->__version = MessageQ_Params_VERSION_2;
458             params2->synchronizer = NULL;
459             params2->queueIndex = MessageQ_ANY;
460             break;
462         default:
463             assert(FALSE);
464             break;
465     }
468 /*
469  *  ======== MessageQ_create ========
470  */
471 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
473     Int                   status;
474     MessageQ_Object      *obj = NULL;
475     IMessageQTransport_Handle transport;
476     INetworkTransport_Handle netTrans;
477     ITransport_Handle     baseTrans;
478     UInt16                queueIndex;
479     UInt16                clusterId;
480     Int                   tid;
481     Int                   priority;
482     LAD_ClientHandle      handle;
483     struct LAD_CommandObj cmd;
484     union LAD_ResponseObj rsp;
485     MessageQ_Params ps;
487     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
489     /* copy the given params into the current params structure */
490     if (pp != NULL) {
492         /* snoop the params pointer to see if it's a legacy structure */
493         if ((pp->__version == 0) || (pp->__version > 100)) {
494             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
495         }
497         /* not legacy structure, use params version field */
498         else if (pp->__version == MessageQ_Params_VERSION_2) {
499             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
500             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
501             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
502         }
503         else {
504             assert(FALSE);
505         }
506     }
508     handle = LAD_findHandle();
509     if (handle == LAD_MAXNUMCLIENTS) {
510         PRINTVERBOSE1(
511           "MessageQ_create: can't find connection to daemon for pid %d\n",
512            getpid())
514         return NULL;
515     }
517     cmd.cmd = LAD_MESSAGEQ_CREATE;
518     cmd.clientId = handle;
520     if (name == NULL) {
521         cmd.args.messageQCreate.name[0] = '\0';
522     }
523     else {
524         strncpy(cmd.args.messageQCreate.name, name,
525                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
526         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
527     }
529     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
531     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
532         PRINTVERBOSE1(
533           "MessageQ_create: sending LAD command failed, status=%d\n", status)
534         return NULL;
535     }
537     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
538         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
539         return NULL;
540     }
541     status = rsp.messageQCreate.status;
543     PRINTVERBOSE2(
544       "MessageQ_create: got LAD response for client %d, status=%d\n",
545       handle, status)
547     if (status == -1) {
548        PRINTVERBOSE1(
549           "MessageQ_create: MessageQ server operation failed, status=%d\n",
550           status)
551        return NULL;
552     }
554     /* Create the generic obj */
555     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
557    /* Populate the params member */
558     memcpy(&obj->params, &ps, sizeof(ps));
561     obj->queue = rsp.messageQCreate.queueId;
562     obj->serverHandle = rsp.messageQCreate.serverHandle;
563     CIRCLEQ_INIT(&obj->msgList);
564     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
565         PRINTVERBOSE1(
566           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
568         MessageQ_delete((MessageQ_Handle *)&obj);
570         return NULL;
571     }
573     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
574     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
576     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
577             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
579     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
580         for (priority = 0; priority < 2; priority++) {
581             transport = MessageQ_module->transports[clusterId][priority];
582             if (transport) {
583                 /* need to check return and do something if error */
584                 IMessageQTransport_bind((Void *)transport, obj->queue);
585             }
586         }
587     }
589     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
590         baseTrans = MessageQ_module->transInst[tid];
592         if (baseTrans != NULL) {
593             switch (ITransport_itype(baseTrans)) {
594                 case INetworkTransport_TypeId:
595                     netTrans = INetworkTransport_downCast(baseTrans);
596                     INetworkTransport_bind((void *)netTrans, obj->queue);
597                     break;
599                 default:
600                     /* error */
601                     printf("MessageQ_create: Error: transport id %d is an "
602                             "unsupported transport type.\n", tid);
603                     break;
604             }
605         }
606     }
608     /* LAD's MessageQ module can grow, we need to grow as well */
609     if (queueIndex >= MessageQ_module->numQueues) {
610         _MessageQ_grow(queueIndex);
611     }
613     /*  No need to "allocate" slot since the queueIndex returned by
614      *  LAD is guaranteed to be unique.
615      */
616     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
618     return (MessageQ_Handle)obj;
621 /*
622  *  ======== MessageQ_delete ========
623  */
624 Int MessageQ_delete(MessageQ_Handle *handlePtr)
626     MessageQ_Object *obj;
627     IMessageQTransport_Handle transport;
628     INetworkTransport_Handle  netTrans;
629     ITransport_Handle         baseTrans;
630     Int              status = MessageQ_S_SUCCESS;
631     UInt16           queueIndex;
632     UInt16                clusterId;
633     Int                   tid;
634     Int                   priority;
635     LAD_ClientHandle handle;
636     struct LAD_CommandObj cmd;
637     union LAD_ResponseObj rsp;
639     handle = LAD_findHandle();
640     if (handle == LAD_MAXNUMCLIENTS) {
641         PRINTVERBOSE1(
642           "MessageQ_delete: can't find connection to daemon for pid %d\n",
643            getpid())
645         return MessageQ_E_FAIL;
646     }
648     obj = (MessageQ_Object *)(*handlePtr);
650     cmd.cmd = LAD_MESSAGEQ_DELETE;
651     cmd.clientId = handle;
652     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
654     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
655         PRINTVERBOSE1(
656           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
657         return MessageQ_E_FAIL;
658     }
660     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
661         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
662         return MessageQ_E_FAIL;
663     }
664     status = rsp.messageQDelete.status;
666     PRINTVERBOSE2(
667       "MessageQ_delete: got LAD response for client %d, status=%d\n",
668       handle, status)
670     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
671         for (priority = 0; priority < 2; priority++) {
672             transport = MessageQ_module->transports[clusterId][priority];
673             if (transport) {
674                 IMessageQTransport_unbind((Void *)transport, obj->queue);
675             }
676         }
677     }
679     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
680         baseTrans = MessageQ_module->transInst[tid];
682         if (baseTrans != NULL) {
683             switch (ITransport_itype(baseTrans)) {
684                 case INetworkTransport_TypeId:
685                     netTrans = INetworkTransport_downCast(baseTrans);
686                     INetworkTransport_unbind((void *)netTrans, obj->queue);
687                     break;
689                 default:
690                     /* error */
691                     printf("MessageQ_create: Error: transport id %d is an "
692                             "unsupported transport type.\n", tid);
693                     break;
694             }
695         }
696     }
698     /* extract the queue index from the queueId */
699     queueIndex = MessageQ_getQueueIndex(obj->queue);
700     MessageQ_module->queues[queueIndex] = NULL;
702     free(obj);
703     *handlePtr = NULL;
705     return status;
708 /*
709  *  ======== MessageQ_open ========
710  *  Acquire a queueId for use in sending messages to the queue
711  */
712 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
714     Int status = MessageQ_S_SUCCESS;
716     status = NameServer_getUInt32(MessageQ_module->nameServer,
717                                   name, queueId, NULL);
719     if (status == NameServer_E_NOTFOUND) {
720         /* Set return queue ID to invalid */
721         *queueId = MessageQ_INVALIDMESSAGEQ;
722         status = MessageQ_E_NOTFOUND;
723     }
724     else if (status >= 0) {
725         /* Override with a MessageQ status code */
726         status = MessageQ_S_SUCCESS;
727     }
728     else {
729         /* Set return queue ID to invalid */
730         *queueId = MessageQ_INVALIDMESSAGEQ;
732         /* Override with a MessageQ status code */
733         if (status == NameServer_E_TIMEOUT) {
734             status = MessageQ_E_TIMEOUT;
735         }
736         else {
737             status = MessageQ_E_FAIL;
738         }
739     }
741     return status;
744 /*
745  *  ======== MessageQ_openQueueId ========
746  */
747 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
749     MessageQ_QueueIndex queuePort;
750     MessageQ_QueueId queueId;
752     /* queue port is embedded in the queueId */
753     queuePort = queueIndex + MessageQ_PORTOFFSET;
754     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
756     return (queueId);
759 /*
760  *  ======== MessageQ_close ========
761  *  Closes previously opened instance of MessageQ
762  */
763 Int MessageQ_close(MessageQ_QueueId *queueId)
765     Int32 status = MessageQ_S_SUCCESS;
767     /* Nothing more to be done for closing the MessageQ. */
768     *queueId = MessageQ_INVALIDMESSAGEQ;
770     return status;
773 /*
774  *  ======== MessageQ_put ========
775  *  Deliver the given message, either locally or to the transport
776  *
777  *  If the destination is a local queue, deliver the message. Otherwise,
778  *  pass the message to a transport for delivery. The transport handles
779  *  the sending of the message using the appropriate interface (socket,
780  *  device ioctl, etc.).
781  */
782 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
784     Int status = MessageQ_S_SUCCESS;
785     MessageQ_Object *obj;
786     UInt16 dstProcId;
787     UInt16 queueIndex;
788     UInt16 queuePort;
789     ITransport_Handle baseTrans;
790     IMessageQTransport_Handle msgTrans;
791     INetworkTransport_Handle netTrans;
792     Int priority;
793     UInt tid;
794     UInt16 clusterId;
795     Bool delivered;
797     /* extract destination address from the given queueId */
798     dstProcId  = (UInt16)(queueId >> 16);
799     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
801     /* write the destination address into the message header */
802     msg->dstId = queuePort;
803     msg->dstProc= dstProcId;
805     /* invoke the hook function after addressing the message */
806     if (MessageQ_module->putHookFxn != NULL) {
807         MessageQ_module->putHookFxn(queueId, msg);
808     }
810     /*  For an outbound message: If message destination is on this
811      *  processor, then check if the destination queue is in this
812      *  process (thread-to-thread messaging).
813      *
814      *  For an inbound message: Check if destination queue is in this
815      *  process (process-to-process messaging).
816      */
817     if (dstProcId == MultiProc_self()) {
818         queueIndex = queuePort - MessageQ_PORTOFFSET;
820         if (queueIndex < MessageQ_module->numQueues) {
821             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
823             if (obj != NULL) {
824                 /* deliver message to queue */
825                 pthread_mutex_lock(&MessageQ_module->gate);
826                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
827                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
828                 pthread_mutex_unlock(&MessageQ_module->gate);
829                 sem_post(&obj->synchronizer);
830                 goto done;
831             }
832         }
833     }
835     /*  Getting here implies the message is outbound. Must give it to
836      *  either the primary or secondary transport for delivery. Start
837      *  by extracting the transport ID from the message header.
838      */
839     tid = MessageQ_getTransportId(msg);
841     if (tid >= MessageQ_MAXTRANSPORTS) {
842         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
843                 tid, MessageQ_MAXTRANSPORTS);
844         status = MessageQ_E_FAIL;
845         goto done;
846     }
848     /* if transportId is set, use secondary transport for message delivery */
849     if (tid != 0) {
850         baseTrans = MessageQ_module->transInst[tid];
852         if (baseTrans == NULL) {
853             printf("MessageQ_put: Error: transport is null\n");
854             status = MessageQ_E_FAIL;
855             goto done;
856         }
858         /* downcast instance pointer to transport interface */
859         switch (ITransport_itype(baseTrans)) {
860             case INetworkTransport_TypeId:
861                 netTrans = INetworkTransport_downCast(baseTrans);
862                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
863                 status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
864                 break;
866             default:
867                 /* error */
868                 printf("MessageQ_put: Error: transport id %d is an "
869                         "unsupported transport type\n", tid);
870                 status = MessageQ_E_FAIL;
871                 break;
872         }
873     }
874     else {
875         /* use primary transport for delivery */
876         priority = MessageQ_getMsgPri(msg);
877         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
879         /* primary transport can only be used for intra-cluster delivery */
880         if (clusterId > MultiProc_getNumProcsInCluster()) {
881             printf("MessageQ_put: Error: destination procId=%d is not "
882                     "in cluster. Must specify a transportId.\n", dstProcId);
883             status =  MessageQ_E_FAIL;
884             goto done;
885         }
887         msgTrans = MessageQ_module->transports[clusterId][priority];
888         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
889         status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
890     }
892 done:
893     return (status);
896 /*
897  *  MessageQ_get - gets a message for a message queue and blocks if
898  *  the queue is empty.
899  *
900  *  If a message is present, it returns it.  Otherwise it blocks
901  *  waiting for a message to arrive.
902  *  When a message is returned, it is owned by the caller.
903  */
904 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
906     MessageQ_Object * obj = (MessageQ_Object *)handle;
907     Int     status = MessageQ_S_SUCCESS;
908     struct timespec ts;
909     struct timeval tv;
911 #if 0
912 /*
913  * Optimization here to get a message without going in to the sem
914  * operation, but the sem count will not be maintained properly.
915  */
916     pthread_mutex_lock(&MessageQ_module->gate);
918     if (obj->msgList.cqh_first != &obj->msgList) {
919         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
920         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
922         pthread_mutex_unlock(&MessageQ_module->gate);
923     }
924     else {
925         pthread_mutex_unlock(&MessageQ_module->gate);
926     }
927 #endif
929     if (timeout == MessageQ_FOREVER) {
930         sem_wait(&obj->synchronizer);
931     }
932     else {
933         gettimeofday(&tv, NULL);
934         ts.tv_sec = tv.tv_sec;
935         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
937         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
938             if (errno == ETIMEDOUT) {
939                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
941                 return MessageQ_E_TIMEOUT;
942             }
943         }
944     }
946     if (obj->unblocked) {
947         return MessageQ_E_UNBLOCKED;
948     }
950     pthread_mutex_lock(&MessageQ_module->gate);
952     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
953     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
955     pthread_mutex_unlock(&MessageQ_module->gate);
957     return status;
960 /*
961  * Return a count of the number of messages in the queue
962  *
963  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
964  */
965 Int MessageQ_count(MessageQ_Handle handle)
967     Int               count = -1;
968 #if 0
969     MessageQ_Object * obj   = (MessageQ_Object *) handle;
970     socklen_t         optlen;
972     /*
973      * TBD: Need to find a way to implement (if anyone uses it!), and
974      * push down into transport..
975      */
977     /*
978      * 2nd arg to getsockopt should be transport independent, but using
979      *  SSKPROTO_SHMFIFO for now:
980      */
981     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
982                  &count, &optlen);
983 #endif
985     return count;
988 /*
989  *  Initializes a message not obtained from MessageQ_alloc.
990  */
991 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
993     /* Fill in the fields of the message */
994     MessageQ_msgInit(msg);
995     msg->heapId = MessageQ_STATICMSG;
996     msg->msgSize = size;
999 /*
1000  *  Allocate a message and initialize the needed fields (note some
1001  *  of the fields in the header are set via other APIs or in the
1002  *  MessageQ_put function,
1003  */
1004 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1006     MessageQ_Msg msg;
1008     /*
1009      * heapId not used for local alloc (as this is over a copy transport), but
1010      * we need to send to other side as heapId is used in BIOS transport.
1011      */
1012     msg = (MessageQ_Msg)calloc(1, size);
1013     MessageQ_msgInit(msg);
1014     msg->msgSize = size;
1015     msg->heapId = heapId;
1017     return msg;
1020 /*
1021  *  Frees the message back to the heap that was used to allocate it.
1022  */
1023 Int MessageQ_free(MessageQ_Msg msg)
1025     UInt32 status = MessageQ_S_SUCCESS;
1027     /* Check to ensure this was not allocated by user: */
1028     if (msg->heapId == MessageQ_STATICMSG) {
1029         status = MessageQ_E_CANNOTFREESTATICMSG;
1030     }
1031     else {
1032         free(msg);
1033     }
1035     return status;
1038 /* Register a heap with MessageQ. */
1039 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1041     Int status = MessageQ_S_SUCCESS;
1043     /* Do nothing, as this uses a copy transport */
1045     return status;
1048 /* Unregister a heap with MessageQ. */
1049 Int MessageQ_unregisterHeap(UInt16 heapId)
1051     Int status = MessageQ_S_SUCCESS;
1053     /* Do nothing, as this uses a copy transport */
1055     return status;
1058 /* Unblocks a MessageQ */
1059 Void MessageQ_unblock(MessageQ_Handle handle)
1061     MessageQ_Object *obj = (MessageQ_Object *)handle;
1063     obj->unblocked = TRUE;
1064     sem_post(&obj->synchronizer);
1067 /* Embeds a source message queue into a message */
1068 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1070     MessageQ_Object *obj = (MessageQ_Object *)handle;
1072     msg->replyId = (UInt16)(obj->queue);
1073     msg->replyProc = (UInt16)(obj->queue >> 16);
1076 /* Returns the QueueId associated with the handle. */
1077 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1079     MessageQ_Object *obj = (MessageQ_Object *) handle;
1080     UInt32 queueId;
1082     queueId = (obj->queue);
1084     return queueId;
1087 /* Sets the tracing of a message */
1088 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1090     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1093 /*
1094  *  Returns the amount of shared memory used by one transport instance.
1095  *
1096  *  The MessageQ module itself does not use any shared memory but the
1097  *  underlying transport may use some shared memory.
1098  */
1099 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1101     SizeT memReq = 0u;
1103     /* Do nothing, as this is a copy transport. */
1105     return memReq;
1108 /*
1109  * This is a helper function to initialize a message.
1110  */
1111 Void MessageQ_msgInit(MessageQ_Msg msg)
1113 #if 0
1114     Int                 status    = MessageQ_S_SUCCESS;
1115     LAD_ClientHandle handle;
1116     struct LAD_CommandObj cmd;
1117     union LAD_ResponseObj rsp;
1119     handle = LAD_findHandle();
1120     if (handle == LAD_MAXNUMCLIENTS) {
1121         PRINTVERBOSE1(
1122           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1123            getpid())
1125         return;
1126     }
1128     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1129     cmd.clientId = handle;
1131     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1132         PRINTVERBOSE1(
1133           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1134         return;
1135     }
1137     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1138         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1139         return;
1140     }
1141     status = rsp.msgInit.status;
1143     PRINTVERBOSE2(
1144       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1145       handle, status)
1147     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1148 #else
1149     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1150     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1151     msg->msgId     = MessageQ_INVALIDMSGID;
1152     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1153     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1154     msg->srcProc   = MultiProc_self();
1156     pthread_mutex_lock(&MessageQ_module->gate);
1157     msg->seqNum  = MessageQ_module->seqNum++;
1158     pthread_mutex_unlock(&MessageQ_module->gate);
1159 #endif
1162 /*
1163  *  ======== _MessageQ_grow ========
1164  *  Increase module's queues array to accommodate queueIndex from LAD
1165  *
1166  *  Note: this function takes the queue index value (i.e. without the
1167  *  port offset).
1168  */
1169 Void _MessageQ_grow(UInt16 queueIndex)
1171     MessageQ_Handle *queues;
1172     MessageQ_Handle *oldQueues;
1173     UInt oldSize;
1175     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1177     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1178     memcpy(queues, MessageQ_module->queues, oldSize);
1180     oldQueues = MessageQ_module->queues;
1181     MessageQ_module->queues = queues;
1182     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1184     free(oldQueues);
1186     return;