SDOCM00113609 Add version support for MessageQ_Params structure
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ITransport.h>
54 #include <IMessageQTransport.h>
55 #include <INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 /* Socket Protocol Family */
74 #include <net/rpmsg.h>
76 #include <ladclient.h>
77 #include <_lad.h>
79 /* =============================================================================
80  * Macros/Constants
81  * =============================================================================
82  */
84 /*!
85  *  @brief  Name of the reserved NameServer used for MessageQ.
86  */
87 #define MessageQ_NAMESERVER  "MessageQ"
89 #define MessageQ_MAXTRANSPORTS 8
91 #define MessageQ_GROWSIZE 32
93 /* Trace flag settings: */
94 #define TRACESHIFT    12
95 #define TRACEMASK     0x1000
97 /* Define BENCHMARK to quiet key MessageQ APIs: */
98 //#define BENCHMARK
100 /* =============================================================================
101  * Structures & Enums
102  * =============================================================================
103  */
105 /* params structure evolution */
106 typedef struct {
107     Void *synchronizer;
108 } MessageQ_Params_Legacy;
110 typedef struct {
111     Int __version;
112     Void *synchronizer;
113     MessageQ_QueueIndex queueIndex;
114 } MessageQ_Params_Version2;
116 /* structure for MessageQ module state */
117 typedef struct MessageQ_ModuleObject {
118     MessageQ_Handle           *queues;
119     Int                       numQueues;
120     Int                       refCount;
121     NameServer_Handle         nameServer;
122     pthread_mutex_t           gate;
123     int                       seqNum;
124     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
125     INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
126     MessageQ_PutHookFxn       putHookFxn;
127 } MessageQ_ModuleObject;
129 typedef struct MessageQ_CIRCLEQ_ENTRY {
130      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
131 } MessageQ_CIRCLEQ_ENTRY;
133 /*!
134  *  @brief  Structure for the Handle for the MessageQ.
135  */
136 typedef struct MessageQ_Object_tag {
137     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
138     MessageQ_Params              params;
139     MessageQ_QueueId             queue;
140     int                          unblocked;
141     void                         *serverHandle;
142     sem_t                        synchronizer;
143 } MessageQ_Object;
145 /* traces in this file are controlled via _MessageQ_verbose */
146 Bool _MessageQ_verbose = FALSE;
147 #define verbose _MessageQ_verbose
149 /* =============================================================================
150  *  Globals
151  * =============================================================================
152  */
153 static MessageQ_ModuleObject MessageQ_state =
155     .refCount   = 0,
156     .nameServer = NULL,
157     .putHookFxn = NULL
158 };
160 /*!
161  *  @var    MessageQ_module
162  *
163  *  @brief  Pointer to the MessageQ module state.
164  */
165 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
167 Void _MessageQ_grow(UInt16 queueIndex);
169 /* =============================================================================
170  * APIS
171  * =============================================================================
172  */
174 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
175                                 UInt16 rprocId, UInt priority)
177     Int status = FALSE;
179     if (handle == NULL) {
180         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
181               );
183         return status;
184     }
186     if (rprocId >= MultiProc_MAXPROCESSORS) {
187         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
189         return status;
190     }
192     if (MessageQ_module->transports[rprocId][priority] == NULL) {
193         MessageQ_module->transports[rprocId][priority] = handle;
195         status = TRUE;
196     }
198     return status;
201 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
203     if (inst == NULL) {
204         printf("MessageQ_registerTransportId: invalid NULL handle\n");
206         return MessageQ_E_INVALIDARG;
207     }
209     if (tid >= MessageQ_MAXTRANSPORTS) {
210         printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
212         return MessageQ_E_INVALIDARG;
213     }
215     if (MessageQ_module->transInst[tid] != NULL) {
216         printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
218         return MessageQ_E_ALREADYEXISTS;
219     }
221     MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
223     return MessageQ_S_SUCCESS;
226 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
228     if (rprocId >= MultiProc_MAXPROCESSORS) {
229         printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
231         return;
232     }
234     MessageQ_module->transports[rprocId][priority] = NULL;
237 Void MessageQ_unregisterTransportId(UInt tid)
239     if (tid >= MessageQ_MAXTRANSPORTS) {
240         printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
242         return;
243     }
245     MessageQ_module->transInst[tid] = NULL;
248 /*
249  * Function to get default configuration for the MessageQ module.
250  */
251 Void MessageQ_getConfig(MessageQ_Config *cfg)
253     Int status;
254     LAD_ClientHandle handle;
255     struct LAD_CommandObj cmd;
256     union LAD_ResponseObj rsp;
258     assert (cfg != NULL);
260     handle = LAD_findHandle();
261     if (handle == LAD_MAXNUMCLIENTS) {
262         PRINTVERBOSE1(
263           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
264            getpid())
266         return;
267     }
269     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
270     cmd.clientId = handle;
272     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
273         PRINTVERBOSE1(
274           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
275         return;
276     }
278     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
279         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
280                       status)
281         return;
282     }
283     status = rsp.messageQGetConfig.status;
285     PRINTVERBOSE2(
286       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
287       handle, status)
289     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
291     return;
294 /*
295  *  Function to setup the MessageQ module.
296  */
297 Int MessageQ_setup(const MessageQ_Config *cfg)
299     Int status;
300     LAD_ClientHandle handle;
301     struct LAD_CommandObj cmd;
302     union LAD_ResponseObj rsp;
303     Int pri;
304     Int rprocId;
305     Int tid;
307     pthread_mutex_lock(&MessageQ_module->gate);
309     MessageQ_module->refCount++;
310     if (MessageQ_module->refCount > 1) {
312         pthread_mutex_unlock(&MessageQ_module->gate);
314         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
315                       MessageQ_module->refCount)
317         return MessageQ_S_ALREADYSETUP;
318     }
320     pthread_mutex_unlock(&MessageQ_module->gate);
322     handle = LAD_findHandle();
323     if (handle == LAD_MAXNUMCLIENTS) {
324         PRINTVERBOSE1(
325           "MessageQ_setup: can't find connection to daemon for pid %d\n",
326            getpid())
328         return MessageQ_E_RESOURCE;
329     }
331     cmd.cmd = LAD_MESSAGEQ_SETUP;
332     cmd.clientId = handle;
333     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
335     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
336         PRINTVERBOSE1(
337           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
338         return MessageQ_E_FAIL;
339     }
341     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
342         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
343         return status;
344     }
345     status = rsp.setup.status;
347     PRINTVERBOSE2(
348       "MessageQ_setup: got LAD response for client %d, status=%d\n",
349       handle, status)
351     MessageQ_module->seqNum = 0;
352     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
353     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
354     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
355                                      sizeof (MessageQ_Handle));
357     pthread_mutex_init(&MessageQ_module->gate, NULL);
359     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
360         for (pri = 0; pri < 2; pri++) {
361             MessageQ_module->transports[rprocId][pri] = NULL;
362         }
363     }
364     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
365         MessageQ_module->transInst[tid] = NULL;
366     }
368     return status;
371 /*
372  *  MessageQ_destroy - destroy the MessageQ module.
373  */
374 Int MessageQ_destroy(void)
376     Int status;
377     LAD_ClientHandle handle;
378     struct LAD_CommandObj cmd;
379     union LAD_ResponseObj rsp;
381     handle = LAD_findHandle();
382     if (handle == LAD_MAXNUMCLIENTS) {
383         PRINTVERBOSE1(
384           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
385            getpid())
387         return MessageQ_E_RESOURCE;
388     }
390     cmd.cmd = LAD_MESSAGEQ_DESTROY;
391     cmd.clientId = handle;
393     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
394         PRINTVERBOSE1(
395           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
396         return MessageQ_E_FAIL;
397     }
399     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
400         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
401         return status;
402     }
403     status = rsp.status;
405     PRINTVERBOSE2(
406       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
407       handle, status)
409     return status;
412 /*
413  *  ======== MessageQ_Params_init ========
414  *  Legacy implementation.
415  */
416 Void MessageQ_Params_init(MessageQ_Params *params)
418     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
421 /*
422  *  ======== MessageQ_Params_init__S ========
423  *  New implementation which is version aware.
424  */
425 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
427     MessageQ_Params_Version2 *params2;
429     switch (version) {
431         case MessageQ_Params_VERSION_2:
432             params2 = (MessageQ_Params_Version2 *)params;
433             params2->__version = MessageQ_Params_VERSION_2;
434             params2->synchronizer = NULL;
435             params2->queueIndex = MessageQ_ANY;
436             break;
438         default:
439             assert(FALSE);
440             break;
441     }
444 /*
445  *  MessageQ_create - create a MessageQ object for receiving.
446  */
447 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
449     Int                   status;
450     MessageQ_Object      *obj = NULL;
451     IMessageQTransport_Handle transport;
452     INetworkTransport_Handle transInst;
453     UInt16                queueIndex;
454     UInt16                rprocId;
455     Int                   tid;
456     Int                   priority;
457     LAD_ClientHandle      handle;
458     struct LAD_CommandObj cmd;
459     union LAD_ResponseObj rsp;
460     MessageQ_Params ps;
462     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
464     /* copy the given params into the current params structure */
465     if (pp != NULL) {
467         /* snoop the params pointer to see if it's a legacy structure */
468         if ((pp->__version == 0) || (pp->__version > 100)) {
469             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
470         }
472         /* not legacy structure, use params version field */
473         else if (pp->__version == MessageQ_Params_VERSION_2) {
474             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
475             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
476             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
477         }
478         else {
479             assert(FALSE);
480         }
481     }
483     handle = LAD_findHandle();
484     if (handle == LAD_MAXNUMCLIENTS) {
485         PRINTVERBOSE1(
486           "MessageQ_create: can't find connection to daemon for pid %d\n",
487            getpid())
489         return NULL;
490     }
492     cmd.cmd = LAD_MESSAGEQ_CREATE;
493     cmd.clientId = handle;
495     if (name == NULL) {
496         cmd.args.messageQCreate.name[0] = '\0';
497     }
498     else {
499         strncpy(cmd.args.messageQCreate.name, name,
500                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
501         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
502     }
504     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
506     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
507         PRINTVERBOSE1(
508           "MessageQ_create: sending LAD command failed, status=%d\n", status)
509         return NULL;
510     }
512     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
513         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
514         return NULL;
515     }
516     status = rsp.messageQCreate.status;
518     PRINTVERBOSE2(
519       "MessageQ_create: got LAD response for client %d, status=%d\n",
520       handle, status)
522     if (status == -1) {
523        PRINTVERBOSE1(
524           "MessageQ_create: MessageQ server operation failed, status=%d\n",
525           status)
526        return NULL;
527     }
529     /* Create the generic obj */
530     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
532    /* Populate the params member */
533     memcpy(&obj->params, &ps, sizeof(ps));
535     queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
537     obj->queue = rsp.messageQCreate.queueId;
538     obj->serverHandle = rsp.messageQCreate.serverHandle;
539     CIRCLEQ_INIT(&obj->msgList);
540     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
541         PRINTVERBOSE1(
542           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
544         MessageQ_delete((MessageQ_Handle *)&obj);
546         return NULL;
547     }
549     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
551     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
552         for (priority = 0; priority < 2; priority++) {
553             transport = MessageQ_module->transports[rprocId][priority];
554             if (transport) {
555                 /* need to check return and do something if error */
556                 IMessageQTransport_bind((Void *)transport, obj->queue);
557             }
558         }
559     }
560     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
561         transInst = MessageQ_module->transInst[tid];
562         if (transInst) {
563             /* need to check return and do something if error */
564             INetworkTransport_bind((Void *)transInst, obj->queue);
565         }
566     }
568     /*
569      * Since LAD's MessageQ_module can grow, we need to be able to grow as well
570      */
571     if (queueIndex >= MessageQ_module->numQueues) {
572         _MessageQ_grow(queueIndex);
573     }
575     /*
576      * No need to "allocate" slot since the queueIndex returned by
577      * LAD is guaranteed to be unique.
578      */
579     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
581     return (MessageQ_Handle)obj;
584 /*
585  * MessageQ_delete - delete a MessageQ object.
586  */
587 Int MessageQ_delete(MessageQ_Handle *handlePtr)
589     MessageQ_Object *obj;
590     IMessageQTransport_Handle transport;
591     INetworkTransport_Handle transInst;
592     Int              status = MessageQ_S_SUCCESS;
593     UInt16           queueIndex;
594     UInt16                rprocId;
595     Int                   tid;
596     Int                   priority;
597     LAD_ClientHandle handle;
598     struct LAD_CommandObj cmd;
599     union LAD_ResponseObj rsp;
601     handle = LAD_findHandle();
602     if (handle == LAD_MAXNUMCLIENTS) {
603         PRINTVERBOSE1(
604           "MessageQ_delete: can't find connection to daemon for pid %d\n",
605            getpid())
607         return MessageQ_E_FAIL;
608     }
610     obj = (MessageQ_Object *)(*handlePtr);
612     cmd.cmd = LAD_MESSAGEQ_DELETE;
613     cmd.clientId = handle;
614     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
616     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
617         PRINTVERBOSE1(
618           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
619         return MessageQ_E_FAIL;
620     }
622     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
623         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
624         return MessageQ_E_FAIL;
625     }
626     status = rsp.messageQDelete.status;
628     PRINTVERBOSE2(
629       "MessageQ_delete: got LAD response for client %d, status=%d\n",
630       handle, status)
632     for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
633         for (priority = 0; priority < 2; priority++) {
634             transport = MessageQ_module->transports[rprocId][priority];
635             if (transport) {
636                 IMessageQTransport_unbind((Void *)transport, obj->queue);
637             }
638         }
639     }
640     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
641         transInst = MessageQ_module->transInst[tid];
642         if (transInst) {
643             INetworkTransport_unbind((Void *)transInst, obj->queue);
644         }
645     }
647     queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
648     MessageQ_module->queues[queueIndex] = NULL;
650     free(obj);
651     *handlePtr = NULL;
653     return status;
656 /*
657  *  MessageQ_open - Opens an instance of MessageQ for sending.
658  */
659 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
661     Int status = MessageQ_S_SUCCESS;
663     status = NameServer_getUInt32(MessageQ_module->nameServer,
664                                   name, queueId, NULL);
666     if (status == NameServer_E_NOTFOUND) {
667         /* Set return queue ID to invalid */
668         *queueId = MessageQ_INVALIDMESSAGEQ;
669         status = MessageQ_E_NOTFOUND;
670     }
671     else if (status >= 0) {
672         /* Override with a MessageQ status code */
673         status = MessageQ_S_SUCCESS;
674     }
675     else {
676         /* Set return queue ID to invalid */
677         *queueId = MessageQ_INVALIDMESSAGEQ;
679         /* Override with a MessageQ status code */
680         if (status == NameServer_E_TIMEOUT) {
681             status = MessageQ_E_TIMEOUT;
682         }
683         else {
684             status = MessageQ_E_FAIL;
685         }
686     }
688     return status;
691 /*
692  *  MessageQ_close - Closes previously opened instance of MessageQ.
693  */
694 Int MessageQ_close(MessageQ_QueueId *queueId)
696     Int32 status = MessageQ_S_SUCCESS;
698     /* Nothing more to be done for closing the MessageQ. */
699     *queueId = MessageQ_INVALIDMESSAGEQ;
701     return status;
704 /*
705  * MessageQ_put - place a message onto a message queue.
706  *
707  * Calls transport's put(), which handles the sending of the message using the
708  * appropriate kernel interface (socket, device ioctl) call for the remote
709  * procId encoded in the queueId argument.
710  *
711  */
712 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
714     MessageQ_Object *obj;
715     UInt16   dstProcId  = (UInt16)(queueId >> 16);
716     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
717     Int      status = MessageQ_S_SUCCESS;
718     ITransport_Handle transport;
719     IMessageQTransport_Handle msgTrans;
720     INetworkTransport_Handle netTrans;
721     Int priority;
722     UInt tid;
724     msg->dstId     = queueIndex;
725     msg->dstProc   = dstProcId;
727     /* invoke put hook function after addressing the message */
728     if (MessageQ_module->putHookFxn != NULL) {
729         MessageQ_module->putHookFxn(queueId, msg);
730     }
732     if (dstProcId != MultiProc_self()) {
733         tid = MessageQ_getTransportId(msg);
734         if (tid == 0) {
735             priority = MessageQ_getMsgPri(msg);
736             msgTrans = MessageQ_module->transports[dstProcId][priority];
738             IMessageQTransport_put(msgTrans, (Ptr)msg);
739         }
740         else {
741             if (tid >= MessageQ_MAXTRANSPORTS) {
742                 printf("MessageQ_put: transport id %d too big, must be < %d\n",
743                        tid, MessageQ_MAXTRANSPORTS);
745                 return MessageQ_E_FAIL;
746             }
748             /* use secondary transport */
749             netTrans = MessageQ_module->transInst[tid];
750             transport = INetworkTransport_upCast(netTrans);
752             /* downcast instance pointer to transport interface */
753             switch (ITransport_itype(transport)) {
754                 case INetworkTransport_TypeId:
755                     INetworkTransport_put(netTrans, (Ptr)msg);
757                     break;
759                 default:
760                     /* error */
761                     printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
763                     status = MessageQ_E_FAIL;
765                     break;
766             }
767         }
768     }
769     else {
770         obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
772         pthread_mutex_lock(&MessageQ_module->gate);
774         /* It is a local MessageQ */
775         CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
777         pthread_mutex_unlock(&MessageQ_module->gate);
779         sem_post(&obj->synchronizer);
780     }
782     return status;
785 /*
786  *  MessageQ_get - gets a message for a message queue and blocks if
787  *  the queue is empty.
788  *
789  *  If a message is present, it returns it.  Otherwise it blocks
790  *  waiting for a message to arrive.
791  *  When a message is returned, it is owned by the caller.
792  */
793 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
795     MessageQ_Object * obj = (MessageQ_Object *)handle;
796     Int     status = MessageQ_S_SUCCESS;
797     struct timespec ts;
798     struct timeval tv;
800 #if 0
801 /*
802  * Optimization here to get a message without going in to the sem
803  * operation, but the sem count will not be maintained properly.
804  */
805     pthread_mutex_lock(&MessageQ_module->gate);
807     if (obj->msgList.cqh_first != &obj->msgList) {
808         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
809         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
811         pthread_mutex_unlock(&MessageQ_module->gate);
812     }
813     else {
814         pthread_mutex_unlock(&MessageQ_module->gate);
815     }
816 #endif
818     if (timeout == MessageQ_FOREVER) {
819         sem_wait(&obj->synchronizer);
820     }
821     else {
822         gettimeofday(&tv, NULL);
823         ts.tv_sec = tv.tv_sec;
824         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
826         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
827             if (errno == ETIMEDOUT) {
828                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
830                 return MessageQ_E_TIMEOUT;
831             }
832         }
833     }
835     if (obj->unblocked) {
836         return MessageQ_E_UNBLOCKED;
837     }
839     pthread_mutex_lock(&MessageQ_module->gate);
841     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
842     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
844     pthread_mutex_unlock(&MessageQ_module->gate);
846     return status;
849 /*
850  * Return a count of the number of messages in the queue
851  *
852  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
853  */
854 Int MessageQ_count(MessageQ_Handle handle)
856     Int               count = -1;
857 #if 0
858     MessageQ_Object * obj   = (MessageQ_Object *) handle;
859     socklen_t         optlen;
861     /*
862      * TBD: Need to find a way to implement (if anyone uses it!), and
863      * push down into transport..
864      */
866     /*
867      * 2nd arg to getsockopt should be transport independent, but using
868      *  SSKPROTO_SHMFIFO for now:
869      */
870     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
871                  &count, &optlen);
872 #endif
874     return count;
877 /*
878  *  Initializes a message not obtained from MessageQ_alloc.
879  */
880 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
882     /* Fill in the fields of the message */
883     MessageQ_msgInit(msg);
884     msg->heapId = MessageQ_STATICMSG;
885     msg->msgSize = size;
888 /*
889  *  Allocate a message and initialize the needed fields (note some
890  *  of the fields in the header are set via other APIs or in the
891  *  MessageQ_put function,
892  */
893 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
895     MessageQ_Msg msg;
897     /*
898      * heapId not used for local alloc (as this is over a copy transport), but
899      * we need to send to other side as heapId is used in BIOS transport.
900      */
901     msg = (MessageQ_Msg)calloc(1, size);
902     MessageQ_msgInit(msg);
903     msg->msgSize = size;
904     msg->heapId = heapId;
906     return msg;
909 /*
910  *  Frees the message back to the heap that was used to allocate it.
911  */
912 Int MessageQ_free(MessageQ_Msg msg)
914     UInt32 status = MessageQ_S_SUCCESS;
916     /* Check to ensure this was not allocated by user: */
917     if (msg->heapId == MessageQ_STATICMSG) {
918         status = MessageQ_E_CANNOTFREESTATICMSG;
919     }
920     else {
921         free(msg);
922     }
924     return status;
927 /* Register a heap with MessageQ. */
928 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
930     Int status = MessageQ_S_SUCCESS;
932     /* Do nothing, as this uses a copy transport */
934     return status;
937 /* Unregister a heap with MessageQ. */
938 Int MessageQ_unregisterHeap(UInt16 heapId)
940     Int status = MessageQ_S_SUCCESS;
942     /* Do nothing, as this uses a copy transport */
944     return status;
947 /* Unblocks a MessageQ */
948 Void MessageQ_unblock(MessageQ_Handle handle)
950     MessageQ_Object *obj = (MessageQ_Object *)handle;
952     obj->unblocked = TRUE;
953     sem_post(&obj->synchronizer);
956 /* Embeds a source message queue into a message */
957 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
959     MessageQ_Object *obj = (MessageQ_Object *)handle;
961     msg->replyId = (UInt16)(obj->queue);
962     msg->replyProc = (UInt16)(obj->queue >> 16);
965 /* Returns the QueueId associated with the handle. */
966 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
968     MessageQ_Object *obj = (MessageQ_Object *) handle;
969     UInt32 queueId;
971     queueId = (obj->queue);
973     return queueId;
976 /* Sets the tracing of a message */
977 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
979     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
982 /*
983  *  Returns the amount of shared memory used by one transport instance.
984  *
985  *  The MessageQ module itself does not use any shared memory but the
986  *  underlying transport may use some shared memory.
987  */
988 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
990     SizeT memReq = 0u;
992     /* Do nothing, as this is a copy transport. */
994     return memReq;
997 /*
998  * This is a helper function to initialize a message.
999  */
1000 Void MessageQ_msgInit(MessageQ_Msg msg)
1002 #if 0
1003     Int                 status    = MessageQ_S_SUCCESS;
1004     LAD_ClientHandle handle;
1005     struct LAD_CommandObj cmd;
1006     union LAD_ResponseObj rsp;
1008     handle = LAD_findHandle();
1009     if (handle == LAD_MAXNUMCLIENTS) {
1010         PRINTVERBOSE1(
1011           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1012            getpid())
1014         return;
1015     }
1017     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1018     cmd.clientId = handle;
1020     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1021         PRINTVERBOSE1(
1022           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1023         return;
1024     }
1026     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1027         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1028         return;
1029     }
1030     status = rsp.msgInit.status;
1032     PRINTVERBOSE2(
1033       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1034       handle, status)
1036     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1037 #else
1038     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1039     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1040     msg->msgId     = MessageQ_INVALIDMSGID;
1041     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1042     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1043     msg->srcProc   = MultiProc_self();
1045     pthread_mutex_lock(&MessageQ_module->gate);
1046     msg->seqNum  = MessageQ_module->seqNum++;
1047     pthread_mutex_unlock(&MessageQ_module->gate);
1048 #endif
1051 /*
1052  * Grow module's queues[] array to accommodate queueIndex from LAD
1053  */
1054 Void _MessageQ_grow(UInt16 queueIndex)
1056     MessageQ_Handle *queues;
1057     MessageQ_Handle *oldQueues;
1058     UInt oldSize;
1060     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1062     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1063     memcpy(queues, MessageQ_module->queues, oldSize);
1065     oldQueues = MessageQ_module->queues;
1066     MessageQ_module->queues = queues;
1067     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1069     free(oldQueues);
1071     return;