]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/api/MessageQ.c
db96b241156963cc831833d95edd0de733991b04
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/IHeap.h>
54 #include <ti/ipc/interfaces/ITransport.h>
55 #include <ti/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/ipc/interfaces/INetworkTransport.h>
58 /* Socket Headers */
59 #include <sys/select.h>
60 #include <sys/time.h>
61 #include <sys/types.h>
62 #include <sys/param.h>
63 #include <sys/eventfd.h>
64 #include <sys/queue.h>
65 #include <errno.h>
66 #include <stdio.h>
67 #include <string.h>
68 #include <stdlib.h>
69 #include <unistd.h>
70 #include <assert.h>
71 #include <pthread.h>
72 #include <semaphore.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78  * Macros/Constants
79  * =============================================================================
80  */
82 /*!
83  *  @brief  Name of the reserved NameServer used for MessageQ.
84  */
85 #define MessageQ_NAMESERVER  "MessageQ"
87 #define MessageQ_MAXTRANSPORTS 8
89 #define MessageQ_GROWSIZE 32
91 /* Trace flag settings: */
92 #define TRACESHIFT    12
93 #define TRACEMASK     0x1000
95 /* Define BENCHMARK to quiet key MessageQ APIs: */
96 //#define BENCHMARK
98 /* =============================================================================
99  * Structures & Enums
100  * =============================================================================
101  */
103 /* params structure evolution */
104 typedef struct {
105     Void *synchronizer;
106 } MessageQ_Params_Legacy;
108 typedef struct {
109     Int __version;
110     Void *synchronizer;
111     MessageQ_QueueIndex queueIndex;
112 } MessageQ_Params_Version2;
114 /* structure for MessageQ module state */
115 typedef struct MessageQ_ModuleObject {
116     MessageQ_Handle           *queues;
117     Int                       numQueues;
118     Int                       refCount;
119     NameServer_Handle         nameServer;
120     pthread_mutex_t           gate;
121     int                       seqNum;
122     pthread_mutex_t           seqNumGate;
123     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
124     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
125     MessageQ_PutHookFxn       putHookFxn;
126     Ptr                      *heaps;
127     Int                       numHeaps;
128 } MessageQ_ModuleObject;
130 typedef struct MessageQ_CIRCLEQ_ENTRY {
131      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
132 } MessageQ_CIRCLEQ_ENTRY;
134 /*!
135  *  @brief  Structure for the Handle for the MessageQ.
136  */
137 typedef struct MessageQ_Object_tag {
138     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
139     pthread_mutex_t              msgListGate;
140     MessageQ_Params              params;
141     MessageQ_QueueId             queue;
142     int                          unblocked;
143     void                         *serverHandle;
144     sem_t                        synchronizer;
145 } MessageQ_Object;
147 /* traces in this file are controlled via _MessageQ_verbose */
148 Bool _MessageQ_verbose = FALSE;
149 #define verbose _MessageQ_verbose
151 /* =============================================================================
152  *  Globals
153  * =============================================================================
154  */
155 static MessageQ_ModuleObject MessageQ_state =
157     .refCount   = 0,
158     .nameServer = NULL,
159 #if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
160     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
161 #else
162     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
163 #endif
164     .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
165     .putHookFxn = NULL,
166     .heaps      = NULL,
167     .numHeaps   = 0
168 };
170 /*!
171  *  @var    MessageQ_module
172  *
173  *  @brief  Pointer to the MessageQ module state.
174  */
175 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
177 Void _MessageQ_grow(UInt16 queueIndex);
179 /* =============================================================================
180  * APIS
181  * =============================================================================
182  */
184 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
185                                 UInt16 rprocId, UInt priority)
187     Int status = FALSE;
188     UInt16 clusterId;
190     if (handle == NULL) {
191         fprintf(stderr,
192                 "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
193                );
195         return status;
196     }
198     /* map procId to clusterId */
199     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
201     if (clusterId >= MultiProc_MAXPROCESSORS) {
202         fprintf(stderr,
203                 "MessageQ_registerTransport: invalid procId %d\n", rprocId);
205         return status;
206     }
208     if (MessageQ_module->transports[clusterId][priority] == NULL) {
209         MessageQ_module->transports[clusterId][priority] = handle;
211         status = TRUE;
212     }
214     return status;
217 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
219     if (inst == NULL) {
220         fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
222         return MessageQ_E_INVALIDARG;
223     }
225     if (tid >= MessageQ_MAXTRANSPORTS) {
226         fprintf(stderr,
227                 "MessageQ_unregisterNetTransport: invalid transport id %d, "
228                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
230         return MessageQ_E_INVALIDARG;
231     }
233     if (MessageQ_module->transInst[tid] != NULL) {
234         fprintf(stderr,
235                 "MessageQ_registerTransportId: transport id %d already "
236                 "registered\n", tid);
238         return MessageQ_E_ALREADYEXISTS;
239     }
241     MessageQ_module->transInst[tid] = inst;
243     return MessageQ_S_SUCCESS;
246 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
248     UInt16 clusterId;
250     /* map procId to clusterId */
251     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
253     if (clusterId >= MultiProc_MAXPROCESSORS) {
254         fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
255                 rprocId);
257         return;
258     }
260     MessageQ_module->transports[clusterId][priority] = NULL;
263 Void MessageQ_unregisterTransportId(UInt tid)
265     if (tid >= MessageQ_MAXTRANSPORTS) {
266         fprintf(stderr,
267                 "MessageQ_unregisterTransportId: invalid transport id %d, "
268                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
270         return;
271     }
273     MessageQ_module->transInst[tid] = NULL;
276 /*
277  * Function to get default configuration for the MessageQ module.
278  */
279 Void MessageQ_getConfig(MessageQ_Config *cfg)
281     Int status;
282     LAD_ClientHandle handle;
283     struct LAD_CommandObj cmd;
284     union LAD_ResponseObj rsp;
286     assert (cfg != NULL);
288     handle = LAD_findHandle();
289     if (handle == LAD_MAXNUMCLIENTS) {
290         PRINTVERBOSE1(
291           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
292            getpid())
294         return;
295     }
297     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
298     cmd.clientId = handle;
300     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
301         PRINTVERBOSE1(
302           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
303         return;
304     }
306     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
307         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
308                       status)
309         return;
310     }
311     status = rsp.messageQGetConfig.status;
313     PRINTVERBOSE2(
314       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
315       handle, status)
317     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
319     return;
322 /*
323  *  Function to setup the MessageQ module.
324  */
325 Int MessageQ_setup(const MessageQ_Config *cfg)
327     Int status = MessageQ_S_SUCCESS;
328     LAD_ClientHandle handle;
329     struct LAD_CommandObj cmd;
330     union LAD_ResponseObj rsp;
331     Int pri;
332     Int i;
333     Int tid;
335     /* this entire function must be serialized */
336     pthread_mutex_lock(&MessageQ_module->gate);
338     /* ensure only first thread performs startup procedure */
339     if (++MessageQ_module->refCount > 1) {
340         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
341                 MessageQ_module->refCount)
342         status = MessageQ_S_ALREADYSETUP;
343         goto exit;
344     }
346     handle = LAD_findHandle();
347     if (handle == LAD_MAXNUMCLIENTS) {
348         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
349                 "pid %d\n", getpid())
350         status = MessageQ_E_RESOURCE;
351         goto exit;
352     }
354     cmd.cmd = LAD_MESSAGEQ_SETUP;
355     cmd.clientId = handle;
356     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
358     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
359         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
360                 "status=%d\n", status)
361         status = MessageQ_E_FAIL;
362         goto exit;
363     }
365     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
366         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
367         status = MessageQ_E_FAIL;
368         goto exit;
369     }
370     status = rsp.setup.status;
372     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
373             handle, status)
375     MessageQ_module->seqNum = 0;
376     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
377     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
378     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
379             sizeof(MessageQ_Handle));
380     MessageQ_module->numHeaps = cfg->numHeaps;
381     MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
383     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
384         for (pri = 0; pri < 2; pri++) {
385             MessageQ_module->transports[i][pri] = NULL;
386         }
387     }
389     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
390         MessageQ_module->transInst[tid] = NULL;
391     }
393 exit:
394     /* if error, must decrement reference count */
395     if (status < 0) {
396         MessageQ_module->refCount--;
397     }
399     pthread_mutex_unlock(&MessageQ_module->gate);
401     return (status);
404 /*
405  *  MessageQ_destroy - destroy the MessageQ module.
406  */
407 Int MessageQ_destroy(void)
409     Int status = MessageQ_S_SUCCESS;
410     LAD_ClientHandle handle;
411     struct LAD_CommandObj cmd;
412     union LAD_ResponseObj rsp;
413     int i;
415     /* this entire function must be serialized */
416     pthread_mutex_lock(&MessageQ_module->gate);
418     /* ensure only last thread does the work */
419     if (--MessageQ_module->refCount > 0) {
420         goto exit;
421     }
423     /* ensure all registered heaps have been unregistered */
424     for (i = 0; i < MessageQ_module->numHeaps; i++) {
425         if (MessageQ_module->heaps[i] != NULL) {
426             PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
427         }
428     }
429     free(MessageQ_module->heaps);
430     MessageQ_module->heaps = NULL;
432     handle = LAD_findHandle();
433     if (handle == LAD_MAXNUMCLIENTS) {
434         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
435                 "for pid %d\n", getpid())
436         status =  MessageQ_E_RESOURCE;
437         goto exit;
438     }
440     cmd.cmd = LAD_MESSAGEQ_DESTROY;
441     cmd.clientId = handle;
443     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
444         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
445                 "status=%d\n", status)
446         status = MessageQ_E_FAIL;
447         goto exit;
448     }
450     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
451         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
452         status = MessageQ_E_FAIL;
453         goto exit;
454     }
455     status = rsp.status;
457     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
458             "status=%d\n", handle, status)
460 exit:
461     pthread_mutex_unlock(&MessageQ_module->gate);
463     return (status);
466 /*
467  *  ======== MessageQ_Params_init ========
468  *  Legacy implementation.
469  */
470 Void MessageQ_Params_init(MessageQ_Params *params)
472     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
475 /*
476  *  ======== MessageQ_Params_init__S ========
477  *  New implementation which is version aware.
478  */
479 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
481     MessageQ_Params_Version2 *params2;
483     switch (version) {
485         case MessageQ_Params_VERSION_2:
486             params2 = (MessageQ_Params_Version2 *)params;
487             params2->__version = MessageQ_Params_VERSION_2;
488             params2->synchronizer = NULL;
489             params2->queueIndex = MessageQ_ANY;
490             break;
492         default:
493             assert(FALSE);
494             break;
495     }
498 /*
499  *  ======== MessageQ_create ========
500  */
501 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
503     Int                   status;
504     MessageQ_Object      *obj = NULL;
505     IMessageQTransport_Handle transport;
506     INetworkTransport_Handle netTrans;
507     ITransport_Handle     baseTrans;
508     UInt16                queueIndex;
509     UInt16                clusterId;
510     Int                   tid;
511     Int                   priority;
512     LAD_ClientHandle      handle;
513     struct LAD_CommandObj cmd;
514     union LAD_ResponseObj rsp;
515     MessageQ_Params ps;
517     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
519     /* copy the given params into the current params structure */
520     if (pp != NULL) {
522         /* snoop the params pointer to see if it's a legacy structure */
523         if ((pp->__version == 0) || (pp->__version > 100)) {
524             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
525         }
527         /* not legacy structure, use params version field */
528         else if (pp->__version == MessageQ_Params_VERSION_2) {
529             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
530             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
531             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
532         }
533         else {
534             assert(FALSE);
535         }
536     }
538     handle = LAD_findHandle();
539     if (handle == LAD_MAXNUMCLIENTS) {
540         PRINTVERBOSE1(
541           "MessageQ_create: can't find connection to daemon for pid %d\n",
542            getpid())
544         return NULL;
545     }
547     cmd.cmd = LAD_MESSAGEQ_CREATE;
548     cmd.clientId = handle;
550     if (name == NULL) {
551         cmd.args.messageQCreate.name[0] = '\0';
552     }
553     else {
554         strncpy(cmd.args.messageQCreate.name, name,
555                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
556         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
557     }
559     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
561     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
562         PRINTVERBOSE1(
563           "MessageQ_create: sending LAD command failed, status=%d\n", status)
564         return NULL;
565     }
567     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
568         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
569         return NULL;
570     }
571     status = rsp.messageQCreate.status;
573     PRINTVERBOSE2(
574       "MessageQ_create: got LAD response for client %d, status=%d\n",
575       handle, status)
577     if (status == -1) {
578        PRINTVERBOSE1(
579           "MessageQ_create: MessageQ server operation failed, status=%d\n",
580           status)
581        return NULL;
582     }
584     /* Create the generic obj */
585     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
587    /* Populate the params member */
588     memcpy(&obj->params, &ps, sizeof(ps));
591     obj->queue = rsp.messageQCreate.queueId;
592     obj->serverHandle = rsp.messageQCreate.serverHandle;
593     pthread_mutex_init(&obj->msgListGate, NULL);
594     CIRCLEQ_INIT(&obj->msgList);
595     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
596         PRINTVERBOSE1(
597           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
599         MessageQ_delete((MessageQ_Handle *)&obj);
601         return NULL;
602     }
604     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
605     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
607     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
608             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
610     pthread_mutex_lock(&MessageQ_module->gate);
612     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
613         for (priority = 0; priority < 2; priority++) {
614             transport = MessageQ_module->transports[clusterId][priority];
615             if (transport) {
616                 /* need to check return and do something if error */
617                 IMessageQTransport_bind((Void *)transport, obj->queue);
618             }
619         }
620     }
622     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
623         baseTrans = MessageQ_module->transInst[tid];
625         if (baseTrans != NULL) {
626             switch (ITransport_itype(baseTrans)) {
627                 case INetworkTransport_TypeId:
628                     netTrans = INetworkTransport_downCast(baseTrans);
629                     INetworkTransport_bind((void *)netTrans, obj->queue);
630                     break;
632                 default:
633                     /* error */
634                     fprintf(stderr,
635                             "MessageQ_create: Error: transport id %d is an "
636                             "unsupported transport type.\n", tid);
637                     break;
638             }
639         }
640     }
642     /* LAD's MessageQ module can grow, we need to grow as well */
643     if (queueIndex >= MessageQ_module->numQueues) {
644         _MessageQ_grow(queueIndex);
645     }
647     /*  No need to "allocate" slot since the queueIndex returned by
648      *  LAD is guaranteed to be unique.
649      */
650     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
652     pthread_mutex_unlock(&MessageQ_module->gate);
654     /* send announce message to LAD, indicating we are ready to receive msgs */
655     cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
656     cmd.clientId = handle;
658     if (name == NULL) {
659         cmd.args.messageQAnnounce.name[0] = '\0';
660     }
661     else {
662         strncpy(cmd.args.messageQAnnounce.name, name,
663                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
664         cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
665     }
667     cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
669     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
670         PRINTVERBOSE1(
671           "MessageQ_create: sending LAD command failed, status=%d\n", status)
672         goto exit;
673     }
675     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
676         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
677         goto exit;
678     }
679     status = rsp.messageQAnnounce.status;
681     PRINTVERBOSE2(
682       "MessageQ_create: got LAD response for client %d, status=%d\n",
683       handle, status)
685     if (status == -1) {
686        PRINTVERBOSE1(
687           "MessageQ_create: MessageQ server operation failed, status=%d\n",
688           status)
689     }
691 exit:
692     return (MessageQ_Handle)obj;
695 /*
696  *  ======== MessageQ_delete ========
697  */
698 Int MessageQ_delete(MessageQ_Handle *handlePtr)
700     MessageQ_Object *obj;
701     IMessageQTransport_Handle transport;
702     INetworkTransport_Handle  netTrans;
703     ITransport_Handle         baseTrans;
704     Int              status = MessageQ_S_SUCCESS;
705     UInt16           queueIndex;
706     UInt16                clusterId;
707     Int                   tid;
708     Int                   priority;
709     LAD_ClientHandle handle;
710     struct LAD_CommandObj cmd;
711     union LAD_ResponseObj rsp;
713     handle = LAD_findHandle();
714     if (handle == LAD_MAXNUMCLIENTS) {
715         PRINTVERBOSE1(
716           "MessageQ_delete: can't find connection to daemon for pid %d\n",
717            getpid())
719         return MessageQ_E_FAIL;
720     }
722     obj = (MessageQ_Object *)(*handlePtr);
724     cmd.cmd = LAD_MESSAGEQ_DELETE;
725     cmd.clientId = handle;
726     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
728     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
729         PRINTVERBOSE1(
730           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
731         return MessageQ_E_FAIL;
732     }
734     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
735         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
736         return MessageQ_E_FAIL;
737     }
738     status = rsp.messageQDelete.status;
740     PRINTVERBOSE2(
741       "MessageQ_delete: got LAD response for client %d, status=%d\n",
742       handle, status)
744     pthread_mutex_lock(&MessageQ_module->gate);
746     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
747         for (priority = 0; priority < 2; priority++) {
748             transport = MessageQ_module->transports[clusterId][priority];
749             if (transport) {
750                 IMessageQTransport_unbind((Void *)transport, obj->queue);
751             }
752         }
753     }
755     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
756         baseTrans = MessageQ_module->transInst[tid];
758         if (baseTrans != NULL) {
759             switch (ITransport_itype(baseTrans)) {
760                 case INetworkTransport_TypeId:
761                     netTrans = INetworkTransport_downCast(baseTrans);
762                     INetworkTransport_unbind((void *)netTrans, obj->queue);
763                     break;
765                 default:
766                     /* error */
767                     fprintf(stderr,
768                             "MessageQ_create: Error: transport id %d is an "
769                             "unsupported transport type.\n", tid);
770                     break;
771             }
772         }
773     }
775     /* extract the queue index from the queueId */
776     queueIndex = MessageQ_getQueueIndex(obj->queue);
777     MessageQ_module->queues[queueIndex] = NULL;
779     pthread_mutex_unlock(&MessageQ_module->gate);
781     free(obj);
782     *handlePtr = NULL;
784     return status;
787 /*
788  *  ======== MessageQ_open ========
789  *  Acquire a queueId for use in sending messages to the queue
790  */
791 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
793     Int status = MessageQ_S_SUCCESS;
795     status = NameServer_getUInt32(MessageQ_module->nameServer,
796                                   name, queueId, NULL);
798     if (status == NameServer_E_NOTFOUND) {
799         /* Set return queue ID to invalid */
800         *queueId = MessageQ_INVALIDMESSAGEQ;
801         status = MessageQ_E_NOTFOUND;
802     }
803     else if (status >= 0) {
804         /* Override with a MessageQ status code */
805         status = MessageQ_S_SUCCESS;
806     }
807     else {
808         /* Set return queue ID to invalid */
809         *queueId = MessageQ_INVALIDMESSAGEQ;
811         /* Override with a MessageQ status code */
812         if (status == NameServer_E_TIMEOUT) {
813             status = MessageQ_E_TIMEOUT;
814         }
815         else {
816             status = MessageQ_E_FAIL;
817         }
818     }
820     return status;
823 /*
824  *  ======== MessageQ_openQueueId ========
825  */
826 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
828     MessageQ_QueueIndex queuePort;
829     MessageQ_QueueId queueId;
831     /* queue port is embedded in the queueId */
832     queuePort = queueIndex + MessageQ_PORTOFFSET;
833     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
835     return (queueId);
838 /*
839  *  ======== MessageQ_close ========
840  *  Closes previously opened instance of MessageQ
841  */
842 Int MessageQ_close(MessageQ_QueueId *queueId)
844     Int32 status = MessageQ_S_SUCCESS;
846     /* Nothing more to be done for closing the MessageQ. */
847     *queueId = MessageQ_INVALIDMESSAGEQ;
849     return status;
852 /*
853  *  ======== MessageQ_put ========
854  *  Deliver the given message, either locally or to the transport
855  *
856  *  If the destination is a local queue, deliver the message. Otherwise,
857  *  pass the message to a transport for delivery. The transport handles
858  *  the sending of the message using the appropriate interface (socket,
859  *  device ioctl, etc.).
860  */
861 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
863     Int status = MessageQ_S_SUCCESS;
864     MessageQ_Object *obj;
865     UInt16 dstProcId;
866     UInt16 queueIndex;
867     UInt16 queuePort;
868     ITransport_Handle baseTrans;
869     IMessageQTransport_Handle msgTrans;
870     INetworkTransport_Handle netTrans;
871     Int priority;
872     UInt tid;
873     UInt16 clusterId;
874     Bool delivered;
876     /* extract destination address from the given queueId */
877     dstProcId  = (UInt16)(queueId >> 16);
878     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
880     /* write the destination address into the message header */
881     msg->dstId = queuePort;
882     msg->dstProc= dstProcId;
884     /* invoke the hook function after addressing the message */
885     if (MessageQ_module->putHookFxn != NULL) {
886         MessageQ_module->putHookFxn(queueId, msg);
887     }
889     /*  For an outbound message: If message destination is on this
890      *  processor, then check if the destination queue is in this
891      *  process (thread-to-thread messaging).
892      *
893      *  For an inbound message: Check if destination queue is in this
894      *  process (process-to-process messaging).
895      */
896     if (dstProcId == MultiProc_self()) {
897         queueIndex = queuePort - MessageQ_PORTOFFSET;
899         if (queueIndex < MessageQ_module->numQueues) {
900             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
902             if (obj != NULL) {
903                 /* deliver message to queue */
904                 pthread_mutex_lock(&obj->msgListGate);
905                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
906                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
907                 pthread_mutex_unlock(&obj->msgListGate);
908                 sem_post(&obj->synchronizer);
909                 goto done;
910             }
911         }
912         /* If we get here, then we have failed to deliver a local message. */
913         status = MessageQ_E_FAIL;
914         goto done;
915     }
917     /*  Getting here implies the message is outbound. Must give it to
918      *  either the primary or secondary transport for delivery. Start
919      *  by extracting the transport ID from the message header.
920      */
921     tid = MessageQ_getTransportId(msg);
923     if (tid >= MessageQ_MAXTRANSPORTS) {
924         fprintf(stderr,
925                 "MessageQ_put: Error: transport id %d too big, must be < %d\n",
926                 tid, MessageQ_MAXTRANSPORTS);
927         status = MessageQ_E_FAIL;
928         goto done;
929     }
931     /* if transportId is set, use secondary transport for message delivery */
932     if (tid != 0) {
933         baseTrans = MessageQ_module->transInst[tid];
935         if (baseTrans == NULL) {
936             fprintf(stderr, "MessageQ_put: Error: transport is null\n");
937             status = MessageQ_E_FAIL;
938             goto done;
939         }
941         /* downcast instance pointer to transport interface */
942         switch (ITransport_itype(baseTrans)) {
943             case INetworkTransport_TypeId:
944                 netTrans = INetworkTransport_downCast(baseTrans);
945                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
946                 status = (delivered ? MessageQ_S_SUCCESS :
947                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
948                            MessageQ_E_FAIL));
949                 break;
951             default:
952                 /* error */
953                 fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
954                         "unsupported transport type\n", tid);
955                 status = MessageQ_E_FAIL;
956                 break;
957         }
958     }
959     else {
960         /* use primary transport for delivery */
961         priority = MessageQ_getMsgPri(msg);
962         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
964         /* primary transport can only be used for intra-cluster delivery */
965         if (clusterId > MultiProc_getNumProcsInCluster()) {
966             fprintf(stderr,
967                     "MessageQ_put: Error: destination procId=%d is not "
968                     "in cluster. Must specify a transportId.\n", dstProcId);
969             status =  MessageQ_E_FAIL;
970             goto done;
971         }
973         msgTrans = MessageQ_module->transports[clusterId][priority];
974         if (msgTrans) {
975             delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
976         }
977         else {
978             delivered = MessageQ_E_FAIL;
979         }
980         status = (delivered ? MessageQ_S_SUCCESS :
981                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
982     }
984 done:
985     return (status);
988 /*
989  *  MessageQ_get - gets a message for a message queue and blocks if
990  *  the queue is empty.
991  *
992  *  If a message is present, it returns it.  Otherwise it blocks
993  *  waiting for a message to arrive.
994  *  When a message is returned, it is owned by the caller.
995  */
996 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
998     MessageQ_Object * obj = (MessageQ_Object *)handle;
999     Int     status = MessageQ_S_SUCCESS;
1000     struct timespec ts;
1001     struct timeval tv;
1003 #if 0
1004 /*
1005  * Optimization here to get a message without going in to the sem
1006  * operation, but the sem count will not be maintained properly.
1007  */
1008     pthread_mutex_lock(&obj->msgListGate);
1010     if (obj->msgList.cqh_first != &obj->msgList) {
1011         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
1012         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
1014         pthread_mutex_unlock(&obj->msgListGate);
1015     }
1016     else {
1017         pthread_mutex_unlock(&obj->msgListGate);
1018     }
1019 #endif
1021     if (timeout == MessageQ_FOREVER) {
1022         sem_wait(&obj->synchronizer);
1023     }
1024     else {
1025         /* add timeout (microseconds) to current time of day */
1026         gettimeofday(&tv, NULL);
1027         tv.tv_sec += timeout / 1000000;
1028         tv.tv_usec += timeout % 1000000;
1030         if (tv.tv_usec >= 1000000) {
1031               tv.tv_sec++;
1032               tv.tv_usec -= 1000000;
1033         }
1035         /* set absolute timeout value */
1036         ts.tv_sec = tv.tv_sec;
1037         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
1039         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
1040             if (errno == ETIMEDOUT) {
1041                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
1042                 return (MessageQ_E_TIMEOUT);
1043             }
1044             else {
1045                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
1046                 return (MessageQ_E_FAIL);
1047             }
1048         }
1049     }
1051     if (obj->unblocked) {
1052         return obj->unblocked;
1053     }
1055     pthread_mutex_lock(&obj->msgListGate);
1057     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
1058     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
1060     pthread_mutex_unlock(&obj->msgListGate);
1062     return status;
1065 /*
1066  * Return a count of the number of messages in the queue
1067  *
1068  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
1069  */
1070 Int MessageQ_count(MessageQ_Handle handle)
1072     Int               count = -1;
1073 #if 0
1074     MessageQ_Object * obj   = (MessageQ_Object *) handle;
1075     socklen_t         optlen;
1077     /*
1078      * TBD: Need to find a way to implement (if anyone uses it!), and
1079      * push down into transport..
1080      */
1082     /*
1083      * 2nd arg to getsockopt should be transport independent, but using
1084      *  SSKPROTO_SHMFIFO for now:
1085      */
1086     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1087                  &count, &optlen);
1088 #endif
1090     return count;
1093 /*
1094  *  Initializes a message not obtained from MessageQ_alloc.
1095  */
1096 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1098     /* Fill in the fields of the message */
1099     MessageQ_msgInit(msg);
1100     msg->heapId = MessageQ_STATICMSG;
1101     msg->msgSize = size;
1104 /*
1105  *  Allocate a message and initialize the needed fields (note some
1106  *  of the fields in the header are set via other APIs or in the
1107  *  MessageQ_put function,
1108  */
1109 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1111     IHeap_Handle heap;
1112     MessageQ_Msg msg;
1114     if (heapId > (MessageQ_module->numHeaps - 1)) {
1115         PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
1116         return (NULL);
1117     }
1118     else if (MessageQ_module->heaps[heapId] == NULL) {
1119         PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
1120                 heapId);
1121         return (NULL);
1122     }
1123     else {
1124         heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
1125     }
1127     msg = IHeap_alloc(heap, size);
1129     if (msg == NULL) {
1130         return (NULL);
1131     }
1133     MessageQ_msgInit(msg);
1134     msg->msgSize = size;
1135     msg->heapId = heapId;
1137     return (msg);
1140 /*
1141  *  Frees the message back to the heap that was used to allocate it.
1142  */
1143 Int MessageQ_free(MessageQ_Msg msg)
1145     UInt32 status = MessageQ_S_SUCCESS;
1146     IHeap_Handle heap;
1148     /* ensure this was not allocated by user */
1149     if (msg->heapId == MessageQ_STATICMSG) {
1150         status = MessageQ_E_CANNOTFREESTATICMSG;
1151     }
1152     else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
1153         status = MessageQ_E_INVALIDARG;
1154     }
1155     else if (MessageQ_module->heaps[msg->heapId] == NULL) {
1156         status = MessageQ_E_NOTFOUND;
1157     }
1158     else {
1159         heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
1160     }
1162     IHeap_free(heap, (void *)msg);
1164     return (status);
1167 /*
1168  *  ======== MessageQ_registerHeap ========
1169  */
1170 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1172     Int status = MessageQ_S_SUCCESS;
1174     pthread_mutex_lock(&MessageQ_module->gate);
1176     if (heapId > (MessageQ_module->numHeaps - 1)) {
1177         status = MessageQ_E_INVALIDARG;
1178     }
1179     else if (MessageQ_module->heaps[heapId] != NULL) {
1180         status = MessageQ_E_ALREADYEXISTS;
1181     }
1182     else {
1183         MessageQ_module->heaps[heapId] = heap;
1184     }
1186     pthread_mutex_unlock(&MessageQ_module->gate);
1188     return (status);
1191 /*
1192  *  ======== MessageQ_unregisterHeap ========
1193  */
1194 Int MessageQ_unregisterHeap(UInt16 heapId)
1196     Int status = MessageQ_S_SUCCESS;
1198     pthread_mutex_lock(&MessageQ_module->gate);
1200     if (heapId > (MessageQ_module->numHeaps - 1)) {
1201         status = MessageQ_E_INVALIDARG;
1202     }
1203     else if (MessageQ_module->heaps[heapId] == NULL) {
1204         status = MessageQ_E_NOTFOUND;
1205     }
1206     else {
1207         MessageQ_module->heaps[heapId] = NULL;
1208     }
1210     pthread_mutex_unlock(&MessageQ_module->gate);
1212     return (status);
1215 /* Unblocks a MessageQ */
1216 Void MessageQ_unblock(MessageQ_Handle handle)
1218     MessageQ_Object *obj = (MessageQ_Object *)handle;
1220     obj->unblocked = MessageQ_E_UNBLOCKED;
1221     sem_post(&obj->synchronizer);
1224 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1225 Void MessageQ_shutdown(MessageQ_Handle handle)
1227     MessageQ_Object *obj = (MessageQ_Object *)handle;
1229     obj->unblocked = MessageQ_E_SHUTDOWN;
1230     sem_post(&obj->synchronizer);
1233 /* Embeds a source message queue into a message */
1234 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1236     MessageQ_Object *obj = (MessageQ_Object *)handle;
1238     msg->replyId = (UInt16)(obj->queue);
1239     msg->replyProc = (UInt16)(obj->queue >> 16);
1242 /* Returns the QueueId associated with the handle. */
1243 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1245     MessageQ_Object *obj = (MessageQ_Object *) handle;
1246     UInt32 queueId;
1248     queueId = (obj->queue);
1250     return queueId;
1253 /* Returns the local handle associated with queueId. */
1254 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1256     MessageQ_Object *obj;
1257     MessageQ_QueueIndex queueIndex;
1258     UInt16 procId;
1260     procId = MessageQ_getProcId(queueId);
1261     if (procId != MultiProc_self()) {
1262         return NULL;
1263     }
1265     queueIndex = MessageQ_getQueueIndex(queueId);
1266     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1268     return (MessageQ_Handle)obj;
1271 /* Sets the tracing of a message */
1272 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1274     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1277 /*
1278  *  Returns the amount of shared memory used by one transport instance.
1279  *
1280  *  The MessageQ module itself does not use any shared memory but the
1281  *  underlying transport may use some shared memory.
1282  */
1283 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1285     SizeT memReq = 0u;
1287     /* Do nothing, as this is a copy transport. */
1289     return memReq;
1292 /*
1293  * This is a helper function to initialize a message.
1294  */
1295 Void MessageQ_msgInit(MessageQ_Msg msg)
1297 #if 0
1298     Int                 status    = MessageQ_S_SUCCESS;
1299     LAD_ClientHandle handle;
1300     struct LAD_CommandObj cmd;
1301     union LAD_ResponseObj rsp;
1303     handle = LAD_findHandle();
1304     if (handle == LAD_MAXNUMCLIENTS) {
1305         PRINTVERBOSE1(
1306           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1307            getpid())
1309         return;
1310     }
1312     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1313     cmd.clientId = handle;
1315     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1316         PRINTVERBOSE1(
1317           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1318         return;
1319     }
1321     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1322         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1323         return;
1324     }
1325     status = rsp.msgInit.status;
1327     PRINTVERBOSE2(
1328       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1329       handle, status)
1331     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1332 #else
1333     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1334     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1335     msg->msgId     = MessageQ_INVALIDMSGID;
1336     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1337     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1338     msg->srcProc   = MultiProc_self();
1340     pthread_mutex_lock(&MessageQ_module->seqNumGate);
1341     msg->seqNum  = MessageQ_module->seqNum++;
1342     pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1343 #endif
1346 /*
1347  *  ======== _MessageQ_grow ========
1348  *  Increase module's queues array to accommodate queueIndex from LAD
1349  *
1350  *  Note: this function takes the queue index value (i.e. without the
1351  *  port offset).
1352  */
1353 Void _MessageQ_grow(UInt16 queueIndex)
1355     MessageQ_Handle *queues;
1356     MessageQ_Handle *oldQueues;
1357     UInt oldSize;
1359     pthread_mutex_lock(&MessageQ_module->gate);
1361     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1363     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1364     memcpy(queues, MessageQ_module->queues, oldSize);
1366     oldQueues = MessageQ_module->queues;
1367     MessageQ_module->queues = queues;
1368     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1370     pthread_mutex_unlock(&MessageQ_module->gate);
1372     free(oldQueues);
1374     return;
1377 /*
1378  *  ======== MessageQ_bind ========
1379  *  Bind all existing message queues to the given processor
1380  *
1381  *  Note: This function is a hack to work around the driver.
1382  *
1383  *  The Linux rpmsgproto driver requires a socket for each
1384  *  message queue and remote processor tuple.
1385  *
1386  *      socket --> (queue, processor)
1387  *
1388  *  Therefore, each time a new remote processor is started, all
1389  *  existing message queues need to create a socket for the new
1390  *  processor.
1391  *
1392  *  The driver should not have this requirement. One socket per
1393  *  message queue should be sufficient to uniquely identify the
1394  *  endpoint to the driver.
1395  */
1396 Void MessageQ_bind(UInt16 procId)
1398     int q;
1399     int clusterId;
1400     int priority;
1401     MessageQ_Handle handle;
1402     MessageQ_QueueId queue;
1403     IMessageQTransport_Handle transport;
1405     clusterId = procId - MultiProc_getBaseIdOfCluster();
1406     pthread_mutex_lock(&MessageQ_module->gate);
1408     for (q = 0; q < MessageQ_module->numQueues; q++) {
1410         if ((handle = MessageQ_module->queues[q]) == NULL) {
1411             continue;
1412         }
1414         queue = ((MessageQ_Object *)handle)->queue;
1416         for (priority = 0; priority < 2; priority++) {
1417             transport = MessageQ_module->transports[clusterId][priority];
1418             if (transport != NULL) {
1419                 IMessageQTransport_bind((Void *)transport, queue);
1420             }
1421         }
1422     }
1424     pthread_mutex_unlock(&MessageQ_module->gate);
1427 /*
1428  *  ======== MessageQ_unbind ========
1429  *  Unbind all existing message queues from the given processor
1430  *
1431  *  Hack: see MessageQ_bind.
1432  */
1433 Void MessageQ_unbind(UInt16 procId)
1435     int q;
1436     int clusterId;
1437     int priority;
1438     MessageQ_Handle handle;
1439     MessageQ_QueueId queue;
1440     IMessageQTransport_Handle transport;
1442     pthread_mutex_lock(&MessageQ_module->gate);
1444     for (q = 0; q < MessageQ_module->numQueues; q++) {
1446         if ((handle = MessageQ_module->queues[q]) == NULL) {
1447             continue;
1448         }
1450         queue = ((MessageQ_Object *)handle)->queue;
1451         clusterId = procId - MultiProc_getBaseIdOfCluster();
1453         for (priority = 0; priority < 2; priority++) {
1454             transport = MessageQ_module->transports[clusterId][priority];
1455             if (transport != NULL) {
1456                 IMessageQTransport_unbind((Void *)transport, queue);
1457             }
1458         }
1459     }
1461     pthread_mutex_unlock(&MessageQ_module->gate);