1 /*
2 * Copyright (c) 2012-2014, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1 /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ITransport.h>
54 #include <IMessageQTransport.h>
55 #include <INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 /* Socket Protocol Family */
74 #include <net/rpmsg.h>
76 #include <ladclient.h>
77 #include <_lad.h>
79 /* =============================================================================
80 * Macros/Constants
81 * =============================================================================
82 */
84 /*!
85 * @brief Name of the reserved NameServer used for MessageQ.
86 */
87 #define MessageQ_NAMESERVER "MessageQ"
89 #define MessageQ_MAXTRANSPORTS 8
91 #define MessageQ_GROWSIZE 32
93 /* Trace flag settings: */
94 #define TRACESHIFT 12
95 #define TRACEMASK 0x1000
97 /* Define BENCHMARK to quiet key MessageQ APIs: */
98 //#define BENCHMARK
100 /* =============================================================================
101 * Structures & Enums
102 * =============================================================================
103 */
105 /* params structure evolution */
106 typedef struct {
107 Void *synchronizer;
108 } MessageQ_Params_Legacy;
110 typedef struct {
111 Int __version;
112 Void *synchronizer;
113 MessageQ_QueueIndex queueIndex;
114 } MessageQ_Params_Version2;
116 /* structure for MessageQ module state */
117 typedef struct MessageQ_ModuleObject {
118 MessageQ_Handle *queues;
119 Int numQueues;
120 Int refCount;
121 NameServer_Handle nameServer;
122 pthread_mutex_t gate;
123 int seqNum;
124 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
125 INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS];
126 MessageQ_PutHookFxn putHookFxn;
127 } MessageQ_ModuleObject;
129 typedef struct MessageQ_CIRCLEQ_ENTRY {
130 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
131 } MessageQ_CIRCLEQ_ENTRY;
133 /*!
134 * @brief Structure for the Handle for the MessageQ.
135 */
136 typedef struct MessageQ_Object_tag {
137 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
138 MessageQ_Params params;
139 MessageQ_QueueId queue;
140 int unblocked;
141 void *serverHandle;
142 sem_t synchronizer;
143 } MessageQ_Object;
145 /* traces in this file are controlled via _MessageQ_verbose */
146 Bool _MessageQ_verbose = FALSE;
147 #define verbose _MessageQ_verbose
149 /* =============================================================================
150 * Globals
151 * =============================================================================
152 */
153 static MessageQ_ModuleObject MessageQ_state =
154 {
155 .refCount = 0,
156 .nameServer = NULL,
157 .putHookFxn = NULL
158 };
160 /*!
161 * @var MessageQ_module
162 *
163 * @brief Pointer to the MessageQ module state.
164 */
165 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
167 Void _MessageQ_grow(UInt16 queueIndex);
169 /* =============================================================================
170 * APIS
171 * =============================================================================
172 */
174 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
175 UInt16 rprocId, UInt priority)
176 {
177 Int status = FALSE;
179 if (handle == NULL) {
180 printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
181 );
183 return status;
184 }
186 if (rprocId >= MultiProc_MAXPROCESSORS) {
187 printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
189 return status;
190 }
192 if (MessageQ_module->transports[rprocId][priority] == NULL) {
193 MessageQ_module->transports[rprocId][priority] = handle;
195 status = TRUE;
196 }
198 return status;
199 }
201 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
202 {
203 if (inst == NULL) {
204 printf("MessageQ_registerTransportId: invalid NULL handle\n");
206 return MessageQ_E_INVALIDARG;
207 }
209 if (tid >= MessageQ_MAXTRANSPORTS) {
210 printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
212 return MessageQ_E_INVALIDARG;
213 }
215 if (MessageQ_module->transInst[tid] != NULL) {
216 printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
218 return MessageQ_E_ALREADYEXISTS;
219 }
221 MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
223 return MessageQ_S_SUCCESS;
224 }
226 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
227 {
228 if (rprocId >= MultiProc_MAXPROCESSORS) {
229 printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
231 return;
232 }
234 MessageQ_module->transports[rprocId][priority] = NULL;
235 }
237 Void MessageQ_unregisterTransportId(UInt tid)
238 {
239 if (tid >= MessageQ_MAXTRANSPORTS) {
240 printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
242 return;
243 }
245 MessageQ_module->transInst[tid] = NULL;
246 }
248 /*
249 * Function to get default configuration for the MessageQ module.
250 */
251 Void MessageQ_getConfig(MessageQ_Config *cfg)
252 {
253 Int status;
254 LAD_ClientHandle handle;
255 struct LAD_CommandObj cmd;
256 union LAD_ResponseObj rsp;
258 assert (cfg != NULL);
260 handle = LAD_findHandle();
261 if (handle == LAD_MAXNUMCLIENTS) {
262 PRINTVERBOSE1(
263 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
264 getpid())
266 return;
267 }
269 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
270 cmd.clientId = handle;
272 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
273 PRINTVERBOSE1(
274 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
275 return;
276 }
278 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
279 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
280 status)
281 return;
282 }
283 status = rsp.messageQGetConfig.status;
285 PRINTVERBOSE2(
286 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
287 handle, status)
289 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
291 return;
292 }
294 /*
295 * Function to setup the MessageQ module.
296 */
297 Int MessageQ_setup(const MessageQ_Config *cfg)
298 {
299 Int status;
300 LAD_ClientHandle handle;
301 struct LAD_CommandObj cmd;
302 union LAD_ResponseObj rsp;
303 Int pri;
304 Int rprocId;
305 Int tid;
307 pthread_mutex_lock(&MessageQ_module->gate);
309 MessageQ_module->refCount++;
310 if (MessageQ_module->refCount > 1) {
312 pthread_mutex_unlock(&MessageQ_module->gate);
314 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
315 MessageQ_module->refCount)
317 return MessageQ_S_ALREADYSETUP;
318 }
320 pthread_mutex_unlock(&MessageQ_module->gate);
322 handle = LAD_findHandle();
323 if (handle == LAD_MAXNUMCLIENTS) {
324 PRINTVERBOSE1(
325 "MessageQ_setup: can't find connection to daemon for pid %d\n",
326 getpid())
328 return MessageQ_E_RESOURCE;
329 }
331 cmd.cmd = LAD_MESSAGEQ_SETUP;
332 cmd.clientId = handle;
333 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
335 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
336 PRINTVERBOSE1(
337 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
338 return MessageQ_E_FAIL;
339 }
341 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
342 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
343 return status;
344 }
345 status = rsp.setup.status;
347 PRINTVERBOSE2(
348 "MessageQ_setup: got LAD response for client %d, status=%d\n",
349 handle, status)
351 MessageQ_module->seqNum = 0;
352 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
353 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
354 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
355 sizeof (MessageQ_Handle));
357 pthread_mutex_init(&MessageQ_module->gate, NULL);
359 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
360 for (pri = 0; pri < 2; pri++) {
361 MessageQ_module->transports[rprocId][pri] = NULL;
362 }
363 }
364 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
365 MessageQ_module->transInst[tid] = NULL;
366 }
368 return status;
369 }
371 /*
372 * MessageQ_destroy - destroy the MessageQ module.
373 */
374 Int MessageQ_destroy(void)
375 {
376 Int status;
377 LAD_ClientHandle handle;
378 struct LAD_CommandObj cmd;
379 union LAD_ResponseObj rsp;
381 handle = LAD_findHandle();
382 if (handle == LAD_MAXNUMCLIENTS) {
383 PRINTVERBOSE1(
384 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
385 getpid())
387 return MessageQ_E_RESOURCE;
388 }
390 cmd.cmd = LAD_MESSAGEQ_DESTROY;
391 cmd.clientId = handle;
393 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
394 PRINTVERBOSE1(
395 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
396 return MessageQ_E_FAIL;
397 }
399 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
400 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
401 return status;
402 }
403 status = rsp.status;
405 PRINTVERBOSE2(
406 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
407 handle, status)
409 return status;
410 }
412 /*
413 * ======== MessageQ_Params_init ========
414 * Legacy implementation.
415 */
416 Void MessageQ_Params_init(MessageQ_Params *params)
417 {
418 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
419 }
421 /*
422 * ======== MessageQ_Params_init__S ========
423 * New implementation which is version aware.
424 */
425 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
426 {
427 MessageQ_Params_Version2 *params2;
429 switch (version) {
431 case MessageQ_Params_VERSION_2:
432 params2 = (MessageQ_Params_Version2 *)params;
433 params2->__version = MessageQ_Params_VERSION_2;
434 params2->synchronizer = NULL;
435 params2->queueIndex = MessageQ_ANY;
436 break;
438 default:
439 assert(FALSE);
440 break;
441 }
442 }
444 /*
445 * MessageQ_create - create a MessageQ object for receiving.
446 */
447 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
448 {
449 Int status;
450 MessageQ_Object *obj = NULL;
451 IMessageQTransport_Handle transport;
452 INetworkTransport_Handle transInst;
453 UInt16 queueIndex;
454 UInt16 rprocId;
455 Int tid;
456 Int priority;
457 LAD_ClientHandle handle;
458 struct LAD_CommandObj cmd;
459 union LAD_ResponseObj rsp;
460 MessageQ_Params ps;
462 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
464 /* copy the given params into the current params structure */
465 if (pp != NULL) {
467 /* snoop the params pointer to see if it's a legacy structure */
468 if ((pp->__version == 0) || (pp->__version > 100)) {
469 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
470 }
472 /* not legacy structure, use params version field */
473 else if (pp->__version == MessageQ_Params_VERSION_2) {
474 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
475 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
476 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
477 }
478 else {
479 assert(FALSE);
480 }
481 }
483 handle = LAD_findHandle();
484 if (handle == LAD_MAXNUMCLIENTS) {
485 PRINTVERBOSE1(
486 "MessageQ_create: can't find connection to daemon for pid %d\n",
487 getpid())
489 return NULL;
490 }
492 cmd.cmd = LAD_MESSAGEQ_CREATE;
493 cmd.clientId = handle;
495 if (name == NULL) {
496 cmd.args.messageQCreate.name[0] = '\0';
497 }
498 else {
499 strncpy(cmd.args.messageQCreate.name, name,
500 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
501 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
502 }
504 memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
506 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
507 PRINTVERBOSE1(
508 "MessageQ_create: sending LAD command failed, status=%d\n", status)
509 return NULL;
510 }
512 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
513 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
514 return NULL;
515 }
516 status = rsp.messageQCreate.status;
518 PRINTVERBOSE2(
519 "MessageQ_create: got LAD response for client %d, status=%d\n",
520 handle, status)
522 if (status == -1) {
523 PRINTVERBOSE1(
524 "MessageQ_create: MessageQ server operation failed, status=%d\n",
525 status)
526 return NULL;
527 }
529 /* Create the generic obj */
530 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
532 /* Populate the params member */
533 memcpy(&obj->params, &ps, sizeof(ps));
535 queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
537 obj->queue = rsp.messageQCreate.queueId;
538 obj->serverHandle = rsp.messageQCreate.serverHandle;
539 CIRCLEQ_INIT(&obj->msgList);
540 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
541 PRINTVERBOSE1(
542 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
544 MessageQ_delete((MessageQ_Handle *)&obj);
546 return NULL;
547 }
549 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
551 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
552 for (priority = 0; priority < 2; priority++) {
553 transport = MessageQ_module->transports[rprocId][priority];
554 if (transport) {
555 /* need to check return and do something if error */
556 IMessageQTransport_bind((Void *)transport, obj->queue);
557 }
558 }
559 }
560 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
561 transInst = MessageQ_module->transInst[tid];
562 if (transInst) {
563 /* need to check return and do something if error */
564 INetworkTransport_bind((Void *)transInst, obj->queue);
565 }
566 }
568 /*
569 * Since LAD's MessageQ_module can grow, we need to be able to grow as well
570 */
571 if (queueIndex >= MessageQ_module->numQueues) {
572 _MessageQ_grow(queueIndex);
573 }
575 /*
576 * No need to "allocate" slot since the queueIndex returned by
577 * LAD is guaranteed to be unique.
578 */
579 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
581 return (MessageQ_Handle)obj;
582 }
584 /*
585 * MessageQ_delete - delete a MessageQ object.
586 */
587 Int MessageQ_delete(MessageQ_Handle *handlePtr)
588 {
589 MessageQ_Object *obj;
590 IMessageQTransport_Handle transport;
591 INetworkTransport_Handle transInst;
592 Int status = MessageQ_S_SUCCESS;
593 UInt16 queueIndex;
594 UInt16 rprocId;
595 Int tid;
596 Int priority;
597 LAD_ClientHandle handle;
598 struct LAD_CommandObj cmd;
599 union LAD_ResponseObj rsp;
601 handle = LAD_findHandle();
602 if (handle == LAD_MAXNUMCLIENTS) {
603 PRINTVERBOSE1(
604 "MessageQ_delete: can't find connection to daemon for pid %d\n",
605 getpid())
607 return MessageQ_E_FAIL;
608 }
610 obj = (MessageQ_Object *)(*handlePtr);
612 cmd.cmd = LAD_MESSAGEQ_DELETE;
613 cmd.clientId = handle;
614 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
616 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
617 PRINTVERBOSE1(
618 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
619 return MessageQ_E_FAIL;
620 }
622 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
623 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
624 return MessageQ_E_FAIL;
625 }
626 status = rsp.messageQDelete.status;
628 PRINTVERBOSE2(
629 "MessageQ_delete: got LAD response for client %d, status=%d\n",
630 handle, status)
632 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
633 for (priority = 0; priority < 2; priority++) {
634 transport = MessageQ_module->transports[rprocId][priority];
635 if (transport) {
636 IMessageQTransport_unbind((Void *)transport, obj->queue);
637 }
638 }
639 }
640 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
641 transInst = MessageQ_module->transInst[tid];
642 if (transInst) {
643 INetworkTransport_unbind((Void *)transInst, obj->queue);
644 }
645 }
647 queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
648 MessageQ_module->queues[queueIndex] = NULL;
650 free(obj);
651 *handlePtr = NULL;
653 return status;
654 }
656 /*
657 * MessageQ_open - Opens an instance of MessageQ for sending.
658 */
659 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
660 {
661 Int status = MessageQ_S_SUCCESS;
663 status = NameServer_getUInt32(MessageQ_module->nameServer,
664 name, queueId, NULL);
666 if (status == NameServer_E_NOTFOUND) {
667 /* Set return queue ID to invalid */
668 *queueId = MessageQ_INVALIDMESSAGEQ;
669 status = MessageQ_E_NOTFOUND;
670 }
671 else if (status >= 0) {
672 /* Override with a MessageQ status code */
673 status = MessageQ_S_SUCCESS;
674 }
675 else {
676 /* Set return queue ID to invalid */
677 *queueId = MessageQ_INVALIDMESSAGEQ;
679 /* Override with a MessageQ status code */
680 if (status == NameServer_E_TIMEOUT) {
681 status = MessageQ_E_TIMEOUT;
682 }
683 else {
684 status = MessageQ_E_FAIL;
685 }
686 }
688 return status;
689 }
691 /*
692 * MessageQ_close - Closes previously opened instance of MessageQ.
693 */
694 Int MessageQ_close(MessageQ_QueueId *queueId)
695 {
696 Int32 status = MessageQ_S_SUCCESS;
698 /* Nothing more to be done for closing the MessageQ. */
699 *queueId = MessageQ_INVALIDMESSAGEQ;
701 return status;
702 }
704 /*
705 * MessageQ_put - place a message onto a message queue.
706 *
707 * Calls transport's put(), which handles the sending of the message using the
708 * appropriate kernel interface (socket, device ioctl) call for the remote
709 * procId encoded in the queueId argument.
710 *
711 */
712 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
713 {
714 MessageQ_Object *obj;
715 UInt16 dstProcId = (UInt16)(queueId >> 16);
716 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
717 Int status = MessageQ_S_SUCCESS;
718 ITransport_Handle transport;
719 IMessageQTransport_Handle msgTrans;
720 INetworkTransport_Handle netTrans;
721 Int priority;
722 UInt tid;
724 msg->dstId = queueIndex;
725 msg->dstProc = dstProcId;
727 /* invoke put hook function after addressing the message */
728 if (MessageQ_module->putHookFxn != NULL) {
729 MessageQ_module->putHookFxn(queueId, msg);
730 }
732 if (dstProcId != MultiProc_self()) {
733 tid = MessageQ_getTransportId(msg);
734 if (tid == 0) {
735 priority = MessageQ_getMsgPri(msg);
736 msgTrans = MessageQ_module->transports[dstProcId][priority];
738 IMessageQTransport_put(msgTrans, (Ptr)msg);
739 }
740 else {
741 if (tid >= MessageQ_MAXTRANSPORTS) {
742 printf("MessageQ_put: transport id %d too big, must be < %d\n",
743 tid, MessageQ_MAXTRANSPORTS);
745 return MessageQ_E_FAIL;
746 }
748 /* use secondary transport */
749 netTrans = MessageQ_module->transInst[tid];
750 transport = INetworkTransport_upCast(netTrans);
752 /* downcast instance pointer to transport interface */
753 switch (ITransport_itype(transport)) {
754 case INetworkTransport_TypeId:
755 INetworkTransport_put(netTrans, (Ptr)msg);
757 break;
759 default:
760 /* error */
761 printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
763 status = MessageQ_E_FAIL;
765 break;
766 }
767 }
768 }
769 else {
770 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
772 pthread_mutex_lock(&MessageQ_module->gate);
774 /* It is a local MessageQ */
775 CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
777 pthread_mutex_unlock(&MessageQ_module->gate);
779 sem_post(&obj->synchronizer);
780 }
782 return status;
783 }
785 /*
786 * MessageQ_get - gets a message for a message queue and blocks if
787 * the queue is empty.
788 *
789 * If a message is present, it returns it. Otherwise it blocks
790 * waiting for a message to arrive.
791 * When a message is returned, it is owned by the caller.
792 */
793 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
794 {
795 MessageQ_Object * obj = (MessageQ_Object *)handle;
796 Int status = MessageQ_S_SUCCESS;
797 struct timespec ts;
798 struct timeval tv;
800 #if 0
801 /*
802 * Optimization here to get a message without going in to the sem
803 * operation, but the sem count will not be maintained properly.
804 */
805 pthread_mutex_lock(&MessageQ_module->gate);
807 if (obj->msgList.cqh_first != &obj->msgList) {
808 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
809 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
811 pthread_mutex_unlock(&MessageQ_module->gate);
812 }
813 else {
814 pthread_mutex_unlock(&MessageQ_module->gate);
815 }
816 #endif
818 if (timeout == MessageQ_FOREVER) {
819 sem_wait(&obj->synchronizer);
820 }
821 else {
822 gettimeofday(&tv, NULL);
823 ts.tv_sec = tv.tv_sec;
824 ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
826 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
827 if (errno == ETIMEDOUT) {
828 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
830 return MessageQ_E_TIMEOUT;
831 }
832 }
833 }
835 if (obj->unblocked) {
836 return MessageQ_E_UNBLOCKED;
837 }
839 pthread_mutex_lock(&MessageQ_module->gate);
841 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
842 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
844 pthread_mutex_unlock(&MessageQ_module->gate);
846 return status;
847 }
849 /*
850 * Return a count of the number of messages in the queue
851 *
852 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
853 */
854 Int MessageQ_count(MessageQ_Handle handle)
855 {
856 Int count = -1;
857 #if 0
858 MessageQ_Object * obj = (MessageQ_Object *) handle;
859 socklen_t optlen;
861 /*
862 * TBD: Need to find a way to implement (if anyone uses it!), and
863 * push down into transport..
864 */
866 /*
867 * 2nd arg to getsockopt should be transport independent, but using
868 * SSKPROTO_SHMFIFO for now:
869 */
870 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
871 &count, &optlen);
872 #endif
874 return count;
875 }
877 /*
878 * Initializes a message not obtained from MessageQ_alloc.
879 */
880 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
881 {
882 /* Fill in the fields of the message */
883 MessageQ_msgInit(msg);
884 msg->heapId = MessageQ_STATICMSG;
885 msg->msgSize = size;
886 }
888 /*
889 * Allocate a message and initialize the needed fields (note some
890 * of the fields in the header are set via other APIs or in the
891 * MessageQ_put function,
892 */
893 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
894 {
895 MessageQ_Msg msg;
897 /*
898 * heapId not used for local alloc (as this is over a copy transport), but
899 * we need to send to other side as heapId is used in BIOS transport.
900 */
901 msg = (MessageQ_Msg)calloc(1, size);
902 MessageQ_msgInit(msg);
903 msg->msgSize = size;
904 msg->heapId = heapId;
906 return msg;
907 }
909 /*
910 * Frees the message back to the heap that was used to allocate it.
911 */
912 Int MessageQ_free(MessageQ_Msg msg)
913 {
914 UInt32 status = MessageQ_S_SUCCESS;
916 /* Check to ensure this was not allocated by user: */
917 if (msg->heapId == MessageQ_STATICMSG) {
918 status = MessageQ_E_CANNOTFREESTATICMSG;
919 }
920 else {
921 free(msg);
922 }
924 return status;
925 }
927 /* Register a heap with MessageQ. */
928 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
929 {
930 Int status = MessageQ_S_SUCCESS;
932 /* Do nothing, as this uses a copy transport */
934 return status;
935 }
937 /* Unregister a heap with MessageQ. */
938 Int MessageQ_unregisterHeap(UInt16 heapId)
939 {
940 Int status = MessageQ_S_SUCCESS;
942 /* Do nothing, as this uses a copy transport */
944 return status;
945 }
947 /* Unblocks a MessageQ */
948 Void MessageQ_unblock(MessageQ_Handle handle)
949 {
950 MessageQ_Object *obj = (MessageQ_Object *)handle;
952 obj->unblocked = TRUE;
953 sem_post(&obj->synchronizer);
954 }
956 /* Embeds a source message queue into a message */
957 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
958 {
959 MessageQ_Object *obj = (MessageQ_Object *)handle;
961 msg->replyId = (UInt16)(obj->queue);
962 msg->replyProc = (UInt16)(obj->queue >> 16);
963 }
965 /* Returns the QueueId associated with the handle. */
966 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
967 {
968 MessageQ_Object *obj = (MessageQ_Object *) handle;
969 UInt32 queueId;
971 queueId = (obj->queue);
973 return queueId;
974 }
976 /* Sets the tracing of a message */
977 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
978 {
979 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
980 }
982 /*
983 * Returns the amount of shared memory used by one transport instance.
984 *
985 * The MessageQ module itself does not use any shared memory but the
986 * underlying transport may use some shared memory.
987 */
988 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
989 {
990 SizeT memReq = 0u;
992 /* Do nothing, as this is a copy transport. */
994 return memReq;
995 }
997 /*
998 * This is a helper function to initialize a message.
999 */
1000 Void MessageQ_msgInit(MessageQ_Msg msg)
1001 {
1002 #if 0
1003 Int status = MessageQ_S_SUCCESS;
1004 LAD_ClientHandle handle;
1005 struct LAD_CommandObj cmd;
1006 union LAD_ResponseObj rsp;
1008 handle = LAD_findHandle();
1009 if (handle == LAD_MAXNUMCLIENTS) {
1010 PRINTVERBOSE1(
1011 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1012 getpid())
1014 return;
1015 }
1017 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1018 cmd.clientId = handle;
1020 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1021 PRINTVERBOSE1(
1022 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1023 return;
1024 }
1026 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1027 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1028 return;
1029 }
1030 status = rsp.msgInit.status;
1032 PRINTVERBOSE2(
1033 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1034 handle, status)
1036 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1037 #else
1038 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1039 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1040 msg->msgId = MessageQ_INVALIDMSGID;
1041 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1042 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1043 msg->srcProc = MultiProc_self();
1045 pthread_mutex_lock(&MessageQ_module->gate);
1046 msg->seqNum = MessageQ_module->seqNum++;
1047 pthread_mutex_unlock(&MessageQ_module->gate);
1048 #endif
1049 }
1051 /*
1052 * Grow module's queues[] array to accommodate queueIndex from LAD
1053 */
1054 Void _MessageQ_grow(UInt16 queueIndex)
1055 {
1056 MessageQ_Handle *queues;
1057 MessageQ_Handle *oldQueues;
1058 UInt oldSize;
1060 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1062 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1063 memcpy(queues, MessageQ_module->queues, oldSize);
1065 oldQueues = MessageQ_module->queues;
1066 MessageQ_module->queues = queues;
1067 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1069 free(oldQueues);
1071 return;
1072 }