Create specialized gates to prevent deadlock on general gate
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     pthread_mutex_t           seqNumGate;
122     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
123     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
124     MessageQ_PutHookFxn       putHookFxn;
125 } MessageQ_ModuleObject;
127 typedef struct MessageQ_CIRCLEQ_ENTRY {
128      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
129 } MessageQ_CIRCLEQ_ENTRY;
131 /*!
132  *  @brief  Structure for the Handle for the MessageQ.
133  */
134 typedef struct MessageQ_Object_tag {
135     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
136     pthread_mutex_t              msgListGate;
137     MessageQ_Params              params;
138     MessageQ_QueueId             queue;
139     int                          unblocked;
140     void                         *serverHandle;
141     sem_t                        synchronizer;
142 } MessageQ_Object;
144 /* traces in this file are controlled via _MessageQ_verbose */
145 Bool _MessageQ_verbose = FALSE;
146 #define verbose _MessageQ_verbose
148 /* =============================================================================
149  *  Globals
150  * =============================================================================
151  */
152 static MessageQ_ModuleObject MessageQ_state =
154     .refCount   = 0,
155     .nameServer = NULL,
156     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
157     .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
158     .putHookFxn = NULL
159 };
161 /*!
162  *  @var    MessageQ_module
163  *
164  *  @brief  Pointer to the MessageQ module state.
165  */
166 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
168 Void _MessageQ_grow(UInt16 queueIndex);
170 /* =============================================================================
171  * APIS
172  * =============================================================================
173  */
175 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
176                                 UInt16 rprocId, UInt priority)
178     Int status = FALSE;
179     UInt16 clusterId;
181     if (handle == NULL) {
182         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
183               );
185         return status;
186     }
188     /* map procId to clusterId */
189     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
191     if (clusterId >= MultiProc_MAXPROCESSORS) {
192         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
194         return status;
195     }
197     if (MessageQ_module->transports[clusterId][priority] == NULL) {
198         MessageQ_module->transports[clusterId][priority] = handle;
200         status = TRUE;
201     }
203     return status;
206 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
208     if (inst == NULL) {
209         printf("MessageQ_registerTransportId: invalid NULL handle\n");
211         return MessageQ_E_INVALIDARG;
212     }
214     if (tid >= MessageQ_MAXTRANSPORTS) {
215         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
216                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
218         return MessageQ_E_INVALIDARG;
219     }
221     if (MessageQ_module->transInst[tid] != NULL) {
222         printf("MessageQ_registerTransportId: transport id %d already "
223                 "registered\n", tid);
225         return MessageQ_E_ALREADYEXISTS;
226     }
228     MessageQ_module->transInst[tid] = inst;
230     return MessageQ_S_SUCCESS;
233 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
235     UInt16 clusterId;
237     /* map procId to clusterId */
238     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
240     if (clusterId >= MultiProc_MAXPROCESSORS) {
241         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
243         return;
244     }
246     MessageQ_module->transports[clusterId][priority] = NULL;
249 Void MessageQ_unregisterTransportId(UInt tid)
251     if (tid >= MessageQ_MAXTRANSPORTS) {
252         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
253                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
255         return;
256     }
258     MessageQ_module->transInst[tid] = NULL;
261 /*
262  * Function to get default configuration for the MessageQ module.
263  */
264 Void MessageQ_getConfig(MessageQ_Config *cfg)
266     Int status;
267     LAD_ClientHandle handle;
268     struct LAD_CommandObj cmd;
269     union LAD_ResponseObj rsp;
271     assert (cfg != NULL);
273     handle = LAD_findHandle();
274     if (handle == LAD_MAXNUMCLIENTS) {
275         PRINTVERBOSE1(
276           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
277            getpid())
279         return;
280     }
282     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
283     cmd.clientId = handle;
285     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
286         PRINTVERBOSE1(
287           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
288         return;
289     }
291     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
292         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
293                       status)
294         return;
295     }
296     status = rsp.messageQGetConfig.status;
298     PRINTVERBOSE2(
299       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
300       handle, status)
302     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
304     return;
307 /*
308  *  Function to setup the MessageQ module.
309  */
310 Int MessageQ_setup(const MessageQ_Config *cfg)
312     Int status = MessageQ_S_SUCCESS;
313     LAD_ClientHandle handle;
314     struct LAD_CommandObj cmd;
315     union LAD_ResponseObj rsp;
316     Int pri;
317     Int i;
318     Int tid;
320     /* this entire function must be serialized */
321     pthread_mutex_lock(&MessageQ_module->gate);
323     /* ensure only first thread performs startup procedure */
324     if (++MessageQ_module->refCount > 1) {
325         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
326                 MessageQ_module->refCount)
327         status = MessageQ_S_ALREADYSETUP;
328         goto exit;
329     }
331     handle = LAD_findHandle();
332     if (handle == LAD_MAXNUMCLIENTS) {
333         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
334                 "pid %d\n", getpid())
335         status = MessageQ_E_RESOURCE;
336         goto exit;
337     }
339     cmd.cmd = LAD_MESSAGEQ_SETUP;
340     cmd.clientId = handle;
341     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
343     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
344         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
345                 "status=%d\n", status)
346         status = MessageQ_E_FAIL;
347         goto exit;
348     }
350     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
351         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
352         status = MessageQ_E_FAIL;
353         goto exit;
354     }
355     status = rsp.setup.status;
357     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
358             handle, status)
360     MessageQ_module->seqNum = 0;
361     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
362     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
363     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
364             sizeof(MessageQ_Handle));
366     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
367         for (pri = 0; pri < 2; pri++) {
368             MessageQ_module->transports[i][pri] = NULL;
369         }
370     }
372     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
373         MessageQ_module->transInst[tid] = NULL;
374     }
376 exit:
377     /* if error, must decrement reference count */
378     if (status < 0) {
379         MessageQ_module->refCount--;
380     }
382     pthread_mutex_unlock(&MessageQ_module->gate);
384     return (status);
387 /*
388  *  MessageQ_destroy - destroy the MessageQ module.
389  */
390 Int MessageQ_destroy(void)
392     Int status = MessageQ_S_SUCCESS;
393     LAD_ClientHandle handle;
394     struct LAD_CommandObj cmd;
395     union LAD_ResponseObj rsp;
397     /* this entire function must be serialized */
398     pthread_mutex_lock(&MessageQ_module->gate);
400     /* ensure only last thread does the work */
401     if (--MessageQ_module->refCount > 0) {
402         goto exit;
403     }
405     handle = LAD_findHandle();
406     if (handle == LAD_MAXNUMCLIENTS) {
407         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
408                 "for pid %d\n", getpid())
409         status =  MessageQ_E_RESOURCE;
410         goto exit;
411     }
413     cmd.cmd = LAD_MESSAGEQ_DESTROY;
414     cmd.clientId = handle;
416     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
417         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
418                 "status=%d\n", status)
419         status = MessageQ_E_FAIL;
420         goto exit;
421     }
423     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
424         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
425         status = MessageQ_E_FAIL;
426         goto exit;
427     }
428     status = rsp.status;
430     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
431             "status=%d\n", handle, status)
433 exit:
434     pthread_mutex_unlock(&MessageQ_module->gate);
436     return (status);
439 /*
440  *  ======== MessageQ_Params_init ========
441  *  Legacy implementation.
442  */
443 Void MessageQ_Params_init(MessageQ_Params *params)
445     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
448 /*
449  *  ======== MessageQ_Params_init__S ========
450  *  New implementation which is version aware.
451  */
452 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
454     MessageQ_Params_Version2 *params2;
456     switch (version) {
458         case MessageQ_Params_VERSION_2:
459             params2 = (MessageQ_Params_Version2 *)params;
460             params2->__version = MessageQ_Params_VERSION_2;
461             params2->synchronizer = NULL;
462             params2->queueIndex = MessageQ_ANY;
463             break;
465         default:
466             assert(FALSE);
467             break;
468     }
471 /*
472  *  ======== MessageQ_create ========
473  */
474 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
476     Int                   status;
477     MessageQ_Object      *obj = NULL;
478     IMessageQTransport_Handle transport;
479     INetworkTransport_Handle netTrans;
480     ITransport_Handle     baseTrans;
481     UInt16                queueIndex;
482     UInt16                clusterId;
483     Int                   tid;
484     Int                   priority;
485     LAD_ClientHandle      handle;
486     struct LAD_CommandObj cmd;
487     union LAD_ResponseObj rsp;
488     MessageQ_Params ps;
490     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
492     /* copy the given params into the current params structure */
493     if (pp != NULL) {
495         /* snoop the params pointer to see if it's a legacy structure */
496         if ((pp->__version == 0) || (pp->__version > 100)) {
497             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
498         }
500         /* not legacy structure, use params version field */
501         else if (pp->__version == MessageQ_Params_VERSION_2) {
502             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
503             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
504             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
505         }
506         else {
507             assert(FALSE);
508         }
509     }
511     handle = LAD_findHandle();
512     if (handle == LAD_MAXNUMCLIENTS) {
513         PRINTVERBOSE1(
514           "MessageQ_create: can't find connection to daemon for pid %d\n",
515            getpid())
517         return NULL;
518     }
520     cmd.cmd = LAD_MESSAGEQ_CREATE;
521     cmd.clientId = handle;
523     if (name == NULL) {
524         cmd.args.messageQCreate.name[0] = '\0';
525     }
526     else {
527         strncpy(cmd.args.messageQCreate.name, name,
528                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
529         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
530     }
532     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
534     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
535         PRINTVERBOSE1(
536           "MessageQ_create: sending LAD command failed, status=%d\n", status)
537         return NULL;
538     }
540     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
541         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
542         return NULL;
543     }
544     status = rsp.messageQCreate.status;
546     PRINTVERBOSE2(
547       "MessageQ_create: got LAD response for client %d, status=%d\n",
548       handle, status)
550     if (status == -1) {
551        PRINTVERBOSE1(
552           "MessageQ_create: MessageQ server operation failed, status=%d\n",
553           status)
554        return NULL;
555     }
557     /* Create the generic obj */
558     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
560    /* Populate the params member */
561     memcpy(&obj->params, &ps, sizeof(ps));
564     obj->queue = rsp.messageQCreate.queueId;
565     obj->serverHandle = rsp.messageQCreate.serverHandle;
566     pthread_mutex_init(&obj->msgListGate, NULL);
567     CIRCLEQ_INIT(&obj->msgList);
568     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
569         PRINTVERBOSE1(
570           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
572         MessageQ_delete((MessageQ_Handle *)&obj);
574         return NULL;
575     }
577     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
578     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
580     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
581             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
583     pthread_mutex_lock(&MessageQ_module->gate);
585     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
586         for (priority = 0; priority < 2; priority++) {
587             transport = MessageQ_module->transports[clusterId][priority];
588             if (transport) {
589                 /* need to check return and do something if error */
590                 IMessageQTransport_bind((Void *)transport, obj->queue);
591             }
592         }
593     }
595     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
596         baseTrans = MessageQ_module->transInst[tid];
598         if (baseTrans != NULL) {
599             switch (ITransport_itype(baseTrans)) {
600                 case INetworkTransport_TypeId:
601                     netTrans = INetworkTransport_downCast(baseTrans);
602                     INetworkTransport_bind((void *)netTrans, obj->queue);
603                     break;
605                 default:
606                     /* error */
607                     printf("MessageQ_create: Error: transport id %d is an "
608                             "unsupported transport type.\n", tid);
609                     break;
610             }
611         }
612     }
614     /* LAD's MessageQ module can grow, we need to grow as well */
615     if (queueIndex >= MessageQ_module->numQueues) {
616         _MessageQ_grow(queueIndex);
617     }
619     /*  No need to "allocate" slot since the queueIndex returned by
620      *  LAD is guaranteed to be unique.
621      */
622     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
624     pthread_mutex_unlock(&MessageQ_module->gate);
626     return (MessageQ_Handle)obj;
629 /*
630  *  ======== MessageQ_delete ========
631  */
632 Int MessageQ_delete(MessageQ_Handle *handlePtr)
634     MessageQ_Object *obj;
635     IMessageQTransport_Handle transport;
636     INetworkTransport_Handle  netTrans;
637     ITransport_Handle         baseTrans;
638     Int              status = MessageQ_S_SUCCESS;
639     UInt16           queueIndex;
640     UInt16                clusterId;
641     Int                   tid;
642     Int                   priority;
643     LAD_ClientHandle handle;
644     struct LAD_CommandObj cmd;
645     union LAD_ResponseObj rsp;
647     handle = LAD_findHandle();
648     if (handle == LAD_MAXNUMCLIENTS) {
649         PRINTVERBOSE1(
650           "MessageQ_delete: can't find connection to daemon for pid %d\n",
651            getpid())
653         return MessageQ_E_FAIL;
654     }
656     obj = (MessageQ_Object *)(*handlePtr);
658     cmd.cmd = LAD_MESSAGEQ_DELETE;
659     cmd.clientId = handle;
660     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
662     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
663         PRINTVERBOSE1(
664           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
665         return MessageQ_E_FAIL;
666     }
668     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
669         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
670         return MessageQ_E_FAIL;
671     }
672     status = rsp.messageQDelete.status;
674     PRINTVERBOSE2(
675       "MessageQ_delete: got LAD response for client %d, status=%d\n",
676       handle, status)
678     pthread_mutex_lock(&MessageQ_module->gate);
680     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
681         for (priority = 0; priority < 2; priority++) {
682             transport = MessageQ_module->transports[clusterId][priority];
683             if (transport) {
684                 IMessageQTransport_unbind((Void *)transport, obj->queue);
685             }
686         }
687     }
689     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
690         baseTrans = MessageQ_module->transInst[tid];
692         if (baseTrans != NULL) {
693             switch (ITransport_itype(baseTrans)) {
694                 case INetworkTransport_TypeId:
695                     netTrans = INetworkTransport_downCast(baseTrans);
696                     INetworkTransport_unbind((void *)netTrans, obj->queue);
697                     break;
699                 default:
700                     /* error */
701                     printf("MessageQ_create: Error: transport id %d is an "
702                             "unsupported transport type.\n", tid);
703                     break;
704             }
705         }
706     }
708     /* extract the queue index from the queueId */
709     queueIndex = MessageQ_getQueueIndex(obj->queue);
710     MessageQ_module->queues[queueIndex] = NULL;
712     pthread_mutex_unlock(&MessageQ_module->gate);
714     free(obj);
715     *handlePtr = NULL;
717     return status;
720 /*
721  *  ======== MessageQ_open ========
722  *  Acquire a queueId for use in sending messages to the queue
723  */
724 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
726     Int status = MessageQ_S_SUCCESS;
728     status = NameServer_getUInt32(MessageQ_module->nameServer,
729                                   name, queueId, NULL);
731     if (status == NameServer_E_NOTFOUND) {
732         /* Set return queue ID to invalid */
733         *queueId = MessageQ_INVALIDMESSAGEQ;
734         status = MessageQ_E_NOTFOUND;
735     }
736     else if (status >= 0) {
737         /* Override with a MessageQ status code */
738         status = MessageQ_S_SUCCESS;
739     }
740     else {
741         /* Set return queue ID to invalid */
742         *queueId = MessageQ_INVALIDMESSAGEQ;
744         /* Override with a MessageQ status code */
745         if (status == NameServer_E_TIMEOUT) {
746             status = MessageQ_E_TIMEOUT;
747         }
748         else {
749             status = MessageQ_E_FAIL;
750         }
751     }
753     return status;
756 /*
757  *  ======== MessageQ_openQueueId ========
758  */
759 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
761     MessageQ_QueueIndex queuePort;
762     MessageQ_QueueId queueId;
764     /* queue port is embedded in the queueId */
765     queuePort = queueIndex + MessageQ_PORTOFFSET;
766     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
768     return (queueId);
771 /*
772  *  ======== MessageQ_close ========
773  *  Closes previously opened instance of MessageQ
774  */
775 Int MessageQ_close(MessageQ_QueueId *queueId)
777     Int32 status = MessageQ_S_SUCCESS;
779     /* Nothing more to be done for closing the MessageQ. */
780     *queueId = MessageQ_INVALIDMESSAGEQ;
782     return status;
785 /*
786  *  ======== MessageQ_put ========
787  *  Deliver the given message, either locally or to the transport
788  *
789  *  If the destination is a local queue, deliver the message. Otherwise,
790  *  pass the message to a transport for delivery. The transport handles
791  *  the sending of the message using the appropriate interface (socket,
792  *  device ioctl, etc.).
793  */
794 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
796     Int status = MessageQ_S_SUCCESS;
797     MessageQ_Object *obj;
798     UInt16 dstProcId;
799     UInt16 queueIndex;
800     UInt16 queuePort;
801     ITransport_Handle baseTrans;
802     IMessageQTransport_Handle msgTrans;
803     INetworkTransport_Handle netTrans;
804     Int priority;
805     UInt tid;
806     UInt16 clusterId;
807     Bool delivered;
809     /* extract destination address from the given queueId */
810     dstProcId  = (UInt16)(queueId >> 16);
811     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
813     /* write the destination address into the message header */
814     msg->dstId = queuePort;
815     msg->dstProc= dstProcId;
817     /* invoke the hook function after addressing the message */
818     if (MessageQ_module->putHookFxn != NULL) {
819         MessageQ_module->putHookFxn(queueId, msg);
820     }
822     /*  For an outbound message: If message destination is on this
823      *  processor, then check if the destination queue is in this
824      *  process (thread-to-thread messaging).
825      *
826      *  For an inbound message: Check if destination queue is in this
827      *  process (process-to-process messaging).
828      */
829     if (dstProcId == MultiProc_self()) {
830         queueIndex = queuePort - MessageQ_PORTOFFSET;
832         if (queueIndex < MessageQ_module->numQueues) {
833             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
835             if (obj != NULL) {
836                 /* deliver message to queue */
837                 pthread_mutex_lock(&obj->msgListGate);
838                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
839                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
840                 pthread_mutex_unlock(&obj->msgListGate);
841                 sem_post(&obj->synchronizer);
842                 goto done;
843             }
844         }
845     }
847     /*  Getting here implies the message is outbound. Must give it to
848      *  either the primary or secondary transport for delivery. Start
849      *  by extracting the transport ID from the message header.
850      */
851     tid = MessageQ_getTransportId(msg);
853     if (tid >= MessageQ_MAXTRANSPORTS) {
854         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
855                 tid, MessageQ_MAXTRANSPORTS);
856         status = MessageQ_E_FAIL;
857         goto done;
858     }
860     /* if transportId is set, use secondary transport for message delivery */
861     if (tid != 0) {
862         baseTrans = MessageQ_module->transInst[tid];
864         if (baseTrans == NULL) {
865             printf("MessageQ_put: Error: transport is null\n");
866             status = MessageQ_E_FAIL;
867             goto done;
868         }
870         /* downcast instance pointer to transport interface */
871         switch (ITransport_itype(baseTrans)) {
872             case INetworkTransport_TypeId:
873                 netTrans = INetworkTransport_downCast(baseTrans);
874                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
875                 status = (delivered ? MessageQ_S_SUCCESS :
876                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
877                            MessageQ_E_FAIL));
878                 break;
880             default:
881                 /* error */
882                 printf("MessageQ_put: Error: transport id %d is an "
883                         "unsupported transport type\n", tid);
884                 status = MessageQ_E_FAIL;
885                 break;
886         }
887     }
888     else {
889         /* use primary transport for delivery */
890         priority = MessageQ_getMsgPri(msg);
891         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
893         /* primary transport can only be used for intra-cluster delivery */
894         if (clusterId > MultiProc_getNumProcsInCluster()) {
895             printf("MessageQ_put: Error: destination procId=%d is not "
896                     "in cluster. Must specify a transportId.\n", dstProcId);
897             status =  MessageQ_E_FAIL;
898             goto done;
899         }
901         msgTrans = MessageQ_module->transports[clusterId][priority];
902         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
903         status = (delivered ? MessageQ_S_SUCCESS :
904                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
905     }
907 done:
908     return (status);
911 /*
912  *  MessageQ_get - gets a message for a message queue and blocks if
913  *  the queue is empty.
914  *
915  *  If a message is present, it returns it.  Otherwise it blocks
916  *  waiting for a message to arrive.
917  *  When a message is returned, it is owned by the caller.
918  */
919 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
921     MessageQ_Object * obj = (MessageQ_Object *)handle;
922     Int     status = MessageQ_S_SUCCESS;
923     struct timespec ts;
924     struct timeval tv;
926 #if 0
927 /*
928  * Optimization here to get a message without going in to the sem
929  * operation, but the sem count will not be maintained properly.
930  */
931     pthread_mutex_lock(&obj->msgListGate);
933     if (obj->msgList.cqh_first != &obj->msgList) {
934         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
935         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
937         pthread_mutex_unlock(&obj->msgListGate);
938     }
939     else {
940         pthread_mutex_unlock(&obj->msgListGate);
941     }
942 #endif
944     if (timeout == MessageQ_FOREVER) {
945         sem_wait(&obj->synchronizer);
946     }
947     else {
948         /* add timeout (microseconds) to current time of day */
949         gettimeofday(&tv, NULL);
950         tv.tv_sec += timeout / 1000000;
951         tv.tv_usec += timeout % 1000000;
953         if (tv.tv_usec >= 1000000) {
954               tv.tv_sec++;
955               tv.tv_usec -= 1000000;
956         }
958         /* set absolute timeout value */
959         ts.tv_sec = tv.tv_sec;
960         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
962         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
963             if (errno == ETIMEDOUT) {
964                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
965                 return (MessageQ_E_TIMEOUT);
966             }
967             else {
968                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
969                 return (MessageQ_E_FAIL);
970             }
971         }
972     }
974     if (obj->unblocked) {
975         return obj->unblocked;
976     }
978     pthread_mutex_lock(&obj->msgListGate);
980     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
981     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
983     pthread_mutex_unlock(&obj->msgListGate);
985     return status;
988 /*
989  * Return a count of the number of messages in the queue
990  *
991  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
992  */
993 Int MessageQ_count(MessageQ_Handle handle)
995     Int               count = -1;
996 #if 0
997     MessageQ_Object * obj   = (MessageQ_Object *) handle;
998     socklen_t         optlen;
1000     /*
1001      * TBD: Need to find a way to implement (if anyone uses it!), and
1002      * push down into transport..
1003      */
1005     /*
1006      * 2nd arg to getsockopt should be transport independent, but using
1007      *  SSKPROTO_SHMFIFO for now:
1008      */
1009     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1010                  &count, &optlen);
1011 #endif
1013     return count;
1016 /*
1017  *  Initializes a message not obtained from MessageQ_alloc.
1018  */
1019 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1021     /* Fill in the fields of the message */
1022     MessageQ_msgInit(msg);
1023     msg->heapId = MessageQ_STATICMSG;
1024     msg->msgSize = size;
1027 /*
1028  *  Allocate a message and initialize the needed fields (note some
1029  *  of the fields in the header are set via other APIs or in the
1030  *  MessageQ_put function,
1031  */
1032 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1034     MessageQ_Msg msg;
1036     /*
1037      * heapId not used for local alloc (as this is over a copy transport), but
1038      * we need to send to other side as heapId is used in BIOS transport.
1039      */
1040     msg = (MessageQ_Msg)calloc(1, size);
1041     MessageQ_msgInit(msg);
1042     msg->msgSize = size;
1043     msg->heapId = heapId;
1045     return msg;
1048 /*
1049  *  Frees the message back to the heap that was used to allocate it.
1050  */
1051 Int MessageQ_free(MessageQ_Msg msg)
1053     UInt32 status = MessageQ_S_SUCCESS;
1055     /* Check to ensure this was not allocated by user: */
1056     if (msg->heapId == MessageQ_STATICMSG) {
1057         status = MessageQ_E_CANNOTFREESTATICMSG;
1058     }
1059     else {
1060         free(msg);
1061     }
1063     return status;
1066 /* Register a heap with MessageQ. */
1067 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1069     Int status = MessageQ_S_SUCCESS;
1071     /* Do nothing, as this uses a copy transport */
1073     return status;
1076 /* Unregister a heap with MessageQ. */
1077 Int MessageQ_unregisterHeap(UInt16 heapId)
1079     Int status = MessageQ_S_SUCCESS;
1081     /* Do nothing, as this uses a copy transport */
1083     return status;
1086 /* Unblocks a MessageQ */
1087 Void MessageQ_unblock(MessageQ_Handle handle)
1089     MessageQ_Object *obj = (MessageQ_Object *)handle;
1091     obj->unblocked = MessageQ_E_UNBLOCKED;
1092     sem_post(&obj->synchronizer);
1095 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1096 Void MessageQ_shutdown(MessageQ_Handle handle)
1098     MessageQ_Object *obj = (MessageQ_Object *)handle;
1100     obj->unblocked = MessageQ_E_SHUTDOWN;
1101     sem_post(&obj->synchronizer);
1104 /* Embeds a source message queue into a message */
1105 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1107     MessageQ_Object *obj = (MessageQ_Object *)handle;
1109     msg->replyId = (UInt16)(obj->queue);
1110     msg->replyProc = (UInt16)(obj->queue >> 16);
1113 /* Returns the QueueId associated with the handle. */
1114 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1116     MessageQ_Object *obj = (MessageQ_Object *) handle;
1117     UInt32 queueId;
1119     queueId = (obj->queue);
1121     return queueId;
1124 /* Returns the local handle associated with queueId. */
1125 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1127     MessageQ_Object *obj;
1128     MessageQ_QueueIndex queueIndex;
1129     UInt16 procId;
1131     procId = MessageQ_getProcId(queueId);
1132     if (procId != MultiProc_self()) {
1133         return NULL;
1134     }
1136     queueIndex = MessageQ_getQueueIndex(queueId);
1137     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1139     return (MessageQ_Handle)obj;
1142 /* Sets the tracing of a message */
1143 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1145     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1148 /*
1149  *  Returns the amount of shared memory used by one transport instance.
1150  *
1151  *  The MessageQ module itself does not use any shared memory but the
1152  *  underlying transport may use some shared memory.
1153  */
1154 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1156     SizeT memReq = 0u;
1158     /* Do nothing, as this is a copy transport. */
1160     return memReq;
1163 /*
1164  * This is a helper function to initialize a message.
1165  */
1166 Void MessageQ_msgInit(MessageQ_Msg msg)
1168 #if 0
1169     Int                 status    = MessageQ_S_SUCCESS;
1170     LAD_ClientHandle handle;
1171     struct LAD_CommandObj cmd;
1172     union LAD_ResponseObj rsp;
1174     handle = LAD_findHandle();
1175     if (handle == LAD_MAXNUMCLIENTS) {
1176         PRINTVERBOSE1(
1177           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1178            getpid())
1180         return;
1181     }
1183     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1184     cmd.clientId = handle;
1186     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1187         PRINTVERBOSE1(
1188           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1189         return;
1190     }
1192     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1193         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1194         return;
1195     }
1196     status = rsp.msgInit.status;
1198     PRINTVERBOSE2(
1199       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1200       handle, status)
1202     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1203 #else
1204     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1205     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1206     msg->msgId     = MessageQ_INVALIDMSGID;
1207     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1208     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1209     msg->srcProc   = MultiProc_self();
1211     pthread_mutex_lock(&MessageQ_module->seqNumGate);
1212     msg->seqNum  = MessageQ_module->seqNum++;
1213     pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1214 #endif
1217 /*
1218  *  ======== _MessageQ_grow ========
1219  *  Increase module's queues array to accommodate queueIndex from LAD
1220  *
1221  *  Note: this function takes the queue index value (i.e. without the
1222  *  port offset).
1223  */
1224 Void _MessageQ_grow(UInt16 queueIndex)
1226     MessageQ_Handle *queues;
1227     MessageQ_Handle *oldQueues;
1228     UInt oldSize;
1230     pthread_mutex_lock(&MessageQ_module->gate);
1232     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1234     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1235     memcpy(queues, MessageQ_module->queues, oldSize);
1237     oldQueues = MessageQ_module->queues;
1238     MessageQ_module->queues = queues;
1239     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1241     pthread_mutex_unlock(&MessageQ_module->gate);
1243     free(oldQueues);
1245     return;
1248 /*
1249  *  ======== MessageQ_bind ========
1250  *  Bind all existing message queues to the given processor
1251  *
1252  *  Note: This function is a hack to work around the driver.
1253  *
1254  *  The Linux rpmsgproto driver requires a socket for each
1255  *  message queue and remote processor tuple.
1256  *
1257  *      socket --> (queue, processor)
1258  *
1259  *  Therefore, each time a new remote processor is started, all
1260  *  existing message queues need to create a socket for the new
1261  *  processor.
1262  *
1263  *  The driver should not have this requirement. One socket per
1264  *  message queue should be sufficient to uniquely identify the
1265  *  endpoint to the driver.
1266  */
1267 Void MessageQ_bind(UInt16 procId)
1269     int q;
1270     int clusterId;
1271     int priority;
1272     MessageQ_Handle handle;
1273     MessageQ_QueueId queue;
1274     IMessageQTransport_Handle transport;
1276     clusterId = procId - MultiProc_getBaseIdOfCluster();
1277     pthread_mutex_lock(&MessageQ_module->gate);
1279     for (q = 0; q < MessageQ_module->numQueues; q++) {
1281         if ((handle = MessageQ_module->queues[q]) == NULL) {
1282             continue;
1283         }
1285         queue = ((MessageQ_Object *)handle)->queue;
1287         for (priority = 0; priority < 2; priority++) {
1288             transport = MessageQ_module->transports[clusterId][priority];
1289             if (transport != NULL) {
1290                 IMessageQTransport_bind((Void *)transport, queue);
1291             }
1292         }
1293     }
1295     pthread_mutex_unlock(&MessageQ_module->gate);
1298 /*
1299  *  ======== MessageQ_unbind ========
1300  *  Unbind all existing message queues from the given processor
1301  *
1302  *  Hack: see MessageQ_bind.
1303  */
1304 Void MessageQ_unbind(UInt16 procId)
1306     int q;
1307     int clusterId;
1308     int priority;
1309     MessageQ_Handle handle;
1310     MessageQ_QueueId queue;
1311     IMessageQTransport_Handle transport;
1313     pthread_mutex_lock(&MessageQ_module->gate);
1315     for (q = 0; q < MessageQ_module->numQueues; q++) {
1317         if ((handle = MessageQ_module->queues[q]) == NULL) {
1318             continue;
1319         }
1321         queue = ((MessageQ_Object *)handle)->queue;
1322         clusterId = procId - MultiProc_getBaseIdOfCluster();
1324         for (priority = 0; priority < 2; priority++) {
1325             transport = MessageQ_module->transports[clusterId][priority];
1326             if (transport != NULL) {
1327                 IMessageQTransport_unbind((Void *)transport, queue);
1328             }
1329         }
1330     }
1332     pthread_mutex_unlock(&MessageQ_module->gate);