802e500365889a755bfadffd73471185bce9ee4d
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
122     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
123     MessageQ_PutHookFxn       putHookFxn;
124 } MessageQ_ModuleObject;
126 typedef struct MessageQ_CIRCLEQ_ENTRY {
127      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
128 } MessageQ_CIRCLEQ_ENTRY;
130 /*!
131  *  @brief  Structure for the Handle for the MessageQ.
132  */
133 typedef struct MessageQ_Object_tag {
134     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
135     MessageQ_Params              params;
136     MessageQ_QueueId             queue;
137     int                          unblocked;
138     void                         *serverHandle;
139     sem_t                        synchronizer;
140 } MessageQ_Object;
142 /* traces in this file are controlled via _MessageQ_verbose */
143 Bool _MessageQ_verbose = FALSE;
144 #define verbose _MessageQ_verbose
146 /* =============================================================================
147  *  Globals
148  * =============================================================================
149  */
150 static MessageQ_ModuleObject MessageQ_state =
152     .refCount   = 0,
153     .nameServer = NULL,
154     .gate       = PTHREAD_MUTEX_INITIALIZER,
155     .putHookFxn = NULL
156 };
158 /*!
159  *  @var    MessageQ_module
160  *
161  *  @brief  Pointer to the MessageQ module state.
162  */
163 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
165 Void _MessageQ_grow(UInt16 queueIndex);
167 /* =============================================================================
168  * APIS
169  * =============================================================================
170  */
172 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
173                                 UInt16 rprocId, UInt priority)
175     Int status = FALSE;
176     UInt16 clusterId;
178     if (handle == NULL) {
179         printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
180               );
182         return status;
183     }
185     /* map procId to clusterId */
186     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
188     if (clusterId >= MultiProc_MAXPROCESSORS) {
189         printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
191         return status;
192     }
194     if (MessageQ_module->transports[clusterId][priority] == NULL) {
195         MessageQ_module->transports[clusterId][priority] = handle;
197         status = TRUE;
198     }
200     return status;
203 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
205     if (inst == NULL) {
206         printf("MessageQ_registerTransportId: invalid NULL handle\n");
208         return MessageQ_E_INVALIDARG;
209     }
211     if (tid >= MessageQ_MAXTRANSPORTS) {
212         printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
213                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
215         return MessageQ_E_INVALIDARG;
216     }
218     if (MessageQ_module->transInst[tid] != NULL) {
219         printf("MessageQ_registerTransportId: transport id %d already "
220                 "registered\n", tid);
222         return MessageQ_E_ALREADYEXISTS;
223     }
225     MessageQ_module->transInst[tid] = inst;
227     return MessageQ_S_SUCCESS;
230 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
232     UInt16 clusterId;
234     /* map procId to clusterId */
235     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
237     if (clusterId >= MultiProc_MAXPROCESSORS) {
238         printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
240         return;
241     }
243     MessageQ_module->transports[clusterId][priority] = NULL;
246 Void MessageQ_unregisterTransportId(UInt tid)
248     if (tid >= MessageQ_MAXTRANSPORTS) {
249         printf("MessageQ_unregisterTransportId: invalid transport id %d, "
250                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
252         return;
253     }
255     MessageQ_module->transInst[tid] = NULL;
258 /*
259  * Function to get default configuration for the MessageQ module.
260  */
261 Void MessageQ_getConfig(MessageQ_Config *cfg)
263     Int status;
264     LAD_ClientHandle handle;
265     struct LAD_CommandObj cmd;
266     union LAD_ResponseObj rsp;
268     assert (cfg != NULL);
270     handle = LAD_findHandle();
271     if (handle == LAD_MAXNUMCLIENTS) {
272         PRINTVERBOSE1(
273           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
274            getpid())
276         return;
277     }
279     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
280     cmd.clientId = handle;
282     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
283         PRINTVERBOSE1(
284           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
285         return;
286     }
288     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
289         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
290                       status)
291         return;
292     }
293     status = rsp.messageQGetConfig.status;
295     PRINTVERBOSE2(
296       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
297       handle, status)
299     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
301     return;
304 /*
305  *  Function to setup the MessageQ module.
306  */
307 Int MessageQ_setup(const MessageQ_Config *cfg)
309     Int status = MessageQ_S_SUCCESS;
310     LAD_ClientHandle handle;
311     struct LAD_CommandObj cmd;
312     union LAD_ResponseObj rsp;
313     Int pri;
314     Int i;
315     Int tid;
317     /* this entire function must be serialized */
318     pthread_mutex_lock(&MessageQ_module->gate);
320     /* ensure only first thread performs startup procedure */
321     if (++MessageQ_module->refCount > 1) {
322         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
323                 MessageQ_module->refCount)
324         status = MessageQ_S_ALREADYSETUP;
325         goto exit;
326     }
328     handle = LAD_findHandle();
329     if (handle == LAD_MAXNUMCLIENTS) {
330         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
331                 "pid %d\n", getpid())
332         status = MessageQ_E_RESOURCE;
333         goto exit;
334     }
336     cmd.cmd = LAD_MESSAGEQ_SETUP;
337     cmd.clientId = handle;
338     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
340     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
341         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
342                 "status=%d\n", status)
343         status = MessageQ_E_FAIL;
344         goto exit;
345     }
347     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
348         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
349         status = MessageQ_E_FAIL;
350         goto exit;
351     }
352     status = rsp.setup.status;
354     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
355             handle, status)
357     MessageQ_module->seqNum = 0;
358     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
359     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
360     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
361             sizeof(MessageQ_Handle));
363     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
364         for (pri = 0; pri < 2; pri++) {
365             MessageQ_module->transports[i][pri] = NULL;
366         }
367     }
369     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
370         MessageQ_module->transInst[tid] = NULL;
371     }
373 exit:
374     /* if error, must decrement reference count */
375     if (status < 0) {
376         MessageQ_module->refCount--;
377     }
379     pthread_mutex_unlock(&MessageQ_module->gate);
381     return (status);
384 /*
385  *  MessageQ_destroy - destroy the MessageQ module.
386  */
387 Int MessageQ_destroy(void)
389     Int status = MessageQ_S_SUCCESS;
390     LAD_ClientHandle handle;
391     struct LAD_CommandObj cmd;
392     union LAD_ResponseObj rsp;
394     /* this entire function must be serialized */
395     pthread_mutex_lock(&MessageQ_module->gate);
397     /* ensure only last thread does the work */
398     if (--MessageQ_module->refCount > 0) {
399         goto exit;
400     }
402     handle = LAD_findHandle();
403     if (handle == LAD_MAXNUMCLIENTS) {
404         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
405                 "for pid %d\n", getpid())
406         status =  MessageQ_E_RESOURCE;
407         goto exit;
408     }
410     cmd.cmd = LAD_MESSAGEQ_DESTROY;
411     cmd.clientId = handle;
413     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
414         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
415                 "status=%d\n", status)
416         status = MessageQ_E_FAIL;
417         goto exit;
418     }
420     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
421         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
422         status = MessageQ_E_FAIL;
423         goto exit;
424     }
425     status = rsp.status;
427     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
428             "status=%d\n", handle, status)
430 exit:
431     pthread_mutex_unlock(&MessageQ_module->gate);
433     return (status);
436 /*
437  *  ======== MessageQ_Params_init ========
438  *  Legacy implementation.
439  */
440 Void MessageQ_Params_init(MessageQ_Params *params)
442     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
445 /*
446  *  ======== MessageQ_Params_init__S ========
447  *  New implementation which is version aware.
448  */
449 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
451     MessageQ_Params_Version2 *params2;
453     switch (version) {
455         case MessageQ_Params_VERSION_2:
456             params2 = (MessageQ_Params_Version2 *)params;
457             params2->__version = MessageQ_Params_VERSION_2;
458             params2->synchronizer = NULL;
459             params2->queueIndex = MessageQ_ANY;
460             break;
462         default:
463             assert(FALSE);
464             break;
465     }
468 /*
469  *  ======== MessageQ_create ========
470  */
471 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
473     Int                   status;
474     MessageQ_Object      *obj = NULL;
475     IMessageQTransport_Handle transport;
476     INetworkTransport_Handle netTrans;
477     ITransport_Handle     baseTrans;
478     UInt16                queueIndex;
479     UInt16                clusterId;
480     Int                   tid;
481     Int                   priority;
482     LAD_ClientHandle      handle;
483     struct LAD_CommandObj cmd;
484     union LAD_ResponseObj rsp;
485     MessageQ_Params ps;
487     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
489     /* copy the given params into the current params structure */
490     if (pp != NULL) {
492         /* snoop the params pointer to see if it's a legacy structure */
493         if ((pp->__version == 0) || (pp->__version > 100)) {
494             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
495         }
497         /* not legacy structure, use params version field */
498         else if (pp->__version == MessageQ_Params_VERSION_2) {
499             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
500             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
501             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
502         }
503         else {
504             assert(FALSE);
505         }
506     }
508     handle = LAD_findHandle();
509     if (handle == LAD_MAXNUMCLIENTS) {
510         PRINTVERBOSE1(
511           "MessageQ_create: can't find connection to daemon for pid %d\n",
512            getpid())
514         return NULL;
515     }
517     cmd.cmd = LAD_MESSAGEQ_CREATE;
518     cmd.clientId = handle;
520     if (name == NULL) {
521         cmd.args.messageQCreate.name[0] = '\0';
522     }
523     else {
524         strncpy(cmd.args.messageQCreate.name, name,
525                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
526         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
527     }
529     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
531     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
532         PRINTVERBOSE1(
533           "MessageQ_create: sending LAD command failed, status=%d\n", status)
534         return NULL;
535     }
537     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
538         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
539         return NULL;
540     }
541     status = rsp.messageQCreate.status;
543     PRINTVERBOSE2(
544       "MessageQ_create: got LAD response for client %d, status=%d\n",
545       handle, status)
547     if (status == -1) {
548        PRINTVERBOSE1(
549           "MessageQ_create: MessageQ server operation failed, status=%d\n",
550           status)
551        return NULL;
552     }
554     /* Create the generic obj */
555     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
557    /* Populate the params member */
558     memcpy(&obj->params, &ps, sizeof(ps));
561     obj->queue = rsp.messageQCreate.queueId;
562     obj->serverHandle = rsp.messageQCreate.serverHandle;
563     CIRCLEQ_INIT(&obj->msgList);
564     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
565         PRINTVERBOSE1(
566           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
568         MessageQ_delete((MessageQ_Handle *)&obj);
570         return NULL;
571     }
573     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
574     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
576     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
577             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
579     pthread_mutex_lock(&MessageQ_module->gate);
581     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
582         for (priority = 0; priority < 2; priority++) {
583             transport = MessageQ_module->transports[clusterId][priority];
584             if (transport) {
585                 /* need to check return and do something if error */
586                 IMessageQTransport_bind((Void *)transport, obj->queue);
587             }
588         }
589     }
591     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
592         baseTrans = MessageQ_module->transInst[tid];
594         if (baseTrans != NULL) {
595             switch (ITransport_itype(baseTrans)) {
596                 case INetworkTransport_TypeId:
597                     netTrans = INetworkTransport_downCast(baseTrans);
598                     INetworkTransport_bind((void *)netTrans, obj->queue);
599                     break;
601                 default:
602                     /* error */
603                     printf("MessageQ_create: Error: transport id %d is an "
604                             "unsupported transport type.\n", tid);
605                     break;
606             }
607         }
608     }
610     /* LAD's MessageQ module can grow, we need to grow as well */
611     if (queueIndex >= MessageQ_module->numQueues) {
612         _MessageQ_grow(queueIndex);
613     }
615     /*  No need to "allocate" slot since the queueIndex returned by
616      *  LAD is guaranteed to be unique.
617      */
618     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
620     pthread_mutex_unlock(&MessageQ_module->gate);
622     return (MessageQ_Handle)obj;
625 /*
626  *  ======== MessageQ_delete ========
627  */
628 Int MessageQ_delete(MessageQ_Handle *handlePtr)
630     MessageQ_Object *obj;
631     IMessageQTransport_Handle transport;
632     INetworkTransport_Handle  netTrans;
633     ITransport_Handle         baseTrans;
634     Int              status = MessageQ_S_SUCCESS;
635     UInt16           queueIndex;
636     UInt16                clusterId;
637     Int                   tid;
638     Int                   priority;
639     LAD_ClientHandle handle;
640     struct LAD_CommandObj cmd;
641     union LAD_ResponseObj rsp;
643     handle = LAD_findHandle();
644     if (handle == LAD_MAXNUMCLIENTS) {
645         PRINTVERBOSE1(
646           "MessageQ_delete: can't find connection to daemon for pid %d\n",
647            getpid())
649         return MessageQ_E_FAIL;
650     }
652     obj = (MessageQ_Object *)(*handlePtr);
654     cmd.cmd = LAD_MESSAGEQ_DELETE;
655     cmd.clientId = handle;
656     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
658     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
659         PRINTVERBOSE1(
660           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
661         return MessageQ_E_FAIL;
662     }
664     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
665         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
666         return MessageQ_E_FAIL;
667     }
668     status = rsp.messageQDelete.status;
670     PRINTVERBOSE2(
671       "MessageQ_delete: got LAD response for client %d, status=%d\n",
672       handle, status)
674     pthread_mutex_lock(&MessageQ_module->gate);
676     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
677         for (priority = 0; priority < 2; priority++) {
678             transport = MessageQ_module->transports[clusterId][priority];
679             if (transport) {
680                 IMessageQTransport_unbind((Void *)transport, obj->queue);
681             }
682         }
683     }
685     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
686         baseTrans = MessageQ_module->transInst[tid];
688         if (baseTrans != NULL) {
689             switch (ITransport_itype(baseTrans)) {
690                 case INetworkTransport_TypeId:
691                     netTrans = INetworkTransport_downCast(baseTrans);
692                     INetworkTransport_unbind((void *)netTrans, obj->queue);
693                     break;
695                 default:
696                     /* error */
697                     printf("MessageQ_create: Error: transport id %d is an "
698                             "unsupported transport type.\n", tid);
699                     break;
700             }
701         }
702     }
704     /* extract the queue index from the queueId */
705     queueIndex = MessageQ_getQueueIndex(obj->queue);
706     MessageQ_module->queues[queueIndex] = NULL;
708     pthread_mutex_unlock(&MessageQ_module->gate);
710     free(obj);
711     *handlePtr = NULL;
713     return status;
716 /*
717  *  ======== MessageQ_open ========
718  *  Acquire a queueId for use in sending messages to the queue
719  */
720 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
722     Int status = MessageQ_S_SUCCESS;
724     status = NameServer_getUInt32(MessageQ_module->nameServer,
725                                   name, queueId, NULL);
727     if (status == NameServer_E_NOTFOUND) {
728         /* Set return queue ID to invalid */
729         *queueId = MessageQ_INVALIDMESSAGEQ;
730         status = MessageQ_E_NOTFOUND;
731     }
732     else if (status >= 0) {
733         /* Override with a MessageQ status code */
734         status = MessageQ_S_SUCCESS;
735     }
736     else {
737         /* Set return queue ID to invalid */
738         *queueId = MessageQ_INVALIDMESSAGEQ;
740         /* Override with a MessageQ status code */
741         if (status == NameServer_E_TIMEOUT) {
742             status = MessageQ_E_TIMEOUT;
743         }
744         else {
745             status = MessageQ_E_FAIL;
746         }
747     }
749     return status;
752 /*
753  *  ======== MessageQ_openQueueId ========
754  */
755 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
757     MessageQ_QueueIndex queuePort;
758     MessageQ_QueueId queueId;
760     /* queue port is embedded in the queueId */
761     queuePort = queueIndex + MessageQ_PORTOFFSET;
762     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
764     return (queueId);
767 /*
768  *  ======== MessageQ_close ========
769  *  Closes previously opened instance of MessageQ
770  */
771 Int MessageQ_close(MessageQ_QueueId *queueId)
773     Int32 status = MessageQ_S_SUCCESS;
775     /* Nothing more to be done for closing the MessageQ. */
776     *queueId = MessageQ_INVALIDMESSAGEQ;
778     return status;
781 /*
782  *  ======== MessageQ_put ========
783  *  Deliver the given message, either locally or to the transport
784  *
785  *  If the destination is a local queue, deliver the message. Otherwise,
786  *  pass the message to a transport for delivery. The transport handles
787  *  the sending of the message using the appropriate interface (socket,
788  *  device ioctl, etc.).
789  */
790 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
792     Int status = MessageQ_S_SUCCESS;
793     MessageQ_Object *obj;
794     UInt16 dstProcId;
795     UInt16 queueIndex;
796     UInt16 queuePort;
797     ITransport_Handle baseTrans;
798     IMessageQTransport_Handle msgTrans;
799     INetworkTransport_Handle netTrans;
800     Int priority;
801     UInt tid;
802     UInt16 clusterId;
803     Bool delivered;
805     /* extract destination address from the given queueId */
806     dstProcId  = (UInt16)(queueId >> 16);
807     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
809     /* write the destination address into the message header */
810     msg->dstId = queuePort;
811     msg->dstProc= dstProcId;
813     /* invoke the hook function after addressing the message */
814     if (MessageQ_module->putHookFxn != NULL) {
815         MessageQ_module->putHookFxn(queueId, msg);
816     }
818     /*  For an outbound message: If message destination is on this
819      *  processor, then check if the destination queue is in this
820      *  process (thread-to-thread messaging).
821      *
822      *  For an inbound message: Check if destination queue is in this
823      *  process (process-to-process messaging).
824      */
825     if (dstProcId == MultiProc_self()) {
826         queueIndex = queuePort - MessageQ_PORTOFFSET;
828         if (queueIndex < MessageQ_module->numQueues) {
829             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
831             if (obj != NULL) {
832                 /* deliver message to queue */
833                 pthread_mutex_lock(&MessageQ_module->gate);
834                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
835                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
836                 pthread_mutex_unlock(&MessageQ_module->gate);
837                 sem_post(&obj->synchronizer);
838                 goto done;
839             }
840         }
841     }
843     /*  Getting here implies the message is outbound. Must give it to
844      *  either the primary or secondary transport for delivery. Start
845      *  by extracting the transport ID from the message header.
846      */
847     tid = MessageQ_getTransportId(msg);
849     if (tid >= MessageQ_MAXTRANSPORTS) {
850         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
851                 tid, MessageQ_MAXTRANSPORTS);
852         status = MessageQ_E_FAIL;
853         goto done;
854     }
856     /* if transportId is set, use secondary transport for message delivery */
857     if (tid != 0) {
858         baseTrans = MessageQ_module->transInst[tid];
860         if (baseTrans == NULL) {
861             printf("MessageQ_put: Error: transport is null\n");
862             status = MessageQ_E_FAIL;
863             goto done;
864         }
866         /* downcast instance pointer to transport interface */
867         switch (ITransport_itype(baseTrans)) {
868             case INetworkTransport_TypeId:
869                 netTrans = INetworkTransport_downCast(baseTrans);
870                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
871                 status = (delivered ? MessageQ_S_SUCCESS :
872                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
873                            MessageQ_E_FAIL));
874                 break;
876             default:
877                 /* error */
878                 printf("MessageQ_put: Error: transport id %d is an "
879                         "unsupported transport type\n", tid);
880                 status = MessageQ_E_FAIL;
881                 break;
882         }
883     }
884     else {
885         /* use primary transport for delivery */
886         priority = MessageQ_getMsgPri(msg);
887         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
889         /* primary transport can only be used for intra-cluster delivery */
890         if (clusterId > MultiProc_getNumProcsInCluster()) {
891             printf("MessageQ_put: Error: destination procId=%d is not "
892                     "in cluster. Must specify a transportId.\n", dstProcId);
893             status =  MessageQ_E_FAIL;
894             goto done;
895         }
897         msgTrans = MessageQ_module->transports[clusterId][priority];
898         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
899         status = (delivered ? MessageQ_S_SUCCESS :
900                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
901     }
903 done:
904     return (status);
907 /*
908  *  MessageQ_get - gets a message for a message queue and blocks if
909  *  the queue is empty.
910  *
911  *  If a message is present, it returns it.  Otherwise it blocks
912  *  waiting for a message to arrive.
913  *  When a message is returned, it is owned by the caller.
914  */
915 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
917     MessageQ_Object * obj = (MessageQ_Object *)handle;
918     Int     status = MessageQ_S_SUCCESS;
919     struct timespec ts;
920     struct timeval tv;
922 #if 0
923 /*
924  * Optimization here to get a message without going in to the sem
925  * operation, but the sem count will not be maintained properly.
926  */
927     pthread_mutex_lock(&MessageQ_module->gate);
929     if (obj->msgList.cqh_first != &obj->msgList) {
930         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
931         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
933         pthread_mutex_unlock(&MessageQ_module->gate);
934     }
935     else {
936         pthread_mutex_unlock(&MessageQ_module->gate);
937     }
938 #endif
940     if (timeout == MessageQ_FOREVER) {
941         sem_wait(&obj->synchronizer);
942     }
943     else {
944         /* add timeout (microseconds) to current time of day */
945         gettimeofday(&tv, NULL);
946         tv.tv_sec += timeout / 1000000;
947         tv.tv_usec += timeout % 1000000;
949         if (tv.tv_usec >= 1000000) {
950               tv.tv_sec++;
951               tv.tv_usec -= 1000000;
952         }
954         /* set absolute timeout value */
955         ts.tv_sec = tv.tv_sec;
956         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
958         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
959             if (errno == ETIMEDOUT) {
960                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
961                 return (MessageQ_E_TIMEOUT);
962             }
963             else {
964                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
965                 return (MessageQ_E_FAIL);
966             }
967         }
968     }
970     if (obj->unblocked) {
971         return obj->unblocked;
972     }
974     pthread_mutex_lock(&MessageQ_module->gate);
976     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
977     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
979     pthread_mutex_unlock(&MessageQ_module->gate);
981     return status;
984 /*
985  * Return a count of the number of messages in the queue
986  *
987  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
988  */
989 Int MessageQ_count(MessageQ_Handle handle)
991     Int               count = -1;
992 #if 0
993     MessageQ_Object * obj   = (MessageQ_Object *) handle;
994     socklen_t         optlen;
996     /*
997      * TBD: Need to find a way to implement (if anyone uses it!), and
998      * push down into transport..
999      */
1001     /*
1002      * 2nd arg to getsockopt should be transport independent, but using
1003      *  SSKPROTO_SHMFIFO for now:
1004      */
1005     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1006                  &count, &optlen);
1007 #endif
1009     return count;
1012 /*
1013  *  Initializes a message not obtained from MessageQ_alloc.
1014  */
1015 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1017     /* Fill in the fields of the message */
1018     MessageQ_msgInit(msg);
1019     msg->heapId = MessageQ_STATICMSG;
1020     msg->msgSize = size;
1023 /*
1024  *  Allocate a message and initialize the needed fields (note some
1025  *  of the fields in the header are set via other APIs or in the
1026  *  MessageQ_put function,
1027  */
1028 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1030     MessageQ_Msg msg;
1032     /*
1033      * heapId not used for local alloc (as this is over a copy transport), but
1034      * we need to send to other side as heapId is used in BIOS transport.
1035      */
1036     msg = (MessageQ_Msg)calloc(1, size);
1037     MessageQ_msgInit(msg);
1038     msg->msgSize = size;
1039     msg->heapId = heapId;
1041     return msg;
1044 /*
1045  *  Frees the message back to the heap that was used to allocate it.
1046  */
1047 Int MessageQ_free(MessageQ_Msg msg)
1049     UInt32 status = MessageQ_S_SUCCESS;
1051     /* Check to ensure this was not allocated by user: */
1052     if (msg->heapId == MessageQ_STATICMSG) {
1053         status = MessageQ_E_CANNOTFREESTATICMSG;
1054     }
1055     else {
1056         free(msg);
1057     }
1059     return status;
1062 /* Register a heap with MessageQ. */
1063 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1065     Int status = MessageQ_S_SUCCESS;
1067     /* Do nothing, as this uses a copy transport */
1069     return status;
1072 /* Unregister a heap with MessageQ. */
1073 Int MessageQ_unregisterHeap(UInt16 heapId)
1075     Int status = MessageQ_S_SUCCESS;
1077     /* Do nothing, as this uses a copy transport */
1079     return status;
1082 /* Unblocks a MessageQ */
1083 Void MessageQ_unblock(MessageQ_Handle handle)
1085     MessageQ_Object *obj = (MessageQ_Object *)handle;
1087     obj->unblocked = MessageQ_E_UNBLOCKED;
1088     sem_post(&obj->synchronizer);
1091 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1092 Void MessageQ_shutdown(MessageQ_Handle handle)
1094     MessageQ_Object *obj = (MessageQ_Object *)handle;
1096     obj->unblocked = MessageQ_E_SHUTDOWN;
1097     sem_post(&obj->synchronizer);
1100 /* Embeds a source message queue into a message */
1101 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1103     MessageQ_Object *obj = (MessageQ_Object *)handle;
1105     msg->replyId = (UInt16)(obj->queue);
1106     msg->replyProc = (UInt16)(obj->queue >> 16);
1109 /* Returns the QueueId associated with the handle. */
1110 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1112     MessageQ_Object *obj = (MessageQ_Object *) handle;
1113     UInt32 queueId;
1115     queueId = (obj->queue);
1117     return queueId;
1120 /* Returns the local handle associated with queueId. */
1121 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1123     MessageQ_Object *obj;
1124     MessageQ_QueueIndex queueIndex;
1125     UInt16 procId;
1127     procId = MessageQ_getProcId(queueId);
1128     if (procId != MultiProc_self()) {
1129         return NULL;
1130     }
1132     queueIndex = MessageQ_getQueueIndex(queueId);
1133     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1135     return (MessageQ_Handle)obj;
1138 /* Sets the tracing of a message */
1139 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1141     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1144 /*
1145  *  Returns the amount of shared memory used by one transport instance.
1146  *
1147  *  The MessageQ module itself does not use any shared memory but the
1148  *  underlying transport may use some shared memory.
1149  */
1150 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1152     SizeT memReq = 0u;
1154     /* Do nothing, as this is a copy transport. */
1156     return memReq;
1159 /*
1160  * This is a helper function to initialize a message.
1161  */
1162 Void MessageQ_msgInit(MessageQ_Msg msg)
1164 #if 0
1165     Int                 status    = MessageQ_S_SUCCESS;
1166     LAD_ClientHandle handle;
1167     struct LAD_CommandObj cmd;
1168     union LAD_ResponseObj rsp;
1170     handle = LAD_findHandle();
1171     if (handle == LAD_MAXNUMCLIENTS) {
1172         PRINTVERBOSE1(
1173           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1174            getpid())
1176         return;
1177     }
1179     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1180     cmd.clientId = handle;
1182     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1183         PRINTVERBOSE1(
1184           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1185         return;
1186     }
1188     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1189         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1190         return;
1191     }
1192     status = rsp.msgInit.status;
1194     PRINTVERBOSE2(
1195       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1196       handle, status)
1198     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1199 #else
1200     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1201     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1202     msg->msgId     = MessageQ_INVALIDMSGID;
1203     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1204     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1205     msg->srcProc   = MultiProc_self();
1207     pthread_mutex_lock(&MessageQ_module->gate);
1208     msg->seqNum  = MessageQ_module->seqNum++;
1209     pthread_mutex_unlock(&MessageQ_module->gate);
1210 #endif
1213 /*
1214  *  ======== _MessageQ_grow ========
1215  *  Increase module's queues array to accommodate queueIndex from LAD
1216  *
1217  *  Note: this function takes the queue index value (i.e. without the
1218  *  port offset).
1219  */
1220 Void _MessageQ_grow(UInt16 queueIndex)
1222     MessageQ_Handle *queues;
1223     MessageQ_Handle *oldQueues;
1224     UInt oldSize;
1226     pthread_mutex_lock(&MessageQ_module->gate);
1228     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1230     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1231     memcpy(queues, MessageQ_module->queues, oldSize);
1233     oldQueues = MessageQ_module->queues;
1234     MessageQ_module->queues = queues;
1235     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1237     pthread_mutex_unlock(&MessageQ_module->gate);
1239     free(oldQueues);
1241     return;
1244 /*
1245  *  ======== MessageQ_bind ========
1246  *  Bind all existing message queues to the given processor
1247  *
1248  *  Note: This function is a hack to work around the driver.
1249  *
1250  *  The Linux rpmsgproto driver requires a socket for each
1251  *  message queue and remote processor tuple.
1252  *
1253  *      socket --> (queue, processor)
1254  *
1255  *  Therefore, each time a new remote processor is started, all
1256  *  existing message queues need to create a socket for the new
1257  *  processor.
1258  *
1259  *  The driver should not have this requirement. One socket per
1260  *  message queue should be sufficient to uniquely identify the
1261  *  endpoint to the driver.
1262  */
1263 Void MessageQ_bind(UInt16 procId)
1265     int q;
1266     int clusterId;
1267     int priority;
1268     MessageQ_Handle handle;
1269     MessageQ_QueueId queue;
1270     IMessageQTransport_Handle transport;
1272     clusterId = procId - MultiProc_getBaseIdOfCluster();
1273     pthread_mutex_lock(&MessageQ_module->gate);
1275     for (q = 0; q < MessageQ_module->numQueues; q++) {
1277         if ((handle = MessageQ_module->queues[q]) == NULL) {
1278             continue;
1279         }
1281         queue = ((MessageQ_Object *)handle)->queue;
1283         for (priority = 0; priority < 2; priority++) {
1284             transport = MessageQ_module->transports[clusterId][priority];
1285             if (transport != NULL) {
1286                 IMessageQTransport_bind((Void *)transport, queue);
1287             }
1288         }
1289     }
1291     pthread_mutex_unlock(&MessageQ_module->gate);
1294 /*
1295  *  ======== MessageQ_unbind ========
1296  *  Unbind all existing message queues from the given processor
1297  *
1298  *  Hack: see MessageQ_bind.
1299  */
1300 Void MessageQ_unbind(UInt16 procId)
1302     int q;
1303     int clusterId;
1304     int priority;
1305     MessageQ_Handle handle;
1306     MessageQ_QueueId queue;
1307     IMessageQTransport_Handle transport;
1309     pthread_mutex_lock(&MessageQ_module->gate);
1311     for (q = 0; q < MessageQ_module->numQueues; q++) {
1313         if ((handle = MessageQ_module->queues[q]) == NULL) {
1314             continue;
1315         }
1317         queue = ((MessageQ_Object *)handle)->queue;
1318         clusterId = procId - MultiProc_getBaseIdOfCluster();
1320         for (priority = 0; priority < 2; priority++) {
1321             transport = MessageQ_module->transports[clusterId][priority];
1322             if (transport != NULL) {
1323                 IMessageQTransport_unbind((Void *)transport, queue);
1324             }
1325         }
1326     }
1328     pthread_mutex_unlock(&MessageQ_module->gate);