]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/api/MessageQ.c
2fc61e32a10808d1d848d85e94eb52eb008f6928
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <_MessageQ.h>
52 #include <ITransport.h>
53 #include <IMessageQTransport.h>
54 #include <INetworkTransport.h>
56 /* Socket Headers */
57 #include <sys/select.h>
58 #include <sys/time.h>
59 #include <sys/types.h>
60 #include <sys/param.h>
61 #include <sys/eventfd.h>
62 #include <sys/queue.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <string.h>
66 #include <stdlib.h>
67 #include <unistd.h>
68 #include <assert.h>
69 #include <pthread.h>
70 #include <semaphore.h>
72 /* Socket Protocol Family */
73 #include <net/rpmsg.h>
75 #include <ladclient.h>
76 #include <_lad.h>
78 /* =============================================================================
79  * Macros/Constants
80  * =============================================================================
81  */
83 /*!
84  *  @brief  Name of the reserved NameServer used for MessageQ.
85  */
86 #define MessageQ_NAMESERVER  "MessageQ"
88 #define MessageQ_MAXTRANSPORTS 8
90 #define MessageQ_GROWSIZE 32
92 /* Trace flag settings: */
93 #define TRACESHIFT    12
94 #define TRACEMASK     0x1000
96 /* Define BENCHMARK to quiet key MessageQ APIs: */
97 //#define BENCHMARK
99 /* =============================================================================
100  * Structures & Enums
101  * =============================================================================
102  */
104 /* structure for MessageQ module state */
105 typedef struct MessageQ_ModuleObject {
106     MessageQ_Handle           *queues;
107     Int                       numQueues;
108     Int                       refCount;
109     NameServer_Handle         nameServer;
110     pthread_mutex_t           gate;
111     MessageQ_Params           defaultInstParams;
112     int                       seqNum;
113     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
114     INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
115     MessageQ_PutHookFxn       putHookFxn;
116 } MessageQ_ModuleObject;
118 typedef struct MessageQ_CIRCLEQ_ENTRY {
119      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
120 } MessageQ_CIRCLEQ_ENTRY;
122 /*!
123  *  @brief  Structure for the Handle for the MessageQ.
124  */
125 typedef struct MessageQ_Object_tag {
126     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
127     MessageQ_Params              params;
128     MessageQ_QueueId             queue;
129     int                          unblocked;
130     void                         *serverHandle;
131     sem_t                        synchronizer;
132 } MessageQ_Object;
134 /* traces in this file are controlled via _MessageQ_verbose */
135 Bool _MessageQ_verbose = FALSE;
136 #define verbose _MessageQ_verbose
138 /* =============================================================================
139  *  Globals
140  * =============================================================================
141  */
142 static MessageQ_ModuleObject MessageQ_state =
144     .refCount   = 0,
145     .nameServer = NULL,
146     .putHookFxn = NULL
147 };
149 /*!
150  *  @var    MessageQ_module
151  *
152  *  @brief  Pointer to the MessageQ module state.
153  */
154 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
156 Void _MessageQ_grow(UInt16 queueIndex);
158 /* =============================================================================
159  * APIS
160  * =============================================================================
161  */
163 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
164                                 UInt16 rprocId, UInt priority)
166     Int status = FALSE;
168     if (handle == NULL) {
169         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
170               );
172         return status;
173     }
175     if (rprocId >= MultiProc_MAXPROCESSORS) {
176         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
178         return status;
179     }
181     if (MessageQ_module->transports[rprocId][priority] == NULL) {
182         MessageQ_module->transports[rprocId][priority] = handle;
184         status = TRUE;
185     }
187     return status;
190 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
192     if (inst == NULL) {
193         printf("MessageQ_registerTransportId: invalid NULL handle\n");
195         return MessageQ_E_INVALIDARG;
196     }
198     if (tid >= MessageQ_MAXTRANSPORTS) {
199         printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
201         return MessageQ_E_INVALIDARG;
202     }
204     if (MessageQ_module->transInst[tid] != NULL) {
205         printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
207         return MessageQ_E_ALREADYEXISTS;
208     }
210     MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
212     return MessageQ_S_SUCCESS;
215 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
217     if (rprocId >= MultiProc_MAXPROCESSORS) {
218         printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
220         return;
221     }
223     MessageQ_module->transports[rprocId][priority] = NULL;
226 Void MessageQ_unregisterTransportId(UInt tid)
228     if (tid >= MessageQ_MAXTRANSPORTS) {
229         printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
231         return;
232     }
234     MessageQ_module->transInst[tid] = NULL;
237 /*
238  * Function to get default configuration for the MessageQ module.
239  */
240 Void MessageQ_getConfig(MessageQ_Config *cfg)
242     Int status;
243     LAD_ClientHandle handle;
244     struct LAD_CommandObj cmd;
245     union LAD_ResponseObj rsp;
247     assert (cfg != NULL);
249     handle = LAD_findHandle();
250     if (handle == LAD_MAXNUMCLIENTS) {
251         PRINTVERBOSE1(
252           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
253            getpid())
255         return;
256     }
258     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
259     cmd.clientId = handle;
261     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
262         PRINTVERBOSE1(
263           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
264         return;
265     }
267     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
268         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
269                       status)
270         return;
271     }
272     status = rsp.messageQGetConfig.status;
274     PRINTVERBOSE2(
275       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
276       handle, status)
278     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
280     return;
283 /*
284  *  Function to setup the MessageQ module.
285  */
286 Int MessageQ_setup(const MessageQ_Config *cfg)
288     Int status;
289     LAD_ClientHandle handle;
290     struct LAD_CommandObj cmd;
291     union LAD_ResponseObj rsp;
292     Int pri;
293     Int rprocId;
294     Int tid;
296     pthread_mutex_lock(&MessageQ_module->gate);
298     MessageQ_module->refCount++;
299     if (MessageQ_module->refCount > 1) {
301         pthread_mutex_unlock(&MessageQ_module->gate);
303         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
304                       MessageQ_module->refCount)
306         return MessageQ_S_ALREADYSETUP;
307     }
309     pthread_mutex_unlock(&MessageQ_module->gate);
311     handle = LAD_findHandle();
312     if (handle == LAD_MAXNUMCLIENTS) {
313         PRINTVERBOSE1(
314           "MessageQ_setup: can't find connection to daemon for pid %d\n",
315            getpid())
317         return MessageQ_E_RESOURCE;
318     }
320     cmd.cmd = LAD_MESSAGEQ_SETUP;
321     cmd.clientId = handle;
322     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
324     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
325         PRINTVERBOSE1(
326           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
327         return MessageQ_E_FAIL;
328     }
330     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
331         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
332         return status;
333     }
334     status = rsp.setup.status;
336     PRINTVERBOSE2(
337       "MessageQ_setup: got LAD response for client %d, status=%d\n",
338       handle, status)
340     MessageQ_module->seqNum = 0;
341     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
342     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
343     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
344                                      sizeof (MessageQ_Handle));
346     pthread_mutex_init(&MessageQ_module->gate, NULL);
348     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
349         for (pri = 0; pri < 2; pri++) {
350             MessageQ_module->transports[rprocId][pri] = NULL;
351         }
352     }
353     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
354         MessageQ_module->transInst[tid] = NULL;
355     }
357     return status;
360 /*
361  *  MessageQ_destroy - destroy the MessageQ module.
362  */
363 Int MessageQ_destroy(void)
365     Int status;
366     LAD_ClientHandle handle;
367     struct LAD_CommandObj cmd;
368     union LAD_ResponseObj rsp;
370     handle = LAD_findHandle();
371     if (handle == LAD_MAXNUMCLIENTS) {
372         PRINTVERBOSE1(
373           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
374            getpid())
376         return MessageQ_E_RESOURCE;
377     }
379     cmd.cmd = LAD_MESSAGEQ_DESTROY;
380     cmd.clientId = handle;
382     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
383         PRINTVERBOSE1(
384           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
385         return MessageQ_E_FAIL;
386     }
388     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
389         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
390         return status;
391     }
392     status = rsp.status;
394     PRINTVERBOSE2(
395       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
396       handle, status)
398     return status;
402 /* Function to initialize the parameters for the MessageQ instance. */
403 Void MessageQ_Params_init(MessageQ_Params *params)
405     memcpy (params, &(MessageQ_module->defaultInstParams),
406             sizeof (MessageQ_Params));
408     return;
411 /*
412  *  MessageQ_create - create a MessageQ object for receiving.
413  */
414 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
416     Int                   status;
417     MessageQ_Object      *obj = NULL;
418     IMessageQTransport_Handle transport;
419     INetworkTransport_Handle transInst;
420     UInt16                queueIndex;
421     UInt16                rprocId;
422     Int                   tid;
423     Int                   priority;
424     LAD_ClientHandle      handle;
425     struct LAD_CommandObj cmd;
426     union LAD_ResponseObj rsp;
428     handle = LAD_findHandle();
429     if (handle == LAD_MAXNUMCLIENTS) {
430         PRINTVERBOSE1(
431           "MessageQ_create: can't find connection to daemon for pid %d\n",
432            getpid())
434         return NULL;
435     }
437     cmd.cmd = LAD_MESSAGEQ_CREATE;
438     cmd.clientId = handle;
439     if (name == NULL) {
440         cmd.args.messageQCreate.name[0] = '\0';
441     }
442     else {
443         strncpy(cmd.args.messageQCreate.name, name,
444                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
445         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
446     }
448     if (params) {
449         memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
450     }
452     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
453         PRINTVERBOSE1(
454           "MessageQ_create: sending LAD command failed, status=%d\n", status)
455         return NULL;
456     }
458     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
459         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
460         return NULL;
461     }
462     status = rsp.messageQCreate.status;
464     PRINTVERBOSE2(
465       "MessageQ_create: got LAD response for client %d, status=%d\n",
466       handle, status)
468     if (status == -1) {
469        PRINTVERBOSE1(
470           "MessageQ_create: MessageQ server operation failed, status=%d\n",
471           status)
472        return NULL;
473     }
475     /* Create the generic obj */
476     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
478     if (params != NULL) {
479        /* Populate the params member */
480         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
481     }
483     queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
485     obj->queue = rsp.messageQCreate.queueId;
486     obj->serverHandle = rsp.messageQCreate.serverHandle;
487     CIRCLEQ_INIT(&obj->msgList);
488     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
489         PRINTVERBOSE1(
490           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
492         MessageQ_delete((MessageQ_Handle *)&obj);
494         return NULL;
495     }
497     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
499     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
500         for (priority = 0; priority < 2; priority++) {
501             transport = MessageQ_module->transports[rprocId][priority];
502             if (transport) {
503                 /* need to check return and do something if error */
504                 IMessageQTransport_bind((Void *)transport, obj->queue);
505             }
506         }
507     }
508     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
509         transInst = MessageQ_module->transInst[tid];
510         if (transInst) {
511             /* need to check return and do something if error */
512             INetworkTransport_bind((Void *)transInst, obj->queue);
513         }
514     }
516     /*
517      * Since LAD's MessageQ_module can grow, we need to be able to grow as well
518      */
519     if (queueIndex >= MessageQ_module->numQueues) {
520         _MessageQ_grow(queueIndex);
521     }
523     /*
524      * No need to "allocate" slot since the queueIndex returned by
525      * LAD is guaranteed to be unique.
526      */
527     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
529     return (MessageQ_Handle)obj;
532 /*
533  * MessageQ_delete - delete a MessageQ object.
534  */
535 Int MessageQ_delete(MessageQ_Handle *handlePtr)
537     MessageQ_Object *obj;
538     IMessageQTransport_Handle transport;
539     INetworkTransport_Handle transInst;
540     Int              status = MessageQ_S_SUCCESS;
541     UInt16           queueIndex;
542     UInt16                rprocId;
543     Int                   tid;
544     Int                   priority;
545     LAD_ClientHandle handle;
546     struct LAD_CommandObj cmd;
547     union LAD_ResponseObj rsp;
549     handle = LAD_findHandle();
550     if (handle == LAD_MAXNUMCLIENTS) {
551         PRINTVERBOSE1(
552           "MessageQ_delete: can't find connection to daemon for pid %d\n",
553            getpid())
555         return MessageQ_E_FAIL;
556     }
558     obj = (MessageQ_Object *)(*handlePtr);
560     cmd.cmd = LAD_MESSAGEQ_DELETE;
561     cmd.clientId = handle;
562     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
564     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
565         PRINTVERBOSE1(
566           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
567         return MessageQ_E_FAIL;
568     }
570     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
571         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
572         return MessageQ_E_FAIL;
573     }
574     status = rsp.messageQDelete.status;
576     PRINTVERBOSE2(
577       "MessageQ_delete: got LAD response for client %d, status=%d\n",
578       handle, status)
580     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
581         for (priority = 0; priority < 2; priority++) {
582             transport = MessageQ_module->transports[rprocId][priority];
583             if (transport) {
584                 IMessageQTransport_unbind((Void *)transport, obj->queue);
585             }
586         }
587     }
588     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
589         transInst = MessageQ_module->transInst[tid];
590         if (transInst) {
591             INetworkTransport_unbind((Void *)transInst, obj->queue);
592         }
593     }
595     queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
596     MessageQ_module->queues[queueIndex] = NULL;
598     free(obj);
599     *handlePtr = NULL;
601     return status;
604 /*
605  *  MessageQ_open - Opens an instance of MessageQ for sending.
606  */
607 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
609     Int status = MessageQ_S_SUCCESS;
611     status = NameServer_getUInt32(MessageQ_module->nameServer,
612                                   name, queueId, NULL);
614     if (status == NameServer_E_NOTFOUND) {
615         /* Set return queue ID to invalid */
616         *queueId = MessageQ_INVALIDMESSAGEQ;
617         status = MessageQ_E_NOTFOUND;
618     }
619     else if (status >= 0) {
620         /* Override with a MessageQ status code */
621         status = MessageQ_S_SUCCESS;
622     }
623     else {
624         /* Set return queue ID to invalid */
625         *queueId = MessageQ_INVALIDMESSAGEQ;
627         /* Override with a MessageQ status code */
628         if (status == NameServer_E_TIMEOUT) {
629             status = MessageQ_E_TIMEOUT;
630         }
631         else {
632             status = MessageQ_E_FAIL;
633         }
634     }
636     return status;
639 /*
640  *  MessageQ_close - Closes previously opened instance of MessageQ.
641  */
642 Int MessageQ_close(MessageQ_QueueId *queueId)
644     Int32 status = MessageQ_S_SUCCESS;
646     /* Nothing more to be done for closing the MessageQ. */
647     *queueId = MessageQ_INVALIDMESSAGEQ;
649     return status;
652 /*
653  * MessageQ_put - place a message onto a message queue.
654  *
655  * Calls transport's put(), which handles the sending of the message using the
656  * appropriate kernel interface (socket, device ioctl) call for the remote
657  * procId encoded in the queueId argument.
658  *
659  */
660 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
662     MessageQ_Object *obj;
663     UInt16   dstProcId  = (UInt16)(queueId >> 16);
664     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
665     Int      status = MessageQ_S_SUCCESS;
666     ITransport_Handle transport;
667     IMessageQTransport_Handle msgTrans;
668     INetworkTransport_Handle netTrans;
669     Int priority;
670     UInt tid;
672     msg->dstId     = queueIndex;
673     msg->dstProc   = dstProcId;
675     /* invoke put hook function after addressing the message */
676     if (MessageQ_module->putHookFxn != NULL) {
677         MessageQ_module->putHookFxn(queueId, msg);
678     }
680     if (dstProcId != MultiProc_self()) {
681         tid = MessageQ_getTransportId(msg);
682         if (tid == 0) {
683             priority = MessageQ_getMsgPri(msg);
684             msgTrans = MessageQ_module->transports[dstProcId][priority];
686             IMessageQTransport_put(msgTrans, (Ptr)msg);
687         }
688         else {
689             if (tid >= MessageQ_MAXTRANSPORTS) {
690                 printf("MessageQ_put: transport id %d too big, must be < %d\n",
691                        tid, MessageQ_MAXTRANSPORTS);
693                 return MessageQ_E_FAIL;
694             }
696             /* use secondary transport */
697             netTrans = MessageQ_module->transInst[tid];
698             transport = INetworkTransport_upCast(netTrans);
700             /* downcast instance pointer to transport interface */
701             switch (ITransport_itype(transport)) {
702                 case INetworkTransport_TypeId:
703                     INetworkTransport_put(netTrans, (Ptr)msg);
705                     break;
707                 default:
708                     /* error */
709                     printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
711                     status = MessageQ_E_FAIL;
713                     break;
714             }
715         }
716     }
717     else {
718         obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
720         pthread_mutex_lock(&MessageQ_module->gate);
722         /* It is a local MessageQ */
723         CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
725         pthread_mutex_unlock(&MessageQ_module->gate);
727         sem_post(&obj->synchronizer);
728     }
730     return status;
733 /*
734  *  MessageQ_get - gets a message for a message queue and blocks if
735  *  the queue is empty.
736  *
737  *  If a message is present, it returns it.  Otherwise it blocks
738  *  waiting for a message to arrive.
739  *  When a message is returned, it is owned by the caller.
740  */
741 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
743     MessageQ_Object * obj = (MessageQ_Object *)handle;
744     Int     status = MessageQ_S_SUCCESS;
745     struct timespec ts;
746     struct timeval tv;
748 #if 0
749 /*
750  * Optimization here to get a message without going in to the sem
751  * operation, but the sem count will not be maintained properly.
752  */
753     pthread_mutex_lock(&MessageQ_module->gate);
755     if (obj->msgList.cqh_first != &obj->msgList) {
756         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
757         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
759         pthread_mutex_unlock(&MessageQ_module->gate);
760     }
761     else {
762         pthread_mutex_unlock(&MessageQ_module->gate);
763     }
764 #endif
766     if (timeout == MessageQ_FOREVER) {
767         sem_wait(&obj->synchronizer);
768     }
769     else {
770         gettimeofday(&tv, NULL);
771         ts.tv_sec = tv.tv_sec;
772         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
774         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
775             if (errno == ETIMEDOUT) {
776                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
778                 return MessageQ_E_TIMEOUT;
779             }
780         }
781     }
783     if (obj->unblocked) {
784         return MessageQ_E_UNBLOCKED;
785     }
787     pthread_mutex_lock(&MessageQ_module->gate);
789     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
790     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
792     pthread_mutex_unlock(&MessageQ_module->gate);
794     return status;
797 /*
798  * Return a count of the number of messages in the queue
799  *
800  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
801  */
802 Int MessageQ_count(MessageQ_Handle handle)
804     Int               count = -1;
805 #if 0
806     MessageQ_Object * obj   = (MessageQ_Object *) handle;
807     socklen_t         optlen;
809     /*
810      * TBD: Need to find a way to implement (if anyone uses it!), and
811      * push down into transport..
812      */
814     /*
815      * 2nd arg to getsockopt should be transport independent, but using
816      *  SSKPROTO_SHMFIFO for now:
817      */
818     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
819                  &count, &optlen);
820 #endif
822     return count;
825 /*
826  *  Initializes a message not obtained from MessageQ_alloc.
827  */
828 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
830     /* Fill in the fields of the message */
831     MessageQ_msgInit(msg);
832     msg->heapId = MessageQ_STATICMSG;
833     msg->msgSize = size;
836 /*
837  *  Allocate a message and initialize the needed fields (note some
838  *  of the fields in the header are set via other APIs or in the
839  *  MessageQ_put function,
840  */
841 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
843     MessageQ_Msg msg;
845     /*
846      * heapId not used for local alloc (as this is over a copy transport), but
847      * we need to send to other side as heapId is used in BIOS transport.
848      */
849     msg = (MessageQ_Msg)calloc(1, size);
850     MessageQ_msgInit(msg);
851     msg->msgSize = size;
852     msg->heapId = heapId;
854     return msg;
857 /*
858  *  Frees the message back to the heap that was used to allocate it.
859  */
860 Int MessageQ_free(MessageQ_Msg msg)
862     UInt32 status = MessageQ_S_SUCCESS;
864     /* Check to ensure this was not allocated by user: */
865     if (msg->heapId == MessageQ_STATICMSG) {
866         status = MessageQ_E_CANNOTFREESTATICMSG;
867     }
868     else {
869         free(msg);
870     }
872     return status;
875 /* Register a heap with MessageQ. */
876 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
878     Int status = MessageQ_S_SUCCESS;
880     /* Do nothing, as this uses a copy transport */
882     return status;
885 /* Unregister a heap with MessageQ. */
886 Int MessageQ_unregisterHeap(UInt16 heapId)
888     Int status = MessageQ_S_SUCCESS;
890     /* Do nothing, as this uses a copy transport */
892     return status;
895 /* Unblocks a MessageQ */
896 Void MessageQ_unblock(MessageQ_Handle handle)
898     MessageQ_Object *obj = (MessageQ_Object *)handle;
900     obj->unblocked = TRUE;
901     sem_post(&obj->synchronizer);
904 /* Embeds a source message queue into a message */
905 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
907     MessageQ_Object *obj = (MessageQ_Object *)handle;
909     msg->replyId = (UInt16)(obj->queue);
910     msg->replyProc = (UInt16)(obj->queue >> 16);
913 /* Returns the QueueId associated with the handle. */
914 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
916     MessageQ_Object *obj = (MessageQ_Object *) handle;
917     UInt32 queueId;
919     queueId = (obj->queue);
921     return queueId;
924 /* Sets the tracing of a message */
925 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
927     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
930 /*
931  *  Returns the amount of shared memory used by one transport instance.
932  *
933  *  The MessageQ module itself does not use any shared memory but the
934  *  underlying transport may use some shared memory.
935  */
936 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
938     SizeT memReq = 0u;
940     /* Do nothing, as this is a copy transport. */
942     return memReq;
945 /*
946  * This is a helper function to initialize a message.
947  */
948 Void MessageQ_msgInit(MessageQ_Msg msg)
950 #if 0
951     Int                 status    = MessageQ_S_SUCCESS;
952     LAD_ClientHandle handle;
953     struct LAD_CommandObj cmd;
954     union LAD_ResponseObj rsp;
956     handle = LAD_findHandle();
957     if (handle == LAD_MAXNUMCLIENTS) {
958         PRINTVERBOSE1(
959           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
960            getpid())
962         return;
963     }
965     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
966     cmd.clientId = handle;
968     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
969         PRINTVERBOSE1(
970           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
971         return;
972     }
974     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
975         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
976         return;
977     }
978     status = rsp.msgInit.status;
980     PRINTVERBOSE2(
981       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
982       handle, status)
984     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
985 #else
986     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
987     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
988     msg->msgId     = MessageQ_INVALIDMSGID;
989     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
990     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
991     msg->srcProc   = MultiProc_self();
993     pthread_mutex_lock(&MessageQ_module->gate);
994     msg->seqNum  = MessageQ_module->seqNum++;
995     pthread_mutex_unlock(&MessageQ_module->gate);
996 #endif
999 /*
1000  * Grow module's queues[] array to accommodate queueIndex from LAD
1001  */
1002 Void _MessageQ_grow(UInt16 queueIndex)
1004     MessageQ_Handle *queues;
1005     MessageQ_Handle *oldQueues;
1006     UInt oldSize;
1008     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1010     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1011     memcpy(queues, MessageQ_module->queues, oldSize);
1013     oldQueues = MessageQ_module->queues;
1014     MessageQ_module->queues = queues;
1015     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1017     free(oldQueues);
1019     return;