1 /*
2 * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1 /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/IHeap.h>
54 #include <ti/ipc/interfaces/ITransport.h>
55 #include <ti/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/ipc/interfaces/INetworkTransport.h>
58 /* Socket Headers */
59 #include <sys/select.h>
60 #include <sys/time.h>
61 #include <sys/types.h>
62 #include <sys/param.h>
63 #include <sys/eventfd.h>
64 #include <sys/queue.h>
65 #include <errno.h>
66 #include <stdio.h>
67 #include <string.h>
68 #include <stdlib.h>
69 #include <unistd.h>
70 #include <assert.h>
71 #include <pthread.h>
72 #include <semaphore.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78 * Macros/Constants
79 * =============================================================================
80 */
82 /*!
83 * @brief Name of the reserved NameServer used for MessageQ.
84 */
85 #define MessageQ_NAMESERVER "MessageQ"
87 #define MessageQ_MAXTRANSPORTS 8
89 #define MessageQ_GROWSIZE 32
91 /* Trace flag settings: */
92 #define TRACESHIFT 12
93 #define TRACEMASK 0x1000
95 /* Define BENCHMARK to quiet key MessageQ APIs: */
96 //#define BENCHMARK
98 /* =============================================================================
99 * Structures & Enums
100 * =============================================================================
101 */
103 /* params structure evolution */
104 typedef struct {
105 Void *synchronizer;
106 } MessageQ_Params_Legacy;
108 typedef struct {
109 Int __version;
110 Void *synchronizer;
111 MessageQ_QueueIndex queueIndex;
112 } MessageQ_Params_Version2;
114 /* structure for MessageQ module state */
115 typedef struct MessageQ_ModuleObject {
116 MessageQ_Handle *queues;
117 Int numQueues;
118 Int refCount;
119 NameServer_Handle nameServer;
120 pthread_mutex_t gate;
121 int seqNum;
122 pthread_mutex_t seqNumGate;
123 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
124 ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
125 MessageQ_PutHookFxn putHookFxn;
126 Ptr *heaps;
127 Int numHeaps;
128 } MessageQ_ModuleObject;
130 typedef struct MessageQ_CIRCLEQ_ENTRY {
131 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
132 } MessageQ_CIRCLEQ_ENTRY;
134 /*!
135 * @brief Structure for the Handle for the MessageQ.
136 */
137 typedef struct MessageQ_Object_tag {
138 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
139 pthread_mutex_t msgListGate;
140 MessageQ_Params params;
141 MessageQ_QueueId queue;
142 int unblocked;
143 void *serverHandle;
144 sem_t synchronizer;
145 } MessageQ_Object;
147 /* traces in this file are controlled via _MessageQ_verbose */
148 Bool _MessageQ_verbose = FALSE;
149 #define verbose _MessageQ_verbose
151 /* =============================================================================
152 * Globals
153 * =============================================================================
154 */
155 static MessageQ_ModuleObject MessageQ_state =
156 {
157 .refCount = 0,
158 .nameServer = NULL,
159 #if defined(IPC_BUILDOS_ANDROID)
160 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
161 #else
162 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
163 #endif
164 .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
165 .putHookFxn = NULL,
166 .heaps = NULL,
167 .numHeaps = 0
168 };
170 /*!
171 * @var MessageQ_module
172 *
173 * @brief Pointer to the MessageQ module state.
174 */
175 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
177 Void _MessageQ_grow(UInt16 queueIndex);
179 /* =============================================================================
180 * APIS
181 * =============================================================================
182 */
184 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
185 UInt16 rprocId, UInt priority)
186 {
187 Int status = FALSE;
188 UInt16 clusterId;
190 if (handle == NULL) {
191 fprintf(stderr,
192 "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
193 );
195 return status;
196 }
198 /* map procId to clusterId */
199 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
201 if (clusterId >= MultiProc_MAXPROCESSORS) {
202 fprintf(stderr,
203 "MessageQ_registerTransport: invalid procId %d\n", rprocId);
205 return status;
206 }
208 if (MessageQ_module->transports[clusterId][priority] == NULL) {
209 MessageQ_module->transports[clusterId][priority] = handle;
211 status = TRUE;
212 }
214 return status;
215 }
217 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
218 {
219 if (inst == NULL) {
220 fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
222 return MessageQ_E_INVALIDARG;
223 }
225 if (tid >= MessageQ_MAXTRANSPORTS) {
226 fprintf(stderr,
227 "MessageQ_unregisterNetTransport: invalid transport id %d, "
228 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
230 return MessageQ_E_INVALIDARG;
231 }
233 if (MessageQ_module->transInst[tid] != NULL) {
234 fprintf(stderr,
235 "MessageQ_registerTransportId: transport id %d already "
236 "registered\n", tid);
238 return MessageQ_E_ALREADYEXISTS;
239 }
241 MessageQ_module->transInst[tid] = inst;
243 return MessageQ_S_SUCCESS;
244 }
246 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
247 {
248 UInt16 clusterId;
250 /* map procId to clusterId */
251 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
253 if (clusterId >= MultiProc_MAXPROCESSORS) {
254 fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
255 rprocId);
257 return;
258 }
260 MessageQ_module->transports[clusterId][priority] = NULL;
261 }
263 Void MessageQ_unregisterTransportId(UInt tid)
264 {
265 if (tid >= MessageQ_MAXTRANSPORTS) {
266 fprintf(stderr,
267 "MessageQ_unregisterTransportId: invalid transport id %d, "
268 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
270 return;
271 }
273 MessageQ_module->transInst[tid] = NULL;
274 }
276 /*
277 * Function to get default configuration for the MessageQ module.
278 */
279 Void MessageQ_getConfig(MessageQ_Config *cfg)
280 {
281 Int status;
282 LAD_ClientHandle handle;
283 struct LAD_CommandObj cmd;
284 union LAD_ResponseObj rsp;
286 assert (cfg != NULL);
288 handle = LAD_findHandle();
289 if (handle == LAD_MAXNUMCLIENTS) {
290 PRINTVERBOSE1(
291 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
292 getpid())
294 return;
295 }
297 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
298 cmd.clientId = handle;
300 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
301 PRINTVERBOSE1(
302 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
303 return;
304 }
306 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
307 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
308 status)
309 return;
310 }
311 status = rsp.messageQGetConfig.status;
313 PRINTVERBOSE2(
314 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
315 handle, status)
317 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
319 return;
320 }
322 /*
323 * Function to setup the MessageQ module.
324 */
325 Int MessageQ_setup(const MessageQ_Config *cfg)
326 {
327 Int status = MessageQ_S_SUCCESS;
328 LAD_ClientHandle handle;
329 struct LAD_CommandObj cmd;
330 union LAD_ResponseObj rsp;
331 Int pri;
332 Int i;
333 Int tid;
335 /* this entire function must be serialized */
336 pthread_mutex_lock(&MessageQ_module->gate);
338 /* ensure only first thread performs startup procedure */
339 if (++MessageQ_module->refCount > 1) {
340 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
341 MessageQ_module->refCount)
342 status = MessageQ_S_ALREADYSETUP;
343 goto exit;
344 }
346 handle = LAD_findHandle();
347 if (handle == LAD_MAXNUMCLIENTS) {
348 PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
349 "pid %d\n", getpid())
350 status = MessageQ_E_RESOURCE;
351 goto exit;
352 }
354 cmd.cmd = LAD_MESSAGEQ_SETUP;
355 cmd.clientId = handle;
356 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
358 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
359 PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
360 "status=%d\n", status)
361 status = MessageQ_E_FAIL;
362 goto exit;
363 }
365 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
366 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
367 status = MessageQ_E_FAIL;
368 goto exit;
369 }
370 status = rsp.setup.status;
372 PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
373 handle, status)
375 MessageQ_module->seqNum = 0;
376 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
377 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
378 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
379 sizeof(MessageQ_Handle));
380 MessageQ_module->numHeaps = cfg->numHeaps;
381 MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
383 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
384 for (pri = 0; pri < 2; pri++) {
385 MessageQ_module->transports[i][pri] = NULL;
386 }
387 }
389 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
390 MessageQ_module->transInst[tid] = NULL;
391 }
393 exit:
394 /* if error, must decrement reference count */
395 if (status < 0) {
396 MessageQ_module->refCount--;
397 }
399 pthread_mutex_unlock(&MessageQ_module->gate);
401 return (status);
402 }
404 /*
405 * MessageQ_destroy - destroy the MessageQ module.
406 */
407 Int MessageQ_destroy(void)
408 {
409 Int status = MessageQ_S_SUCCESS;
410 LAD_ClientHandle handle;
411 struct LAD_CommandObj cmd;
412 union LAD_ResponseObj rsp;
413 int i;
415 /* this entire function must be serialized */
416 pthread_mutex_lock(&MessageQ_module->gate);
418 /* ensure only last thread does the work */
419 if (--MessageQ_module->refCount > 0) {
420 goto exit;
421 }
423 /* ensure all registered heaps have been unregistered */
424 for (i = 0; i < MessageQ_module->numHeaps; i++) {
425 if (MessageQ_module->heaps[i] != NULL) {
426 PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
427 }
428 }
429 free(MessageQ_module->heaps);
430 MessageQ_module->heaps = NULL;
432 handle = LAD_findHandle();
433 if (handle == LAD_MAXNUMCLIENTS) {
434 PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
435 "for pid %d\n", getpid())
436 status = MessageQ_E_RESOURCE;
437 goto exit;
438 }
440 cmd.cmd = LAD_MESSAGEQ_DESTROY;
441 cmd.clientId = handle;
443 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
444 PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
445 "status=%d\n", status)
446 status = MessageQ_E_FAIL;
447 goto exit;
448 }
450 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
451 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
452 status = MessageQ_E_FAIL;
453 goto exit;
454 }
455 status = rsp.status;
457 PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
458 "status=%d\n", handle, status)
460 exit:
461 pthread_mutex_unlock(&MessageQ_module->gate);
463 return (status);
464 }
466 /*
467 * ======== MessageQ_Params_init ========
468 * Legacy implementation.
469 */
470 Void MessageQ_Params_init(MessageQ_Params *params)
471 {
472 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
473 }
475 /*
476 * ======== MessageQ_Params_init__S ========
477 * New implementation which is version aware.
478 */
479 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
480 {
481 MessageQ_Params_Version2 *params2;
483 switch (version) {
485 case MessageQ_Params_VERSION_2:
486 params2 = (MessageQ_Params_Version2 *)params;
487 params2->__version = MessageQ_Params_VERSION_2;
488 params2->synchronizer = NULL;
489 params2->queueIndex = MessageQ_ANY;
490 break;
492 default:
493 assert(FALSE);
494 break;
495 }
496 }
498 /*
499 * ======== MessageQ_create ========
500 */
501 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
502 {
503 Int status;
504 MessageQ_Object *obj = NULL;
505 IMessageQTransport_Handle transport;
506 INetworkTransport_Handle netTrans;
507 ITransport_Handle baseTrans;
508 UInt16 queueIndex;
509 UInt16 clusterId;
510 Int tid;
511 Int priority;
512 LAD_ClientHandle handle;
513 struct LAD_CommandObj cmd;
514 union LAD_ResponseObj rsp;
515 MessageQ_Params ps;
517 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
519 /* copy the given params into the current params structure */
520 if (pp != NULL) {
522 /* snoop the params pointer to see if it's a legacy structure */
523 if ((pp->__version == 0) || (pp->__version > 100)) {
524 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
525 }
527 /* not legacy structure, use params version field */
528 else if (pp->__version == MessageQ_Params_VERSION_2) {
529 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
530 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
531 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
532 }
533 else {
534 assert(FALSE);
535 }
536 }
538 handle = LAD_findHandle();
539 if (handle == LAD_MAXNUMCLIENTS) {
540 PRINTVERBOSE1(
541 "MessageQ_create: can't find connection to daemon for pid %d\n",
542 getpid())
544 return NULL;
545 }
547 cmd.cmd = LAD_MESSAGEQ_CREATE;
548 cmd.clientId = handle;
550 if (name == NULL) {
551 cmd.args.messageQCreate.name[0] = '\0';
552 }
553 else {
554 strncpy(cmd.args.messageQCreate.name, name,
555 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
556 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
557 }
559 memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
561 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
562 PRINTVERBOSE1(
563 "MessageQ_create: sending LAD command failed, status=%d\n", status)
564 return NULL;
565 }
567 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
568 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
569 return NULL;
570 }
571 status = rsp.messageQCreate.status;
573 PRINTVERBOSE2(
574 "MessageQ_create: got LAD response for client %d, status=%d\n",
575 handle, status)
577 if (status == -1) {
578 PRINTVERBOSE1(
579 "MessageQ_create: MessageQ server operation failed, status=%d\n",
580 status)
581 return NULL;
582 }
584 /* Create the generic obj */
585 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
587 /* Populate the params member */
588 memcpy(&obj->params, &ps, sizeof(ps));
591 obj->queue = rsp.messageQCreate.queueId;
592 obj->serverHandle = rsp.messageQCreate.serverHandle;
593 pthread_mutex_init(&obj->msgListGate, NULL);
594 CIRCLEQ_INIT(&obj->msgList);
595 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
596 PRINTVERBOSE1(
597 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
599 MessageQ_delete((MessageQ_Handle *)&obj);
601 return NULL;
602 }
604 /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
605 queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
607 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
608 "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
610 pthread_mutex_lock(&MessageQ_module->gate);
612 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
613 for (priority = 0; priority < 2; priority++) {
614 transport = MessageQ_module->transports[clusterId][priority];
615 if (transport) {
616 /* need to check return and do something if error */
617 IMessageQTransport_bind((Void *)transport, obj->queue);
618 }
619 }
620 }
622 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
623 baseTrans = MessageQ_module->transInst[tid];
625 if (baseTrans != NULL) {
626 switch (ITransport_itype(baseTrans)) {
627 case INetworkTransport_TypeId:
628 netTrans = INetworkTransport_downCast(baseTrans);
629 INetworkTransport_bind((void *)netTrans, obj->queue);
630 break;
632 default:
633 /* error */
634 fprintf(stderr,
635 "MessageQ_create: Error: transport id %d is an "
636 "unsupported transport type.\n", tid);
637 break;
638 }
639 }
640 }
642 /* LAD's MessageQ module can grow, we need to grow as well */
643 if (queueIndex >= MessageQ_module->numQueues) {
644 _MessageQ_grow(queueIndex);
645 }
647 /* No need to "allocate" slot since the queueIndex returned by
648 * LAD is guaranteed to be unique.
649 */
650 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
652 pthread_mutex_unlock(&MessageQ_module->gate);
654 return (MessageQ_Handle)obj;
655 }
657 /*
658 * ======== MessageQ_delete ========
659 */
660 Int MessageQ_delete(MessageQ_Handle *handlePtr)
661 {
662 MessageQ_Object *obj;
663 IMessageQTransport_Handle transport;
664 INetworkTransport_Handle netTrans;
665 ITransport_Handle baseTrans;
666 Int status = MessageQ_S_SUCCESS;
667 UInt16 queueIndex;
668 UInt16 clusterId;
669 Int tid;
670 Int priority;
671 LAD_ClientHandle handle;
672 struct LAD_CommandObj cmd;
673 union LAD_ResponseObj rsp;
675 handle = LAD_findHandle();
676 if (handle == LAD_MAXNUMCLIENTS) {
677 PRINTVERBOSE1(
678 "MessageQ_delete: can't find connection to daemon for pid %d\n",
679 getpid())
681 return MessageQ_E_FAIL;
682 }
684 obj = (MessageQ_Object *)(*handlePtr);
686 cmd.cmd = LAD_MESSAGEQ_DELETE;
687 cmd.clientId = handle;
688 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
690 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
691 PRINTVERBOSE1(
692 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
693 return MessageQ_E_FAIL;
694 }
696 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
697 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
698 return MessageQ_E_FAIL;
699 }
700 status = rsp.messageQDelete.status;
702 PRINTVERBOSE2(
703 "MessageQ_delete: got LAD response for client %d, status=%d\n",
704 handle, status)
706 pthread_mutex_lock(&MessageQ_module->gate);
708 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
709 for (priority = 0; priority < 2; priority++) {
710 transport = MessageQ_module->transports[clusterId][priority];
711 if (transport) {
712 IMessageQTransport_unbind((Void *)transport, obj->queue);
713 }
714 }
715 }
717 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
718 baseTrans = MessageQ_module->transInst[tid];
720 if (baseTrans != NULL) {
721 switch (ITransport_itype(baseTrans)) {
722 case INetworkTransport_TypeId:
723 netTrans = INetworkTransport_downCast(baseTrans);
724 INetworkTransport_unbind((void *)netTrans, obj->queue);
725 break;
727 default:
728 /* error */
729 fprintf(stderr,
730 "MessageQ_create: Error: transport id %d is an "
731 "unsupported transport type.\n", tid);
732 break;
733 }
734 }
735 }
737 /* extract the queue index from the queueId */
738 queueIndex = MessageQ_getQueueIndex(obj->queue);
739 MessageQ_module->queues[queueIndex] = NULL;
741 pthread_mutex_unlock(&MessageQ_module->gate);
743 free(obj);
744 *handlePtr = NULL;
746 return status;
747 }
749 /*
750 * ======== MessageQ_open ========
751 * Acquire a queueId for use in sending messages to the queue
752 */
753 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
754 {
755 Int status = MessageQ_S_SUCCESS;
757 status = NameServer_getUInt32(MessageQ_module->nameServer,
758 name, queueId, NULL);
760 if (status == NameServer_E_NOTFOUND) {
761 /* Set return queue ID to invalid */
762 *queueId = MessageQ_INVALIDMESSAGEQ;
763 status = MessageQ_E_NOTFOUND;
764 }
765 else if (status >= 0) {
766 /* Override with a MessageQ status code */
767 status = MessageQ_S_SUCCESS;
768 }
769 else {
770 /* Set return queue ID to invalid */
771 *queueId = MessageQ_INVALIDMESSAGEQ;
773 /* Override with a MessageQ status code */
774 if (status == NameServer_E_TIMEOUT) {
775 status = MessageQ_E_TIMEOUT;
776 }
777 else {
778 status = MessageQ_E_FAIL;
779 }
780 }
782 return status;
783 }
785 /*
786 * ======== MessageQ_openQueueId ========
787 */
788 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
789 {
790 MessageQ_QueueIndex queuePort;
791 MessageQ_QueueId queueId;
793 /* queue port is embedded in the queueId */
794 queuePort = queueIndex + MessageQ_PORTOFFSET;
795 queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
797 return (queueId);
798 }
800 /*
801 * ======== MessageQ_close ========
802 * Closes previously opened instance of MessageQ
803 */
804 Int MessageQ_close(MessageQ_QueueId *queueId)
805 {
806 Int32 status = MessageQ_S_SUCCESS;
808 /* Nothing more to be done for closing the MessageQ. */
809 *queueId = MessageQ_INVALIDMESSAGEQ;
811 return status;
812 }
814 /*
815 * ======== MessageQ_put ========
816 * Deliver the given message, either locally or to the transport
817 *
818 * If the destination is a local queue, deliver the message. Otherwise,
819 * pass the message to a transport for delivery. The transport handles
820 * the sending of the message using the appropriate interface (socket,
821 * device ioctl, etc.).
822 */
823 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
824 {
825 Int status = MessageQ_S_SUCCESS;
826 MessageQ_Object *obj;
827 UInt16 dstProcId;
828 UInt16 queueIndex;
829 UInt16 queuePort;
830 ITransport_Handle baseTrans;
831 IMessageQTransport_Handle msgTrans;
832 INetworkTransport_Handle netTrans;
833 Int priority;
834 UInt tid;
835 UInt16 clusterId;
836 Bool delivered;
838 /* extract destination address from the given queueId */
839 dstProcId = (UInt16)(queueId >> 16);
840 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
842 /* write the destination address into the message header */
843 msg->dstId = queuePort;
844 msg->dstProc= dstProcId;
846 /* invoke the hook function after addressing the message */
847 if (MessageQ_module->putHookFxn != NULL) {
848 MessageQ_module->putHookFxn(queueId, msg);
849 }
851 /* For an outbound message: If message destination is on this
852 * processor, then check if the destination queue is in this
853 * process (thread-to-thread messaging).
854 *
855 * For an inbound message: Check if destination queue is in this
856 * process (process-to-process messaging).
857 */
858 if (dstProcId == MultiProc_self()) {
859 queueIndex = queuePort - MessageQ_PORTOFFSET;
861 if (queueIndex < MessageQ_module->numQueues) {
862 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
864 if (obj != NULL) {
865 /* deliver message to queue */
866 pthread_mutex_lock(&obj->msgListGate);
867 CIRCLEQ_INSERT_TAIL(&obj->msgList,
868 (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
869 pthread_mutex_unlock(&obj->msgListGate);
870 sem_post(&obj->synchronizer);
871 goto done;
872 }
873 }
874 }
876 /* Getting here implies the message is outbound. Must give it to
877 * either the primary or secondary transport for delivery. Start
878 * by extracting the transport ID from the message header.
879 */
880 tid = MessageQ_getTransportId(msg);
882 if (tid >= MessageQ_MAXTRANSPORTS) {
883 fprintf(stderr,
884 "MessageQ_put: Error: transport id %d too big, must be < %d\n",
885 tid, MessageQ_MAXTRANSPORTS);
886 status = MessageQ_E_FAIL;
887 goto done;
888 }
890 /* if transportId is set, use secondary transport for message delivery */
891 if (tid != 0) {
892 baseTrans = MessageQ_module->transInst[tid];
894 if (baseTrans == NULL) {
895 fprintf(stderr, "MessageQ_put: Error: transport is null\n");
896 status = MessageQ_E_FAIL;
897 goto done;
898 }
900 /* downcast instance pointer to transport interface */
901 switch (ITransport_itype(baseTrans)) {
902 case INetworkTransport_TypeId:
903 netTrans = INetworkTransport_downCast(baseTrans);
904 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
905 status = (delivered ? MessageQ_S_SUCCESS :
906 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
907 MessageQ_E_FAIL));
908 break;
910 default:
911 /* error */
912 fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
913 "unsupported transport type\n", tid);
914 status = MessageQ_E_FAIL;
915 break;
916 }
917 }
918 else {
919 /* use primary transport for delivery */
920 priority = MessageQ_getMsgPri(msg);
921 clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
923 /* primary transport can only be used for intra-cluster delivery */
924 if (clusterId > MultiProc_getNumProcsInCluster()) {
925 fprintf(stderr,
926 "MessageQ_put: Error: destination procId=%d is not "
927 "in cluster. Must specify a transportId.\n", dstProcId);
928 status = MessageQ_E_FAIL;
929 goto done;
930 }
932 msgTrans = MessageQ_module->transports[clusterId][priority];
933 delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
934 status = (delivered ? MessageQ_S_SUCCESS :
935 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
936 }
938 done:
939 return (status);
940 }
942 /*
943 * MessageQ_get - gets a message for a message queue and blocks if
944 * the queue is empty.
945 *
946 * If a message is present, it returns it. Otherwise it blocks
947 * waiting for a message to arrive.
948 * When a message is returned, it is owned by the caller.
949 */
950 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
951 {
952 MessageQ_Object * obj = (MessageQ_Object *)handle;
953 Int status = MessageQ_S_SUCCESS;
954 struct timespec ts;
955 struct timeval tv;
957 #if 0
958 /*
959 * Optimization here to get a message without going in to the sem
960 * operation, but the sem count will not be maintained properly.
961 */
962 pthread_mutex_lock(&obj->msgListGate);
964 if (obj->msgList.cqh_first != &obj->msgList) {
965 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
966 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
968 pthread_mutex_unlock(&obj->msgListGate);
969 }
970 else {
971 pthread_mutex_unlock(&obj->msgListGate);
972 }
973 #endif
975 if (timeout == MessageQ_FOREVER) {
976 sem_wait(&obj->synchronizer);
977 }
978 else {
979 /* add timeout (microseconds) to current time of day */
980 gettimeofday(&tv, NULL);
981 tv.tv_sec += timeout / 1000000;
982 tv.tv_usec += timeout % 1000000;
984 if (tv.tv_usec >= 1000000) {
985 tv.tv_sec++;
986 tv.tv_usec -= 1000000;
987 }
989 /* set absolute timeout value */
990 ts.tv_sec = tv.tv_sec;
991 ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
993 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
994 if (errno == ETIMEDOUT) {
995 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
996 return (MessageQ_E_TIMEOUT);
997 }
998 else {
999 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
1000 return (MessageQ_E_FAIL);
1001 }
1002 }
1003 }
1005 if (obj->unblocked) {
1006 return obj->unblocked;
1007 }
1009 pthread_mutex_lock(&obj->msgListGate);
1011 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
1012 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
1014 pthread_mutex_unlock(&obj->msgListGate);
1016 return status;
1017 }
1019 /*
1020 * Return a count of the number of messages in the queue
1021 *
1022 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
1023 */
1024 Int MessageQ_count(MessageQ_Handle handle)
1025 {
1026 Int count = -1;
1027 #if 0
1028 MessageQ_Object * obj = (MessageQ_Object *) handle;
1029 socklen_t optlen;
1031 /*
1032 * TBD: Need to find a way to implement (if anyone uses it!), and
1033 * push down into transport..
1034 */
1036 /*
1037 * 2nd arg to getsockopt should be transport independent, but using
1038 * SSKPROTO_SHMFIFO for now:
1039 */
1040 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1041 &count, &optlen);
1042 #endif
1044 return count;
1045 }
1047 /*
1048 * Initializes a message not obtained from MessageQ_alloc.
1049 */
1050 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1051 {
1052 /* Fill in the fields of the message */
1053 MessageQ_msgInit(msg);
1054 msg->heapId = MessageQ_STATICMSG;
1055 msg->msgSize = size;
1056 }
1058 /*
1059 * Allocate a message and initialize the needed fields (note some
1060 * of the fields in the header are set via other APIs or in the
1061 * MessageQ_put function,
1062 */
1063 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1064 {
1065 IHeap_Handle heap;
1066 MessageQ_Msg msg;
1068 if (heapId > (MessageQ_module->numHeaps - 1)) {
1069 PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
1070 return (NULL);
1071 }
1072 else if (MessageQ_module->heaps[heapId] == NULL) {
1073 PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
1074 heapId);
1075 return (NULL);
1076 }
1077 else {
1078 heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
1079 }
1081 msg = IHeap_alloc(heap, size);
1083 if (msg == NULL) {
1084 return (NULL);
1085 }
1087 MessageQ_msgInit(msg);
1088 msg->msgSize = size;
1089 msg->heapId = heapId;
1091 return (msg);
1092 }
1094 /*
1095 * Frees the message back to the heap that was used to allocate it.
1096 */
1097 Int MessageQ_free(MessageQ_Msg msg)
1098 {
1099 UInt32 status = MessageQ_S_SUCCESS;
1100 IHeap_Handle heap;
1102 /* ensure this was not allocated by user */
1103 if (msg->heapId == MessageQ_STATICMSG) {
1104 status = MessageQ_E_CANNOTFREESTATICMSG;
1105 }
1106 else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
1107 status = MessageQ_E_INVALIDARG;
1108 }
1109 else if (MessageQ_module->heaps[msg->heapId] == NULL) {
1110 status = MessageQ_E_NOTFOUND;
1111 }
1112 else {
1113 heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
1114 }
1116 IHeap_free(heap, (void *)msg);
1118 return (status);
1119 }
1121 /*
1122 * ======== MessageQ_registerHeap ========
1123 */
1124 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1125 {
1126 Int status = MessageQ_S_SUCCESS;
1128 pthread_mutex_lock(&MessageQ_module->gate);
1130 if (heapId > (MessageQ_module->numHeaps - 1)) {
1131 status = MessageQ_E_INVALIDARG;
1132 }
1133 else if (MessageQ_module->heaps[heapId] != NULL) {
1134 status = MessageQ_E_ALREADYEXISTS;
1135 }
1136 else {
1137 MessageQ_module->heaps[heapId] = heap;
1138 }
1140 pthread_mutex_unlock(&MessageQ_module->gate);
1142 return (status);
1143 }
1145 /*
1146 * ======== MessageQ_unregisterHeap ========
1147 */
1148 Int MessageQ_unregisterHeap(UInt16 heapId)
1149 {
1150 Int status = MessageQ_S_SUCCESS;
1152 pthread_mutex_lock(&MessageQ_module->gate);
1154 if (heapId > (MessageQ_module->numHeaps - 1)) {
1155 status = MessageQ_E_INVALIDARG;
1156 }
1157 else if (MessageQ_module->heaps[heapId] == NULL) {
1158 status = MessageQ_E_NOTFOUND;
1159 }
1160 else {
1161 MessageQ_module->heaps[heapId] = NULL;
1162 }
1164 pthread_mutex_unlock(&MessageQ_module->gate);
1166 return (status);
1167 }
1169 /* Unblocks a MessageQ */
1170 Void MessageQ_unblock(MessageQ_Handle handle)
1171 {
1172 MessageQ_Object *obj = (MessageQ_Object *)handle;
1174 obj->unblocked = MessageQ_E_UNBLOCKED;
1175 sem_post(&obj->synchronizer);
1176 }
1178 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1179 Void MessageQ_shutdown(MessageQ_Handle handle)
1180 {
1181 MessageQ_Object *obj = (MessageQ_Object *)handle;
1183 obj->unblocked = MessageQ_E_SHUTDOWN;
1184 sem_post(&obj->synchronizer);
1185 }
1187 /* Embeds a source message queue into a message */
1188 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1189 {
1190 MessageQ_Object *obj = (MessageQ_Object *)handle;
1192 msg->replyId = (UInt16)(obj->queue);
1193 msg->replyProc = (UInt16)(obj->queue >> 16);
1194 }
1196 /* Returns the QueueId associated with the handle. */
1197 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1198 {
1199 MessageQ_Object *obj = (MessageQ_Object *) handle;
1200 UInt32 queueId;
1202 queueId = (obj->queue);
1204 return queueId;
1205 }
1207 /* Returns the local handle associated with queueId. */
1208 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1209 {
1210 MessageQ_Object *obj;
1211 MessageQ_QueueIndex queueIndex;
1212 UInt16 procId;
1214 procId = MessageQ_getProcId(queueId);
1215 if (procId != MultiProc_self()) {
1216 return NULL;
1217 }
1219 queueIndex = MessageQ_getQueueIndex(queueId);
1220 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1222 return (MessageQ_Handle)obj;
1223 }
1225 /* Sets the tracing of a message */
1226 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1227 {
1228 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
1229 }
1231 /*
1232 * Returns the amount of shared memory used by one transport instance.
1233 *
1234 * The MessageQ module itself does not use any shared memory but the
1235 * underlying transport may use some shared memory.
1236 */
1237 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1238 {
1239 SizeT memReq = 0u;
1241 /* Do nothing, as this is a copy transport. */
1243 return memReq;
1244 }
1246 /*
1247 * This is a helper function to initialize a message.
1248 */
1249 Void MessageQ_msgInit(MessageQ_Msg msg)
1250 {
1251 #if 0
1252 Int status = MessageQ_S_SUCCESS;
1253 LAD_ClientHandle handle;
1254 struct LAD_CommandObj cmd;
1255 union LAD_ResponseObj rsp;
1257 handle = LAD_findHandle();
1258 if (handle == LAD_MAXNUMCLIENTS) {
1259 PRINTVERBOSE1(
1260 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1261 getpid())
1263 return;
1264 }
1266 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1267 cmd.clientId = handle;
1269 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1270 PRINTVERBOSE1(
1271 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1272 return;
1273 }
1275 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1276 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1277 return;
1278 }
1279 status = rsp.msgInit.status;
1281 PRINTVERBOSE2(
1282 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1283 handle, status)
1285 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1286 #else
1287 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1288 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1289 msg->msgId = MessageQ_INVALIDMSGID;
1290 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1291 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1292 msg->srcProc = MultiProc_self();
1294 pthread_mutex_lock(&MessageQ_module->seqNumGate);
1295 msg->seqNum = MessageQ_module->seqNum++;
1296 pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1297 #endif
1298 }
1300 /*
1301 * ======== _MessageQ_grow ========
1302 * Increase module's queues array to accommodate queueIndex from LAD
1303 *
1304 * Note: this function takes the queue index value (i.e. without the
1305 * port offset).
1306 */
1307 Void _MessageQ_grow(UInt16 queueIndex)
1308 {
1309 MessageQ_Handle *queues;
1310 MessageQ_Handle *oldQueues;
1311 UInt oldSize;
1313 pthread_mutex_lock(&MessageQ_module->gate);
1315 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1317 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1318 memcpy(queues, MessageQ_module->queues, oldSize);
1320 oldQueues = MessageQ_module->queues;
1321 MessageQ_module->queues = queues;
1322 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1324 pthread_mutex_unlock(&MessageQ_module->gate);
1326 free(oldQueues);
1328 return;
1329 }
1331 /*
1332 * ======== MessageQ_bind ========
1333 * Bind all existing message queues to the given processor
1334 *
1335 * Note: This function is a hack to work around the driver.
1336 *
1337 * The Linux rpmsgproto driver requires a socket for each
1338 * message queue and remote processor tuple.
1339 *
1340 * socket --> (queue, processor)
1341 *
1342 * Therefore, each time a new remote processor is started, all
1343 * existing message queues need to create a socket for the new
1344 * processor.
1345 *
1346 * The driver should not have this requirement. One socket per
1347 * message queue should be sufficient to uniquely identify the
1348 * endpoint to the driver.
1349 */
1350 Void MessageQ_bind(UInt16 procId)
1351 {
1352 int q;
1353 int clusterId;
1354 int priority;
1355 MessageQ_Handle handle;
1356 MessageQ_QueueId queue;
1357 IMessageQTransport_Handle transport;
1359 clusterId = procId - MultiProc_getBaseIdOfCluster();
1360 pthread_mutex_lock(&MessageQ_module->gate);
1362 for (q = 0; q < MessageQ_module->numQueues; q++) {
1364 if ((handle = MessageQ_module->queues[q]) == NULL) {
1365 continue;
1366 }
1368 queue = ((MessageQ_Object *)handle)->queue;
1370 for (priority = 0; priority < 2; priority++) {
1371 transport = MessageQ_module->transports[clusterId][priority];
1372 if (transport != NULL) {
1373 IMessageQTransport_bind((Void *)transport, queue);
1374 }
1375 }
1376 }
1378 pthread_mutex_unlock(&MessageQ_module->gate);
1379 }
1381 /*
1382 * ======== MessageQ_unbind ========
1383 * Unbind all existing message queues from the given processor
1384 *
1385 * Hack: see MessageQ_bind.
1386 */
1387 Void MessageQ_unbind(UInt16 procId)
1388 {
1389 int q;
1390 int clusterId;
1391 int priority;
1392 MessageQ_Handle handle;
1393 MessageQ_QueueId queue;
1394 IMessageQTransport_Handle transport;
1396 pthread_mutex_lock(&MessageQ_module->gate);
1398 for (q = 0; q < MessageQ_module->numQueues; q++) {
1400 if ((handle = MessageQ_module->queues[q]) == NULL) {
1401 continue;
1402 }
1404 queue = ((MessageQ_Object *)handle)->queue;
1405 clusterId = procId - MultiProc_getBaseIdOfCluster();
1407 for (priority = 0; priority < 2; priority++) {
1408 transport = MessageQ_module->transports[clusterId][priority];
1409 if (transport != NULL) {
1410 IMessageQTransport_unbind((Void *)transport, queue);
1411 }
1412 }
1413 }
1415 pthread_mutex_unlock(&MessageQ_module->gate);
1416 }