db96b241156963cc831833d95edd0de733991b04
1 /*
2 * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1 /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/IHeap.h>
54 #include <ti/ipc/interfaces/ITransport.h>
55 #include <ti/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/ipc/interfaces/INetworkTransport.h>
58 /* Socket Headers */
59 #include <sys/select.h>
60 #include <sys/time.h>
61 #include <sys/types.h>
62 #include <sys/param.h>
63 #include <sys/eventfd.h>
64 #include <sys/queue.h>
65 #include <errno.h>
66 #include <stdio.h>
67 #include <string.h>
68 #include <stdlib.h>
69 #include <unistd.h>
70 #include <assert.h>
71 #include <pthread.h>
72 #include <semaphore.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78 * Macros/Constants
79 * =============================================================================
80 */
82 /*!
83 * @brief Name of the reserved NameServer used for MessageQ.
84 */
85 #define MessageQ_NAMESERVER "MessageQ"
87 #define MessageQ_MAXTRANSPORTS 8
89 #define MessageQ_GROWSIZE 32
91 /* Trace flag settings: */
92 #define TRACESHIFT 12
93 #define TRACEMASK 0x1000
95 /* Define BENCHMARK to quiet key MessageQ APIs: */
96 //#define BENCHMARK
98 /* =============================================================================
99 * Structures & Enums
100 * =============================================================================
101 */
103 /* params structure evolution */
104 typedef struct {
105 Void *synchronizer;
106 } MessageQ_Params_Legacy;
108 typedef struct {
109 Int __version;
110 Void *synchronizer;
111 MessageQ_QueueIndex queueIndex;
112 } MessageQ_Params_Version2;
114 /* structure for MessageQ module state */
115 typedef struct MessageQ_ModuleObject {
116 MessageQ_Handle *queues;
117 Int numQueues;
118 Int refCount;
119 NameServer_Handle nameServer;
120 pthread_mutex_t gate;
121 int seqNum;
122 pthread_mutex_t seqNumGate;
123 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
124 ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
125 MessageQ_PutHookFxn putHookFxn;
126 Ptr *heaps;
127 Int numHeaps;
128 } MessageQ_ModuleObject;
130 typedef struct MessageQ_CIRCLEQ_ENTRY {
131 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
132 } MessageQ_CIRCLEQ_ENTRY;
134 /*!
135 * @brief Structure for the Handle for the MessageQ.
136 */
137 typedef struct MessageQ_Object_tag {
138 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
139 pthread_mutex_t msgListGate;
140 MessageQ_Params params;
141 MessageQ_QueueId queue;
142 int unblocked;
143 void *serverHandle;
144 sem_t synchronizer;
145 } MessageQ_Object;
147 /* traces in this file are controlled via _MessageQ_verbose */
148 Bool _MessageQ_verbose = FALSE;
149 #define verbose _MessageQ_verbose
151 /* =============================================================================
152 * Globals
153 * =============================================================================
154 */
155 static MessageQ_ModuleObject MessageQ_state =
156 {
157 .refCount = 0,
158 .nameServer = NULL,
159 #if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
160 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
161 #else
162 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
163 #endif
164 .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
165 .putHookFxn = NULL,
166 .heaps = NULL,
167 .numHeaps = 0
168 };
170 /*!
171 * @var MessageQ_module
172 *
173 * @brief Pointer to the MessageQ module state.
174 */
175 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
177 Void _MessageQ_grow(UInt16 queueIndex);
179 /* =============================================================================
180 * APIS
181 * =============================================================================
182 */
184 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
185 UInt16 rprocId, UInt priority)
186 {
187 Int status = FALSE;
188 UInt16 clusterId;
190 if (handle == NULL) {
191 fprintf(stderr,
192 "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
193 );
195 return status;
196 }
198 /* map procId to clusterId */
199 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
201 if (clusterId >= MultiProc_MAXPROCESSORS) {
202 fprintf(stderr,
203 "MessageQ_registerTransport: invalid procId %d\n", rprocId);
205 return status;
206 }
208 if (MessageQ_module->transports[clusterId][priority] == NULL) {
209 MessageQ_module->transports[clusterId][priority] = handle;
211 status = TRUE;
212 }
214 return status;
215 }
217 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
218 {
219 if (inst == NULL) {
220 fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
222 return MessageQ_E_INVALIDARG;
223 }
225 if (tid >= MessageQ_MAXTRANSPORTS) {
226 fprintf(stderr,
227 "MessageQ_unregisterNetTransport: invalid transport id %d, "
228 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
230 return MessageQ_E_INVALIDARG;
231 }
233 if (MessageQ_module->transInst[tid] != NULL) {
234 fprintf(stderr,
235 "MessageQ_registerTransportId: transport id %d already "
236 "registered\n", tid);
238 return MessageQ_E_ALREADYEXISTS;
239 }
241 MessageQ_module->transInst[tid] = inst;
243 return MessageQ_S_SUCCESS;
244 }
246 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
247 {
248 UInt16 clusterId;
250 /* map procId to clusterId */
251 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
253 if (clusterId >= MultiProc_MAXPROCESSORS) {
254 fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
255 rprocId);
257 return;
258 }
260 MessageQ_module->transports[clusterId][priority] = NULL;
261 }
263 Void MessageQ_unregisterTransportId(UInt tid)
264 {
265 if (tid >= MessageQ_MAXTRANSPORTS) {
266 fprintf(stderr,
267 "MessageQ_unregisterTransportId: invalid transport id %d, "
268 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
270 return;
271 }
273 MessageQ_module->transInst[tid] = NULL;
274 }
276 /*
277 * Function to get default configuration for the MessageQ module.
278 */
279 Void MessageQ_getConfig(MessageQ_Config *cfg)
280 {
281 Int status;
282 LAD_ClientHandle handle;
283 struct LAD_CommandObj cmd;
284 union LAD_ResponseObj rsp;
286 assert (cfg != NULL);
288 handle = LAD_findHandle();
289 if (handle == LAD_MAXNUMCLIENTS) {
290 PRINTVERBOSE1(
291 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
292 getpid())
294 return;
295 }
297 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
298 cmd.clientId = handle;
300 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
301 PRINTVERBOSE1(
302 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
303 return;
304 }
306 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
307 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
308 status)
309 return;
310 }
311 status = rsp.messageQGetConfig.status;
313 PRINTVERBOSE2(
314 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
315 handle, status)
317 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
319 return;
320 }
322 /*
323 * Function to setup the MessageQ module.
324 */
325 Int MessageQ_setup(const MessageQ_Config *cfg)
326 {
327 Int status = MessageQ_S_SUCCESS;
328 LAD_ClientHandle handle;
329 struct LAD_CommandObj cmd;
330 union LAD_ResponseObj rsp;
331 Int pri;
332 Int i;
333 Int tid;
335 /* this entire function must be serialized */
336 pthread_mutex_lock(&MessageQ_module->gate);
338 /* ensure only first thread performs startup procedure */
339 if (++MessageQ_module->refCount > 1) {
340 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
341 MessageQ_module->refCount)
342 status = MessageQ_S_ALREADYSETUP;
343 goto exit;
344 }
346 handle = LAD_findHandle();
347 if (handle == LAD_MAXNUMCLIENTS) {
348 PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
349 "pid %d\n", getpid())
350 status = MessageQ_E_RESOURCE;
351 goto exit;
352 }
354 cmd.cmd = LAD_MESSAGEQ_SETUP;
355 cmd.clientId = handle;
356 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
358 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
359 PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
360 "status=%d\n", status)
361 status = MessageQ_E_FAIL;
362 goto exit;
363 }
365 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
366 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
367 status = MessageQ_E_FAIL;
368 goto exit;
369 }
370 status = rsp.setup.status;
372 PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
373 handle, status)
375 MessageQ_module->seqNum = 0;
376 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
377 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
378 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
379 sizeof(MessageQ_Handle));
380 MessageQ_module->numHeaps = cfg->numHeaps;
381 MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
383 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
384 for (pri = 0; pri < 2; pri++) {
385 MessageQ_module->transports[i][pri] = NULL;
386 }
387 }
389 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
390 MessageQ_module->transInst[tid] = NULL;
391 }
393 exit:
394 /* if error, must decrement reference count */
395 if (status < 0) {
396 MessageQ_module->refCount--;
397 }
399 pthread_mutex_unlock(&MessageQ_module->gate);
401 return (status);
402 }
404 /*
405 * MessageQ_destroy - destroy the MessageQ module.
406 */
407 Int MessageQ_destroy(void)
408 {
409 Int status = MessageQ_S_SUCCESS;
410 LAD_ClientHandle handle;
411 struct LAD_CommandObj cmd;
412 union LAD_ResponseObj rsp;
413 int i;
415 /* this entire function must be serialized */
416 pthread_mutex_lock(&MessageQ_module->gate);
418 /* ensure only last thread does the work */
419 if (--MessageQ_module->refCount > 0) {
420 goto exit;
421 }
423 /* ensure all registered heaps have been unregistered */
424 for (i = 0; i < MessageQ_module->numHeaps; i++) {
425 if (MessageQ_module->heaps[i] != NULL) {
426 PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
427 }
428 }
429 free(MessageQ_module->heaps);
430 MessageQ_module->heaps = NULL;
432 handle = LAD_findHandle();
433 if (handle == LAD_MAXNUMCLIENTS) {
434 PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
435 "for pid %d\n", getpid())
436 status = MessageQ_E_RESOURCE;
437 goto exit;
438 }
440 cmd.cmd = LAD_MESSAGEQ_DESTROY;
441 cmd.clientId = handle;
443 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
444 PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
445 "status=%d\n", status)
446 status = MessageQ_E_FAIL;
447 goto exit;
448 }
450 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
451 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
452 status = MessageQ_E_FAIL;
453 goto exit;
454 }
455 status = rsp.status;
457 PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
458 "status=%d\n", handle, status)
460 exit:
461 pthread_mutex_unlock(&MessageQ_module->gate);
463 return (status);
464 }
466 /*
467 * ======== MessageQ_Params_init ========
468 * Legacy implementation.
469 */
470 Void MessageQ_Params_init(MessageQ_Params *params)
471 {
472 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
473 }
475 /*
476 * ======== MessageQ_Params_init__S ========
477 * New implementation which is version aware.
478 */
479 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
480 {
481 MessageQ_Params_Version2 *params2;
483 switch (version) {
485 case MessageQ_Params_VERSION_2:
486 params2 = (MessageQ_Params_Version2 *)params;
487 params2->__version = MessageQ_Params_VERSION_2;
488 params2->synchronizer = NULL;
489 params2->queueIndex = MessageQ_ANY;
490 break;
492 default:
493 assert(FALSE);
494 break;
495 }
496 }
498 /*
499 * ======== MessageQ_create ========
500 */
501 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
502 {
503 Int status;
504 MessageQ_Object *obj = NULL;
505 IMessageQTransport_Handle transport;
506 INetworkTransport_Handle netTrans;
507 ITransport_Handle baseTrans;
508 UInt16 queueIndex;
509 UInt16 clusterId;
510 Int tid;
511 Int priority;
512 LAD_ClientHandle handle;
513 struct LAD_CommandObj cmd;
514 union LAD_ResponseObj rsp;
515 MessageQ_Params ps;
517 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
519 /* copy the given params into the current params structure */
520 if (pp != NULL) {
522 /* snoop the params pointer to see if it's a legacy structure */
523 if ((pp->__version == 0) || (pp->__version > 100)) {
524 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
525 }
527 /* not legacy structure, use params version field */
528 else if (pp->__version == MessageQ_Params_VERSION_2) {
529 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
530 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
531 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
532 }
533 else {
534 assert(FALSE);
535 }
536 }
538 handle = LAD_findHandle();
539 if (handle == LAD_MAXNUMCLIENTS) {
540 PRINTVERBOSE1(
541 "MessageQ_create: can't find connection to daemon for pid %d\n",
542 getpid())
544 return NULL;
545 }
547 cmd.cmd = LAD_MESSAGEQ_CREATE;
548 cmd.clientId = handle;
550 if (name == NULL) {
551 cmd.args.messageQCreate.name[0] = '\0';
552 }
553 else {
554 strncpy(cmd.args.messageQCreate.name, name,
555 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
556 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
557 }
559 memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
561 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
562 PRINTVERBOSE1(
563 "MessageQ_create: sending LAD command failed, status=%d\n", status)
564 return NULL;
565 }
567 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
568 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
569 return NULL;
570 }
571 status = rsp.messageQCreate.status;
573 PRINTVERBOSE2(
574 "MessageQ_create: got LAD response for client %d, status=%d\n",
575 handle, status)
577 if (status == -1) {
578 PRINTVERBOSE1(
579 "MessageQ_create: MessageQ server operation failed, status=%d\n",
580 status)
581 return NULL;
582 }
584 /* Create the generic obj */
585 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
587 /* Populate the params member */
588 memcpy(&obj->params, &ps, sizeof(ps));
591 obj->queue = rsp.messageQCreate.queueId;
592 obj->serverHandle = rsp.messageQCreate.serverHandle;
593 pthread_mutex_init(&obj->msgListGate, NULL);
594 CIRCLEQ_INIT(&obj->msgList);
595 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
596 PRINTVERBOSE1(
597 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
599 MessageQ_delete((MessageQ_Handle *)&obj);
601 return NULL;
602 }
604 /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
605 queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
607 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
608 "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
610 pthread_mutex_lock(&MessageQ_module->gate);
612 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
613 for (priority = 0; priority < 2; priority++) {
614 transport = MessageQ_module->transports[clusterId][priority];
615 if (transport) {
616 /* need to check return and do something if error */
617 IMessageQTransport_bind((Void *)transport, obj->queue);
618 }
619 }
620 }
622 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
623 baseTrans = MessageQ_module->transInst[tid];
625 if (baseTrans != NULL) {
626 switch (ITransport_itype(baseTrans)) {
627 case INetworkTransport_TypeId:
628 netTrans = INetworkTransport_downCast(baseTrans);
629 INetworkTransport_bind((void *)netTrans, obj->queue);
630 break;
632 default:
633 /* error */
634 fprintf(stderr,
635 "MessageQ_create: Error: transport id %d is an "
636 "unsupported transport type.\n", tid);
637 break;
638 }
639 }
640 }
642 /* LAD's MessageQ module can grow, we need to grow as well */
643 if (queueIndex >= MessageQ_module->numQueues) {
644 _MessageQ_grow(queueIndex);
645 }
647 /* No need to "allocate" slot since the queueIndex returned by
648 * LAD is guaranteed to be unique.
649 */
650 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
652 pthread_mutex_unlock(&MessageQ_module->gate);
654 /* send announce message to LAD, indicating we are ready to receive msgs */
655 cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
656 cmd.clientId = handle;
658 if (name == NULL) {
659 cmd.args.messageQAnnounce.name[0] = '\0';
660 }
661 else {
662 strncpy(cmd.args.messageQAnnounce.name, name,
663 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
664 cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
665 }
667 cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
669 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
670 PRINTVERBOSE1(
671 "MessageQ_create: sending LAD command failed, status=%d\n", status)
672 goto exit;
673 }
675 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
676 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
677 goto exit;
678 }
679 status = rsp.messageQAnnounce.status;
681 PRINTVERBOSE2(
682 "MessageQ_create: got LAD response for client %d, status=%d\n",
683 handle, status)
685 if (status == -1) {
686 PRINTVERBOSE1(
687 "MessageQ_create: MessageQ server operation failed, status=%d\n",
688 status)
689 }
691 exit:
692 return (MessageQ_Handle)obj;
693 }
695 /*
696 * ======== MessageQ_delete ========
697 */
698 Int MessageQ_delete(MessageQ_Handle *handlePtr)
699 {
700 MessageQ_Object *obj;
701 IMessageQTransport_Handle transport;
702 INetworkTransport_Handle netTrans;
703 ITransport_Handle baseTrans;
704 Int status = MessageQ_S_SUCCESS;
705 UInt16 queueIndex;
706 UInt16 clusterId;
707 Int tid;
708 Int priority;
709 LAD_ClientHandle handle;
710 struct LAD_CommandObj cmd;
711 union LAD_ResponseObj rsp;
713 handle = LAD_findHandle();
714 if (handle == LAD_MAXNUMCLIENTS) {
715 PRINTVERBOSE1(
716 "MessageQ_delete: can't find connection to daemon for pid %d\n",
717 getpid())
719 return MessageQ_E_FAIL;
720 }
722 obj = (MessageQ_Object *)(*handlePtr);
724 cmd.cmd = LAD_MESSAGEQ_DELETE;
725 cmd.clientId = handle;
726 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
728 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
729 PRINTVERBOSE1(
730 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
731 return MessageQ_E_FAIL;
732 }
734 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
735 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
736 return MessageQ_E_FAIL;
737 }
738 status = rsp.messageQDelete.status;
740 PRINTVERBOSE2(
741 "MessageQ_delete: got LAD response for client %d, status=%d\n",
742 handle, status)
744 pthread_mutex_lock(&MessageQ_module->gate);
746 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
747 for (priority = 0; priority < 2; priority++) {
748 transport = MessageQ_module->transports[clusterId][priority];
749 if (transport) {
750 IMessageQTransport_unbind((Void *)transport, obj->queue);
751 }
752 }
753 }
755 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
756 baseTrans = MessageQ_module->transInst[tid];
758 if (baseTrans != NULL) {
759 switch (ITransport_itype(baseTrans)) {
760 case INetworkTransport_TypeId:
761 netTrans = INetworkTransport_downCast(baseTrans);
762 INetworkTransport_unbind((void *)netTrans, obj->queue);
763 break;
765 default:
766 /* error */
767 fprintf(stderr,
768 "MessageQ_create: Error: transport id %d is an "
769 "unsupported transport type.\n", tid);
770 break;
771 }
772 }
773 }
775 /* extract the queue index from the queueId */
776 queueIndex = MessageQ_getQueueIndex(obj->queue);
777 MessageQ_module->queues[queueIndex] = NULL;
779 pthread_mutex_unlock(&MessageQ_module->gate);
781 free(obj);
782 *handlePtr = NULL;
784 return status;
785 }
787 /*
788 * ======== MessageQ_open ========
789 * Acquire a queueId for use in sending messages to the queue
790 */
791 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
792 {
793 Int status = MessageQ_S_SUCCESS;
795 status = NameServer_getUInt32(MessageQ_module->nameServer,
796 name, queueId, NULL);
798 if (status == NameServer_E_NOTFOUND) {
799 /* Set return queue ID to invalid */
800 *queueId = MessageQ_INVALIDMESSAGEQ;
801 status = MessageQ_E_NOTFOUND;
802 }
803 else if (status >= 0) {
804 /* Override with a MessageQ status code */
805 status = MessageQ_S_SUCCESS;
806 }
807 else {
808 /* Set return queue ID to invalid */
809 *queueId = MessageQ_INVALIDMESSAGEQ;
811 /* Override with a MessageQ status code */
812 if (status == NameServer_E_TIMEOUT) {
813 status = MessageQ_E_TIMEOUT;
814 }
815 else {
816 status = MessageQ_E_FAIL;
817 }
818 }
820 return status;
821 }
823 /*
824 * ======== MessageQ_openQueueId ========
825 */
826 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
827 {
828 MessageQ_QueueIndex queuePort;
829 MessageQ_QueueId queueId;
831 /* queue port is embedded in the queueId */
832 queuePort = queueIndex + MessageQ_PORTOFFSET;
833 queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
835 return (queueId);
836 }
838 /*
839 * ======== MessageQ_close ========
840 * Closes previously opened instance of MessageQ
841 */
842 Int MessageQ_close(MessageQ_QueueId *queueId)
843 {
844 Int32 status = MessageQ_S_SUCCESS;
846 /* Nothing more to be done for closing the MessageQ. */
847 *queueId = MessageQ_INVALIDMESSAGEQ;
849 return status;
850 }
852 /*
853 * ======== MessageQ_put ========
854 * Deliver the given message, either locally or to the transport
855 *
856 * If the destination is a local queue, deliver the message. Otherwise,
857 * pass the message to a transport for delivery. The transport handles
858 * the sending of the message using the appropriate interface (socket,
859 * device ioctl, etc.).
860 */
861 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
862 {
863 Int status = MessageQ_S_SUCCESS;
864 MessageQ_Object *obj;
865 UInt16 dstProcId;
866 UInt16 queueIndex;
867 UInt16 queuePort;
868 ITransport_Handle baseTrans;
869 IMessageQTransport_Handle msgTrans;
870 INetworkTransport_Handle netTrans;
871 Int priority;
872 UInt tid;
873 UInt16 clusterId;
874 Bool delivered;
876 /* extract destination address from the given queueId */
877 dstProcId = (UInt16)(queueId >> 16);
878 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
880 /* write the destination address into the message header */
881 msg->dstId = queuePort;
882 msg->dstProc= dstProcId;
884 /* invoke the hook function after addressing the message */
885 if (MessageQ_module->putHookFxn != NULL) {
886 MessageQ_module->putHookFxn(queueId, msg);
887 }
889 /* For an outbound message: If message destination is on this
890 * processor, then check if the destination queue is in this
891 * process (thread-to-thread messaging).
892 *
893 * For an inbound message: Check if destination queue is in this
894 * process (process-to-process messaging).
895 */
896 if (dstProcId == MultiProc_self()) {
897 queueIndex = queuePort - MessageQ_PORTOFFSET;
899 if (queueIndex < MessageQ_module->numQueues) {
900 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
902 if (obj != NULL) {
903 /* deliver message to queue */
904 pthread_mutex_lock(&obj->msgListGate);
905 CIRCLEQ_INSERT_TAIL(&obj->msgList,
906 (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
907 pthread_mutex_unlock(&obj->msgListGate);
908 sem_post(&obj->synchronizer);
909 goto done;
910 }
911 }
912 /* If we get here, then we have failed to deliver a local message. */
913 status = MessageQ_E_FAIL;
914 goto done;
915 }
917 /* Getting here implies the message is outbound. Must give it to
918 * either the primary or secondary transport for delivery. Start
919 * by extracting the transport ID from the message header.
920 */
921 tid = MessageQ_getTransportId(msg);
923 if (tid >= MessageQ_MAXTRANSPORTS) {
924 fprintf(stderr,
925 "MessageQ_put: Error: transport id %d too big, must be < %d\n",
926 tid, MessageQ_MAXTRANSPORTS);
927 status = MessageQ_E_FAIL;
928 goto done;
929 }
931 /* if transportId is set, use secondary transport for message delivery */
932 if (tid != 0) {
933 baseTrans = MessageQ_module->transInst[tid];
935 if (baseTrans == NULL) {
936 fprintf(stderr, "MessageQ_put: Error: transport is null\n");
937 status = MessageQ_E_FAIL;
938 goto done;
939 }
941 /* downcast instance pointer to transport interface */
942 switch (ITransport_itype(baseTrans)) {
943 case INetworkTransport_TypeId:
944 netTrans = INetworkTransport_downCast(baseTrans);
945 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
946 status = (delivered ? MessageQ_S_SUCCESS :
947 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
948 MessageQ_E_FAIL));
949 break;
951 default:
952 /* error */
953 fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
954 "unsupported transport type\n", tid);
955 status = MessageQ_E_FAIL;
956 break;
957 }
958 }
959 else {
960 /* use primary transport for delivery */
961 priority = MessageQ_getMsgPri(msg);
962 clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
964 /* primary transport can only be used for intra-cluster delivery */
965 if (clusterId > MultiProc_getNumProcsInCluster()) {
966 fprintf(stderr,
967 "MessageQ_put: Error: destination procId=%d is not "
968 "in cluster. Must specify a transportId.\n", dstProcId);
969 status = MessageQ_E_FAIL;
970 goto done;
971 }
973 msgTrans = MessageQ_module->transports[clusterId][priority];
974 if (msgTrans) {
975 delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
976 }
977 else {
978 delivered = MessageQ_E_FAIL;
979 }
980 status = (delivered ? MessageQ_S_SUCCESS :
981 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
982 }
984 done:
985 return (status);
986 }
988 /*
989 * MessageQ_get - gets a message for a message queue and blocks if
990 * the queue is empty.
991 *
992 * If a message is present, it returns it. Otherwise it blocks
993 * waiting for a message to arrive.
994 * When a message is returned, it is owned by the caller.
995 */
996 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
997 {
998 MessageQ_Object * obj = (MessageQ_Object *)handle;
999 Int status = MessageQ_S_SUCCESS;
1000 struct timespec ts;
1001 struct timeval tv;
1003 #if 0
1004 /*
1005 * Optimization here to get a message without going in to the sem
1006 * operation, but the sem count will not be maintained properly.
1007 */
1008 pthread_mutex_lock(&obj->msgListGate);
1010 if (obj->msgList.cqh_first != &obj->msgList) {
1011 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
1012 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
1014 pthread_mutex_unlock(&obj->msgListGate);
1015 }
1016 else {
1017 pthread_mutex_unlock(&obj->msgListGate);
1018 }
1019 #endif
1021 if (timeout == MessageQ_FOREVER) {
1022 sem_wait(&obj->synchronizer);
1023 }
1024 else {
1025 /* add timeout (microseconds) to current time of day */
1026 gettimeofday(&tv, NULL);
1027 tv.tv_sec += timeout / 1000000;
1028 tv.tv_usec += timeout % 1000000;
1030 if (tv.tv_usec >= 1000000) {
1031 tv.tv_sec++;
1032 tv.tv_usec -= 1000000;
1033 }
1035 /* set absolute timeout value */
1036 ts.tv_sec = tv.tv_sec;
1037 ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
1039 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
1040 if (errno == ETIMEDOUT) {
1041 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
1042 return (MessageQ_E_TIMEOUT);
1043 }
1044 else {
1045 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
1046 return (MessageQ_E_FAIL);
1047 }
1048 }
1049 }
1051 if (obj->unblocked) {
1052 return obj->unblocked;
1053 }
1055 pthread_mutex_lock(&obj->msgListGate);
1057 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
1058 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
1060 pthread_mutex_unlock(&obj->msgListGate);
1062 return status;
1063 }
1065 /*
1066 * Return a count of the number of messages in the queue
1067 *
1068 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
1069 */
1070 Int MessageQ_count(MessageQ_Handle handle)
1071 {
1072 Int count = -1;
1073 #if 0
1074 MessageQ_Object * obj = (MessageQ_Object *) handle;
1075 socklen_t optlen;
1077 /*
1078 * TBD: Need to find a way to implement (if anyone uses it!), and
1079 * push down into transport..
1080 */
1082 /*
1083 * 2nd arg to getsockopt should be transport independent, but using
1084 * SSKPROTO_SHMFIFO for now:
1085 */
1086 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1087 &count, &optlen);
1088 #endif
1090 return count;
1091 }
1093 /*
1094 * Initializes a message not obtained from MessageQ_alloc.
1095 */
1096 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1097 {
1098 /* Fill in the fields of the message */
1099 MessageQ_msgInit(msg);
1100 msg->heapId = MessageQ_STATICMSG;
1101 msg->msgSize = size;
1102 }
1104 /*
1105 * Allocate a message and initialize the needed fields (note some
1106 * of the fields in the header are set via other APIs or in the
1107 * MessageQ_put function,
1108 */
1109 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1110 {
1111 IHeap_Handle heap;
1112 MessageQ_Msg msg;
1114 if (heapId > (MessageQ_module->numHeaps - 1)) {
1115 PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
1116 return (NULL);
1117 }
1118 else if (MessageQ_module->heaps[heapId] == NULL) {
1119 PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
1120 heapId);
1121 return (NULL);
1122 }
1123 else {
1124 heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
1125 }
1127 msg = IHeap_alloc(heap, size);
1129 if (msg == NULL) {
1130 return (NULL);
1131 }
1133 MessageQ_msgInit(msg);
1134 msg->msgSize = size;
1135 msg->heapId = heapId;
1137 return (msg);
1138 }
1140 /*
1141 * Frees the message back to the heap that was used to allocate it.
1142 */
1143 Int MessageQ_free(MessageQ_Msg msg)
1144 {
1145 UInt32 status = MessageQ_S_SUCCESS;
1146 IHeap_Handle heap;
1148 /* ensure this was not allocated by user */
1149 if (msg->heapId == MessageQ_STATICMSG) {
1150 status = MessageQ_E_CANNOTFREESTATICMSG;
1151 }
1152 else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
1153 status = MessageQ_E_INVALIDARG;
1154 }
1155 else if (MessageQ_module->heaps[msg->heapId] == NULL) {
1156 status = MessageQ_E_NOTFOUND;
1157 }
1158 else {
1159 heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
1160 }
1162 IHeap_free(heap, (void *)msg);
1164 return (status);
1165 }
1167 /*
1168 * ======== MessageQ_registerHeap ========
1169 */
1170 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1171 {
1172 Int status = MessageQ_S_SUCCESS;
1174 pthread_mutex_lock(&MessageQ_module->gate);
1176 if (heapId > (MessageQ_module->numHeaps - 1)) {
1177 status = MessageQ_E_INVALIDARG;
1178 }
1179 else if (MessageQ_module->heaps[heapId] != NULL) {
1180 status = MessageQ_E_ALREADYEXISTS;
1181 }
1182 else {
1183 MessageQ_module->heaps[heapId] = heap;
1184 }
1186 pthread_mutex_unlock(&MessageQ_module->gate);
1188 return (status);
1189 }
1191 /*
1192 * ======== MessageQ_unregisterHeap ========
1193 */
1194 Int MessageQ_unregisterHeap(UInt16 heapId)
1195 {
1196 Int status = MessageQ_S_SUCCESS;
1198 pthread_mutex_lock(&MessageQ_module->gate);
1200 if (heapId > (MessageQ_module->numHeaps - 1)) {
1201 status = MessageQ_E_INVALIDARG;
1202 }
1203 else if (MessageQ_module->heaps[heapId] == NULL) {
1204 status = MessageQ_E_NOTFOUND;
1205 }
1206 else {
1207 MessageQ_module->heaps[heapId] = NULL;
1208 }
1210 pthread_mutex_unlock(&MessageQ_module->gate);
1212 return (status);
1213 }
1215 /* Unblocks a MessageQ */
1216 Void MessageQ_unblock(MessageQ_Handle handle)
1217 {
1218 MessageQ_Object *obj = (MessageQ_Object *)handle;
1220 obj->unblocked = MessageQ_E_UNBLOCKED;
1221 sem_post(&obj->synchronizer);
1222 }
1224 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1225 Void MessageQ_shutdown(MessageQ_Handle handle)
1226 {
1227 MessageQ_Object *obj = (MessageQ_Object *)handle;
1229 obj->unblocked = MessageQ_E_SHUTDOWN;
1230 sem_post(&obj->synchronizer);
1231 }
1233 /* Embeds a source message queue into a message */
1234 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1235 {
1236 MessageQ_Object *obj = (MessageQ_Object *)handle;
1238 msg->replyId = (UInt16)(obj->queue);
1239 msg->replyProc = (UInt16)(obj->queue >> 16);
1240 }
1242 /* Returns the QueueId associated with the handle. */
1243 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1244 {
1245 MessageQ_Object *obj = (MessageQ_Object *) handle;
1246 UInt32 queueId;
1248 queueId = (obj->queue);
1250 return queueId;
1251 }
1253 /* Returns the local handle associated with queueId. */
1254 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1255 {
1256 MessageQ_Object *obj;
1257 MessageQ_QueueIndex queueIndex;
1258 UInt16 procId;
1260 procId = MessageQ_getProcId(queueId);
1261 if (procId != MultiProc_self()) {
1262 return NULL;
1263 }
1265 queueIndex = MessageQ_getQueueIndex(queueId);
1266 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1268 return (MessageQ_Handle)obj;
1269 }
1271 /* Sets the tracing of a message */
1272 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1273 {
1274 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
1275 }
1277 /*
1278 * Returns the amount of shared memory used by one transport instance.
1279 *
1280 * The MessageQ module itself does not use any shared memory but the
1281 * underlying transport may use some shared memory.
1282 */
1283 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1284 {
1285 SizeT memReq = 0u;
1287 /* Do nothing, as this is a copy transport. */
1289 return memReq;
1290 }
1292 /*
1293 * This is a helper function to initialize a message.
1294 */
1295 Void MessageQ_msgInit(MessageQ_Msg msg)
1296 {
1297 #if 0
1298 Int status = MessageQ_S_SUCCESS;
1299 LAD_ClientHandle handle;
1300 struct LAD_CommandObj cmd;
1301 union LAD_ResponseObj rsp;
1303 handle = LAD_findHandle();
1304 if (handle == LAD_MAXNUMCLIENTS) {
1305 PRINTVERBOSE1(
1306 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1307 getpid())
1309 return;
1310 }
1312 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1313 cmd.clientId = handle;
1315 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1316 PRINTVERBOSE1(
1317 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1318 return;
1319 }
1321 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1322 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1323 return;
1324 }
1325 status = rsp.msgInit.status;
1327 PRINTVERBOSE2(
1328 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1329 handle, status)
1331 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1332 #else
1333 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1334 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1335 msg->msgId = MessageQ_INVALIDMSGID;
1336 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1337 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1338 msg->srcProc = MultiProc_self();
1340 pthread_mutex_lock(&MessageQ_module->seqNumGate);
1341 msg->seqNum = MessageQ_module->seqNum++;
1342 pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1343 #endif
1344 }
1346 /*
1347 * ======== _MessageQ_grow ========
1348 * Increase module's queues array to accommodate queueIndex from LAD
1349 *
1350 * Note: this function takes the queue index value (i.e. without the
1351 * port offset).
1352 */
1353 Void _MessageQ_grow(UInt16 queueIndex)
1354 {
1355 MessageQ_Handle *queues;
1356 MessageQ_Handle *oldQueues;
1357 UInt oldSize;
1359 pthread_mutex_lock(&MessageQ_module->gate);
1361 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1363 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1364 memcpy(queues, MessageQ_module->queues, oldSize);
1366 oldQueues = MessageQ_module->queues;
1367 MessageQ_module->queues = queues;
1368 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1370 pthread_mutex_unlock(&MessageQ_module->gate);
1372 free(oldQueues);
1374 return;
1375 }
1377 /*
1378 * ======== MessageQ_bind ========
1379 * Bind all existing message queues to the given processor
1380 *
1381 * Note: This function is a hack to work around the driver.
1382 *
1383 * The Linux rpmsgproto driver requires a socket for each
1384 * message queue and remote processor tuple.
1385 *
1386 * socket --> (queue, processor)
1387 *
1388 * Therefore, each time a new remote processor is started, all
1389 * existing message queues need to create a socket for the new
1390 * processor.
1391 *
1392 * The driver should not have this requirement. One socket per
1393 * message queue should be sufficient to uniquely identify the
1394 * endpoint to the driver.
1395 */
1396 Void MessageQ_bind(UInt16 procId)
1397 {
1398 int q;
1399 int clusterId;
1400 int priority;
1401 MessageQ_Handle handle;
1402 MessageQ_QueueId queue;
1403 IMessageQTransport_Handle transport;
1405 clusterId = procId - MultiProc_getBaseIdOfCluster();
1406 pthread_mutex_lock(&MessageQ_module->gate);
1408 for (q = 0; q < MessageQ_module->numQueues; q++) {
1410 if ((handle = MessageQ_module->queues[q]) == NULL) {
1411 continue;
1412 }
1414 queue = ((MessageQ_Object *)handle)->queue;
1416 for (priority = 0; priority < 2; priority++) {
1417 transport = MessageQ_module->transports[clusterId][priority];
1418 if (transport != NULL) {
1419 IMessageQTransport_bind((Void *)transport, queue);
1420 }
1421 }
1422 }
1424 pthread_mutex_unlock(&MessageQ_module->gate);
1425 }
1427 /*
1428 * ======== MessageQ_unbind ========
1429 * Unbind all existing message queues from the given processor
1430 *
1431 * Hack: see MessageQ_bind.
1432 */
1433 Void MessageQ_unbind(UInt16 procId)
1434 {
1435 int q;
1436 int clusterId;
1437 int priority;
1438 MessageQ_Handle handle;
1439 MessageQ_QueueId queue;
1440 IMessageQTransport_Handle transport;
1442 pthread_mutex_lock(&MessageQ_module->gate);
1444 for (q = 0; q < MessageQ_module->numQueues; q++) {
1446 if ((handle = MessageQ_module->queues[q]) == NULL) {
1447 continue;
1448 }
1450 queue = ((MessageQ_Object *)handle)->queue;
1451 clusterId = procId - MultiProc_getBaseIdOfCluster();
1453 for (priority = 0; priority < 2; priority++) {
1454 transport = MessageQ_module->transports[clusterId][priority];
1455 if (transport != NULL) {
1456 IMessageQTransport_unbind((Void *)transport, queue);
1457 }
1458 }
1459 }
1461 pthread_mutex_unlock(&MessageQ_module->gate);
1462 }