Add FD_CLOEXEC flag for sockets, /dev/mem and LAD pipes
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     pthread_mutex_t           seqNumGate;
122     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
123     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
124     MessageQ_PutHookFxn       putHookFxn;
125 } MessageQ_ModuleObject;
127 typedef struct MessageQ_CIRCLEQ_ENTRY {
128      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
129 } MessageQ_CIRCLEQ_ENTRY;
131 /*!
132  *  @brief  Structure for the Handle for the MessageQ.
133  */
134 typedef struct MessageQ_Object_tag {
135     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
136     pthread_mutex_t              msgListGate;
137     MessageQ_Params              params;
138     MessageQ_QueueId             queue;
139     int                          unblocked;
140     void                         *serverHandle;
141     sem_t                        synchronizer;
142 } MessageQ_Object;
144 /* traces in this file are controlled via _MessageQ_verbose */
145 Bool _MessageQ_verbose = FALSE;
146 #define verbose _MessageQ_verbose
148 /* =============================================================================
149  *  Globals
150  * =============================================================================
151  */
152 static MessageQ_ModuleObject MessageQ_state =
154     .refCount   = 0,
155     .nameServer = NULL,
156 #if defined(IPC_BUILDOS_ANDROID)
157     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
158 #else
159     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
160 #endif
161     .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
162     .putHookFxn = NULL
163 };
165 /*!
166  *  @var    MessageQ_module
167  *
168  *  @brief  Pointer to the MessageQ module state.
169  */
170 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
172 Void _MessageQ_grow(UInt16 queueIndex);
174 /* =============================================================================
175  * APIS
176  * =============================================================================
177  */
179 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
180                                 UInt16 rprocId, UInt priority)
182     Int status = FALSE;
183     UInt16 clusterId;
185     if (handle == NULL) {
186         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
187               );
189         return status;
190     }
192     /* map procId to clusterId */
193     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
195     if (clusterId >= MultiProc_MAXPROCESSORS) {
196         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
198         return status;
199     }
201     if (MessageQ_module->transports[clusterId][priority] == NULL) {
202         MessageQ_module->transports[clusterId][priority] = handle;
204         status = TRUE;
205     }
207     return status;
210 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
212     if (inst == NULL) {
213         printf("MessageQ_registerTransportId: invalid NULL handle\n");
215         return MessageQ_E_INVALIDARG;
216     }
218     if (tid >= MessageQ_MAXTRANSPORTS) {
219         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
220                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
222         return MessageQ_E_INVALIDARG;
223     }
225     if (MessageQ_module->transInst[tid] != NULL) {
226         printf("MessageQ_registerTransportId: transport id %d already "
227                 "registered\n", tid);
229         return MessageQ_E_ALREADYEXISTS;
230     }
232     MessageQ_module->transInst[tid] = inst;
234     return MessageQ_S_SUCCESS;
237 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
239     UInt16 clusterId;
241     /* map procId to clusterId */
242     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
244     if (clusterId >= MultiProc_MAXPROCESSORS) {
245         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
247         return;
248     }
250     MessageQ_module->transports[clusterId][priority] = NULL;
253 Void MessageQ_unregisterTransportId(UInt tid)
255     if (tid >= MessageQ_MAXTRANSPORTS) {
256         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
257                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
259         return;
260     }
262     MessageQ_module->transInst[tid] = NULL;
265 /*
266  * Function to get default configuration for the MessageQ module.
267  */
268 Void MessageQ_getConfig(MessageQ_Config *cfg)
270     Int status;
271     LAD_ClientHandle handle;
272     struct LAD_CommandObj cmd;
273     union LAD_ResponseObj rsp;
275     assert (cfg != NULL);
277     handle = LAD_findHandle();
278     if (handle == LAD_MAXNUMCLIENTS) {
279         PRINTVERBOSE1(
280           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
281            getpid())
283         return;
284     }
286     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
287     cmd.clientId = handle;
289     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
290         PRINTVERBOSE1(
291           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
292         return;
293     }
295     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
296         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
297                       status)
298         return;
299     }
300     status = rsp.messageQGetConfig.status;
302     PRINTVERBOSE2(
303       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
304       handle, status)
306     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
308     return;
311 /*
312  *  Function to setup the MessageQ module.
313  */
314 Int MessageQ_setup(const MessageQ_Config *cfg)
316     Int status = MessageQ_S_SUCCESS;
317     LAD_ClientHandle handle;
318     struct LAD_CommandObj cmd;
319     union LAD_ResponseObj rsp;
320     Int pri;
321     Int i;
322     Int tid;
324     /* this entire function must be serialized */
325     pthread_mutex_lock(&MessageQ_module->gate);
327     /* ensure only first thread performs startup procedure */
328     if (++MessageQ_module->refCount > 1) {
329         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
330                 MessageQ_module->refCount)
331         status = MessageQ_S_ALREADYSETUP;
332         goto exit;
333     }
335     handle = LAD_findHandle();
336     if (handle == LAD_MAXNUMCLIENTS) {
337         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
338                 "pid %d\n", getpid())
339         status = MessageQ_E_RESOURCE;
340         goto exit;
341     }
343     cmd.cmd = LAD_MESSAGEQ_SETUP;
344     cmd.clientId = handle;
345     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
347     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
348         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
349                 "status=%d\n", status)
350         status = MessageQ_E_FAIL;
351         goto exit;
352     }
354     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
355         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
356         status = MessageQ_E_FAIL;
357         goto exit;
358     }
359     status = rsp.setup.status;
361     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
362             handle, status)
364     MessageQ_module->seqNum = 0;
365     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
366     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
367     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
368             sizeof(MessageQ_Handle));
370     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
371         for (pri = 0; pri < 2; pri++) {
372             MessageQ_module->transports[i][pri] = NULL;
373         }
374     }
376     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
377         MessageQ_module->transInst[tid] = NULL;
378     }
380 exit:
381     /* if error, must decrement reference count */
382     if (status < 0) {
383         MessageQ_module->refCount--;
384     }
386     pthread_mutex_unlock(&MessageQ_module->gate);
388     return (status);
391 /*
392  *  MessageQ_destroy - destroy the MessageQ module.
393  */
394 Int MessageQ_destroy(void)
396     Int status = MessageQ_S_SUCCESS;
397     LAD_ClientHandle handle;
398     struct LAD_CommandObj cmd;
399     union LAD_ResponseObj rsp;
401     /* this entire function must be serialized */
402     pthread_mutex_lock(&MessageQ_module->gate);
404     /* ensure only last thread does the work */
405     if (--MessageQ_module->refCount > 0) {
406         goto exit;
407     }
409     handle = LAD_findHandle();
410     if (handle == LAD_MAXNUMCLIENTS) {
411         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
412                 "for pid %d\n", getpid())
413         status =  MessageQ_E_RESOURCE;
414         goto exit;
415     }
417     cmd.cmd = LAD_MESSAGEQ_DESTROY;
418     cmd.clientId = handle;
420     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
421         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
422                 "status=%d\n", status)
423         status = MessageQ_E_FAIL;
424         goto exit;
425     }
427     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
428         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
429         status = MessageQ_E_FAIL;
430         goto exit;
431     }
432     status = rsp.status;
434     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
435             "status=%d\n", handle, status)
437 exit:
438     pthread_mutex_unlock(&MessageQ_module->gate);
440     return (status);
443 /*
444  *  ======== MessageQ_Params_init ========
445  *  Legacy implementation.
446  */
447 Void MessageQ_Params_init(MessageQ_Params *params)
449     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
452 /*
453  *  ======== MessageQ_Params_init__S ========
454  *  New implementation which is version aware.
455  */
456 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
458     MessageQ_Params_Version2 *params2;
460     switch (version) {
462         case MessageQ_Params_VERSION_2:
463             params2 = (MessageQ_Params_Version2 *)params;
464             params2->__version = MessageQ_Params_VERSION_2;
465             params2->synchronizer = NULL;
466             params2->queueIndex = MessageQ_ANY;
467             break;
469         default:
470             assert(FALSE);
471             break;
472     }
475 /*
476  *  ======== MessageQ_create ========
477  */
478 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
480     Int                   status;
481     MessageQ_Object      *obj = NULL;
482     IMessageQTransport_Handle transport;
483     INetworkTransport_Handle netTrans;
484     ITransport_Handle     baseTrans;
485     UInt16                queueIndex;
486     UInt16                clusterId;
487     Int                   tid;
488     Int                   priority;
489     LAD_ClientHandle      handle;
490     struct LAD_CommandObj cmd;
491     union LAD_ResponseObj rsp;
492     MessageQ_Params ps;
494     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
496     /* copy the given params into the current params structure */
497     if (pp != NULL) {
499         /* snoop the params pointer to see if it's a legacy structure */
500         if ((pp->__version == 0) || (pp->__version > 100)) {
501             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
502         }
504         /* not legacy structure, use params version field */
505         else if (pp->__version == MessageQ_Params_VERSION_2) {
506             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
507             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
508             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
509         }
510         else {
511             assert(FALSE);
512         }
513     }
515     handle = LAD_findHandle();
516     if (handle == LAD_MAXNUMCLIENTS) {
517         PRINTVERBOSE1(
518           "MessageQ_create: can't find connection to daemon for pid %d\n",
519            getpid())
521         return NULL;
522     }
524     cmd.cmd = LAD_MESSAGEQ_CREATE;
525     cmd.clientId = handle;
527     if (name == NULL) {
528         cmd.args.messageQCreate.name[0] = '\0';
529     }
530     else {
531         strncpy(cmd.args.messageQCreate.name, name,
532                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
533         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
534     }
536     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
538     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
539         PRINTVERBOSE1(
540           "MessageQ_create: sending LAD command failed, status=%d\n", status)
541         return NULL;
542     }
544     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
545         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
546         return NULL;
547     }
548     status = rsp.messageQCreate.status;
550     PRINTVERBOSE2(
551       "MessageQ_create: got LAD response for client %d, status=%d\n",
552       handle, status)
554     if (status == -1) {
555        PRINTVERBOSE1(
556           "MessageQ_create: MessageQ server operation failed, status=%d\n",
557           status)
558        return NULL;
559     }
561     /* Create the generic obj */
562     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
564    /* Populate the params member */
565     memcpy(&obj->params, &ps, sizeof(ps));
568     obj->queue = rsp.messageQCreate.queueId;
569     obj->serverHandle = rsp.messageQCreate.serverHandle;
570     pthread_mutex_init(&obj->msgListGate, NULL);
571     CIRCLEQ_INIT(&obj->msgList);
572     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
573         PRINTVERBOSE1(
574           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
576         MessageQ_delete((MessageQ_Handle *)&obj);
578         return NULL;
579     }
581     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
582     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
584     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
585             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
587     pthread_mutex_lock(&MessageQ_module->gate);
589     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
590         for (priority = 0; priority < 2; priority++) {
591             transport = MessageQ_module->transports[clusterId][priority];
592             if (transport) {
593                 /* need to check return and do something if error */
594                 IMessageQTransport_bind((Void *)transport, obj->queue);
595             }
596         }
597     }
599     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
600         baseTrans = MessageQ_module->transInst[tid];
602         if (baseTrans != NULL) {
603             switch (ITransport_itype(baseTrans)) {
604                 case INetworkTransport_TypeId:
605                     netTrans = INetworkTransport_downCast(baseTrans);
606                     INetworkTransport_bind((void *)netTrans, obj->queue);
607                     break;
609                 default:
610                     /* error */
611                     printf("MessageQ_create: Error: transport id %d is an "
612                             "unsupported transport type.\n", tid);
613                     break;
614             }
615         }
616     }
618     /* LAD's MessageQ module can grow, we need to grow as well */
619     if (queueIndex >= MessageQ_module->numQueues) {
620         _MessageQ_grow(queueIndex);
621     }
623     /*  No need to "allocate" slot since the queueIndex returned by
624      *  LAD is guaranteed to be unique.
625      */
626     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
628     pthread_mutex_unlock(&MessageQ_module->gate);
630     return (MessageQ_Handle)obj;
633 /*
634  *  ======== MessageQ_delete ========
635  */
636 Int MessageQ_delete(MessageQ_Handle *handlePtr)
638     MessageQ_Object *obj;
639     IMessageQTransport_Handle transport;
640     INetworkTransport_Handle  netTrans;
641     ITransport_Handle         baseTrans;
642     Int              status = MessageQ_S_SUCCESS;
643     UInt16           queueIndex;
644     UInt16                clusterId;
645     Int                   tid;
646     Int                   priority;
647     LAD_ClientHandle handle;
648     struct LAD_CommandObj cmd;
649     union LAD_ResponseObj rsp;
651     handle = LAD_findHandle();
652     if (handle == LAD_MAXNUMCLIENTS) {
653         PRINTVERBOSE1(
654           "MessageQ_delete: can't find connection to daemon for pid %d\n",
655            getpid())
657         return MessageQ_E_FAIL;
658     }
660     obj = (MessageQ_Object *)(*handlePtr);
662     cmd.cmd = LAD_MESSAGEQ_DELETE;
663     cmd.clientId = handle;
664     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
666     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
667         PRINTVERBOSE1(
668           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
669         return MessageQ_E_FAIL;
670     }
672     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
673         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
674         return MessageQ_E_FAIL;
675     }
676     status = rsp.messageQDelete.status;
678     PRINTVERBOSE2(
679       "MessageQ_delete: got LAD response for client %d, status=%d\n",
680       handle, status)
682     pthread_mutex_lock(&MessageQ_module->gate);
684     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
685         for (priority = 0; priority < 2; priority++) {
686             transport = MessageQ_module->transports[clusterId][priority];
687             if (transport) {
688                 IMessageQTransport_unbind((Void *)transport, obj->queue);
689             }
690         }
691     }
693     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
694         baseTrans = MessageQ_module->transInst[tid];
696         if (baseTrans != NULL) {
697             switch (ITransport_itype(baseTrans)) {
698                 case INetworkTransport_TypeId:
699                     netTrans = INetworkTransport_downCast(baseTrans);
700                     INetworkTransport_unbind((void *)netTrans, obj->queue);
701                     break;
703                 default:
704                     /* error */
705                     printf("MessageQ_create: Error: transport id %d is an "
706                             "unsupported transport type.\n", tid);
707                     break;
708             }
709         }
710     }
712     /* extract the queue index from the queueId */
713     queueIndex = MessageQ_getQueueIndex(obj->queue);
714     MessageQ_module->queues[queueIndex] = NULL;
716     pthread_mutex_unlock(&MessageQ_module->gate);
718     free(obj);
719     *handlePtr = NULL;
721     return status;
724 /*
725  *  ======== MessageQ_open ========
726  *  Acquire a queueId for use in sending messages to the queue
727  */
728 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
730     Int status = MessageQ_S_SUCCESS;
732     status = NameServer_getUInt32(MessageQ_module->nameServer,
733                                   name, queueId, NULL);
735     if (status == NameServer_E_NOTFOUND) {
736         /* Set return queue ID to invalid */
737         *queueId = MessageQ_INVALIDMESSAGEQ;
738         status = MessageQ_E_NOTFOUND;
739     }
740     else if (status >= 0) {
741         /* Override with a MessageQ status code */
742         status = MessageQ_S_SUCCESS;
743     }
744     else {
745         /* Set return queue ID to invalid */
746         *queueId = MessageQ_INVALIDMESSAGEQ;
748         /* Override with a MessageQ status code */
749         if (status == NameServer_E_TIMEOUT) {
750             status = MessageQ_E_TIMEOUT;
751         }
752         else {
753             status = MessageQ_E_FAIL;
754         }
755     }
757     return status;
760 /*
761  *  ======== MessageQ_openQueueId ========
762  */
763 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
765     MessageQ_QueueIndex queuePort;
766     MessageQ_QueueId queueId;
768     /* queue port is embedded in the queueId */
769     queuePort = queueIndex + MessageQ_PORTOFFSET;
770     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
772     return (queueId);
775 /*
776  *  ======== MessageQ_close ========
777  *  Closes previously opened instance of MessageQ
778  */
779 Int MessageQ_close(MessageQ_QueueId *queueId)
781     Int32 status = MessageQ_S_SUCCESS;
783     /* Nothing more to be done for closing the MessageQ. */
784     *queueId = MessageQ_INVALIDMESSAGEQ;
786     return status;
789 /*
790  *  ======== MessageQ_put ========
791  *  Deliver the given message, either locally or to the transport
792  *
793  *  If the destination is a local queue, deliver the message. Otherwise,
794  *  pass the message to a transport for delivery. The transport handles
795  *  the sending of the message using the appropriate interface (socket,
796  *  device ioctl, etc.).
797  */
798 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
800     Int status = MessageQ_S_SUCCESS;
801     MessageQ_Object *obj;
802     UInt16 dstProcId;
803     UInt16 queueIndex;
804     UInt16 queuePort;
805     ITransport_Handle baseTrans;
806     IMessageQTransport_Handle msgTrans;
807     INetworkTransport_Handle netTrans;
808     Int priority;
809     UInt tid;
810     UInt16 clusterId;
811     Bool delivered;
813     /* extract destination address from the given queueId */
814     dstProcId  = (UInt16)(queueId >> 16);
815     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
817     /* write the destination address into the message header */
818     msg->dstId = queuePort;
819     msg->dstProc= dstProcId;
821     /* invoke the hook function after addressing the message */
822     if (MessageQ_module->putHookFxn != NULL) {
823         MessageQ_module->putHookFxn(queueId, msg);
824     }
826     /*  For an outbound message: If message destination is on this
827      *  processor, then check if the destination queue is in this
828      *  process (thread-to-thread messaging).
829      *
830      *  For an inbound message: Check if destination queue is in this
831      *  process (process-to-process messaging).
832      */
833     if (dstProcId == MultiProc_self()) {
834         queueIndex = queuePort - MessageQ_PORTOFFSET;
836         if (queueIndex < MessageQ_module->numQueues) {
837             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
839             if (obj != NULL) {
840                 /* deliver message to queue */
841                 pthread_mutex_lock(&obj->msgListGate);
842                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
843                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
844                 pthread_mutex_unlock(&obj->msgListGate);
845                 sem_post(&obj->synchronizer);
846                 goto done;
847             }
848         }
849     }
851     /*  Getting here implies the message is outbound. Must give it to
852      *  either the primary or secondary transport for delivery. Start
853      *  by extracting the transport ID from the message header.
854      */
855     tid = MessageQ_getTransportId(msg);
857     if (tid >= MessageQ_MAXTRANSPORTS) {
858         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
859                 tid, MessageQ_MAXTRANSPORTS);
860         status = MessageQ_E_FAIL;
861         goto done;
862     }
864     /* if transportId is set, use secondary transport for message delivery */
865     if (tid != 0) {
866         baseTrans = MessageQ_module->transInst[tid];
868         if (baseTrans == NULL) {
869             printf("MessageQ_put: Error: transport is null\n");
870             status = MessageQ_E_FAIL;
871             goto done;
872         }
874         /* downcast instance pointer to transport interface */
875         switch (ITransport_itype(baseTrans)) {
876             case INetworkTransport_TypeId:
877                 netTrans = INetworkTransport_downCast(baseTrans);
878                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
879                 status = (delivered ? MessageQ_S_SUCCESS :
880                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
881                            MessageQ_E_FAIL));
882                 break;
884             default:
885                 /* error */
886                 printf("MessageQ_put: Error: transport id %d is an "
887                         "unsupported transport type\n", tid);
888                 status = MessageQ_E_FAIL;
889                 break;
890         }
891     }
892     else {
893         /* use primary transport for delivery */
894         priority = MessageQ_getMsgPri(msg);
895         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
897         /* primary transport can only be used for intra-cluster delivery */
898         if (clusterId > MultiProc_getNumProcsInCluster()) {
899             printf("MessageQ_put: Error: destination procId=%d is not "
900                     "in cluster. Must specify a transportId.\n", dstProcId);
901             status =  MessageQ_E_FAIL;
902             goto done;
903         }
905         msgTrans = MessageQ_module->transports[clusterId][priority];
906         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
907         status = (delivered ? MessageQ_S_SUCCESS :
908                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
909     }
911 done:
912     return (status);
915 /*
916  *  MessageQ_get - gets a message for a message queue and blocks if
917  *  the queue is empty.
918  *
919  *  If a message is present, it returns it.  Otherwise it blocks
920  *  waiting for a message to arrive.
921  *  When a message is returned, it is owned by the caller.
922  */
923 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
925     MessageQ_Object * obj = (MessageQ_Object *)handle;
926     Int     status = MessageQ_S_SUCCESS;
927     struct timespec ts;
928     struct timeval tv;
930 #if 0
931 /*
932  * Optimization here to get a message without going in to the sem
933  * operation, but the sem count will not be maintained properly.
934  */
935     pthread_mutex_lock(&obj->msgListGate);
937     if (obj->msgList.cqh_first != &obj->msgList) {
938         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
939         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
941         pthread_mutex_unlock(&obj->msgListGate);
942     }
943     else {
944         pthread_mutex_unlock(&obj->msgListGate);
945     }
946 #endif
948     if (timeout == MessageQ_FOREVER) {
949         sem_wait(&obj->synchronizer);
950     }
951     else {
952         /* add timeout (microseconds) to current time of day */
953         gettimeofday(&tv, NULL);
954         tv.tv_sec += timeout / 1000000;
955         tv.tv_usec += timeout % 1000000;
957         if (tv.tv_usec >= 1000000) {
958               tv.tv_sec++;
959               tv.tv_usec -= 1000000;
960         }
962         /* set absolute timeout value */
963         ts.tv_sec = tv.tv_sec;
964         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
966         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
967             if (errno == ETIMEDOUT) {
968                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
969                 return (MessageQ_E_TIMEOUT);
970             }
971             else {
972                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
973                 return (MessageQ_E_FAIL);
974             }
975         }
976     }
978     if (obj->unblocked) {
979         return obj->unblocked;
980     }
982     pthread_mutex_lock(&obj->msgListGate);
984     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
985     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
987     pthread_mutex_unlock(&obj->msgListGate);
989     return status;
992 /*
993  * Return a count of the number of messages in the queue
994  *
995  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
996  */
997 Int MessageQ_count(MessageQ_Handle handle)
999     Int               count = -1;
1000 #if 0
1001     MessageQ_Object * obj   = (MessageQ_Object *) handle;
1002     socklen_t         optlen;
1004     /*
1005      * TBD: Need to find a way to implement (if anyone uses it!), and
1006      * push down into transport..
1007      */
1009     /*
1010      * 2nd arg to getsockopt should be transport independent, but using
1011      *  SSKPROTO_SHMFIFO for now:
1012      */
1013     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1014                  &count, &optlen);
1015 #endif
1017     return count;
1020 /*
1021  *  Initializes a message not obtained from MessageQ_alloc.
1022  */
1023 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1025     /* Fill in the fields of the message */
1026     MessageQ_msgInit(msg);
1027     msg->heapId = MessageQ_STATICMSG;
1028     msg->msgSize = size;
1031 /*
1032  *  Allocate a message and initialize the needed fields (note some
1033  *  of the fields in the header are set via other APIs or in the
1034  *  MessageQ_put function,
1035  */
1036 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1038     MessageQ_Msg msg;
1040     /*
1041      * heapId not used for local alloc (as this is over a copy transport), but
1042      * we need to send to other side as heapId is used in BIOS transport.
1043      */
1044     msg = (MessageQ_Msg)calloc(1, size);
1045     MessageQ_msgInit(msg);
1046     msg->msgSize = size;
1047     msg->heapId = heapId;
1049     return msg;
1052 /*
1053  *  Frees the message back to the heap that was used to allocate it.
1054  */
1055 Int MessageQ_free(MessageQ_Msg msg)
1057     UInt32 status = MessageQ_S_SUCCESS;
1059     /* Check to ensure this was not allocated by user: */
1060     if (msg->heapId == MessageQ_STATICMSG) {
1061         status = MessageQ_E_CANNOTFREESTATICMSG;
1062     }
1063     else {
1064         free(msg);
1065     }
1067     return status;
1070 /* Register a heap with MessageQ. */
1071 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1073     Int status = MessageQ_S_SUCCESS;
1075     /* Do nothing, as this uses a copy transport */
1077     return status;
1080 /* Unregister a heap with MessageQ. */
1081 Int MessageQ_unregisterHeap(UInt16 heapId)
1083     Int status = MessageQ_S_SUCCESS;
1085     /* Do nothing, as this uses a copy transport */
1087     return status;
1090 /* Unblocks a MessageQ */
1091 Void MessageQ_unblock(MessageQ_Handle handle)
1093     MessageQ_Object *obj = (MessageQ_Object *)handle;
1095     obj->unblocked = MessageQ_E_UNBLOCKED;
1096     sem_post(&obj->synchronizer);
1099 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1100 Void MessageQ_shutdown(MessageQ_Handle handle)
1102     MessageQ_Object *obj = (MessageQ_Object *)handle;
1104     obj->unblocked = MessageQ_E_SHUTDOWN;
1105     sem_post(&obj->synchronizer);
1108 /* Embeds a source message queue into a message */
1109 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1111     MessageQ_Object *obj = (MessageQ_Object *)handle;
1113     msg->replyId = (UInt16)(obj->queue);
1114     msg->replyProc = (UInt16)(obj->queue >> 16);
1117 /* Returns the QueueId associated with the handle. */
1118 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1120     MessageQ_Object *obj = (MessageQ_Object *) handle;
1121     UInt32 queueId;
1123     queueId = (obj->queue);
1125     return queueId;
1128 /* Returns the local handle associated with queueId. */
1129 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1131     MessageQ_Object *obj;
1132     MessageQ_QueueIndex queueIndex;
1133     UInt16 procId;
1135     procId = MessageQ_getProcId(queueId);
1136     if (procId != MultiProc_self()) {
1137         return NULL;
1138     }
1140     queueIndex = MessageQ_getQueueIndex(queueId);
1141     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1143     return (MessageQ_Handle)obj;
1146 /* Sets the tracing of a message */
1147 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1149     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1152 /*
1153  *  Returns the amount of shared memory used by one transport instance.
1154  *
1155  *  The MessageQ module itself does not use any shared memory but the
1156  *  underlying transport may use some shared memory.
1157  */
1158 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1160     SizeT memReq = 0u;
1162     /* Do nothing, as this is a copy transport. */
1164     return memReq;
1167 /*
1168  * This is a helper function to initialize a message.
1169  */
1170 Void MessageQ_msgInit(MessageQ_Msg msg)
1172 #if 0
1173     Int                 status    = MessageQ_S_SUCCESS;
1174     LAD_ClientHandle handle;
1175     struct LAD_CommandObj cmd;
1176     union LAD_ResponseObj rsp;
1178     handle = LAD_findHandle();
1179     if (handle == LAD_MAXNUMCLIENTS) {
1180         PRINTVERBOSE1(
1181           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1182            getpid())
1184         return;
1185     }
1187     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1188     cmd.clientId = handle;
1190     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1191         PRINTVERBOSE1(
1192           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1193         return;
1194     }
1196     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1197         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1198         return;
1199     }
1200     status = rsp.msgInit.status;
1202     PRINTVERBOSE2(
1203       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1204       handle, status)
1206     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1207 #else
1208     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1209     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1210     msg->msgId     = MessageQ_INVALIDMSGID;
1211     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1212     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1213     msg->srcProc   = MultiProc_self();
1215     pthread_mutex_lock(&MessageQ_module->seqNumGate);
1216     msg->seqNum  = MessageQ_module->seqNum++;
1217     pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1218 #endif
1221 /*
1222  *  ======== _MessageQ_grow ========
1223  *  Increase module's queues array to accommodate queueIndex from LAD
1224  *
1225  *  Note: this function takes the queue index value (i.e. without the
1226  *  port offset).
1227  */
1228 Void _MessageQ_grow(UInt16 queueIndex)
1230     MessageQ_Handle *queues;
1231     MessageQ_Handle *oldQueues;
1232     UInt oldSize;
1234     pthread_mutex_lock(&MessageQ_module->gate);
1236     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1238     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1239     memcpy(queues, MessageQ_module->queues, oldSize);
1241     oldQueues = MessageQ_module->queues;
1242     MessageQ_module->queues = queues;
1243     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1245     pthread_mutex_unlock(&MessageQ_module->gate);
1247     free(oldQueues);
1249     return;
1252 /*
1253  *  ======== MessageQ_bind ========
1254  *  Bind all existing message queues to the given processor
1255  *
1256  *  Note: This function is a hack to work around the driver.
1257  *
1258  *  The Linux rpmsgproto driver requires a socket for each
1259  *  message queue and remote processor tuple.
1260  *
1261  *      socket --> (queue, processor)
1262  *
1263  *  Therefore, each time a new remote processor is started, all
1264  *  existing message queues need to create a socket for the new
1265  *  processor.
1266  *
1267  *  The driver should not have this requirement. One socket per
1268  *  message queue should be sufficient to uniquely identify the
1269  *  endpoint to the driver.
1270  */
1271 Void MessageQ_bind(UInt16 procId)
1273     int q;
1274     int clusterId;
1275     int priority;
1276     MessageQ_Handle handle;
1277     MessageQ_QueueId queue;
1278     IMessageQTransport_Handle transport;
1280     clusterId = procId - MultiProc_getBaseIdOfCluster();
1281     pthread_mutex_lock(&MessageQ_module->gate);
1283     for (q = 0; q < MessageQ_module->numQueues; q++) {
1285         if ((handle = MessageQ_module->queues[q]) == NULL) {
1286             continue;
1287         }
1289         queue = ((MessageQ_Object *)handle)->queue;
1291         for (priority = 0; priority < 2; priority++) {
1292             transport = MessageQ_module->transports[clusterId][priority];
1293             if (transport != NULL) {
1294                 IMessageQTransport_bind((Void *)transport, queue);
1295             }
1296         }
1297     }
1299     pthread_mutex_unlock(&MessageQ_module->gate);
1302 /*
1303  *  ======== MessageQ_unbind ========
1304  *  Unbind all existing message queues from the given processor
1305  *
1306  *  Hack: see MessageQ_bind.
1307  */
1308 Void MessageQ_unbind(UInt16 procId)
1310     int q;
1311     int clusterId;
1312     int priority;
1313     MessageQ_Handle handle;
1314     MessageQ_QueueId queue;
1315     IMessageQTransport_Handle transport;
1317     pthread_mutex_lock(&MessageQ_module->gate);
1319     for (q = 0; q < MessageQ_module->numQueues; q++) {
1321         if ((handle = MessageQ_module->queues[q]) == NULL) {
1322             continue;
1323         }
1325         queue = ((MessageQ_Object *)handle)->queue;
1326         clusterId = procId - MultiProc_getBaseIdOfCluster();
1328         for (priority = 0; priority < 2; priority++) {
1329             transport = MessageQ_module->transports[clusterId][priority];
1330             if (transport != NULL) {
1331                 IMessageQTransport_unbind((Void *)transport, queue);
1332             }
1333         }
1334     }
1336     pthread_mutex_unlock(&MessageQ_module->gate);