1 /*
2 * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1 /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77 * Macros/Constants
78 * =============================================================================
79 */
81 /*!
82 * @brief Name of the reserved NameServer used for MessageQ.
83 */
84 #define MessageQ_NAMESERVER "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT 12
92 #define TRACEMASK 0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98 * Structures & Enums
99 * =============================================================================
100 */
102 /* params structure evolution */
103 typedef struct {
104 Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108 Int __version;
109 Void *synchronizer;
110 MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115 MessageQ_Handle *queues;
116 Int numQueues;
117 Int refCount;
118 NameServer_Handle nameServer;
119 pthread_mutex_t gate;
120 int seqNum;
121 pthread_mutex_t seqNumGate;
122 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
123 ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
124 MessageQ_PutHookFxn putHookFxn;
125 } MessageQ_ModuleObject;
127 typedef struct MessageQ_CIRCLEQ_ENTRY {
128 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
129 } MessageQ_CIRCLEQ_ENTRY;
131 /*!
132 * @brief Structure for the Handle for the MessageQ.
133 */
134 typedef struct MessageQ_Object_tag {
135 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
136 pthread_mutex_t msgListGate;
137 MessageQ_Params params;
138 MessageQ_QueueId queue;
139 int unblocked;
140 void *serverHandle;
141 sem_t synchronizer;
142 } MessageQ_Object;
144 /* traces in this file are controlled via _MessageQ_verbose */
145 Bool _MessageQ_verbose = FALSE;
146 #define verbose _MessageQ_verbose
148 /* =============================================================================
149 * Globals
150 * =============================================================================
151 */
152 static MessageQ_ModuleObject MessageQ_state =
153 {
154 .refCount = 0,
155 .nameServer = NULL,
156 #if defined(IPC_BUILDOS_ANDROID)
157 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
158 #else
159 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
160 #endif
161 .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
162 .putHookFxn = NULL
163 };
165 /*!
166 * @var MessageQ_module
167 *
168 * @brief Pointer to the MessageQ module state.
169 */
170 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
172 Void _MessageQ_grow(UInt16 queueIndex);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
179 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
180 UInt16 rprocId, UInt priority)
181 {
182 Int status = FALSE;
183 UInt16 clusterId;
185 if (handle == NULL) {
186 printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
187 );
189 return status;
190 }
192 /* map procId to clusterId */
193 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
195 if (clusterId >= MultiProc_MAXPROCESSORS) {
196 printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
198 return status;
199 }
201 if (MessageQ_module->transports[clusterId][priority] == NULL) {
202 MessageQ_module->transports[clusterId][priority] = handle;
204 status = TRUE;
205 }
207 return status;
208 }
210 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
211 {
212 if (inst == NULL) {
213 printf("MessageQ_registerTransportId: invalid NULL handle\n");
215 return MessageQ_E_INVALIDARG;
216 }
218 if (tid >= MessageQ_MAXTRANSPORTS) {
219 printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
220 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
222 return MessageQ_E_INVALIDARG;
223 }
225 if (MessageQ_module->transInst[tid] != NULL) {
226 printf("MessageQ_registerTransportId: transport id %d already "
227 "registered\n", tid);
229 return MessageQ_E_ALREADYEXISTS;
230 }
232 MessageQ_module->transInst[tid] = inst;
234 return MessageQ_S_SUCCESS;
235 }
237 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
238 {
239 UInt16 clusterId;
241 /* map procId to clusterId */
242 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
244 if (clusterId >= MultiProc_MAXPROCESSORS) {
245 printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
247 return;
248 }
250 MessageQ_module->transports[clusterId][priority] = NULL;
251 }
253 Void MessageQ_unregisterTransportId(UInt tid)
254 {
255 if (tid >= MessageQ_MAXTRANSPORTS) {
256 printf("MessageQ_unregisterTransportId: invalid transport id %d, "
257 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
259 return;
260 }
262 MessageQ_module->transInst[tid] = NULL;
263 }
265 /*
266 * Function to get default configuration for the MessageQ module.
267 */
268 Void MessageQ_getConfig(MessageQ_Config *cfg)
269 {
270 Int status;
271 LAD_ClientHandle handle;
272 struct LAD_CommandObj cmd;
273 union LAD_ResponseObj rsp;
275 assert (cfg != NULL);
277 handle = LAD_findHandle();
278 if (handle == LAD_MAXNUMCLIENTS) {
279 PRINTVERBOSE1(
280 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
281 getpid())
283 return;
284 }
286 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
287 cmd.clientId = handle;
289 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
290 PRINTVERBOSE1(
291 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
292 return;
293 }
295 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
296 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
297 status)
298 return;
299 }
300 status = rsp.messageQGetConfig.status;
302 PRINTVERBOSE2(
303 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
304 handle, status)
306 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
308 return;
309 }
311 /*
312 * Function to setup the MessageQ module.
313 */
314 Int MessageQ_setup(const MessageQ_Config *cfg)
315 {
316 Int status = MessageQ_S_SUCCESS;
317 LAD_ClientHandle handle;
318 struct LAD_CommandObj cmd;
319 union LAD_ResponseObj rsp;
320 Int pri;
321 Int i;
322 Int tid;
324 /* this entire function must be serialized */
325 pthread_mutex_lock(&MessageQ_module->gate);
327 /* ensure only first thread performs startup procedure */
328 if (++MessageQ_module->refCount > 1) {
329 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
330 MessageQ_module->refCount)
331 status = MessageQ_S_ALREADYSETUP;
332 goto exit;
333 }
335 handle = LAD_findHandle();
336 if (handle == LAD_MAXNUMCLIENTS) {
337 PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
338 "pid %d\n", getpid())
339 status = MessageQ_E_RESOURCE;
340 goto exit;
341 }
343 cmd.cmd = LAD_MESSAGEQ_SETUP;
344 cmd.clientId = handle;
345 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
347 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
348 PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
349 "status=%d\n", status)
350 status = MessageQ_E_FAIL;
351 goto exit;
352 }
354 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
355 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
356 status = MessageQ_E_FAIL;
357 goto exit;
358 }
359 status = rsp.setup.status;
361 PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
362 handle, status)
364 MessageQ_module->seqNum = 0;
365 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
366 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
367 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
368 sizeof(MessageQ_Handle));
370 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
371 for (pri = 0; pri < 2; pri++) {
372 MessageQ_module->transports[i][pri] = NULL;
373 }
374 }
376 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
377 MessageQ_module->transInst[tid] = NULL;
378 }
380 exit:
381 /* if error, must decrement reference count */
382 if (status < 0) {
383 MessageQ_module->refCount--;
384 }
386 pthread_mutex_unlock(&MessageQ_module->gate);
388 return (status);
389 }
391 /*
392 * MessageQ_destroy - destroy the MessageQ module.
393 */
394 Int MessageQ_destroy(void)
395 {
396 Int status = MessageQ_S_SUCCESS;
397 LAD_ClientHandle handle;
398 struct LAD_CommandObj cmd;
399 union LAD_ResponseObj rsp;
401 /* this entire function must be serialized */
402 pthread_mutex_lock(&MessageQ_module->gate);
404 /* ensure only last thread does the work */
405 if (--MessageQ_module->refCount > 0) {
406 goto exit;
407 }
409 handle = LAD_findHandle();
410 if (handle == LAD_MAXNUMCLIENTS) {
411 PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
412 "for pid %d\n", getpid())
413 status = MessageQ_E_RESOURCE;
414 goto exit;
415 }
417 cmd.cmd = LAD_MESSAGEQ_DESTROY;
418 cmd.clientId = handle;
420 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
421 PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
422 "status=%d\n", status)
423 status = MessageQ_E_FAIL;
424 goto exit;
425 }
427 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
428 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
429 status = MessageQ_E_FAIL;
430 goto exit;
431 }
432 status = rsp.status;
434 PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
435 "status=%d\n", handle, status)
437 exit:
438 pthread_mutex_unlock(&MessageQ_module->gate);
440 return (status);
441 }
443 /*
444 * ======== MessageQ_Params_init ========
445 * Legacy implementation.
446 */
447 Void MessageQ_Params_init(MessageQ_Params *params)
448 {
449 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
450 }
452 /*
453 * ======== MessageQ_Params_init__S ========
454 * New implementation which is version aware.
455 */
456 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
457 {
458 MessageQ_Params_Version2 *params2;
460 switch (version) {
462 case MessageQ_Params_VERSION_2:
463 params2 = (MessageQ_Params_Version2 *)params;
464 params2->__version = MessageQ_Params_VERSION_2;
465 params2->synchronizer = NULL;
466 params2->queueIndex = MessageQ_ANY;
467 break;
469 default:
470 assert(FALSE);
471 break;
472 }
473 }
475 /*
476 * ======== MessageQ_create ========
477 */
478 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
479 {
480 Int status;
481 MessageQ_Object *obj = NULL;
482 IMessageQTransport_Handle transport;
483 INetworkTransport_Handle netTrans;
484 ITransport_Handle baseTrans;
485 UInt16 queueIndex;
486 UInt16 clusterId;
487 Int tid;
488 Int priority;
489 LAD_ClientHandle handle;
490 struct LAD_CommandObj cmd;
491 union LAD_ResponseObj rsp;
492 MessageQ_Params ps;
494 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
496 /* copy the given params into the current params structure */
497 if (pp != NULL) {
499 /* snoop the params pointer to see if it's a legacy structure */
500 if ((pp->__version == 0) || (pp->__version > 100)) {
501 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
502 }
504 /* not legacy structure, use params version field */
505 else if (pp->__version == MessageQ_Params_VERSION_2) {
506 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
507 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
508 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
509 }
510 else {
511 assert(FALSE);
512 }
513 }
515 handle = LAD_findHandle();
516 if (handle == LAD_MAXNUMCLIENTS) {
517 PRINTVERBOSE1(
518 "MessageQ_create: can't find connection to daemon for pid %d\n",
519 getpid())
521 return NULL;
522 }
524 cmd.cmd = LAD_MESSAGEQ_CREATE;
525 cmd.clientId = handle;
527 if (name == NULL) {
528 cmd.args.messageQCreate.name[0] = '\0';
529 }
530 else {
531 strncpy(cmd.args.messageQCreate.name, name,
532 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
533 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
534 }
536 memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
538 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
539 PRINTVERBOSE1(
540 "MessageQ_create: sending LAD command failed, status=%d\n", status)
541 return NULL;
542 }
544 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
545 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
546 return NULL;
547 }
548 status = rsp.messageQCreate.status;
550 PRINTVERBOSE2(
551 "MessageQ_create: got LAD response for client %d, status=%d\n",
552 handle, status)
554 if (status == -1) {
555 PRINTVERBOSE1(
556 "MessageQ_create: MessageQ server operation failed, status=%d\n",
557 status)
558 return NULL;
559 }
561 /* Create the generic obj */
562 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
564 /* Populate the params member */
565 memcpy(&obj->params, &ps, sizeof(ps));
568 obj->queue = rsp.messageQCreate.queueId;
569 obj->serverHandle = rsp.messageQCreate.serverHandle;
570 pthread_mutex_init(&obj->msgListGate, NULL);
571 CIRCLEQ_INIT(&obj->msgList);
572 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
573 PRINTVERBOSE1(
574 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
576 MessageQ_delete((MessageQ_Handle *)&obj);
578 return NULL;
579 }
581 /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
582 queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
584 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
585 "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
587 pthread_mutex_lock(&MessageQ_module->gate);
589 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
590 for (priority = 0; priority < 2; priority++) {
591 transport = MessageQ_module->transports[clusterId][priority];
592 if (transport) {
593 /* need to check return and do something if error */
594 IMessageQTransport_bind((Void *)transport, obj->queue);
595 }
596 }
597 }
599 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
600 baseTrans = MessageQ_module->transInst[tid];
602 if (baseTrans != NULL) {
603 switch (ITransport_itype(baseTrans)) {
604 case INetworkTransport_TypeId:
605 netTrans = INetworkTransport_downCast(baseTrans);
606 INetworkTransport_bind((void *)netTrans, obj->queue);
607 break;
609 default:
610 /* error */
611 printf("MessageQ_create: Error: transport id %d is an "
612 "unsupported transport type.\n", tid);
613 break;
614 }
615 }
616 }
618 /* LAD's MessageQ module can grow, we need to grow as well */
619 if (queueIndex >= MessageQ_module->numQueues) {
620 _MessageQ_grow(queueIndex);
621 }
623 /* No need to "allocate" slot since the queueIndex returned by
624 * LAD is guaranteed to be unique.
625 */
626 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
628 pthread_mutex_unlock(&MessageQ_module->gate);
630 return (MessageQ_Handle)obj;
631 }
633 /*
634 * ======== MessageQ_delete ========
635 */
636 Int MessageQ_delete(MessageQ_Handle *handlePtr)
637 {
638 MessageQ_Object *obj;
639 IMessageQTransport_Handle transport;
640 INetworkTransport_Handle netTrans;
641 ITransport_Handle baseTrans;
642 Int status = MessageQ_S_SUCCESS;
643 UInt16 queueIndex;
644 UInt16 clusterId;
645 Int tid;
646 Int priority;
647 LAD_ClientHandle handle;
648 struct LAD_CommandObj cmd;
649 union LAD_ResponseObj rsp;
651 handle = LAD_findHandle();
652 if (handle == LAD_MAXNUMCLIENTS) {
653 PRINTVERBOSE1(
654 "MessageQ_delete: can't find connection to daemon for pid %d\n",
655 getpid())
657 return MessageQ_E_FAIL;
658 }
660 obj = (MessageQ_Object *)(*handlePtr);
662 cmd.cmd = LAD_MESSAGEQ_DELETE;
663 cmd.clientId = handle;
664 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
666 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
667 PRINTVERBOSE1(
668 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
669 return MessageQ_E_FAIL;
670 }
672 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
673 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
674 return MessageQ_E_FAIL;
675 }
676 status = rsp.messageQDelete.status;
678 PRINTVERBOSE2(
679 "MessageQ_delete: got LAD response for client %d, status=%d\n",
680 handle, status)
682 pthread_mutex_lock(&MessageQ_module->gate);
684 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
685 for (priority = 0; priority < 2; priority++) {
686 transport = MessageQ_module->transports[clusterId][priority];
687 if (transport) {
688 IMessageQTransport_unbind((Void *)transport, obj->queue);
689 }
690 }
691 }
693 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
694 baseTrans = MessageQ_module->transInst[tid];
696 if (baseTrans != NULL) {
697 switch (ITransport_itype(baseTrans)) {
698 case INetworkTransport_TypeId:
699 netTrans = INetworkTransport_downCast(baseTrans);
700 INetworkTransport_unbind((void *)netTrans, obj->queue);
701 break;
703 default:
704 /* error */
705 printf("MessageQ_create: Error: transport id %d is an "
706 "unsupported transport type.\n", tid);
707 break;
708 }
709 }
710 }
712 /* extract the queue index from the queueId */
713 queueIndex = MessageQ_getQueueIndex(obj->queue);
714 MessageQ_module->queues[queueIndex] = NULL;
716 pthread_mutex_unlock(&MessageQ_module->gate);
718 free(obj);
719 *handlePtr = NULL;
721 return status;
722 }
724 /*
725 * ======== MessageQ_open ========
726 * Acquire a queueId for use in sending messages to the queue
727 */
728 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
729 {
730 Int status = MessageQ_S_SUCCESS;
732 status = NameServer_getUInt32(MessageQ_module->nameServer,
733 name, queueId, NULL);
735 if (status == NameServer_E_NOTFOUND) {
736 /* Set return queue ID to invalid */
737 *queueId = MessageQ_INVALIDMESSAGEQ;
738 status = MessageQ_E_NOTFOUND;
739 }
740 else if (status >= 0) {
741 /* Override with a MessageQ status code */
742 status = MessageQ_S_SUCCESS;
743 }
744 else {
745 /* Set return queue ID to invalid */
746 *queueId = MessageQ_INVALIDMESSAGEQ;
748 /* Override with a MessageQ status code */
749 if (status == NameServer_E_TIMEOUT) {
750 status = MessageQ_E_TIMEOUT;
751 }
752 else {
753 status = MessageQ_E_FAIL;
754 }
755 }
757 return status;
758 }
760 /*
761 * ======== MessageQ_openQueueId ========
762 */
763 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
764 {
765 MessageQ_QueueIndex queuePort;
766 MessageQ_QueueId queueId;
768 /* queue port is embedded in the queueId */
769 queuePort = queueIndex + MessageQ_PORTOFFSET;
770 queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
772 return (queueId);
773 }
775 /*
776 * ======== MessageQ_close ========
777 * Closes previously opened instance of MessageQ
778 */
779 Int MessageQ_close(MessageQ_QueueId *queueId)
780 {
781 Int32 status = MessageQ_S_SUCCESS;
783 /* Nothing more to be done for closing the MessageQ. */
784 *queueId = MessageQ_INVALIDMESSAGEQ;
786 return status;
787 }
789 /*
790 * ======== MessageQ_put ========
791 * Deliver the given message, either locally or to the transport
792 *
793 * If the destination is a local queue, deliver the message. Otherwise,
794 * pass the message to a transport for delivery. The transport handles
795 * the sending of the message using the appropriate interface (socket,
796 * device ioctl, etc.).
797 */
798 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
799 {
800 Int status = MessageQ_S_SUCCESS;
801 MessageQ_Object *obj;
802 UInt16 dstProcId;
803 UInt16 queueIndex;
804 UInt16 queuePort;
805 ITransport_Handle baseTrans;
806 IMessageQTransport_Handle msgTrans;
807 INetworkTransport_Handle netTrans;
808 Int priority;
809 UInt tid;
810 UInt16 clusterId;
811 Bool delivered;
813 /* extract destination address from the given queueId */
814 dstProcId = (UInt16)(queueId >> 16);
815 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
817 /* write the destination address into the message header */
818 msg->dstId = queuePort;
819 msg->dstProc= dstProcId;
821 /* invoke the hook function after addressing the message */
822 if (MessageQ_module->putHookFxn != NULL) {
823 MessageQ_module->putHookFxn(queueId, msg);
824 }
826 /* For an outbound message: If message destination is on this
827 * processor, then check if the destination queue is in this
828 * process (thread-to-thread messaging).
829 *
830 * For an inbound message: Check if destination queue is in this
831 * process (process-to-process messaging).
832 */
833 if (dstProcId == MultiProc_self()) {
834 queueIndex = queuePort - MessageQ_PORTOFFSET;
836 if (queueIndex < MessageQ_module->numQueues) {
837 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
839 if (obj != NULL) {
840 /* deliver message to queue */
841 pthread_mutex_lock(&obj->msgListGate);
842 CIRCLEQ_INSERT_TAIL(&obj->msgList,
843 (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
844 pthread_mutex_unlock(&obj->msgListGate);
845 sem_post(&obj->synchronizer);
846 goto done;
847 }
848 }
849 }
851 /* Getting here implies the message is outbound. Must give it to
852 * either the primary or secondary transport for delivery. Start
853 * by extracting the transport ID from the message header.
854 */
855 tid = MessageQ_getTransportId(msg);
857 if (tid >= MessageQ_MAXTRANSPORTS) {
858 printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
859 tid, MessageQ_MAXTRANSPORTS);
860 status = MessageQ_E_FAIL;
861 goto done;
862 }
864 /* if transportId is set, use secondary transport for message delivery */
865 if (tid != 0) {
866 baseTrans = MessageQ_module->transInst[tid];
868 if (baseTrans == NULL) {
869 printf("MessageQ_put: Error: transport is null\n");
870 status = MessageQ_E_FAIL;
871 goto done;
872 }
874 /* downcast instance pointer to transport interface */
875 switch (ITransport_itype(baseTrans)) {
876 case INetworkTransport_TypeId:
877 netTrans = INetworkTransport_downCast(baseTrans);
878 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
879 status = (delivered ? MessageQ_S_SUCCESS :
880 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
881 MessageQ_E_FAIL));
882 break;
884 default:
885 /* error */
886 printf("MessageQ_put: Error: transport id %d is an "
887 "unsupported transport type\n", tid);
888 status = MessageQ_E_FAIL;
889 break;
890 }
891 }
892 else {
893 /* use primary transport for delivery */
894 priority = MessageQ_getMsgPri(msg);
895 clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
897 /* primary transport can only be used for intra-cluster delivery */
898 if (clusterId > MultiProc_getNumProcsInCluster()) {
899 printf("MessageQ_put: Error: destination procId=%d is not "
900 "in cluster. Must specify a transportId.\n", dstProcId);
901 status = MessageQ_E_FAIL;
902 goto done;
903 }
905 msgTrans = MessageQ_module->transports[clusterId][priority];
906 delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
907 status = (delivered ? MessageQ_S_SUCCESS :
908 (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
909 }
911 done:
912 return (status);
913 }
915 /*
916 * MessageQ_get - gets a message for a message queue and blocks if
917 * the queue is empty.
918 *
919 * If a message is present, it returns it. Otherwise it blocks
920 * waiting for a message to arrive.
921 * When a message is returned, it is owned by the caller.
922 */
923 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
924 {
925 MessageQ_Object * obj = (MessageQ_Object *)handle;
926 Int status = MessageQ_S_SUCCESS;
927 struct timespec ts;
928 struct timeval tv;
930 #if 0
931 /*
932 * Optimization here to get a message without going in to the sem
933 * operation, but the sem count will not be maintained properly.
934 */
935 pthread_mutex_lock(&obj->msgListGate);
937 if (obj->msgList.cqh_first != &obj->msgList) {
938 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
939 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
941 pthread_mutex_unlock(&obj->msgListGate);
942 }
943 else {
944 pthread_mutex_unlock(&obj->msgListGate);
945 }
946 #endif
948 if (timeout == MessageQ_FOREVER) {
949 sem_wait(&obj->synchronizer);
950 }
951 else {
952 /* add timeout (microseconds) to current time of day */
953 gettimeofday(&tv, NULL);
954 tv.tv_sec += timeout / 1000000;
955 tv.tv_usec += timeout % 1000000;
957 if (tv.tv_usec >= 1000000) {
958 tv.tv_sec++;
959 tv.tv_usec -= 1000000;
960 }
962 /* set absolute timeout value */
963 ts.tv_sec = tv.tv_sec;
964 ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
966 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
967 if (errno == ETIMEDOUT) {
968 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
969 return (MessageQ_E_TIMEOUT);
970 }
971 else {
972 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
973 return (MessageQ_E_FAIL);
974 }
975 }
976 }
978 if (obj->unblocked) {
979 return obj->unblocked;
980 }
982 pthread_mutex_lock(&obj->msgListGate);
984 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
985 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
987 pthread_mutex_unlock(&obj->msgListGate);
989 return status;
990 }
992 /*
993 * Return a count of the number of messages in the queue
994 *
995 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
996 */
997 Int MessageQ_count(MessageQ_Handle handle)
998 {
999 Int count = -1;
1000 #if 0
1001 MessageQ_Object * obj = (MessageQ_Object *) handle;
1002 socklen_t optlen;
1004 /*
1005 * TBD: Need to find a way to implement (if anyone uses it!), and
1006 * push down into transport..
1007 */
1009 /*
1010 * 2nd arg to getsockopt should be transport independent, but using
1011 * SSKPROTO_SHMFIFO for now:
1012 */
1013 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1014 &count, &optlen);
1015 #endif
1017 return count;
1018 }
1020 /*
1021 * Initializes a message not obtained from MessageQ_alloc.
1022 */
1023 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1024 {
1025 /* Fill in the fields of the message */
1026 MessageQ_msgInit(msg);
1027 msg->heapId = MessageQ_STATICMSG;
1028 msg->msgSize = size;
1029 }
1031 /*
1032 * Allocate a message and initialize the needed fields (note some
1033 * of the fields in the header are set via other APIs or in the
1034 * MessageQ_put function,
1035 */
1036 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1037 {
1038 MessageQ_Msg msg;
1040 /*
1041 * heapId not used for local alloc (as this is over a copy transport), but
1042 * we need to send to other side as heapId is used in BIOS transport.
1043 */
1044 msg = (MessageQ_Msg)calloc(1, size);
1045 MessageQ_msgInit(msg);
1046 msg->msgSize = size;
1047 msg->heapId = heapId;
1049 return msg;
1050 }
1052 /*
1053 * Frees the message back to the heap that was used to allocate it.
1054 */
1055 Int MessageQ_free(MessageQ_Msg msg)
1056 {
1057 UInt32 status = MessageQ_S_SUCCESS;
1059 /* Check to ensure this was not allocated by user: */
1060 if (msg->heapId == MessageQ_STATICMSG) {
1061 status = MessageQ_E_CANNOTFREESTATICMSG;
1062 }
1063 else {
1064 free(msg);
1065 }
1067 return status;
1068 }
1070 /* Register a heap with MessageQ. */
1071 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1072 {
1073 Int status = MessageQ_S_SUCCESS;
1075 /* Do nothing, as this uses a copy transport */
1077 return status;
1078 }
1080 /* Unregister a heap with MessageQ. */
1081 Int MessageQ_unregisterHeap(UInt16 heapId)
1082 {
1083 Int status = MessageQ_S_SUCCESS;
1085 /* Do nothing, as this uses a copy transport */
1087 return status;
1088 }
1090 /* Unblocks a MessageQ */
1091 Void MessageQ_unblock(MessageQ_Handle handle)
1092 {
1093 MessageQ_Object *obj = (MessageQ_Object *)handle;
1095 obj->unblocked = MessageQ_E_UNBLOCKED;
1096 sem_post(&obj->synchronizer);
1097 }
1099 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1100 Void MessageQ_shutdown(MessageQ_Handle handle)
1101 {
1102 MessageQ_Object *obj = (MessageQ_Object *)handle;
1104 obj->unblocked = MessageQ_E_SHUTDOWN;
1105 sem_post(&obj->synchronizer);
1106 }
1108 /* Embeds a source message queue into a message */
1109 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1110 {
1111 MessageQ_Object *obj = (MessageQ_Object *)handle;
1113 msg->replyId = (UInt16)(obj->queue);
1114 msg->replyProc = (UInt16)(obj->queue >> 16);
1115 }
1117 /* Returns the QueueId associated with the handle. */
1118 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1119 {
1120 MessageQ_Object *obj = (MessageQ_Object *) handle;
1121 UInt32 queueId;
1123 queueId = (obj->queue);
1125 return queueId;
1126 }
1128 /* Returns the local handle associated with queueId. */
1129 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1130 {
1131 MessageQ_Object *obj;
1132 MessageQ_QueueIndex queueIndex;
1133 UInt16 procId;
1135 procId = MessageQ_getProcId(queueId);
1136 if (procId != MultiProc_self()) {
1137 return NULL;
1138 }
1140 queueIndex = MessageQ_getQueueIndex(queueId);
1141 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1143 return (MessageQ_Handle)obj;
1144 }
1146 /* Sets the tracing of a message */
1147 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1148 {
1149 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
1150 }
1152 /*
1153 * Returns the amount of shared memory used by one transport instance.
1154 *
1155 * The MessageQ module itself does not use any shared memory but the
1156 * underlying transport may use some shared memory.
1157 */
1158 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1159 {
1160 SizeT memReq = 0u;
1162 /* Do nothing, as this is a copy transport. */
1164 return memReq;
1165 }
1167 /*
1168 * This is a helper function to initialize a message.
1169 */
1170 Void MessageQ_msgInit(MessageQ_Msg msg)
1171 {
1172 #if 0
1173 Int status = MessageQ_S_SUCCESS;
1174 LAD_ClientHandle handle;
1175 struct LAD_CommandObj cmd;
1176 union LAD_ResponseObj rsp;
1178 handle = LAD_findHandle();
1179 if (handle == LAD_MAXNUMCLIENTS) {
1180 PRINTVERBOSE1(
1181 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1182 getpid())
1184 return;
1185 }
1187 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1188 cmd.clientId = handle;
1190 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1191 PRINTVERBOSE1(
1192 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1193 return;
1194 }
1196 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1197 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1198 return;
1199 }
1200 status = rsp.msgInit.status;
1202 PRINTVERBOSE2(
1203 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1204 handle, status)
1206 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1207 #else
1208 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1209 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1210 msg->msgId = MessageQ_INVALIDMSGID;
1211 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1212 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1213 msg->srcProc = MultiProc_self();
1215 pthread_mutex_lock(&MessageQ_module->seqNumGate);
1216 msg->seqNum = MessageQ_module->seqNum++;
1217 pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1218 #endif
1219 }
1221 /*
1222 * ======== _MessageQ_grow ========
1223 * Increase module's queues array to accommodate queueIndex from LAD
1224 *
1225 * Note: this function takes the queue index value (i.e. without the
1226 * port offset).
1227 */
1228 Void _MessageQ_grow(UInt16 queueIndex)
1229 {
1230 MessageQ_Handle *queues;
1231 MessageQ_Handle *oldQueues;
1232 UInt oldSize;
1234 pthread_mutex_lock(&MessageQ_module->gate);
1236 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1238 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1239 memcpy(queues, MessageQ_module->queues, oldSize);
1241 oldQueues = MessageQ_module->queues;
1242 MessageQ_module->queues = queues;
1243 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1245 pthread_mutex_unlock(&MessageQ_module->gate);
1247 free(oldQueues);
1249 return;
1250 }
1252 /*
1253 * ======== MessageQ_bind ========
1254 * Bind all existing message queues to the given processor
1255 *
1256 * Note: This function is a hack to work around the driver.
1257 *
1258 * The Linux rpmsgproto driver requires a socket for each
1259 * message queue and remote processor tuple.
1260 *
1261 * socket --> (queue, processor)
1262 *
1263 * Therefore, each time a new remote processor is started, all
1264 * existing message queues need to create a socket for the new
1265 * processor.
1266 *
1267 * The driver should not have this requirement. One socket per
1268 * message queue should be sufficient to uniquely identify the
1269 * endpoint to the driver.
1270 */
1271 Void MessageQ_bind(UInt16 procId)
1272 {
1273 int q;
1274 int clusterId;
1275 int priority;
1276 MessageQ_Handle handle;
1277 MessageQ_QueueId queue;
1278 IMessageQTransport_Handle transport;
1280 clusterId = procId - MultiProc_getBaseIdOfCluster();
1281 pthread_mutex_lock(&MessageQ_module->gate);
1283 for (q = 0; q < MessageQ_module->numQueues; q++) {
1285 if ((handle = MessageQ_module->queues[q]) == NULL) {
1286 continue;
1287 }
1289 queue = ((MessageQ_Object *)handle)->queue;
1291 for (priority = 0; priority < 2; priority++) {
1292 transport = MessageQ_module->transports[clusterId][priority];
1293 if (transport != NULL) {
1294 IMessageQTransport_bind((Void *)transport, queue);
1295 }
1296 }
1297 }
1299 pthread_mutex_unlock(&MessageQ_module->gate);
1300 }
1302 /*
1303 * ======== MessageQ_unbind ========
1304 * Unbind all existing message queues from the given processor
1305 *
1306 * Hack: see MessageQ_bind.
1307 */
1308 Void MessageQ_unbind(UInt16 procId)
1309 {
1310 int q;
1311 int clusterId;
1312 int priority;
1313 MessageQ_Handle handle;
1314 MessageQ_QueueId queue;
1315 IMessageQTransport_Handle transport;
1317 pthread_mutex_lock(&MessageQ_module->gate);
1319 for (q = 0; q < MessageQ_module->numQueues; q++) {
1321 if ((handle = MessageQ_module->queues[q]) == NULL) {
1322 continue;
1323 }
1325 queue = ((MessageQ_Object *)handle)->queue;
1326 clusterId = procId - MultiProc_getBaseIdOfCluster();
1328 for (priority = 0; priority < 2; priority++) {
1329 transport = MessageQ_module->transports[clusterId][priority];
1330 if (transport != NULL) {
1331 IMessageQTransport_unbind((Void *)transport, queue);
1332 }
1333 }
1334 }
1336 pthread_mutex_unlock(&MessageQ_module->gate);
1337 }