7a20a7821aa629b0af73ccc8283ad38e6a3744a7
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <_MessageQ.h>
52 #include <ITransport.h>
53 #include <IMessageQTransport.h>
54 #include <INetworkTransport.h>
56 /* Socket Headers */
57 #include <sys/select.h>
58 #include <sys/time.h>
59 #include <sys/types.h>
60 #include <sys/param.h>
61 #include <sys/eventfd.h>
62 #include <sys/queue.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <string.h>
66 #include <stdlib.h>
67 #include <unistd.h>
68 #include <assert.h>
69 #include <pthread.h>
70 #include <semaphore.h>
72 /* Socket Protocol Family */
73 #include <net/rpmsg.h>
75 #include <ladclient.h>
76 #include <_lad.h>
78 /* =============================================================================
79  * Macros/Constants
80  * =============================================================================
81  */
83 /*!
84  *  @brief  Name of the reserved NameServer used for MessageQ.
85  */
86 #define MessageQ_NAMESERVER  "MessageQ"
88 #define MessageQ_MAXTRANSPORTS 8
90 #define MessageQ_GROWSIZE 32
92 /* Trace flag settings: */
93 #define TRACESHIFT    12
94 #define TRACEMASK     0x1000
96 /* Define BENCHMARK to quiet key MessageQ APIs: */
97 //#define BENCHMARK
99 /* =============================================================================
100  * Structures & Enums
101  * =============================================================================
102  */
104 /* structure for MessageQ module state */
105 typedef struct MessageQ_ModuleObject {
106     MessageQ_Handle           *queues;
107     Int                       numQueues;
108     Int                       refCount;
109     NameServer_Handle         nameServer;
110     pthread_mutex_t           gate;
111     MessageQ_Params           defaultInstParams;
112     int                       seqNum;
113     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
114     INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
115 } MessageQ_ModuleObject;
117 typedef struct MessageQ_CIRCLEQ_ENTRY {
118      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
119 } MessageQ_CIRCLEQ_ENTRY;
121 /*!
122  *  @brief  Structure for the Handle for the MessageQ.
123  */
124 typedef struct MessageQ_Object_tag {
125     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
126     MessageQ_Params              params;
127     MessageQ_QueueId             queue;
128     int                          unblocked;
129     void                         *serverHandle;
130     sem_t                        synchronizer;
131 } MessageQ_Object;
133 /* traces in this file are controlled via _MessageQ_verbose */
134 Bool _MessageQ_verbose = FALSE;
135 #define verbose _MessageQ_verbose
137 /* =============================================================================
138  *  Globals
139  * =============================================================================
140  */
141 static MessageQ_ModuleObject MessageQ_state =
143     .refCount   = 0,
144     .nameServer = NULL,
145 };
147 /*!
148  *  @var    MessageQ_module
149  *
150  *  @brief  Pointer to the MessageQ module state.
151  */
152 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
154 Void _MessageQ_grow(UInt16 queueIndex);
156 /* =============================================================================
157  * APIS
158  * =============================================================================
159  */
161 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
162                                 UInt16 rprocId, UInt priority)
164     Int status = FALSE;
166     if (handle == NULL) {
167         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
168               );
170         return status;
171     }
173     if (rprocId >= MultiProc_MAXPROCESSORS) {
174         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
176         return status;
177     }
179     if (MessageQ_module->transports[rprocId][priority] == NULL) {
180         MessageQ_module->transports[rprocId][priority] = handle;
182         status = TRUE;
183     }
185     return status;
188 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
190     if (inst == NULL) {
191         printf("MessageQ_registerTransportId: invalid NULL handle\n");
193         return MessageQ_E_INVALIDARG;
194     }
196     if (tid >= MessageQ_MAXTRANSPORTS) {
197         printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
199         return MessageQ_E_INVALIDARG;
200     }
202     if (MessageQ_module->transInst[tid] != NULL) {
203         printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
205         return MessageQ_E_ALREADYEXISTS;
206     }
208     MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
210     return MessageQ_S_SUCCESS;
213 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
215     if (rprocId >= MultiProc_MAXPROCESSORS) {
216         printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
218         return;
219     }
221     MessageQ_module->transports[rprocId][priority] = NULL;
224 Void MessageQ_unregisterTransportId(UInt tid)
226     if (tid >= MessageQ_MAXTRANSPORTS) {
227         printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
229         return;
230     }
232     MessageQ_module->transInst[tid] = NULL;
235 /*
236  * Function to get default configuration for the MessageQ module.
237  */
238 Void MessageQ_getConfig(MessageQ_Config *cfg)
240     Int status;
241     LAD_ClientHandle handle;
242     struct LAD_CommandObj cmd;
243     union LAD_ResponseObj rsp;
245     assert (cfg != NULL);
247     handle = LAD_findHandle();
248     if (handle == LAD_MAXNUMCLIENTS) {
249         PRINTVERBOSE1(
250           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
251            getpid())
253         return;
254     }
256     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
257     cmd.clientId = handle;
259     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
260         PRINTVERBOSE1(
261           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
262         return;
263     }
265     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
266         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
267                       status)
268         return;
269     }
270     status = rsp.messageQGetConfig.status;
272     PRINTVERBOSE2(
273       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
274       handle, status)
276     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
278     return;
281 /*
282  *  Function to setup the MessageQ module.
283  */
284 Int MessageQ_setup(const MessageQ_Config *cfg)
286     Int status;
287     LAD_ClientHandle handle;
288     struct LAD_CommandObj cmd;
289     union LAD_ResponseObj rsp;
290     Int pri;
291     Int rprocId;
292     Int tid;
294     pthread_mutex_lock(&MessageQ_module->gate);
296     MessageQ_module->refCount++;
297     if (MessageQ_module->refCount > 1) {
299         pthread_mutex_unlock(&MessageQ_module->gate);
301         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
302                       MessageQ_module->refCount)
304         return MessageQ_S_ALREADYSETUP;
305     }
307     pthread_mutex_unlock(&MessageQ_module->gate);
309     handle = LAD_findHandle();
310     if (handle == LAD_MAXNUMCLIENTS) {
311         PRINTVERBOSE1(
312           "MessageQ_setup: can't find connection to daemon for pid %d\n",
313            getpid())
315         return MessageQ_E_RESOURCE;
316     }
318     cmd.cmd = LAD_MESSAGEQ_SETUP;
319     cmd.clientId = handle;
320     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
322     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
323         PRINTVERBOSE1(
324           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
325         return MessageQ_E_FAIL;
326     }
328     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
329         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
330         return status;
331     }
332     status = rsp.setup.status;
334     PRINTVERBOSE2(
335       "MessageQ_setup: got LAD response for client %d, status=%d\n",
336       handle, status)
338     MessageQ_module->seqNum = 0;
339     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
340     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
341     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
342                                      sizeof (MessageQ_Handle));
344     pthread_mutex_init(&MessageQ_module->gate, NULL);
346     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
347         for (pri = 0; pri < 2; pri++) {
348             MessageQ_module->transports[rprocId][pri] = NULL;
349         }
350     }
351     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
352         MessageQ_module->transInst[tid] = NULL;
353     }
355     return status;
358 /*
359  *  MessageQ_destroy - destroy the MessageQ module.
360  */
361 Int MessageQ_destroy(void)
363     Int status;
364     LAD_ClientHandle handle;
365     struct LAD_CommandObj cmd;
366     union LAD_ResponseObj rsp;
368     handle = LAD_findHandle();
369     if (handle == LAD_MAXNUMCLIENTS) {
370         PRINTVERBOSE1(
371           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
372            getpid())
374         return MessageQ_E_RESOURCE;
375     }
377     cmd.cmd = LAD_MESSAGEQ_DESTROY;
378     cmd.clientId = handle;
380     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
381         PRINTVERBOSE1(
382           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
383         return MessageQ_E_FAIL;
384     }
386     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
387         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
388         return status;
389     }
390     status = rsp.status;
392     PRINTVERBOSE2(
393       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
394       handle, status)
396     return status;
400 /* Function to initialize the parameters for the MessageQ instance. */
401 Void MessageQ_Params_init(MessageQ_Params *params)
403     memcpy (params, &(MessageQ_module->defaultInstParams),
404             sizeof (MessageQ_Params));
406     return;
409 /*
410  *  MessageQ_create - create a MessageQ object for receiving.
411  */
412 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
414     Int                   status;
415     MessageQ_Object      *obj = NULL;
416     IMessageQTransport_Handle transport;
417     INetworkTransport_Handle transInst;
418     UInt16                queueIndex;
419     UInt16                rprocId;
420     Int                   tid;
421     Int                   priority;
422     LAD_ClientHandle      handle;
423     struct LAD_CommandObj cmd;
424     union LAD_ResponseObj rsp;
426     handle = LAD_findHandle();
427     if (handle == LAD_MAXNUMCLIENTS) {
428         PRINTVERBOSE1(
429           "MessageQ_create: can't find connection to daemon for pid %d\n",
430            getpid())
432         return NULL;
433     }
435     cmd.cmd = LAD_MESSAGEQ_CREATE;
436     cmd.clientId = handle;
437     if (name == NULL) {
438         cmd.args.messageQCreate.name[0] = '\0';
439     }
440     else {
441         strncpy(cmd.args.messageQCreate.name, name,
442                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
443         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
444     }
446     if (params) {
447         memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
448     }
450     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
451         PRINTVERBOSE1(
452           "MessageQ_create: sending LAD command failed, status=%d\n", status)
453         return NULL;
454     }
456     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
457         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
458         return NULL;
459     }
460     status = rsp.messageQCreate.status;
462     PRINTVERBOSE2(
463       "MessageQ_create: got LAD response for client %d, status=%d\n",
464       handle, status)
466     if (status == -1) {
467        PRINTVERBOSE1(
468           "MessageQ_create: MessageQ server operation failed, status=%d\n",
469           status)
470        return NULL;
471     }
473     /* Create the generic obj */
474     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
476     if (params != NULL) {
477        /* Populate the params member */
478         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
479     }
481     queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
483     obj->queue = rsp.messageQCreate.queueId;
484     obj->serverHandle = rsp.messageQCreate.serverHandle;
485     CIRCLEQ_INIT(&obj->msgList);
486     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
487         PRINTVERBOSE1(
488           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
490         MessageQ_delete((MessageQ_Handle *)&obj);
492         return NULL;
493     }
495     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
497     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
498         for (priority = 0; priority < 2; priority++) {
499             transport = MessageQ_module->transports[rprocId][priority];
500             if (transport) {
501                 /* need to check return and do something if error */
502                 IMessageQTransport_bind((Void *)transport, obj->queue);
503             }
504         }
505     }
506     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
507         transInst = MessageQ_module->transInst[tid];
508         if (transInst) {
509             /* need to check return and do something if error */
510             INetworkTransport_bind((Void *)transInst, obj->queue);
511         }
512     }
514     /*
515      * Since LAD's MessageQ_module can grow, we need to be able to grow as well
516      */
517     if (queueIndex >= MessageQ_module->numQueues) {
518         _MessageQ_grow(queueIndex);
519     }
521     /*
522      * No need to "allocate" slot since the queueIndex returned by
523      * LAD is guaranteed to be unique.
524      */
525     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
527     return (MessageQ_Handle)obj;
530 /*
531  * MessageQ_delete - delete a MessageQ object.
532  */
533 Int MessageQ_delete(MessageQ_Handle *handlePtr)
535     MessageQ_Object *obj;
536     IMessageQTransport_Handle transport;
537     INetworkTransport_Handle transInst;
538     Int              status = MessageQ_S_SUCCESS;
539     UInt16           queueIndex;
540     UInt16                rprocId;
541     Int                   tid;
542     Int                   priority;
543     LAD_ClientHandle handle;
544     struct LAD_CommandObj cmd;
545     union LAD_ResponseObj rsp;
547     handle = LAD_findHandle();
548     if (handle == LAD_MAXNUMCLIENTS) {
549         PRINTVERBOSE1(
550           "MessageQ_delete: can't find connection to daemon for pid %d\n",
551            getpid())
553         return MessageQ_E_FAIL;
554     }
556     obj = (MessageQ_Object *)(*handlePtr);
558     cmd.cmd = LAD_MESSAGEQ_DELETE;
559     cmd.clientId = handle;
560     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
562     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
563         PRINTVERBOSE1(
564           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
565         return MessageQ_E_FAIL;
566     }
568     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
569         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
570         return MessageQ_E_FAIL;
571     }
572     status = rsp.messageQDelete.status;
574     PRINTVERBOSE2(
575       "MessageQ_delete: got LAD response for client %d, status=%d\n",
576       handle, status)
578     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
579         for (priority = 0; priority < 2; priority++) {
580             transport = MessageQ_module->transports[rprocId][priority];
581             if (transport) {
582                 IMessageQTransport_unbind((Void *)transport, obj->queue);
583             }
584         }
585     }
586     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
587         transInst = MessageQ_module->transInst[tid];
588         if (transInst) {
589             INetworkTransport_unbind((Void *)transInst, obj->queue);
590         }
591     }
593     queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
594     MessageQ_module->queues[queueIndex] = NULL;
596     free(obj);
597     *handlePtr = NULL;
599     return status;
602 /*
603  *  MessageQ_open - Opens an instance of MessageQ for sending.
604  */
605 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
607     Int status = MessageQ_S_SUCCESS;
609     status = NameServer_getUInt32(MessageQ_module->nameServer,
610                                   name, queueId, NULL);
612     if (status == NameServer_E_NOTFOUND) {
613         /* Set return queue ID to invalid */
614         *queueId = MessageQ_INVALIDMESSAGEQ;
615         status = MessageQ_E_NOTFOUND;
616     }
617     else if (status >= 0) {
618         /* Override with a MessageQ status code */
619         status = MessageQ_S_SUCCESS;
620     }
621     else {
622         /* Set return queue ID to invalid */
623         *queueId = MessageQ_INVALIDMESSAGEQ;
625         /* Override with a MessageQ status code */
626         if (status == NameServer_E_TIMEOUT) {
627             status = MessageQ_E_TIMEOUT;
628         }
629         else {
630             status = MessageQ_E_FAIL;
631         }
632     }
634     return status;
637 /*
638  *  MessageQ_close - Closes previously opened instance of MessageQ.
639  */
640 Int MessageQ_close(MessageQ_QueueId *queueId)
642     Int32 status = MessageQ_S_SUCCESS;
644     /* Nothing more to be done for closing the MessageQ. */
645     *queueId = MessageQ_INVALIDMESSAGEQ;
647     return status;
650 /*
651  * MessageQ_put - place a message onto a message queue.
652  *
653  * Calls transport's put(), which handles the sending of the message using the
654  * appropriate kernel interface (socket, device ioctl) call for the remote
655  * procId encoded in the queueId argument.
656  *
657  */
658 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
660     MessageQ_Object *obj;
661     UInt16   dstProcId  = (UInt16)(queueId >> 16);
662     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
663     Int      status = MessageQ_S_SUCCESS;
664     ITransport_Handle transport;
665     IMessageQTransport_Handle msgTrans;
666     INetworkTransport_Handle netTrans;
667     Int priority;
668     UInt tid;
670     msg->dstId     = queueIndex;
671     msg->dstProc   = dstProcId;
673     if (dstProcId != MultiProc_self()) {
674         tid = MessageQ_getTransportId(msg);
675         if (tid == 0) {
676             priority = MessageQ_getMsgPri(msg);
677             msgTrans = MessageQ_module->transports[dstProcId][priority];
679             IMessageQTransport_put(msgTrans, (Ptr)msg);
680         }
681         else {
682             if (tid >= MessageQ_MAXTRANSPORTS) {
683                 printf("MessageQ_put: transport id %d too big, must be < %d\n",
684                        tid, MessageQ_MAXTRANSPORTS);
686                 return MessageQ_E_FAIL;
687             }
689             /* use secondary transport */
690             netTrans = MessageQ_module->transInst[tid];
691             transport = INetworkTransport_upCast(netTrans);
693             /* downcast instance pointer to transport interface */
694             switch (ITransport_itype(transport)) {
695                 case INetworkTransport_TypeId:
696                     INetworkTransport_put(netTrans, (Ptr)msg);
698                     break;
700                 default:
701                     /* error */
702                     printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
704                     status = MessageQ_E_FAIL;
706                     break;
707             }
708         }
709     }
710     else {
711         obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
713         pthread_mutex_lock(&MessageQ_module->gate);
715         /* It is a local MessageQ */
716         CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
718         pthread_mutex_unlock(&MessageQ_module->gate);
720         sem_post(&obj->synchronizer);
721     }
723     return status;
726 /*
727  *  MessageQ_get - gets a message for a message queue and blocks if
728  *  the queue is empty.
729  *
730  *  If a message is present, it returns it.  Otherwise it blocks
731  *  waiting for a message to arrive.
732  *  When a message is returned, it is owned by the caller.
733  */
734 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
736     MessageQ_Object * obj = (MessageQ_Object *)handle;
737     Int     status = MessageQ_S_SUCCESS;
738     struct timespec ts;
739     struct timeval tv;
741 #if 0
742 /*
743  * Optimization here to get a message without going in to the sem
744  * operation, but the sem count will not be maintained properly.
745  */
746     pthread_mutex_lock(&MessageQ_module->gate);
748     if (obj->msgList.cqh_first != &obj->msgList) {
749         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
750         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
752         pthread_mutex_unlock(&MessageQ_module->gate);
753     }
754     else {
755         pthread_mutex_unlock(&MessageQ_module->gate);
756     }
757 #endif
759     if (timeout == MessageQ_FOREVER) {
760         sem_wait(&obj->synchronizer);
761     }
762     else {
763         gettimeofday(&tv, NULL);
764         ts.tv_sec = tv.tv_sec;
765         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
767         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
768             if (errno == ETIMEDOUT) {
769                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
771                 return MessageQ_E_TIMEOUT;
772             }
773         }
774     }
776     if (obj->unblocked) {
777         return MessageQ_E_UNBLOCKED;
778     }
780     pthread_mutex_lock(&MessageQ_module->gate);
782     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
783     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
785     pthread_mutex_unlock(&MessageQ_module->gate);
787     return status;
790 /*
791  * Return a count of the number of messages in the queue
792  *
793  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
794  */
795 Int MessageQ_count(MessageQ_Handle handle)
797     Int               count = -1;
798 #if 0
799     MessageQ_Object * obj   = (MessageQ_Object *) handle;
800     socklen_t         optlen;
802     /*
803      * TBD: Need to find a way to implement (if anyone uses it!), and
804      * push down into transport..
805      */
807     /*
808      * 2nd arg to getsockopt should be transport independent, but using
809      *  SSKPROTO_SHMFIFO for now:
810      */
811     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
812                  &count, &optlen);
813 #endif
815     return count;
818 /*
819  *  Initializes a message not obtained from MessageQ_alloc.
820  */
821 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
823     /* Fill in the fields of the message */
824     MessageQ_msgInit(msg);
825     msg->heapId = MessageQ_STATICMSG;
826     msg->msgSize = size;
829 /*
830  *  Allocate a message and initialize the needed fields (note some
831  *  of the fields in the header are set via other APIs or in the
832  *  MessageQ_put function,
833  */
834 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
836     MessageQ_Msg msg;
838     /*
839      * heapId not used for local alloc (as this is over a copy transport), but
840      * we need to send to other side as heapId is used in BIOS transport.
841      */
842     msg = (MessageQ_Msg)calloc(1, size);
843     MessageQ_msgInit(msg);
844     msg->msgSize = size;
845     msg->heapId = heapId;
847     return msg;
850 /*
851  *  Frees the message back to the heap that was used to allocate it.
852  */
853 Int MessageQ_free(MessageQ_Msg msg)
855     UInt32 status = MessageQ_S_SUCCESS;
857     /* Check to ensure this was not allocated by user: */
858     if (msg->heapId == MessageQ_STATICMSG) {
859         status = MessageQ_E_CANNOTFREESTATICMSG;
860     }
861     else {
862         free(msg);
863     }
865     return status;
868 /* Register a heap with MessageQ. */
869 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
871     Int status = MessageQ_S_SUCCESS;
873     /* Do nothing, as this uses a copy transport */
875     return status;
878 /* Unregister a heap with MessageQ. */
879 Int MessageQ_unregisterHeap(UInt16 heapId)
881     Int status = MessageQ_S_SUCCESS;
883     /* Do nothing, as this uses a copy transport */
885     return status;
888 /* Unblocks a MessageQ */
889 Void MessageQ_unblock(MessageQ_Handle handle)
891     MessageQ_Object *obj = (MessageQ_Object *)handle;
893     obj->unblocked = TRUE;
894     sem_post(&obj->synchronizer);
897 /* Embeds a source message queue into a message */
898 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
900     MessageQ_Object *obj = (MessageQ_Object *)handle;
902     msg->replyId = (UInt16)(obj->queue);
903     msg->replyProc = (UInt16)(obj->queue >> 16);
906 /* Returns the QueueId associated with the handle. */
907 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
909     MessageQ_Object *obj = (MessageQ_Object *) handle;
910     UInt32 queueId;
912     queueId = (obj->queue);
914     return queueId;
917 /* Sets the tracing of a message */
918 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
920     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
923 /*
924  *  Returns the amount of shared memory used by one transport instance.
925  *
926  *  The MessageQ module itself does not use any shared memory but the
927  *  underlying transport may use some shared memory.
928  */
929 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
931     SizeT memReq = 0u;
933     /* Do nothing, as this is a copy transport. */
935     return memReq;
938 /*
939  * This is a helper function to initialize a message.
940  */
941 Void MessageQ_msgInit(MessageQ_Msg msg)
943 #if 0
944     Int                 status    = MessageQ_S_SUCCESS;
945     LAD_ClientHandle handle;
946     struct LAD_CommandObj cmd;
947     union LAD_ResponseObj rsp;
949     handle = LAD_findHandle();
950     if (handle == LAD_MAXNUMCLIENTS) {
951         PRINTVERBOSE1(
952           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
953            getpid())
955         return;
956     }
958     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
959     cmd.clientId = handle;
961     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
962         PRINTVERBOSE1(
963           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
964         return;
965     }
967     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
968         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
969         return;
970     }
971     status = rsp.msgInit.status;
973     PRINTVERBOSE2(
974       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
975       handle, status)
977     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
978 #else
979     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
980     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
981     msg->msgId     = MessageQ_INVALIDMSGID;
982     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
983     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
984     msg->srcProc   = MultiProc_self();
986     pthread_mutex_lock(&MessageQ_module->gate);
987     msg->seqNum  = MessageQ_module->seqNum++;
988     pthread_mutex_unlock(&MessageQ_module->gate);
989 #endif
992 /*
993  * Grow module's queues[] array to accommodate queueIndex from LAD
994  */
995 Void _MessageQ_grow(UInt16 queueIndex)
997     MessageQ_Handle *queues;
998     MessageQ_Handle *oldQueues;
999     UInt oldSize;
1001     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1003     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1004     memcpy(queues, MessageQ_module->queues, oldSize);
1006     oldQueues = MessageQ_module->queues;
1007     MessageQ_module->queues = queues;
1008     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1010     free(oldQueues);
1012     return;