1 /*
2 * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1 /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77 * Macros/Constants
78 * =============================================================================
79 */
81 /*!
82 * @brief Name of the reserved NameServer used for MessageQ.
83 */
84 #define MessageQ_NAMESERVER "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT 12
92 #define TRACEMASK 0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98 * Structures & Enums
99 * =============================================================================
100 */
102 /* params structure evolution */
103 typedef struct {
104 Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108 Int __version;
109 Void *synchronizer;
110 MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115 MessageQ_Handle *queues;
116 Int numQueues;
117 Int refCount;
118 NameServer_Handle nameServer;
119 pthread_mutex_t gate;
120 int seqNum;
121 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
122 ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
123 MessageQ_PutHookFxn putHookFxn;
124 } MessageQ_ModuleObject;
126 typedef struct MessageQ_CIRCLEQ_ENTRY {
127 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
128 } MessageQ_CIRCLEQ_ENTRY;
130 /*!
131 * @brief Structure for the Handle for the MessageQ.
132 */
133 typedef struct MessageQ_Object_tag {
134 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
135 MessageQ_Params params;
136 MessageQ_QueueId queue;
137 int unblocked;
138 void *serverHandle;
139 sem_t synchronizer;
140 } MessageQ_Object;
142 /* traces in this file are controlled via _MessageQ_verbose */
143 Bool _MessageQ_verbose = FALSE;
144 #define verbose _MessageQ_verbose
146 /* =============================================================================
147 * Globals
148 * =============================================================================
149 */
150 static MessageQ_ModuleObject MessageQ_state =
151 {
152 .refCount = 0,
153 .nameServer = NULL,
154 .gate = PTHREAD_MUTEX_INITIALIZER,
155 .putHookFxn = NULL
156 };
158 /*!
159 * @var MessageQ_module
160 *
161 * @brief Pointer to the MessageQ module state.
162 */
163 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
165 Void _MessageQ_grow(UInt16 queueIndex);
167 /* =============================================================================
168 * APIS
169 * =============================================================================
170 */
172 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
173 UInt16 rprocId, UInt priority)
174 {
175 Int status = FALSE;
176 UInt16 clusterId;
178 if (handle == NULL) {
179 printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
180 );
182 return status;
183 }
185 /* map procId to clusterId */
186 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
188 if (clusterId >= MultiProc_MAXPROCESSORS) {
189 printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
191 return status;
192 }
194 if (MessageQ_module->transports[clusterId][priority] == NULL) {
195 MessageQ_module->transports[clusterId][priority] = handle;
197 status = TRUE;
198 }
200 return status;
201 }
203 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
204 {
205 if (inst == NULL) {
206 printf("MessageQ_registerTransportId: invalid NULL handle\n");
208 return MessageQ_E_INVALIDARG;
209 }
211 if (tid >= MessageQ_MAXTRANSPORTS) {
212 printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
213 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
215 return MessageQ_E_INVALIDARG;
216 }
218 if (MessageQ_module->transInst[tid] != NULL) {
219 printf("MessageQ_registerTransportId: transport id %d already "
220 "registered\n", tid);
222 return MessageQ_E_ALREADYEXISTS;
223 }
225 MessageQ_module->transInst[tid] = inst;
227 return MessageQ_S_SUCCESS;
228 }
230 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
231 {
232 UInt16 clusterId;
234 /* map procId to clusterId */
235 clusterId = rprocId - MultiProc_getBaseIdOfCluster();
237 if (clusterId >= MultiProc_MAXPROCESSORS) {
238 printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
240 return;
241 }
243 MessageQ_module->transports[clusterId][priority] = NULL;
244 }
246 Void MessageQ_unregisterTransportId(UInt tid)
247 {
248 if (tid >= MessageQ_MAXTRANSPORTS) {
249 printf("MessageQ_unregisterTransportId: invalid transport id %d, "
250 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
252 return;
253 }
255 MessageQ_module->transInst[tid] = NULL;
256 }
258 /*
259 * Function to get default configuration for the MessageQ module.
260 */
261 Void MessageQ_getConfig(MessageQ_Config *cfg)
262 {
263 Int status;
264 LAD_ClientHandle handle;
265 struct LAD_CommandObj cmd;
266 union LAD_ResponseObj rsp;
268 assert (cfg != NULL);
270 handle = LAD_findHandle();
271 if (handle == LAD_MAXNUMCLIENTS) {
272 PRINTVERBOSE1(
273 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
274 getpid())
276 return;
277 }
279 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
280 cmd.clientId = handle;
282 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
283 PRINTVERBOSE1(
284 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
285 return;
286 }
288 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
289 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
290 status)
291 return;
292 }
293 status = rsp.messageQGetConfig.status;
295 PRINTVERBOSE2(
296 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
297 handle, status)
299 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
301 return;
302 }
304 /*
305 * Function to setup the MessageQ module.
306 */
307 Int MessageQ_setup(const MessageQ_Config *cfg)
308 {
309 Int status = MessageQ_S_SUCCESS;
310 LAD_ClientHandle handle;
311 struct LAD_CommandObj cmd;
312 union LAD_ResponseObj rsp;
313 Int pri;
314 Int i;
315 Int tid;
317 /* this entire function must be serialized */
318 pthread_mutex_lock(&MessageQ_module->gate);
320 /* ensure only first thread performs startup procedure */
321 if (++MessageQ_module->refCount > 1) {
322 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
323 MessageQ_module->refCount)
324 status = MessageQ_S_ALREADYSETUP;
325 goto exit;
326 }
328 handle = LAD_findHandle();
329 if (handle == LAD_MAXNUMCLIENTS) {
330 PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
331 "pid %d\n", getpid())
332 status = MessageQ_E_RESOURCE;
333 goto exit;
334 }
336 cmd.cmd = LAD_MESSAGEQ_SETUP;
337 cmd.clientId = handle;
338 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
340 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
341 PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
342 "status=%d\n", status)
343 status = MessageQ_E_FAIL;
344 goto exit;
345 }
347 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
348 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
349 status = MessageQ_E_FAIL;
350 goto exit;
351 }
352 status = rsp.setup.status;
354 PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
355 handle, status)
357 MessageQ_module->seqNum = 0;
358 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
359 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
360 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
361 sizeof(MessageQ_Handle));
363 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
364 for (pri = 0; pri < 2; pri++) {
365 MessageQ_module->transports[i][pri] = NULL;
366 }
367 }
369 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
370 MessageQ_module->transInst[tid] = NULL;
371 }
373 exit:
374 /* if error, must decrement reference count */
375 if (status < 0) {
376 MessageQ_module->refCount--;
377 }
379 pthread_mutex_unlock(&MessageQ_module->gate);
381 return (status);
382 }
384 /*
385 * MessageQ_destroy - destroy the MessageQ module.
386 */
387 Int MessageQ_destroy(void)
388 {
389 Int status = MessageQ_S_SUCCESS;
390 LAD_ClientHandle handle;
391 struct LAD_CommandObj cmd;
392 union LAD_ResponseObj rsp;
394 /* this entire function must be serialized */
395 pthread_mutex_lock(&MessageQ_module->gate);
397 /* ensure only last thread does the work */
398 if (--MessageQ_module->refCount > 0) {
399 goto exit;
400 }
402 handle = LAD_findHandle();
403 if (handle == LAD_MAXNUMCLIENTS) {
404 PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
405 "for pid %d\n", getpid())
406 status = MessageQ_E_RESOURCE;
407 goto exit;
408 }
410 cmd.cmd = LAD_MESSAGEQ_DESTROY;
411 cmd.clientId = handle;
413 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
414 PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
415 "status=%d\n", status)
416 status = MessageQ_E_FAIL;
417 goto exit;
418 }
420 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
421 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
422 status = MessageQ_E_FAIL;
423 goto exit;
424 }
425 status = rsp.status;
427 PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
428 "status=%d\n", handle, status)
430 exit:
431 pthread_mutex_unlock(&MessageQ_module->gate);
433 return (status);
434 }
436 /*
437 * ======== MessageQ_Params_init ========
438 * Legacy implementation.
439 */
440 Void MessageQ_Params_init(MessageQ_Params *params)
441 {
442 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
443 }
445 /*
446 * ======== MessageQ_Params_init__S ========
447 * New implementation which is version aware.
448 */
449 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
450 {
451 MessageQ_Params_Version2 *params2;
453 switch (version) {
455 case MessageQ_Params_VERSION_2:
456 params2 = (MessageQ_Params_Version2 *)params;
457 params2->__version = MessageQ_Params_VERSION_2;
458 params2->synchronizer = NULL;
459 params2->queueIndex = MessageQ_ANY;
460 break;
462 default:
463 assert(FALSE);
464 break;
465 }
466 }
468 /*
469 * ======== MessageQ_create ========
470 */
471 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
472 {
473 Int status;
474 MessageQ_Object *obj = NULL;
475 IMessageQTransport_Handle transport;
476 INetworkTransport_Handle netTrans;
477 ITransport_Handle baseTrans;
478 UInt16 queueIndex;
479 UInt16 clusterId;
480 Int tid;
481 Int priority;
482 LAD_ClientHandle handle;
483 struct LAD_CommandObj cmd;
484 union LAD_ResponseObj rsp;
485 MessageQ_Params ps;
487 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
489 /* copy the given params into the current params structure */
490 if (pp != NULL) {
492 /* snoop the params pointer to see if it's a legacy structure */
493 if ((pp->__version == 0) || (pp->__version > 100)) {
494 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
495 }
497 /* not legacy structure, use params version field */
498 else if (pp->__version == MessageQ_Params_VERSION_2) {
499 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
500 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
501 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
502 }
503 else {
504 assert(FALSE);
505 }
506 }
508 handle = LAD_findHandle();
509 if (handle == LAD_MAXNUMCLIENTS) {
510 PRINTVERBOSE1(
511 "MessageQ_create: can't find connection to daemon for pid %d\n",
512 getpid())
514 return NULL;
515 }
517 cmd.cmd = LAD_MESSAGEQ_CREATE;
518 cmd.clientId = handle;
520 if (name == NULL) {
521 cmd.args.messageQCreate.name[0] = '\0';
522 }
523 else {
524 strncpy(cmd.args.messageQCreate.name, name,
525 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
526 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
527 }
529 memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
531 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
532 PRINTVERBOSE1(
533 "MessageQ_create: sending LAD command failed, status=%d\n", status)
534 return NULL;
535 }
537 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
538 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
539 return NULL;
540 }
541 status = rsp.messageQCreate.status;
543 PRINTVERBOSE2(
544 "MessageQ_create: got LAD response for client %d, status=%d\n",
545 handle, status)
547 if (status == -1) {
548 PRINTVERBOSE1(
549 "MessageQ_create: MessageQ server operation failed, status=%d\n",
550 status)
551 return NULL;
552 }
554 /* Create the generic obj */
555 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
557 /* Populate the params member */
558 memcpy(&obj->params, &ps, sizeof(ps));
561 obj->queue = rsp.messageQCreate.queueId;
562 obj->serverHandle = rsp.messageQCreate.serverHandle;
563 CIRCLEQ_INIT(&obj->msgList);
564 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
565 PRINTVERBOSE1(
566 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
568 MessageQ_delete((MessageQ_Handle *)&obj);
570 return NULL;
571 }
573 /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
574 queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
576 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
577 "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
579 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
580 for (priority = 0; priority < 2; priority++) {
581 transport = MessageQ_module->transports[clusterId][priority];
582 if (transport) {
583 /* need to check return and do something if error */
584 IMessageQTransport_bind((Void *)transport, obj->queue);
585 }
586 }
587 }
589 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
590 baseTrans = MessageQ_module->transInst[tid];
592 if (baseTrans != NULL) {
593 switch (ITransport_itype(baseTrans)) {
594 case INetworkTransport_TypeId:
595 netTrans = INetworkTransport_downCast(baseTrans);
596 INetworkTransport_bind((void *)netTrans, obj->queue);
597 break;
599 default:
600 /* error */
601 printf("MessageQ_create: Error: transport id %d is an "
602 "unsupported transport type.\n", tid);
603 break;
604 }
605 }
606 }
608 /* LAD's MessageQ module can grow, we need to grow as well */
609 if (queueIndex >= MessageQ_module->numQueues) {
610 _MessageQ_grow(queueIndex);
611 }
613 /* No need to "allocate" slot since the queueIndex returned by
614 * LAD is guaranteed to be unique.
615 */
616 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
618 return (MessageQ_Handle)obj;
619 }
621 /*
622 * ======== MessageQ_delete ========
623 */
624 Int MessageQ_delete(MessageQ_Handle *handlePtr)
625 {
626 MessageQ_Object *obj;
627 IMessageQTransport_Handle transport;
628 INetworkTransport_Handle netTrans;
629 ITransport_Handle baseTrans;
630 Int status = MessageQ_S_SUCCESS;
631 UInt16 queueIndex;
632 UInt16 clusterId;
633 Int tid;
634 Int priority;
635 LAD_ClientHandle handle;
636 struct LAD_CommandObj cmd;
637 union LAD_ResponseObj rsp;
639 handle = LAD_findHandle();
640 if (handle == LAD_MAXNUMCLIENTS) {
641 PRINTVERBOSE1(
642 "MessageQ_delete: can't find connection to daemon for pid %d\n",
643 getpid())
645 return MessageQ_E_FAIL;
646 }
648 obj = (MessageQ_Object *)(*handlePtr);
650 cmd.cmd = LAD_MESSAGEQ_DELETE;
651 cmd.clientId = handle;
652 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
654 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
655 PRINTVERBOSE1(
656 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
657 return MessageQ_E_FAIL;
658 }
660 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
661 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
662 return MessageQ_E_FAIL;
663 }
664 status = rsp.messageQDelete.status;
666 PRINTVERBOSE2(
667 "MessageQ_delete: got LAD response for client %d, status=%d\n",
668 handle, status)
670 for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
671 for (priority = 0; priority < 2; priority++) {
672 transport = MessageQ_module->transports[clusterId][priority];
673 if (transport) {
674 IMessageQTransport_unbind((Void *)transport, obj->queue);
675 }
676 }
677 }
679 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
680 baseTrans = MessageQ_module->transInst[tid];
682 if (baseTrans != NULL) {
683 switch (ITransport_itype(baseTrans)) {
684 case INetworkTransport_TypeId:
685 netTrans = INetworkTransport_downCast(baseTrans);
686 INetworkTransport_unbind((void *)netTrans, obj->queue);
687 break;
689 default:
690 /* error */
691 printf("MessageQ_create: Error: transport id %d is an "
692 "unsupported transport type.\n", tid);
693 break;
694 }
695 }
696 }
698 /* extract the queue index from the queueId */
699 queueIndex = MessageQ_getQueueIndex(obj->queue);
700 MessageQ_module->queues[queueIndex] = NULL;
702 free(obj);
703 *handlePtr = NULL;
705 return status;
706 }
708 /*
709 * ======== MessageQ_open ========
710 * Acquire a queueId for use in sending messages to the queue
711 */
712 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
713 {
714 Int status = MessageQ_S_SUCCESS;
716 status = NameServer_getUInt32(MessageQ_module->nameServer,
717 name, queueId, NULL);
719 if (status == NameServer_E_NOTFOUND) {
720 /* Set return queue ID to invalid */
721 *queueId = MessageQ_INVALIDMESSAGEQ;
722 status = MessageQ_E_NOTFOUND;
723 }
724 else if (status >= 0) {
725 /* Override with a MessageQ status code */
726 status = MessageQ_S_SUCCESS;
727 }
728 else {
729 /* Set return queue ID to invalid */
730 *queueId = MessageQ_INVALIDMESSAGEQ;
732 /* Override with a MessageQ status code */
733 if (status == NameServer_E_TIMEOUT) {
734 status = MessageQ_E_TIMEOUT;
735 }
736 else {
737 status = MessageQ_E_FAIL;
738 }
739 }
741 return status;
742 }
744 /*
745 * ======== MessageQ_openQueueId ========
746 */
747 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
748 {
749 MessageQ_QueueIndex queuePort;
750 MessageQ_QueueId queueId;
752 /* queue port is embedded in the queueId */
753 queuePort = queueIndex + MessageQ_PORTOFFSET;
754 queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
756 return (queueId);
757 }
759 /*
760 * ======== MessageQ_close ========
761 * Closes previously opened instance of MessageQ
762 */
763 Int MessageQ_close(MessageQ_QueueId *queueId)
764 {
765 Int32 status = MessageQ_S_SUCCESS;
767 /* Nothing more to be done for closing the MessageQ. */
768 *queueId = MessageQ_INVALIDMESSAGEQ;
770 return status;
771 }
773 /*
774 * ======== MessageQ_put ========
775 * Deliver the given message, either locally or to the transport
776 *
777 * If the destination is a local queue, deliver the message. Otherwise,
778 * pass the message to a transport for delivery. The transport handles
779 * the sending of the message using the appropriate interface (socket,
780 * device ioctl, etc.).
781 */
782 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
783 {
784 Int status = MessageQ_S_SUCCESS;
785 MessageQ_Object *obj;
786 UInt16 dstProcId;
787 UInt16 queueIndex;
788 UInt16 queuePort;
789 ITransport_Handle baseTrans;
790 IMessageQTransport_Handle msgTrans;
791 INetworkTransport_Handle netTrans;
792 Int priority;
793 UInt tid;
794 UInt16 clusterId;
795 Bool delivered;
797 /* extract destination address from the given queueId */
798 dstProcId = (UInt16)(queueId >> 16);
799 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
801 /* write the destination address into the message header */
802 msg->dstId = queuePort;
803 msg->dstProc= dstProcId;
805 /* invoke the hook function after addressing the message */
806 if (MessageQ_module->putHookFxn != NULL) {
807 MessageQ_module->putHookFxn(queueId, msg);
808 }
810 /* For an outbound message: If message destination is on this
811 * processor, then check if the destination queue is in this
812 * process (thread-to-thread messaging).
813 *
814 * For an inbound message: Check if destination queue is in this
815 * process (process-to-process messaging).
816 */
817 if (dstProcId == MultiProc_self()) {
818 queueIndex = queuePort - MessageQ_PORTOFFSET;
820 if (queueIndex < MessageQ_module->numQueues) {
821 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
823 if (obj != NULL) {
824 /* deliver message to queue */
825 pthread_mutex_lock(&MessageQ_module->gate);
826 CIRCLEQ_INSERT_TAIL(&obj->msgList,
827 (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
828 pthread_mutex_unlock(&MessageQ_module->gate);
829 sem_post(&obj->synchronizer);
830 goto done;
831 }
832 }
833 }
835 /* Getting here implies the message is outbound. Must give it to
836 * either the primary or secondary transport for delivery. Start
837 * by extracting the transport ID from the message header.
838 */
839 tid = MessageQ_getTransportId(msg);
841 if (tid >= MessageQ_MAXTRANSPORTS) {
842 printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
843 tid, MessageQ_MAXTRANSPORTS);
844 status = MessageQ_E_FAIL;
845 goto done;
846 }
848 /* if transportId is set, use secondary transport for message delivery */
849 if (tid != 0) {
850 baseTrans = MessageQ_module->transInst[tid];
852 if (baseTrans == NULL) {
853 printf("MessageQ_put: Error: transport is null\n");
854 status = MessageQ_E_FAIL;
855 goto done;
856 }
858 /* downcast instance pointer to transport interface */
859 switch (ITransport_itype(baseTrans)) {
860 case INetworkTransport_TypeId:
861 netTrans = INetworkTransport_downCast(baseTrans);
862 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
863 status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
864 break;
866 default:
867 /* error */
868 printf("MessageQ_put: Error: transport id %d is an "
869 "unsupported transport type\n", tid);
870 status = MessageQ_E_FAIL;
871 break;
872 }
873 }
874 else {
875 /* use primary transport for delivery */
876 priority = MessageQ_getMsgPri(msg);
877 clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
879 /* primary transport can only be used for intra-cluster delivery */
880 if (clusterId > MultiProc_getNumProcsInCluster()) {
881 printf("MessageQ_put: Error: destination procId=%d is not "
882 "in cluster. Must specify a transportId.\n", dstProcId);
883 status = MessageQ_E_FAIL;
884 goto done;
885 }
887 msgTrans = MessageQ_module->transports[clusterId][priority];
888 delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
889 status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
890 }
892 done:
893 return (status);
894 }
896 /*
897 * MessageQ_get - gets a message for a message queue and blocks if
898 * the queue is empty.
899 *
900 * If a message is present, it returns it. Otherwise it blocks
901 * waiting for a message to arrive.
902 * When a message is returned, it is owned by the caller.
903 */
904 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
905 {
906 MessageQ_Object * obj = (MessageQ_Object *)handle;
907 Int status = MessageQ_S_SUCCESS;
908 struct timespec ts;
909 struct timeval tv;
911 #if 0
912 /*
913 * Optimization here to get a message without going in to the sem
914 * operation, but the sem count will not be maintained properly.
915 */
916 pthread_mutex_lock(&MessageQ_module->gate);
918 if (obj->msgList.cqh_first != &obj->msgList) {
919 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
920 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
922 pthread_mutex_unlock(&MessageQ_module->gate);
923 }
924 else {
925 pthread_mutex_unlock(&MessageQ_module->gate);
926 }
927 #endif
929 if (timeout == MessageQ_FOREVER) {
930 sem_wait(&obj->synchronizer);
931 }
932 else {
933 /* add timeout (microseconds) to current time of day */
934 gettimeofday(&tv, NULL);
935 tv.tv_sec += timeout / 1000000;
936 tv.tv_usec += timeout % 1000000;
938 if (tv.tv_usec >= 1000000) {
939 tv.tv_sec++;
940 tv.tv_usec -= 1000000;
941 }
943 /* set absolute timeout value */
944 ts.tv_sec = tv.tv_sec;
945 ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
947 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
948 if (errno == ETIMEDOUT) {
949 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
950 return (MessageQ_E_TIMEOUT);
951 }
952 else {
953 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
954 return (MessageQ_E_FAIL);
955 }
956 }
957 }
959 if (obj->unblocked) {
960 return MessageQ_E_UNBLOCKED;
961 }
963 pthread_mutex_lock(&MessageQ_module->gate);
965 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
966 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
968 pthread_mutex_unlock(&MessageQ_module->gate);
970 return status;
971 }
973 /*
974 * Return a count of the number of messages in the queue
975 *
976 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
977 */
978 Int MessageQ_count(MessageQ_Handle handle)
979 {
980 Int count = -1;
981 #if 0
982 MessageQ_Object * obj = (MessageQ_Object *) handle;
983 socklen_t optlen;
985 /*
986 * TBD: Need to find a way to implement (if anyone uses it!), and
987 * push down into transport..
988 */
990 /*
991 * 2nd arg to getsockopt should be transport independent, but using
992 * SSKPROTO_SHMFIFO for now:
993 */
994 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
995 &count, &optlen);
996 #endif
998 return count;
999 }
1001 /*
1002 * Initializes a message not obtained from MessageQ_alloc.
1003 */
1004 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1005 {
1006 /* Fill in the fields of the message */
1007 MessageQ_msgInit(msg);
1008 msg->heapId = MessageQ_STATICMSG;
1009 msg->msgSize = size;
1010 }
1012 /*
1013 * Allocate a message and initialize the needed fields (note some
1014 * of the fields in the header are set via other APIs or in the
1015 * MessageQ_put function,
1016 */
1017 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1018 {
1019 MessageQ_Msg msg;
1021 /*
1022 * heapId not used for local alloc (as this is over a copy transport), but
1023 * we need to send to other side as heapId is used in BIOS transport.
1024 */
1025 msg = (MessageQ_Msg)calloc(1, size);
1026 MessageQ_msgInit(msg);
1027 msg->msgSize = size;
1028 msg->heapId = heapId;
1030 return msg;
1031 }
1033 /*
1034 * Frees the message back to the heap that was used to allocate it.
1035 */
1036 Int MessageQ_free(MessageQ_Msg msg)
1037 {
1038 UInt32 status = MessageQ_S_SUCCESS;
1040 /* Check to ensure this was not allocated by user: */
1041 if (msg->heapId == MessageQ_STATICMSG) {
1042 status = MessageQ_E_CANNOTFREESTATICMSG;
1043 }
1044 else {
1045 free(msg);
1046 }
1048 return status;
1049 }
1051 /* Register a heap with MessageQ. */
1052 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1053 {
1054 Int status = MessageQ_S_SUCCESS;
1056 /* Do nothing, as this uses a copy transport */
1058 return status;
1059 }
1061 /* Unregister a heap with MessageQ. */
1062 Int MessageQ_unregisterHeap(UInt16 heapId)
1063 {
1064 Int status = MessageQ_S_SUCCESS;
1066 /* Do nothing, as this uses a copy transport */
1068 return status;
1069 }
1071 /* Unblocks a MessageQ */
1072 Void MessageQ_unblock(MessageQ_Handle handle)
1073 {
1074 MessageQ_Object *obj = (MessageQ_Object *)handle;
1076 obj->unblocked = TRUE;
1077 sem_post(&obj->synchronizer);
1078 }
1080 /* Embeds a source message queue into a message */
1081 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1082 {
1083 MessageQ_Object *obj = (MessageQ_Object *)handle;
1085 msg->replyId = (UInt16)(obj->queue);
1086 msg->replyProc = (UInt16)(obj->queue >> 16);
1087 }
1089 /* Returns the QueueId associated with the handle. */
1090 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1091 {
1092 MessageQ_Object *obj = (MessageQ_Object *) handle;
1093 UInt32 queueId;
1095 queueId = (obj->queue);
1097 return queueId;
1098 }
1100 /* Sets the tracing of a message */
1101 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1102 {
1103 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
1104 }
1106 /*
1107 * Returns the amount of shared memory used by one transport instance.
1108 *
1109 * The MessageQ module itself does not use any shared memory but the
1110 * underlying transport may use some shared memory.
1111 */
1112 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1113 {
1114 SizeT memReq = 0u;
1116 /* Do nothing, as this is a copy transport. */
1118 return memReq;
1119 }
1121 /*
1122 * This is a helper function to initialize a message.
1123 */
1124 Void MessageQ_msgInit(MessageQ_Msg msg)
1125 {
1126 #if 0
1127 Int status = MessageQ_S_SUCCESS;
1128 LAD_ClientHandle handle;
1129 struct LAD_CommandObj cmd;
1130 union LAD_ResponseObj rsp;
1132 handle = LAD_findHandle();
1133 if (handle == LAD_MAXNUMCLIENTS) {
1134 PRINTVERBOSE1(
1135 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1136 getpid())
1138 return;
1139 }
1141 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1142 cmd.clientId = handle;
1144 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1145 PRINTVERBOSE1(
1146 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1147 return;
1148 }
1150 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1151 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1152 return;
1153 }
1154 status = rsp.msgInit.status;
1156 PRINTVERBOSE2(
1157 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1158 handle, status)
1160 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1161 #else
1162 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1163 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1164 msg->msgId = MessageQ_INVALIDMSGID;
1165 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1166 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1167 msg->srcProc = MultiProc_self();
1169 pthread_mutex_lock(&MessageQ_module->gate);
1170 msg->seqNum = MessageQ_module->seqNum++;
1171 pthread_mutex_unlock(&MessageQ_module->gate);
1172 #endif
1173 }
1175 /*
1176 * ======== _MessageQ_grow ========
1177 * Increase module's queues array to accommodate queueIndex from LAD
1178 *
1179 * Note: this function takes the queue index value (i.e. without the
1180 * port offset).
1181 */
1182 Void _MessageQ_grow(UInt16 queueIndex)
1183 {
1184 MessageQ_Handle *queues;
1185 MessageQ_Handle *oldQueues;
1186 UInt oldSize;
1188 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1190 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1191 memcpy(queues, MessageQ_module->queues, oldSize);
1193 oldQueues = MessageQ_module->queues;
1194 MessageQ_module->queues = queues;
1195 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1197 free(oldQueues);
1199 return;
1200 }