Tests: ping_rpmsg: Update Test with Proper Socket Usage
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/IHeap.h>
54 #include <ti/ipc/interfaces/ITransport.h>
55 #include <ti/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/ipc/interfaces/INetworkTransport.h>
58 /* Socket Headers */
59 #include <sys/select.h>
60 #include <sys/time.h>
61 #include <sys/types.h>
62 #include <sys/param.h>
63 #include <sys/eventfd.h>
64 #include <sys/queue.h>
65 #include <errno.h>
66 #include <stdio.h>
67 #include <string.h>
68 #include <stdlib.h>
69 #include <unistd.h>
70 #include <assert.h>
71 #include <pthread.h>
72 #include <semaphore.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78  * Macros/Constants
79  * =============================================================================
80  */
82 /*!
83  *  @brief  Name of the reserved NameServer used for MessageQ.
84  */
85 #define MessageQ_NAMESERVER  "MessageQ"
87 #define MessageQ_MAXTRANSPORTS 8
89 #define MessageQ_GROWSIZE 32
91 /* Trace flag settings: */
92 #define TRACESHIFT    12
93 #define TRACEMASK     0x1000
95 /* Define BENCHMARK to quiet key MessageQ APIs: */
96 //#define BENCHMARK
98 /* =============================================================================
99  * Structures & Enums
100  * =============================================================================
101  */
103 /* params structure evolution */
104 typedef struct {
105     Void *synchronizer;
106 } MessageQ_Params_Legacy;
108 typedef struct {
109     Int __version;
110     Void *synchronizer;
111     MessageQ_QueueIndex queueIndex;
112 } MessageQ_Params_Version2;
114 /* structure for MessageQ module state */
115 typedef struct MessageQ_ModuleObject {
116     MessageQ_Handle           *queues;
117     Int                       numQueues;
118     Int                       refCount;
119     NameServer_Handle         nameServer;
120     pthread_mutex_t           gate;
121     int                       seqNum;
122     pthread_mutex_t           seqNumGate;
123     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
124     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
125     MessageQ_PutHookFxn       putHookFxn;
126     Ptr                      *heaps;
127     Int                       numHeaps;
128 } MessageQ_ModuleObject;
130 typedef struct MessageQ_CIRCLEQ_ENTRY {
131      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
132 } MessageQ_CIRCLEQ_ENTRY;
134 /*!
135  *  @brief  Structure for the Handle for the MessageQ.
136  */
137 typedef struct MessageQ_Object_tag {
138     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
139     pthread_mutex_t              msgListGate;
140     MessageQ_Params              params;
141     MessageQ_QueueId             queue;
142     int                          unblocked;
143     void                         *serverHandle;
144     sem_t                        synchronizer;
145 } MessageQ_Object;
147 /* traces in this file are controlled via _MessageQ_verbose */
148 Bool _MessageQ_verbose = FALSE;
149 #define verbose _MessageQ_verbose
151 /* =============================================================================
152  *  Globals
153  * =============================================================================
154  */
155 static MessageQ_ModuleObject MessageQ_state =
157     .refCount   = 0,
158     .nameServer = NULL,
159 #if defined(IPC_BUILDOS_ANDROID)
160     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
161 #else
162     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
163 #endif
164     .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
165     .putHookFxn = NULL,
166     .heaps      = NULL,
167     .numHeaps   = 0
168 };
170 /*!
171  *  @var    MessageQ_module
172  *
173  *  @brief  Pointer to the MessageQ module state.
174  */
175 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
177 Void _MessageQ_grow(UInt16 queueIndex);
179 /* =============================================================================
180  * APIS
181  * =============================================================================
182  */
184 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
185                                 UInt16 rprocId, UInt priority)
187     Int status = FALSE;
188     UInt16 clusterId;
190     if (handle == NULL) {
191         fprintf(stderr,
192                 "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
193                );
195         return status;
196     }
198     /* map procId to clusterId */
199     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
201     if (clusterId >= MultiProc_MAXPROCESSORS) {
202         fprintf(stderr,
203                 "MessageQ_registerTransport: invalid procId %d\n", rprocId);
205         return status;
206     }
208     if (MessageQ_module->transports[clusterId][priority] == NULL) {
209         MessageQ_module->transports[clusterId][priority] = handle;
211         status = TRUE;
212     }
214     return status;
217 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
219     if (inst == NULL) {
220         fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
222         return MessageQ_E_INVALIDARG;
223     }
225     if (tid >= MessageQ_MAXTRANSPORTS) {
226         fprintf(stderr,
227                 "MessageQ_unregisterNetTransport: invalid transport id %d, "
228                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
230         return MessageQ_E_INVALIDARG;
231     }
233     if (MessageQ_module->transInst[tid] != NULL) {
234         fprintf(stderr,
235                 "MessageQ_registerTransportId: transport id %d already "
236                 "registered\n", tid);
238         return MessageQ_E_ALREADYEXISTS;
239     }
241     MessageQ_module->transInst[tid] = inst;
243     return MessageQ_S_SUCCESS;
246 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
248     UInt16 clusterId;
250     /* map procId to clusterId */
251     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
253     if (clusterId >= MultiProc_MAXPROCESSORS) {
254         fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
255                 rprocId);
257         return;
258     }
260     MessageQ_module->transports[clusterId][priority] = NULL;
263 Void MessageQ_unregisterTransportId(UInt tid)
265     if (tid >= MessageQ_MAXTRANSPORTS) {
266         fprintf(stderr,
267                 "MessageQ_unregisterTransportId: invalid transport id %d, "
268                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
270         return;
271     }
273     MessageQ_module->transInst[tid] = NULL;
276 /*
277  * Function to get default configuration for the MessageQ module.
278  */
279 Void MessageQ_getConfig(MessageQ_Config *cfg)
281     Int status;
282     LAD_ClientHandle handle;
283     struct LAD_CommandObj cmd;
284     union LAD_ResponseObj rsp;
286     assert (cfg != NULL);
288     handle = LAD_findHandle();
289     if (handle == LAD_MAXNUMCLIENTS) {
290         PRINTVERBOSE1(
291           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
292            getpid())
294         return;
295     }
297     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
298     cmd.clientId = handle;
300     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
301         PRINTVERBOSE1(
302           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
303         return;
304     }
306     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
307         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
308                       status)
309         return;
310     }
311     status = rsp.messageQGetConfig.status;
313     PRINTVERBOSE2(
314       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
315       handle, status)
317     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
319     return;
322 /*
323  *  Function to setup the MessageQ module.
324  */
325 Int MessageQ_setup(const MessageQ_Config *cfg)
327     Int status = MessageQ_S_SUCCESS;
328     LAD_ClientHandle handle;
329     struct LAD_CommandObj cmd;
330     union LAD_ResponseObj rsp;
331     Int pri;
332     Int i;
333     Int tid;
335     /* this entire function must be serialized */
336     pthread_mutex_lock(&MessageQ_module->gate);
338     /* ensure only first thread performs startup procedure */
339     if (++MessageQ_module->refCount > 1) {
340         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
341                 MessageQ_module->refCount)
342         status = MessageQ_S_ALREADYSETUP;
343         goto exit;
344     }
346     handle = LAD_findHandle();
347     if (handle == LAD_MAXNUMCLIENTS) {
348         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
349                 "pid %d\n", getpid())
350         status = MessageQ_E_RESOURCE;
351         goto exit;
352     }
354     cmd.cmd = LAD_MESSAGEQ_SETUP;
355     cmd.clientId = handle;
356     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
358     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
359         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
360                 "status=%d\n", status)
361         status = MessageQ_E_FAIL;
362         goto exit;
363     }
365     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
366         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
367         status = MessageQ_E_FAIL;
368         goto exit;
369     }
370     status = rsp.setup.status;
372     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
373             handle, status)
375     MessageQ_module->seqNum = 0;
376     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
377     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
378     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
379             sizeof(MessageQ_Handle));
380     MessageQ_module->numHeaps = cfg->numHeaps;
381     MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
383     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
384         for (pri = 0; pri < 2; pri++) {
385             MessageQ_module->transports[i][pri] = NULL;
386         }
387     }
389     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
390         MessageQ_module->transInst[tid] = NULL;
391     }
393 exit:
394     /* if error, must decrement reference count */
395     if (status < 0) {
396         MessageQ_module->refCount--;
397     }
399     pthread_mutex_unlock(&MessageQ_module->gate);
401     return (status);
404 /*
405  *  MessageQ_destroy - destroy the MessageQ module.
406  */
407 Int MessageQ_destroy(void)
409     Int status = MessageQ_S_SUCCESS;
410     LAD_ClientHandle handle;
411     struct LAD_CommandObj cmd;
412     union LAD_ResponseObj rsp;
413     int i;
415     /* this entire function must be serialized */
416     pthread_mutex_lock(&MessageQ_module->gate);
418     /* ensure only last thread does the work */
419     if (--MessageQ_module->refCount > 0) {
420         goto exit;
421     }
423     /* ensure all registered heaps have been unregistered */
424     for (i = 0; i < MessageQ_module->numHeaps; i++) {
425         if (MessageQ_module->heaps[i] != NULL) {
426             PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
427         }
428     }
429     free(MessageQ_module->heaps);
430     MessageQ_module->heaps = NULL;
432     handle = LAD_findHandle();
433     if (handle == LAD_MAXNUMCLIENTS) {
434         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
435                 "for pid %d\n", getpid())
436         status =  MessageQ_E_RESOURCE;
437         goto exit;
438     }
440     cmd.cmd = LAD_MESSAGEQ_DESTROY;
441     cmd.clientId = handle;
443     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
444         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
445                 "status=%d\n", status)
446         status = MessageQ_E_FAIL;
447         goto exit;
448     }
450     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
451         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
452         status = MessageQ_E_FAIL;
453         goto exit;
454     }
455     status = rsp.status;
457     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
458             "status=%d\n", handle, status)
460 exit:
461     pthread_mutex_unlock(&MessageQ_module->gate);
463     return (status);
466 /*
467  *  ======== MessageQ_Params_init ========
468  *  Legacy implementation.
469  */
470 Void MessageQ_Params_init(MessageQ_Params *params)
472     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
475 /*
476  *  ======== MessageQ_Params_init__S ========
477  *  New implementation which is version aware.
478  */
479 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
481     MessageQ_Params_Version2 *params2;
483     switch (version) {
485         case MessageQ_Params_VERSION_2:
486             params2 = (MessageQ_Params_Version2 *)params;
487             params2->__version = MessageQ_Params_VERSION_2;
488             params2->synchronizer = NULL;
489             params2->queueIndex = MessageQ_ANY;
490             break;
492         default:
493             assert(FALSE);
494             break;
495     }
498 /*
499  *  ======== MessageQ_create ========
500  */
501 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
503     Int                   status;
504     MessageQ_Object      *obj = NULL;
505     IMessageQTransport_Handle transport;
506     INetworkTransport_Handle netTrans;
507     ITransport_Handle     baseTrans;
508     UInt16                queueIndex;
509     UInt16                clusterId;
510     Int                   tid;
511     Int                   priority;
512     LAD_ClientHandle      handle;
513     struct LAD_CommandObj cmd;
514     union LAD_ResponseObj rsp;
515     MessageQ_Params ps;
517     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
519     /* copy the given params into the current params structure */
520     if (pp != NULL) {
522         /* snoop the params pointer to see if it's a legacy structure */
523         if ((pp->__version == 0) || (pp->__version > 100)) {
524             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
525         }
527         /* not legacy structure, use params version field */
528         else if (pp->__version == MessageQ_Params_VERSION_2) {
529             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
530             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
531             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
532         }
533         else {
534             assert(FALSE);
535         }
536     }
538     handle = LAD_findHandle();
539     if (handle == LAD_MAXNUMCLIENTS) {
540         PRINTVERBOSE1(
541           "MessageQ_create: can't find connection to daemon for pid %d\n",
542            getpid())
544         return NULL;
545     }
547     cmd.cmd = LAD_MESSAGEQ_CREATE;
548     cmd.clientId = handle;
550     if (name == NULL) {
551         cmd.args.messageQCreate.name[0] = '\0';
552     }
553     else {
554         strncpy(cmd.args.messageQCreate.name, name,
555                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
556         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
557     }
559     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
561     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
562         PRINTVERBOSE1(
563           "MessageQ_create: sending LAD command failed, status=%d\n", status)
564         return NULL;
565     }
567     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
568         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
569         return NULL;
570     }
571     status = rsp.messageQCreate.status;
573     PRINTVERBOSE2(
574       "MessageQ_create: got LAD response for client %d, status=%d\n",
575       handle, status)
577     if (status == -1) {
578        PRINTVERBOSE1(
579           "MessageQ_create: MessageQ server operation failed, status=%d\n",
580           status)
581        return NULL;
582     }
584     /* Create the generic obj */
585     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
587    /* Populate the params member */
588     memcpy(&obj->params, &ps, sizeof(ps));
591     obj->queue = rsp.messageQCreate.queueId;
592     obj->serverHandle = rsp.messageQCreate.serverHandle;
593     pthread_mutex_init(&obj->msgListGate, NULL);
594     CIRCLEQ_INIT(&obj->msgList);
595     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
596         PRINTVERBOSE1(
597           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
599         MessageQ_delete((MessageQ_Handle *)&obj);
601         return NULL;
602     }
604     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
605     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
607     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
608             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
610     pthread_mutex_lock(&MessageQ_module->gate);
612     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
613         for (priority = 0; priority < 2; priority++) {
614             transport = MessageQ_module->transports[clusterId][priority];
615             if (transport) {
616                 /* need to check return and do something if error */
617                 IMessageQTransport_bind((Void *)transport, obj->queue);
618             }
619         }
620     }
622     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
623         baseTrans = MessageQ_module->transInst[tid];
625         if (baseTrans != NULL) {
626             switch (ITransport_itype(baseTrans)) {
627                 case INetworkTransport_TypeId:
628                     netTrans = INetworkTransport_downCast(baseTrans);
629                     INetworkTransport_bind((void *)netTrans, obj->queue);
630                     break;
632                 default:
633                     /* error */
634                     fprintf(stderr,
635                             "MessageQ_create: Error: transport id %d is an "
636                             "unsupported transport type.\n", tid);
637                     break;
638             }
639         }
640     }
642     /* LAD's MessageQ module can grow, we need to grow as well */
643     if (queueIndex >= MessageQ_module->numQueues) {
644         _MessageQ_grow(queueIndex);
645     }
647     /*  No need to "allocate" slot since the queueIndex returned by
648      *  LAD is guaranteed to be unique.
649      */
650     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
652     pthread_mutex_unlock(&MessageQ_module->gate);
654     return (MessageQ_Handle)obj;
657 /*
658  *  ======== MessageQ_delete ========
659  */
660 Int MessageQ_delete(MessageQ_Handle *handlePtr)
662     MessageQ_Object *obj;
663     IMessageQTransport_Handle transport;
664     INetworkTransport_Handle  netTrans;
665     ITransport_Handle         baseTrans;
666     Int              status = MessageQ_S_SUCCESS;
667     UInt16           queueIndex;
668     UInt16                clusterId;
669     Int                   tid;
670     Int                   priority;
671     LAD_ClientHandle handle;
672     struct LAD_CommandObj cmd;
673     union LAD_ResponseObj rsp;
675     handle = LAD_findHandle();
676     if (handle == LAD_MAXNUMCLIENTS) {
677         PRINTVERBOSE1(
678           "MessageQ_delete: can't find connection to daemon for pid %d\n",
679            getpid())
681         return MessageQ_E_FAIL;
682     }
684     obj = (MessageQ_Object *)(*handlePtr);
686     cmd.cmd = LAD_MESSAGEQ_DELETE;
687     cmd.clientId = handle;
688     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
690     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
691         PRINTVERBOSE1(
692           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
693         return MessageQ_E_FAIL;
694     }
696     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
697         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
698         return MessageQ_E_FAIL;
699     }
700     status = rsp.messageQDelete.status;
702     PRINTVERBOSE2(
703       "MessageQ_delete: got LAD response for client %d, status=%d\n",
704       handle, status)
706     pthread_mutex_lock(&MessageQ_module->gate);
708     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
709         for (priority = 0; priority < 2; priority++) {
710             transport = MessageQ_module->transports[clusterId][priority];
711             if (transport) {
712                 IMessageQTransport_unbind((Void *)transport, obj->queue);
713             }
714         }
715     }
717     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
718         baseTrans = MessageQ_module->transInst[tid];
720         if (baseTrans != NULL) {
721             switch (ITransport_itype(baseTrans)) {
722                 case INetworkTransport_TypeId:
723                     netTrans = INetworkTransport_downCast(baseTrans);
724                     INetworkTransport_unbind((void *)netTrans, obj->queue);
725                     break;
727                 default:
728                     /* error */
729                     fprintf(stderr,
730                             "MessageQ_create: Error: transport id %d is an "
731                             "unsupported transport type.\n", tid);
732                     break;
733             }
734         }
735     }
737     /* extract the queue index from the queueId */
738     queueIndex = MessageQ_getQueueIndex(obj->queue);
739     MessageQ_module->queues[queueIndex] = NULL;
741     pthread_mutex_unlock(&MessageQ_module->gate);
743     free(obj);
744     *handlePtr = NULL;
746     return status;
749 /*
750  *  ======== MessageQ_open ========
751  *  Acquire a queueId for use in sending messages to the queue
752  */
753 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
755     Int status = MessageQ_S_SUCCESS;
757     status = NameServer_getUInt32(MessageQ_module->nameServer,
758                                   name, queueId, NULL);
760     if (status == NameServer_E_NOTFOUND) {
761         /* Set return queue ID to invalid */
762         *queueId = MessageQ_INVALIDMESSAGEQ;
763         status = MessageQ_E_NOTFOUND;
764     }
765     else if (status >= 0) {
766         /* Override with a MessageQ status code */
767         status = MessageQ_S_SUCCESS;
768     }
769     else {
770         /* Set return queue ID to invalid */
771         *queueId = MessageQ_INVALIDMESSAGEQ;
773         /* Override with a MessageQ status code */
774         if (status == NameServer_E_TIMEOUT) {
775             status = MessageQ_E_TIMEOUT;
776         }
777         else {
778             status = MessageQ_E_FAIL;
779         }
780     }
782     return status;
785 /*
786  *  ======== MessageQ_openQueueId ========
787  */
788 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
790     MessageQ_QueueIndex queuePort;
791     MessageQ_QueueId queueId;
793     /* queue port is embedded in the queueId */
794     queuePort = queueIndex + MessageQ_PORTOFFSET;
795     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
797     return (queueId);
800 /*
801  *  ======== MessageQ_close ========
802  *  Closes previously opened instance of MessageQ
803  */
804 Int MessageQ_close(MessageQ_QueueId *queueId)
806     Int32 status = MessageQ_S_SUCCESS;
808     /* Nothing more to be done for closing the MessageQ. */
809     *queueId = MessageQ_INVALIDMESSAGEQ;
811     return status;
814 /*
815  *  ======== MessageQ_put ========
816  *  Deliver the given message, either locally or to the transport
817  *
818  *  If the destination is a local queue, deliver the message. Otherwise,
819  *  pass the message to a transport for delivery. The transport handles
820  *  the sending of the message using the appropriate interface (socket,
821  *  device ioctl, etc.).
822  */
823 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
825     Int status = MessageQ_S_SUCCESS;
826     MessageQ_Object *obj;
827     UInt16 dstProcId;
828     UInt16 queueIndex;
829     UInt16 queuePort;
830     ITransport_Handle baseTrans;
831     IMessageQTransport_Handle msgTrans;
832     INetworkTransport_Handle netTrans;
833     Int priority;
834     UInt tid;
835     UInt16 clusterId;
836     Bool delivered;
838     /* extract destination address from the given queueId */
839     dstProcId  = (UInt16)(queueId >> 16);
840     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
842     /* write the destination address into the message header */
843     msg->dstId = queuePort;
844     msg->dstProc= dstProcId;
846     /* invoke the hook function after addressing the message */
847     if (MessageQ_module->putHookFxn != NULL) {
848         MessageQ_module->putHookFxn(queueId, msg);
849     }
851     /*  For an outbound message: If message destination is on this
852      *  processor, then check if the destination queue is in this
853      *  process (thread-to-thread messaging).
854      *
855      *  For an inbound message: Check if destination queue is in this
856      *  process (process-to-process messaging).
857      */
858     if (dstProcId == MultiProc_self()) {
859         queueIndex = queuePort - MessageQ_PORTOFFSET;
861         if (queueIndex < MessageQ_module->numQueues) {
862             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
864             if (obj != NULL) {
865                 /* deliver message to queue */
866                 pthread_mutex_lock(&obj->msgListGate);
867                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
868                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
869                 pthread_mutex_unlock(&obj->msgListGate);
870                 sem_post(&obj->synchronizer);
871                 goto done;
872             }
873         }
874     }
876     /*  Getting here implies the message is outbound. Must give it to
877      *  either the primary or secondary transport for delivery. Start
878      *  by extracting the transport ID from the message header.
879      */
880     tid = MessageQ_getTransportId(msg);
882     if (tid >= MessageQ_MAXTRANSPORTS) {
883         fprintf(stderr,
884                 "MessageQ_put: Error: transport id %d too big, must be < %d\n",
885                 tid, MessageQ_MAXTRANSPORTS);
886         status = MessageQ_E_FAIL;
887         goto done;
888     }
890     /* if transportId is set, use secondary transport for message delivery */
891     if (tid != 0) {
892         baseTrans = MessageQ_module->transInst[tid];
894         if (baseTrans == NULL) {
895             fprintf(stderr, "MessageQ_put: Error: transport is null\n");
896             status = MessageQ_E_FAIL;
897             goto done;
898         }
900         /* downcast instance pointer to transport interface */
901         switch (ITransport_itype(baseTrans)) {
902             case INetworkTransport_TypeId:
903                 netTrans = INetworkTransport_downCast(baseTrans);
904                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
905                 status = (delivered ? MessageQ_S_SUCCESS :
906                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
907                            MessageQ_E_FAIL));
908                 break;
910             default:
911                 /* error */
912                 fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
913                         "unsupported transport type\n", tid);
914                 status = MessageQ_E_FAIL;
915                 break;
916         }
917     }
918     else {
919         /* use primary transport for delivery */
920         priority = MessageQ_getMsgPri(msg);
921         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
923         /* primary transport can only be used for intra-cluster delivery */
924         if (clusterId > MultiProc_getNumProcsInCluster()) {
925             fprintf(stderr,
926                     "MessageQ_put: Error: destination procId=%d is not "
927                     "in cluster. Must specify a transportId.\n", dstProcId);
928             status =  MessageQ_E_FAIL;
929             goto done;
930         }
932         msgTrans = MessageQ_module->transports[clusterId][priority];
933         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
934         status = (delivered ? MessageQ_S_SUCCESS :
935                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
936     }
938 done:
939     return (status);
942 /*
943  *  MessageQ_get - gets a message for a message queue and blocks if
944  *  the queue is empty.
945  *
946  *  If a message is present, it returns it.  Otherwise it blocks
947  *  waiting for a message to arrive.
948  *  When a message is returned, it is owned by the caller.
949  */
950 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
952     MessageQ_Object * obj = (MessageQ_Object *)handle;
953     Int     status = MessageQ_S_SUCCESS;
954     struct timespec ts;
955     struct timeval tv;
957 #if 0
958 /*
959  * Optimization here to get a message without going in to the sem
960  * operation, but the sem count will not be maintained properly.
961  */
962     pthread_mutex_lock(&obj->msgListGate);
964     if (obj->msgList.cqh_first != &obj->msgList) {
965         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
966         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
968         pthread_mutex_unlock(&obj->msgListGate);
969     }
970     else {
971         pthread_mutex_unlock(&obj->msgListGate);
972     }
973 #endif
975     if (timeout == MessageQ_FOREVER) {
976         sem_wait(&obj->synchronizer);
977     }
978     else {
979         /* add timeout (microseconds) to current time of day */
980         gettimeofday(&tv, NULL);
981         tv.tv_sec += timeout / 1000000;
982         tv.tv_usec += timeout % 1000000;
984         if (tv.tv_usec >= 1000000) {
985               tv.tv_sec++;
986               tv.tv_usec -= 1000000;
987         }
989         /* set absolute timeout value */
990         ts.tv_sec = tv.tv_sec;
991         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
993         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
994             if (errno == ETIMEDOUT) {
995                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
996                 return (MessageQ_E_TIMEOUT);
997             }
998             else {
999                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
1000                 return (MessageQ_E_FAIL);
1001             }
1002         }
1003     }
1005     if (obj->unblocked) {
1006         return obj->unblocked;
1007     }
1009     pthread_mutex_lock(&obj->msgListGate);
1011     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
1012     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
1014     pthread_mutex_unlock(&obj->msgListGate);
1016     return status;
1019 /*
1020  * Return a count of the number of messages in the queue
1021  *
1022  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
1023  */
1024 Int MessageQ_count(MessageQ_Handle handle)
1026     Int               count = -1;
1027 #if 0
1028     MessageQ_Object * obj   = (MessageQ_Object *) handle;
1029     socklen_t         optlen;
1031     /*
1032      * TBD: Need to find a way to implement (if anyone uses it!), and
1033      * push down into transport..
1034      */
1036     /*
1037      * 2nd arg to getsockopt should be transport independent, but using
1038      *  SSKPROTO_SHMFIFO for now:
1039      */
1040     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1041                  &count, &optlen);
1042 #endif
1044     return count;
1047 /*
1048  *  Initializes a message not obtained from MessageQ_alloc.
1049  */
1050 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1052     /* Fill in the fields of the message */
1053     MessageQ_msgInit(msg);
1054     msg->heapId = MessageQ_STATICMSG;
1055     msg->msgSize = size;
1058 /*
1059  *  Allocate a message and initialize the needed fields (note some
1060  *  of the fields in the header are set via other APIs or in the
1061  *  MessageQ_put function,
1062  */
1063 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1065     IHeap_Handle heap;
1066     MessageQ_Msg msg;
1068     if (heapId > (MessageQ_module->numHeaps - 1)) {
1069         PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
1070         return (NULL);
1071     }
1072     else if (MessageQ_module->heaps[heapId] == NULL) {
1073         PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
1074                 heapId);
1075         return (NULL);
1076     }
1077     else {
1078         heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
1079     }
1081     msg = IHeap_alloc(heap, size);
1083     if (msg == NULL) {
1084         return (NULL);
1085     }
1087     MessageQ_msgInit(msg);
1088     msg->msgSize = size;
1089     msg->heapId = heapId;
1091     return (msg);
1094 /*
1095  *  Frees the message back to the heap that was used to allocate it.
1096  */
1097 Int MessageQ_free(MessageQ_Msg msg)
1099     UInt32 status = MessageQ_S_SUCCESS;
1100     IHeap_Handle heap;
1102     /* ensure this was not allocated by user */
1103     if (msg->heapId == MessageQ_STATICMSG) {
1104         status = MessageQ_E_CANNOTFREESTATICMSG;
1105     }
1106     else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
1107         status = MessageQ_E_INVALIDARG;
1108     }
1109     else if (MessageQ_module->heaps[msg->heapId] == NULL) {
1110         status = MessageQ_E_NOTFOUND;
1111     }
1112     else {
1113         heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
1114     }
1116     IHeap_free(heap, (void *)msg);
1118     return (status);
1121 /*
1122  *  ======== MessageQ_registerHeap ========
1123  */
1124 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1126     Int status = MessageQ_S_SUCCESS;
1128     pthread_mutex_lock(&MessageQ_module->gate);
1130     if (heapId > (MessageQ_module->numHeaps - 1)) {
1131         status = MessageQ_E_INVALIDARG;
1132     }
1133     else if (MessageQ_module->heaps[heapId] != NULL) {
1134         status = MessageQ_E_ALREADYEXISTS;
1135     }
1136     else {
1137         MessageQ_module->heaps[heapId] = heap;
1138     }
1140     pthread_mutex_unlock(&MessageQ_module->gate);
1142     return (status);
1145 /*
1146  *  ======== MessageQ_unregisterHeap ========
1147  */
1148 Int MessageQ_unregisterHeap(UInt16 heapId)
1150     Int status = MessageQ_S_SUCCESS;
1152     pthread_mutex_lock(&MessageQ_module->gate);
1154     if (heapId > (MessageQ_module->numHeaps - 1)) {
1155         status = MessageQ_E_INVALIDARG;
1156     }
1157     else if (MessageQ_module->heaps[heapId] == NULL) {
1158         status = MessageQ_E_NOTFOUND;
1159     }
1160     else {
1161         MessageQ_module->heaps[heapId] = NULL;
1162     }
1164     pthread_mutex_unlock(&MessageQ_module->gate);
1166     return (status);
1169 /* Unblocks a MessageQ */
1170 Void MessageQ_unblock(MessageQ_Handle handle)
1172     MessageQ_Object *obj = (MessageQ_Object *)handle;
1174     obj->unblocked = MessageQ_E_UNBLOCKED;
1175     sem_post(&obj->synchronizer);
1178 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1179 Void MessageQ_shutdown(MessageQ_Handle handle)
1181     MessageQ_Object *obj = (MessageQ_Object *)handle;
1183     obj->unblocked = MessageQ_E_SHUTDOWN;
1184     sem_post(&obj->synchronizer);
1187 /* Embeds a source message queue into a message */
1188 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1190     MessageQ_Object *obj = (MessageQ_Object *)handle;
1192     msg->replyId = (UInt16)(obj->queue);
1193     msg->replyProc = (UInt16)(obj->queue >> 16);
1196 /* Returns the QueueId associated with the handle. */
1197 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1199     MessageQ_Object *obj = (MessageQ_Object *) handle;
1200     UInt32 queueId;
1202     queueId = (obj->queue);
1204     return queueId;
1207 /* Returns the local handle associated with queueId. */
1208 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1210     MessageQ_Object *obj;
1211     MessageQ_QueueIndex queueIndex;
1212     UInt16 procId;
1214     procId = MessageQ_getProcId(queueId);
1215     if (procId != MultiProc_self()) {
1216         return NULL;
1217     }
1219     queueIndex = MessageQ_getQueueIndex(queueId);
1220     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1222     return (MessageQ_Handle)obj;
1225 /* Sets the tracing of a message */
1226 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1228     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1231 /*
1232  *  Returns the amount of shared memory used by one transport instance.
1233  *
1234  *  The MessageQ module itself does not use any shared memory but the
1235  *  underlying transport may use some shared memory.
1236  */
1237 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1239     SizeT memReq = 0u;
1241     /* Do nothing, as this is a copy transport. */
1243     return memReq;
1246 /*
1247  * This is a helper function to initialize a message.
1248  */
1249 Void MessageQ_msgInit(MessageQ_Msg msg)
1251 #if 0
1252     Int                 status    = MessageQ_S_SUCCESS;
1253     LAD_ClientHandle handle;
1254     struct LAD_CommandObj cmd;
1255     union LAD_ResponseObj rsp;
1257     handle = LAD_findHandle();
1258     if (handle == LAD_MAXNUMCLIENTS) {
1259         PRINTVERBOSE1(
1260           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1261            getpid())
1263         return;
1264     }
1266     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1267     cmd.clientId = handle;
1269     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1270         PRINTVERBOSE1(
1271           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1272         return;
1273     }
1275     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1276         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1277         return;
1278     }
1279     status = rsp.msgInit.status;
1281     PRINTVERBOSE2(
1282       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1283       handle, status)
1285     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1286 #else
1287     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1288     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1289     msg->msgId     = MessageQ_INVALIDMSGID;
1290     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1291     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1292     msg->srcProc   = MultiProc_self();
1294     pthread_mutex_lock(&MessageQ_module->seqNumGate);
1295     msg->seqNum  = MessageQ_module->seqNum++;
1296     pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1297 #endif
1300 /*
1301  *  ======== _MessageQ_grow ========
1302  *  Increase module's queues array to accommodate queueIndex from LAD
1303  *
1304  *  Note: this function takes the queue index value (i.e. without the
1305  *  port offset).
1306  */
1307 Void _MessageQ_grow(UInt16 queueIndex)
1309     MessageQ_Handle *queues;
1310     MessageQ_Handle *oldQueues;
1311     UInt oldSize;
1313     pthread_mutex_lock(&MessageQ_module->gate);
1315     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1317     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1318     memcpy(queues, MessageQ_module->queues, oldSize);
1320     oldQueues = MessageQ_module->queues;
1321     MessageQ_module->queues = queues;
1322     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1324     pthread_mutex_unlock(&MessageQ_module->gate);
1326     free(oldQueues);
1328     return;
1331 /*
1332  *  ======== MessageQ_bind ========
1333  *  Bind all existing message queues to the given processor
1334  *
1335  *  Note: This function is a hack to work around the driver.
1336  *
1337  *  The Linux rpmsgproto driver requires a socket for each
1338  *  message queue and remote processor tuple.
1339  *
1340  *      socket --> (queue, processor)
1341  *
1342  *  Therefore, each time a new remote processor is started, all
1343  *  existing message queues need to create a socket for the new
1344  *  processor.
1345  *
1346  *  The driver should not have this requirement. One socket per
1347  *  message queue should be sufficient to uniquely identify the
1348  *  endpoint to the driver.
1349  */
1350 Void MessageQ_bind(UInt16 procId)
1352     int q;
1353     int clusterId;
1354     int priority;
1355     MessageQ_Handle handle;
1356     MessageQ_QueueId queue;
1357     IMessageQTransport_Handle transport;
1359     clusterId = procId - MultiProc_getBaseIdOfCluster();
1360     pthread_mutex_lock(&MessageQ_module->gate);
1362     for (q = 0; q < MessageQ_module->numQueues; q++) {
1364         if ((handle = MessageQ_module->queues[q]) == NULL) {
1365             continue;
1366         }
1368         queue = ((MessageQ_Object *)handle)->queue;
1370         for (priority = 0; priority < 2; priority++) {
1371             transport = MessageQ_module->transports[clusterId][priority];
1372             if (transport != NULL) {
1373                 IMessageQTransport_bind((Void *)transport, queue);
1374             }
1375         }
1376     }
1378     pthread_mutex_unlock(&MessageQ_module->gate);
1381 /*
1382  *  ======== MessageQ_unbind ========
1383  *  Unbind all existing message queues from the given processor
1384  *
1385  *  Hack: see MessageQ_bind.
1386  */
1387 Void MessageQ_unbind(UInt16 procId)
1389     int q;
1390     int clusterId;
1391     int priority;
1392     MessageQ_Handle handle;
1393     MessageQ_QueueId queue;
1394     IMessageQTransport_Handle transport;
1396     pthread_mutex_lock(&MessageQ_module->gate);
1398     for (q = 0; q < MessageQ_module->numQueues; q++) {
1400         if ((handle = MessageQ_module->queues[q]) == NULL) {
1401             continue;
1402         }
1404         queue = ((MessageQ_Object *)handle)->queue;
1405         clusterId = procId - MultiProc_getBaseIdOfCluster();
1407         for (priority = 0; priority < 2; priority++) {
1408             transport = MessageQ_module->transports[clusterId][priority];
1409             if (transport != NULL) {
1410                 IMessageQTransport_unbind((Void *)transport, queue);
1411             }
1412         }
1413     }
1415     pthread_mutex_unlock(&MessageQ_module->gate);