Rework MessageQ_put to prioritize transport if given
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 /* Socket Protocol Family */
74 #include <net/rpmsg.h>
76 #include <ladclient.h>
77 #include <_lad.h>
79 /* =============================================================================
80  * Macros/Constants
81  * =============================================================================
82  */
84 /*!
85  *  @brief  Name of the reserved NameServer used for MessageQ.
86  */
87 #define MessageQ_NAMESERVER  "MessageQ"
89 #define MessageQ_MAXTRANSPORTS 8
91 #define MessageQ_GROWSIZE 32
93 /* Trace flag settings: */
94 #define TRACESHIFT    12
95 #define TRACEMASK     0x1000
97 /* Define BENCHMARK to quiet key MessageQ APIs: */
98 //#define BENCHMARK
100 /* =============================================================================
101  * Structures & Enums
102  * =============================================================================
103  */
105 /* params structure evolution */
106 typedef struct {
107     Void *synchronizer;
108 } MessageQ_Params_Legacy;
110 typedef struct {
111     Int __version;
112     Void *synchronizer;
113     MessageQ_QueueIndex queueIndex;
114 } MessageQ_Params_Version2;
116 /* structure for MessageQ module state */
117 typedef struct MessageQ_ModuleObject {
118     MessageQ_Handle           *queues;
119     Int                       numQueues;
120     Int                       refCount;
121     NameServer_Handle         nameServer;
122     pthread_mutex_t           gate;
123     int                       seqNum;
124     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
125     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
126     MessageQ_PutHookFxn       putHookFxn;
127 } MessageQ_ModuleObject;
129 typedef struct MessageQ_CIRCLEQ_ENTRY {
130      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
131 } MessageQ_CIRCLEQ_ENTRY;
133 /*!
134  *  @brief  Structure for the Handle for the MessageQ.
135  */
136 typedef struct MessageQ_Object_tag {
137     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
138     MessageQ_Params              params;
139     MessageQ_QueueId             queue;
140     int                          unblocked;
141     void                         *serverHandle;
142     sem_t                        synchronizer;
143 } MessageQ_Object;
145 /* traces in this file are controlled via _MessageQ_verbose */
146 Bool _MessageQ_verbose = FALSE;
147 #define verbose _MessageQ_verbose
149 /* =============================================================================
150  *  Globals
151  * =============================================================================
152  */
153 static MessageQ_ModuleObject MessageQ_state =
155     .refCount   = 0,
156     .nameServer = NULL,
157     .putHookFxn = NULL
158 };
160 /*!
161  *  @var    MessageQ_module
162  *
163  *  @brief  Pointer to the MessageQ module state.
164  */
165 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
167 Void _MessageQ_grow(UInt16 queueIndex);
169 /* =============================================================================
170  * APIS
171  * =============================================================================
172  */
174 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
175                                 UInt16 rprocId, UInt priority)
177     Int status = FALSE;
178     UInt16 clusterId;
180     if (handle == NULL) {
181         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
182               );
184         return status;
185     }
187     /* map procId to clusterId */
188     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
190     if (clusterId >= MultiProc_MAXPROCESSORS) {
191         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
193         return status;
194     }
196     if (MessageQ_module->transports[clusterId][priority] == NULL) {
197         MessageQ_module->transports[clusterId][priority] = handle;
199         status = TRUE;
200     }
202     return status;
205 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
207     if (inst == NULL) {
208         printf("MessageQ_registerTransportId: invalid NULL handle\n");
210         return MessageQ_E_INVALIDARG;
211     }
213     if (tid >= MessageQ_MAXTRANSPORTS) {
214         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
215                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
217         return MessageQ_E_INVALIDARG;
218     }
220     if (MessageQ_module->transInst[tid] != NULL) {
221         printf("MessageQ_registerTransportId: transport id %d already "
222                 "registered\n", tid);
224         return MessageQ_E_ALREADYEXISTS;
225     }
227     MessageQ_module->transInst[tid] = inst;
229     return MessageQ_S_SUCCESS;
232 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
234     UInt16 clusterId;
236     /* map procId to clusterId */
237     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
239     if (clusterId >= MultiProc_MAXPROCESSORS) {
240         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
242         return;
243     }
245     MessageQ_module->transports[clusterId][priority] = NULL;
248 Void MessageQ_unregisterTransportId(UInt tid)
250     if (tid >= MessageQ_MAXTRANSPORTS) {
251         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
252                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
254         return;
255     }
257     MessageQ_module->transInst[tid] = NULL;
260 /*
261  * Function to get default configuration for the MessageQ module.
262  */
263 Void MessageQ_getConfig(MessageQ_Config *cfg)
265     Int status;
266     LAD_ClientHandle handle;
267     struct LAD_CommandObj cmd;
268     union LAD_ResponseObj rsp;
270     assert (cfg != NULL);
272     handle = LAD_findHandle();
273     if (handle == LAD_MAXNUMCLIENTS) {
274         PRINTVERBOSE1(
275           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
276            getpid())
278         return;
279     }
281     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
282     cmd.clientId = handle;
284     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
285         PRINTVERBOSE1(
286           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
287         return;
288     }
290     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
291         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
292                       status)
293         return;
294     }
295     status = rsp.messageQGetConfig.status;
297     PRINTVERBOSE2(
298       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
299       handle, status)
301     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
303     return;
306 /*
307  *  Function to setup the MessageQ module.
308  */
309 Int MessageQ_setup(const MessageQ_Config *cfg)
311     Int status;
312     LAD_ClientHandle handle;
313     struct LAD_CommandObj cmd;
314     union LAD_ResponseObj rsp;
315     Int pri;
316     Int i;
317     Int tid;
319     pthread_mutex_lock(&MessageQ_module->gate);
321     MessageQ_module->refCount++;
322     if (MessageQ_module->refCount > 1) {
324         pthread_mutex_unlock(&MessageQ_module->gate);
326         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
327                       MessageQ_module->refCount)
329         return MessageQ_S_ALREADYSETUP;
330     }
332     pthread_mutex_unlock(&MessageQ_module->gate);
334     handle = LAD_findHandle();
335     if (handle == LAD_MAXNUMCLIENTS) {
336         PRINTVERBOSE1(
337           "MessageQ_setup: can't find connection to daemon for pid %d\n",
338            getpid())
340         return MessageQ_E_RESOURCE;
341     }
343     cmd.cmd = LAD_MESSAGEQ_SETUP;
344     cmd.clientId = handle;
345     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
347     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
348         PRINTVERBOSE1(
349           "MessageQ_setup: sending LAD command failed, status=%d\n", status)
350         return MessageQ_E_FAIL;
351     }
353     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
354         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
355         return status;
356     }
357     status = rsp.setup.status;
359     PRINTVERBOSE2(
360       "MessageQ_setup: got LAD response for client %d, status=%d\n",
361       handle, status)
363     MessageQ_module->seqNum = 0;
364     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
365     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
366     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
367                                      sizeof (MessageQ_Handle));
369     pthread_mutex_init(&MessageQ_module->gate, NULL);
371     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
372         for (pri = 0; pri < 2; pri++) {
373             MessageQ_module->transports[i][pri] = NULL;
374         }
375     }
377     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
378         MessageQ_module->transInst[tid] = NULL;
379     }
381     return status;
384 /*
385  *  MessageQ_destroy - destroy the MessageQ module.
386  */
387 Int MessageQ_destroy(void)
389     Int status;
390     LAD_ClientHandle handle;
391     struct LAD_CommandObj cmd;
392     union LAD_ResponseObj rsp;
394     handle = LAD_findHandle();
395     if (handle == LAD_MAXNUMCLIENTS) {
396         PRINTVERBOSE1(
397           "MessageQ_destroy: can't find connection to daemon for pid %d\n",
398            getpid())
400         return MessageQ_E_RESOURCE;
401     }
403     cmd.cmd = LAD_MESSAGEQ_DESTROY;
404     cmd.clientId = handle;
406     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
407         PRINTVERBOSE1(
408           "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
409         return MessageQ_E_FAIL;
410     }
412     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
413         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
414         return status;
415     }
416     status = rsp.status;
418     PRINTVERBOSE2(
419       "MessageQ_destroy: got LAD response for client %d, status=%d\n",
420       handle, status)
422     return status;
425 /*
426  *  ======== MessageQ_Params_init ========
427  *  Legacy implementation.
428  */
429 Void MessageQ_Params_init(MessageQ_Params *params)
431     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
434 /*
435  *  ======== MessageQ_Params_init__S ========
436  *  New implementation which is version aware.
437  */
438 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
440     MessageQ_Params_Version2 *params2;
442     switch (version) {
444         case MessageQ_Params_VERSION_2:
445             params2 = (MessageQ_Params_Version2 *)params;
446             params2->__version = MessageQ_Params_VERSION_2;
447             params2->synchronizer = NULL;
448             params2->queueIndex = MessageQ_ANY;
449             break;
451         default:
452             assert(FALSE);
453             break;
454     }
457 /*
458  *  MessageQ_create - create a MessageQ object for receiving.
459  */
460 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
462     Int                   status;
463     MessageQ_Object      *obj = NULL;
464     IMessageQTransport_Handle transport;
465     INetworkTransport_Handle netTrans;
466     ITransport_Handle     baseTrans;
467     UInt16                queueIndex;
468     UInt16                clusterId;
469     Int                   tid;
470     Int                   priority;
471     LAD_ClientHandle      handle;
472     struct LAD_CommandObj cmd;
473     union LAD_ResponseObj rsp;
474     MessageQ_Params ps;
476     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
478     /* copy the given params into the current params structure */
479     if (pp != NULL) {
481         /* snoop the params pointer to see if it's a legacy structure */
482         if ((pp->__version == 0) || (pp->__version > 100)) {
483             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
484         }
486         /* not legacy structure, use params version field */
487         else if (pp->__version == MessageQ_Params_VERSION_2) {
488             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
489             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
490             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
491         }
492         else {
493             assert(FALSE);
494         }
495     }
497     handle = LAD_findHandle();
498     if (handle == LAD_MAXNUMCLIENTS) {
499         PRINTVERBOSE1(
500           "MessageQ_create: can't find connection to daemon for pid %d\n",
501            getpid())
503         return NULL;
504     }
506     cmd.cmd = LAD_MESSAGEQ_CREATE;
507     cmd.clientId = handle;
509     if (name == NULL) {
510         cmd.args.messageQCreate.name[0] = '\0';
511     }
512     else {
513         strncpy(cmd.args.messageQCreate.name, name,
514                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
515         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
516     }
518     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
520     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
521         PRINTVERBOSE1(
522           "MessageQ_create: sending LAD command failed, status=%d\n", status)
523         return NULL;
524     }
526     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
527         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
528         return NULL;
529     }
530     status = rsp.messageQCreate.status;
532     PRINTVERBOSE2(
533       "MessageQ_create: got LAD response for client %d, status=%d\n",
534       handle, status)
536     if (status == -1) {
537        PRINTVERBOSE1(
538           "MessageQ_create: MessageQ server operation failed, status=%d\n",
539           status)
540        return NULL;
541     }
543     /* Create the generic obj */
544     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
546    /* Populate the params member */
547     memcpy(&obj->params, &ps, sizeof(ps));
549     queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
551     obj->queue = rsp.messageQCreate.queueId;
552     obj->serverHandle = rsp.messageQCreate.serverHandle;
553     CIRCLEQ_INIT(&obj->msgList);
554     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
555         PRINTVERBOSE1(
556           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
558         MessageQ_delete((MessageQ_Handle *)&obj);
560         return NULL;
561     }
563     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
564             "queueIndex %d\n", name, queueIndex)
566     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
567         for (priority = 0; priority < 2; priority++) {
568             transport = MessageQ_module->transports[clusterId][priority];
569             if (transport) {
570                 /* need to check return and do something if error */
571                 IMessageQTransport_bind((Void *)transport, obj->queue);
572             }
573         }
574     }
576     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
577         baseTrans = MessageQ_module->transInst[tid];
579         if (baseTrans != NULL) {
580             switch (ITransport_itype(baseTrans)) {
581                 case INetworkTransport_TypeId:
582                     netTrans = INetworkTransport_downCast(baseTrans);
583                     INetworkTransport_bind((void *)netTrans, obj->queue);
584                     break;
586                 default:
587                     /* error */
588                     printf("MessageQ_create: Error: transport id %d is an "
589                             "unsupported transport type.\n", tid);
590                     break;
591             }
592         }
593     }
595     /*
596      * Since LAD's MessageQ_module can grow, we need to be able to grow as well
597      */
598     if (queueIndex >= MessageQ_module->numQueues) {
599         _MessageQ_grow(queueIndex);
600     }
602     /*
603      * No need to "allocate" slot since the queueIndex returned by
604      * LAD is guaranteed to be unique.
605      */
606     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
608     return (MessageQ_Handle)obj;
611 /*
612  * MessageQ_delete - delete a MessageQ object.
613  */
614 Int MessageQ_delete(MessageQ_Handle *handlePtr)
616     MessageQ_Object *obj;
617     IMessageQTransport_Handle transport;
618     INetworkTransport_Handle  netTrans;
619     ITransport_Handle         baseTrans;
620     Int              status = MessageQ_S_SUCCESS;
621     UInt16           queueIndex;
622     UInt16                clusterId;
623     Int                   tid;
624     Int                   priority;
625     LAD_ClientHandle handle;
626     struct LAD_CommandObj cmd;
627     union LAD_ResponseObj rsp;
629     handle = LAD_findHandle();
630     if (handle == LAD_MAXNUMCLIENTS) {
631         PRINTVERBOSE1(
632           "MessageQ_delete: can't find connection to daemon for pid %d\n",
633            getpid())
635         return MessageQ_E_FAIL;
636     }
638     obj = (MessageQ_Object *)(*handlePtr);
640     cmd.cmd = LAD_MESSAGEQ_DELETE;
641     cmd.clientId = handle;
642     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
644     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
645         PRINTVERBOSE1(
646           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
647         return MessageQ_E_FAIL;
648     }
650     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
651         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
652         return MessageQ_E_FAIL;
653     }
654     status = rsp.messageQDelete.status;
656     PRINTVERBOSE2(
657       "MessageQ_delete: got LAD response for client %d, status=%d\n",
658       handle, status)
660     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
661         for (priority = 0; priority < 2; priority++) {
662             transport = MessageQ_module->transports[clusterId][priority];
663             if (transport) {
664                 IMessageQTransport_unbind((Void *)transport, obj->queue);
665             }
666         }
667     }
669     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
670         baseTrans = MessageQ_module->transInst[tid];
672         if (baseTrans != NULL) {
673             switch (ITransport_itype(baseTrans)) {
674                 case INetworkTransport_TypeId:
675                     netTrans = INetworkTransport_downCast(baseTrans);
676                     INetworkTransport_unbind((void *)netTrans, obj->queue);
677                     break;
679                 default:
680                     /* error */
681                     printf("MessageQ_create: Error: transport id %d is an "
682                             "unsupported transport type.\n", tid);
683                     break;
684             }
685         }
686     }
688     queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
689     MessageQ_module->queues[queueIndex] = NULL;
691     free(obj);
692     *handlePtr = NULL;
694     return status;
697 /*
698  *  MessageQ_open - Opens an instance of MessageQ for sending.
699  */
700 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
702     Int status = MessageQ_S_SUCCESS;
704     status = NameServer_getUInt32(MessageQ_module->nameServer,
705                                   name, queueId, NULL);
707     if (status == NameServer_E_NOTFOUND) {
708         /* Set return queue ID to invalid */
709         *queueId = MessageQ_INVALIDMESSAGEQ;
710         status = MessageQ_E_NOTFOUND;
711     }
712     else if (status >= 0) {
713         /* Override with a MessageQ status code */
714         status = MessageQ_S_SUCCESS;
715     }
716     else {
717         /* Set return queue ID to invalid */
718         *queueId = MessageQ_INVALIDMESSAGEQ;
720         /* Override with a MessageQ status code */
721         if (status == NameServer_E_TIMEOUT) {
722             status = MessageQ_E_TIMEOUT;
723         }
724         else {
725             status = MessageQ_E_FAIL;
726         }
727     }
729     return status;
732 /*
733  *  MessageQ_close - Closes previously opened instance of MessageQ.
734  */
735 Int MessageQ_close(MessageQ_QueueId *queueId)
737     Int32 status = MessageQ_S_SUCCESS;
739     /* Nothing more to be done for closing the MessageQ. */
740     *queueId = MessageQ_INVALIDMESSAGEQ;
742     return status;
745 /*
746  * MessageQ_put - place a message onto a message queue.
747  *
748  * Calls transport's put(), which handles the sending of the message using the
749  * appropriate kernel interface (socket, device ioctl) call for the remote
750  * procId encoded in the queueId argument.
751  *
752  */
753 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
755     MessageQ_Object *obj;
756     UInt16   dstProcId  = (UInt16)(queueId >> 16);
757     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
758     Int      status = MessageQ_S_SUCCESS;
759     ITransport_Handle baseTrans;
760     IMessageQTransport_Handle msgTrans;
761     INetworkTransport_Handle netTrans;
762     Int priority;
763     UInt tid;
764     UInt16 clusterId;
766     msg->dstId     = queueIndex;
767     msg->dstProc   = dstProcId;
769     /* invoke put hook function after addressing the message */
770     if (MessageQ_module->putHookFxn != NULL) {
771         MessageQ_module->putHookFxn(queueId, msg);
772     }
774     /* extract the transport ID from the message header */
775     tid = MessageQ_getTransportId(msg);
777     if (tid >= MessageQ_MAXTRANSPORTS) {
778         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
779                 tid, MessageQ_MAXTRANSPORTS);
780         return (MessageQ_E_FAIL);
781     }
783     /* if tid is set, use secondary transport regardless of destination */
784     if (tid != 0) {
785         baseTrans = MessageQ_module->transInst[tid];
787         if (baseTrans == NULL) {
788             printf("MessageQ_put: Error: transport is null\n");
789             return (MessageQ_E_FAIL);
790         }
792         /* downcast instance pointer to transport interface */
793         switch (ITransport_itype(baseTrans)) {
794             case INetworkTransport_TypeId:
795                 netTrans = INetworkTransport_downCast(baseTrans);
796                 INetworkTransport_put(netTrans, (Ptr)msg);
797                 break;
799             default:
800                 /* error */
801                 printf("MessageQ_put: Error: transport id %d is an "
802                         "unsupported transport type\n", tid);
803                 status = MessageQ_E_FAIL;
804                 break;
805         }
806     }
807     else {
808         /* if destination on another processor, use primary transport */
809         if (dstProcId != MultiProc_self()) {
810             priority = MessageQ_getMsgPri(msg);
811             clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
813             /* primary transport can only be used for intra-cluster delivery */
814             if (clusterId > MultiProc_getNumProcsInCluster()) {
815                 printf("MessageQ_put: Error: destination procId=%d is not "
816                         "in cluster. Must specify a transportId.\n", dstProcId);
817                 return (MessageQ_E_FAIL);
818             }
820             msgTrans = MessageQ_module->transports[clusterId][priority];
822             IMessageQTransport_put(msgTrans, (Ptr)msg);
823         }
824         else {
825             /* check if destination queue is in this process */
826             if (queueIndex >= MessageQ_module->numQueues) {
827                 printf("MessageQ_put: Error: unable to deliver message, "
828                         "queueIndex too large or transportId missing.\n");
829                 return (MessageQ_E_FAIL);
830             }
832             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
834             if (obj == NULL) {
835                 printf("MessageQ_put: Error: unable to deliver message, "
836                         "destination queue not in this process.\n");
837                 return (MessageQ_E_FAIL);
838             }
840             /* deliver message to process local queue */
841             pthread_mutex_lock(&MessageQ_module->gate);
842             CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
843                     elem);
844             pthread_mutex_unlock(&MessageQ_module->gate);
845             sem_post(&obj->synchronizer);
846         }
847     }
849     return (status);
852 /*
853  *  MessageQ_get - gets a message for a message queue and blocks if
854  *  the queue is empty.
855  *
856  *  If a message is present, it returns it.  Otherwise it blocks
857  *  waiting for a message to arrive.
858  *  When a message is returned, it is owned by the caller.
859  */
860 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
862     MessageQ_Object * obj = (MessageQ_Object *)handle;
863     Int     status = MessageQ_S_SUCCESS;
864     struct timespec ts;
865     struct timeval tv;
867 #if 0
868 /*
869  * Optimization here to get a message without going in to the sem
870  * operation, but the sem count will not be maintained properly.
871  */
872     pthread_mutex_lock(&MessageQ_module->gate);
874     if (obj->msgList.cqh_first != &obj->msgList) {
875         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
876         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
878         pthread_mutex_unlock(&MessageQ_module->gate);
879     }
880     else {
881         pthread_mutex_unlock(&MessageQ_module->gate);
882     }
883 #endif
885     if (timeout == MessageQ_FOREVER) {
886         sem_wait(&obj->synchronizer);
887     }
888     else {
889         gettimeofday(&tv, NULL);
890         ts.tv_sec = tv.tv_sec;
891         ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
893         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
894             if (errno == ETIMEDOUT) {
895                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
897                 return MessageQ_E_TIMEOUT;
898             }
899         }
900     }
902     if (obj->unblocked) {
903         return MessageQ_E_UNBLOCKED;
904     }
906     pthread_mutex_lock(&MessageQ_module->gate);
908     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
909     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
911     pthread_mutex_unlock(&MessageQ_module->gate);
913     return status;
916 /*
917  * Return a count of the number of messages in the queue
918  *
919  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
920  */
921 Int MessageQ_count(MessageQ_Handle handle)
923     Int               count = -1;
924 #if 0
925     MessageQ_Object * obj   = (MessageQ_Object *) handle;
926     socklen_t         optlen;
928     /*
929      * TBD: Need to find a way to implement (if anyone uses it!), and
930      * push down into transport..
931      */
933     /*
934      * 2nd arg to getsockopt should be transport independent, but using
935      *  SSKPROTO_SHMFIFO for now:
936      */
937     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
938                  &count, &optlen);
939 #endif
941     return count;
944 /*
945  *  Initializes a message not obtained from MessageQ_alloc.
946  */
947 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
949     /* Fill in the fields of the message */
950     MessageQ_msgInit(msg);
951     msg->heapId = MessageQ_STATICMSG;
952     msg->msgSize = size;
955 /*
956  *  Allocate a message and initialize the needed fields (note some
957  *  of the fields in the header are set via other APIs or in the
958  *  MessageQ_put function,
959  */
960 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
962     MessageQ_Msg msg;
964     /*
965      * heapId not used for local alloc (as this is over a copy transport), but
966      * we need to send to other side as heapId is used in BIOS transport.
967      */
968     msg = (MessageQ_Msg)calloc(1, size);
969     MessageQ_msgInit(msg);
970     msg->msgSize = size;
971     msg->heapId = heapId;
973     return msg;
976 /*
977  *  Frees the message back to the heap that was used to allocate it.
978  */
979 Int MessageQ_free(MessageQ_Msg msg)
981     UInt32 status = MessageQ_S_SUCCESS;
983     /* Check to ensure this was not allocated by user: */
984     if (msg->heapId == MessageQ_STATICMSG) {
985         status = MessageQ_E_CANNOTFREESTATICMSG;
986     }
987     else {
988         free(msg);
989     }
991     return status;
994 /* Register a heap with MessageQ. */
995 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
997     Int status = MessageQ_S_SUCCESS;
999     /* Do nothing, as this uses a copy transport */
1001     return status;
1004 /* Unregister a heap with MessageQ. */
1005 Int MessageQ_unregisterHeap(UInt16 heapId)
1007     Int status = MessageQ_S_SUCCESS;
1009     /* Do nothing, as this uses a copy transport */
1011     return status;
1014 /* Unblocks a MessageQ */
1015 Void MessageQ_unblock(MessageQ_Handle handle)
1017     MessageQ_Object *obj = (MessageQ_Object *)handle;
1019     obj->unblocked = TRUE;
1020     sem_post(&obj->synchronizer);
1023 /* Embeds a source message queue into a message */
1024 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1026     MessageQ_Object *obj = (MessageQ_Object *)handle;
1028     msg->replyId = (UInt16)(obj->queue);
1029     msg->replyProc = (UInt16)(obj->queue >> 16);
1032 /* Returns the QueueId associated with the handle. */
1033 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1035     MessageQ_Object *obj = (MessageQ_Object *) handle;
1036     UInt32 queueId;
1038     queueId = (obj->queue);
1040     return queueId;
1043 /* Sets the tracing of a message */
1044 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1046     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1049 /*
1050  *  Returns the amount of shared memory used by one transport instance.
1051  *
1052  *  The MessageQ module itself does not use any shared memory but the
1053  *  underlying transport may use some shared memory.
1054  */
1055 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1057     SizeT memReq = 0u;
1059     /* Do nothing, as this is a copy transport. */
1061     return memReq;
1064 /*
1065  * This is a helper function to initialize a message.
1066  */
1067 Void MessageQ_msgInit(MessageQ_Msg msg)
1069 #if 0
1070     Int                 status    = MessageQ_S_SUCCESS;
1071     LAD_ClientHandle handle;
1072     struct LAD_CommandObj cmd;
1073     union LAD_ResponseObj rsp;
1075     handle = LAD_findHandle();
1076     if (handle == LAD_MAXNUMCLIENTS) {
1077         PRINTVERBOSE1(
1078           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1079            getpid())
1081         return;
1082     }
1084     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1085     cmd.clientId = handle;
1087     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1088         PRINTVERBOSE1(
1089           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1090         return;
1091     }
1093     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1094         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1095         return;
1096     }
1097     status = rsp.msgInit.status;
1099     PRINTVERBOSE2(
1100       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1101       handle, status)
1103     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1104 #else
1105     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1106     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1107     msg->msgId     = MessageQ_INVALIDMSGID;
1108     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1109     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1110     msg->srcProc   = MultiProc_self();
1112     pthread_mutex_lock(&MessageQ_module->gate);
1113     msg->seqNum  = MessageQ_module->seqNum++;
1114     pthread_mutex_unlock(&MessageQ_module->gate);
1115 #endif
1118 /*
1119  * Grow module's queues[] array to accommodate queueIndex from LAD
1120  */
1121 Void _MessageQ_grow(UInt16 queueIndex)
1123     MessageQ_Handle *queues;
1124     MessageQ_Handle *oldQueues;
1125     UInt oldSize;
1127     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1129     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1130     memcpy(queues, MessageQ_module->queues, oldSize);
1132     oldQueues = MessageQ_module->queues;
1133     MessageQ_module->queues = queues;
1134     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1136     free(oldQueues);
1138     return;