2fc61e32a10808d1d848d85e94eb52eb008f6928
1 /*
2 * Copyright (c) 2012-2014, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <_MessageQ.h>
52 #include <ITransport.h>
53 #include <IMessageQTransport.h>
54 #include <INetworkTransport.h>
56 /* Socket Headers */
57 #include <sys/select.h>
58 #include <sys/time.h>
59 #include <sys/types.h>
60 #include <sys/param.h>
61 #include <sys/eventfd.h>
62 #include <sys/queue.h>
63 #include <errno.h>
64 #include <stdio.h>
65 #include <string.h>
66 #include <stdlib.h>
67 #include <unistd.h>
68 #include <assert.h>
69 #include <pthread.h>
70 #include <semaphore.h>
72 /* Socket Protocol Family */
73 #include <net/rpmsg.h>
75 #include <ladclient.h>
76 #include <_lad.h>
78 /* =============================================================================
79 * Macros/Constants
80 * =============================================================================
81 */
83 /*!
84 * @brief Name of the reserved NameServer used for MessageQ.
85 */
86 #define MessageQ_NAMESERVER "MessageQ"
88 #define MessageQ_MAXTRANSPORTS 8
90 #define MessageQ_GROWSIZE 32
92 /* Trace flag settings: */
93 #define TRACESHIFT 12
94 #define TRACEMASK 0x1000
96 /* Define BENCHMARK to quiet key MessageQ APIs: */
97 //#define BENCHMARK
99 /* =============================================================================
100 * Structures & Enums
101 * =============================================================================
102 */
104 /* structure for MessageQ module state */
105 typedef struct MessageQ_ModuleObject {
106 MessageQ_Handle *queues;
107 Int numQueues;
108 Int refCount;
109 NameServer_Handle nameServer;
110 pthread_mutex_t gate;
111 MessageQ_Params defaultInstParams;
112 int seqNum;
113 IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
114 INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS];
115 MessageQ_PutHookFxn putHookFxn;
116 } MessageQ_ModuleObject;
118 typedef struct MessageQ_CIRCLEQ_ENTRY {
119 CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
120 } MessageQ_CIRCLEQ_ENTRY;
122 /*!
123 * @brief Structure for the Handle for the MessageQ.
124 */
125 typedef struct MessageQ_Object_tag {
126 CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
127 MessageQ_Params params;
128 MessageQ_QueueId queue;
129 int unblocked;
130 void *serverHandle;
131 sem_t synchronizer;
132 } MessageQ_Object;
134 /* traces in this file are controlled via _MessageQ_verbose */
135 Bool _MessageQ_verbose = FALSE;
136 #define verbose _MessageQ_verbose
138 /* =============================================================================
139 * Globals
140 * =============================================================================
141 */
142 static MessageQ_ModuleObject MessageQ_state =
143 {
144 .refCount = 0,
145 .nameServer = NULL,
146 .putHookFxn = NULL
147 };
149 /*!
150 * @var MessageQ_module
151 *
152 * @brief Pointer to the MessageQ module state.
153 */
154 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
156 Void _MessageQ_grow(UInt16 queueIndex);
158 /* =============================================================================
159 * APIS
160 * =============================================================================
161 */
163 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
164 UInt16 rprocId, UInt priority)
165 {
166 Int status = FALSE;
168 if (handle == NULL) {
169 printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
170 );
172 return status;
173 }
175 if (rprocId >= MultiProc_MAXPROCESSORS) {
176 printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
178 return status;
179 }
181 if (MessageQ_module->transports[rprocId][priority] == NULL) {
182 MessageQ_module->transports[rprocId][priority] = handle;
184 status = TRUE;
185 }
187 return status;
188 }
190 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
191 {
192 if (inst == NULL) {
193 printf("MessageQ_registerTransportId: invalid NULL handle\n");
195 return MessageQ_E_INVALIDARG;
196 }
198 if (tid >= MessageQ_MAXTRANSPORTS) {
199 printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
201 return MessageQ_E_INVALIDARG;
202 }
204 if (MessageQ_module->transInst[tid] != NULL) {
205 printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
207 return MessageQ_E_ALREADYEXISTS;
208 }
210 MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
212 return MessageQ_S_SUCCESS;
213 }
215 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
216 {
217 if (rprocId >= MultiProc_MAXPROCESSORS) {
218 printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
220 return;
221 }
223 MessageQ_module->transports[rprocId][priority] = NULL;
224 }
226 Void MessageQ_unregisterTransportId(UInt tid)
227 {
228 if (tid >= MessageQ_MAXTRANSPORTS) {
229 printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
231 return;
232 }
234 MessageQ_module->transInst[tid] = NULL;
235 }
237 /*
238 * Function to get default configuration for the MessageQ module.
239 */
240 Void MessageQ_getConfig(MessageQ_Config *cfg)
241 {
242 Int status;
243 LAD_ClientHandle handle;
244 struct LAD_CommandObj cmd;
245 union LAD_ResponseObj rsp;
247 assert (cfg != NULL);
249 handle = LAD_findHandle();
250 if (handle == LAD_MAXNUMCLIENTS) {
251 PRINTVERBOSE1(
252 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
253 getpid())
255 return;
256 }
258 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
259 cmd.clientId = handle;
261 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
262 PRINTVERBOSE1(
263 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
264 return;
265 }
267 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
268 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
269 status)
270 return;
271 }
272 status = rsp.messageQGetConfig.status;
274 PRINTVERBOSE2(
275 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
276 handle, status)
278 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
280 return;
281 }
283 /*
284 * Function to setup the MessageQ module.
285 */
286 Int MessageQ_setup(const MessageQ_Config *cfg)
287 {
288 Int status;
289 LAD_ClientHandle handle;
290 struct LAD_CommandObj cmd;
291 union LAD_ResponseObj rsp;
292 Int pri;
293 Int rprocId;
294 Int tid;
296 pthread_mutex_lock(&MessageQ_module->gate);
298 MessageQ_module->refCount++;
299 if (MessageQ_module->refCount > 1) {
301 pthread_mutex_unlock(&MessageQ_module->gate);
303 PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
304 MessageQ_module->refCount)
306 return MessageQ_S_ALREADYSETUP;
307 }
309 pthread_mutex_unlock(&MessageQ_module->gate);
311 handle = LAD_findHandle();
312 if (handle == LAD_MAXNUMCLIENTS) {
313 PRINTVERBOSE1(
314 "MessageQ_setup: can't find connection to daemon for pid %d\n",
315 getpid())
317 return MessageQ_E_RESOURCE;
318 }
320 cmd.cmd = LAD_MESSAGEQ_SETUP;
321 cmd.clientId = handle;
322 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
324 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
325 PRINTVERBOSE1(
326 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
327 return MessageQ_E_FAIL;
328 }
330 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
331 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
332 return status;
333 }
334 status = rsp.setup.status;
336 PRINTVERBOSE2(
337 "MessageQ_setup: got LAD response for client %d, status=%d\n",
338 handle, status)
340 MessageQ_module->seqNum = 0;
341 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
342 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
343 MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
344 sizeof (MessageQ_Handle));
346 pthread_mutex_init(&MessageQ_module->gate, NULL);
348 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
349 for (pri = 0; pri < 2; pri++) {
350 MessageQ_module->transports[rprocId][pri] = NULL;
351 }
352 }
353 for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
354 MessageQ_module->transInst[tid] = NULL;
355 }
357 return status;
358 }
360 /*
361 * MessageQ_destroy - destroy the MessageQ module.
362 */
363 Int MessageQ_destroy(void)
364 {
365 Int status;
366 LAD_ClientHandle handle;
367 struct LAD_CommandObj cmd;
368 union LAD_ResponseObj rsp;
370 handle = LAD_findHandle();
371 if (handle == LAD_MAXNUMCLIENTS) {
372 PRINTVERBOSE1(
373 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
374 getpid())
376 return MessageQ_E_RESOURCE;
377 }
379 cmd.cmd = LAD_MESSAGEQ_DESTROY;
380 cmd.clientId = handle;
382 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
383 PRINTVERBOSE1(
384 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
385 return MessageQ_E_FAIL;
386 }
388 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
389 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
390 return status;
391 }
392 status = rsp.status;
394 PRINTVERBOSE2(
395 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
396 handle, status)
398 return status;
399 }
402 /* Function to initialize the parameters for the MessageQ instance. */
403 Void MessageQ_Params_init(MessageQ_Params *params)
404 {
405 memcpy (params, &(MessageQ_module->defaultInstParams),
406 sizeof (MessageQ_Params));
408 return;
409 }
411 /*
412 * MessageQ_create - create a MessageQ object for receiving.
413 */
414 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
415 {
416 Int status;
417 MessageQ_Object *obj = NULL;
418 IMessageQTransport_Handle transport;
419 INetworkTransport_Handle transInst;
420 UInt16 queueIndex;
421 UInt16 rprocId;
422 Int tid;
423 Int priority;
424 LAD_ClientHandle handle;
425 struct LAD_CommandObj cmd;
426 union LAD_ResponseObj rsp;
428 handle = LAD_findHandle();
429 if (handle == LAD_MAXNUMCLIENTS) {
430 PRINTVERBOSE1(
431 "MessageQ_create: can't find connection to daemon for pid %d\n",
432 getpid())
434 return NULL;
435 }
437 cmd.cmd = LAD_MESSAGEQ_CREATE;
438 cmd.clientId = handle;
439 if (name == NULL) {
440 cmd.args.messageQCreate.name[0] = '\0';
441 }
442 else {
443 strncpy(cmd.args.messageQCreate.name, name,
444 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
445 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
446 }
448 if (params) {
449 memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
450 }
452 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
453 PRINTVERBOSE1(
454 "MessageQ_create: sending LAD command failed, status=%d\n", status)
455 return NULL;
456 }
458 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
459 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
460 return NULL;
461 }
462 status = rsp.messageQCreate.status;
464 PRINTVERBOSE2(
465 "MessageQ_create: got LAD response for client %d, status=%d\n",
466 handle, status)
468 if (status == -1) {
469 PRINTVERBOSE1(
470 "MessageQ_create: MessageQ server operation failed, status=%d\n",
471 status)
472 return NULL;
473 }
475 /* Create the generic obj */
476 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
478 if (params != NULL) {
479 /* Populate the params member */
480 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
481 }
483 queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
485 obj->queue = rsp.messageQCreate.queueId;
486 obj->serverHandle = rsp.messageQCreate.serverHandle;
487 CIRCLEQ_INIT(&obj->msgList);
488 if (sem_init(&obj->synchronizer, 0, 0) < 0) {
489 PRINTVERBOSE1(
490 "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
492 MessageQ_delete((MessageQ_Handle *)&obj);
494 return NULL;
495 }
497 PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
499 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
500 for (priority = 0; priority < 2; priority++) {
501 transport = MessageQ_module->transports[rprocId][priority];
502 if (transport) {
503 /* need to check return and do something if error */
504 IMessageQTransport_bind((Void *)transport, obj->queue);
505 }
506 }
507 }
508 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
509 transInst = MessageQ_module->transInst[tid];
510 if (transInst) {
511 /* need to check return and do something if error */
512 INetworkTransport_bind((Void *)transInst, obj->queue);
513 }
514 }
516 /*
517 * Since LAD's MessageQ_module can grow, we need to be able to grow as well
518 */
519 if (queueIndex >= MessageQ_module->numQueues) {
520 _MessageQ_grow(queueIndex);
521 }
523 /*
524 * No need to "allocate" slot since the queueIndex returned by
525 * LAD is guaranteed to be unique.
526 */
527 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
529 return (MessageQ_Handle)obj;
530 }
532 /*
533 * MessageQ_delete - delete a MessageQ object.
534 */
535 Int MessageQ_delete(MessageQ_Handle *handlePtr)
536 {
537 MessageQ_Object *obj;
538 IMessageQTransport_Handle transport;
539 INetworkTransport_Handle transInst;
540 Int status = MessageQ_S_SUCCESS;
541 UInt16 queueIndex;
542 UInt16 rprocId;
543 Int tid;
544 Int priority;
545 LAD_ClientHandle handle;
546 struct LAD_CommandObj cmd;
547 union LAD_ResponseObj rsp;
549 handle = LAD_findHandle();
550 if (handle == LAD_MAXNUMCLIENTS) {
551 PRINTVERBOSE1(
552 "MessageQ_delete: can't find connection to daemon for pid %d\n",
553 getpid())
555 return MessageQ_E_FAIL;
556 }
558 obj = (MessageQ_Object *)(*handlePtr);
560 cmd.cmd = LAD_MESSAGEQ_DELETE;
561 cmd.clientId = handle;
562 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
564 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
565 PRINTVERBOSE1(
566 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
567 return MessageQ_E_FAIL;
568 }
570 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
571 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
572 return MessageQ_E_FAIL;
573 }
574 status = rsp.messageQDelete.status;
576 PRINTVERBOSE2(
577 "MessageQ_delete: got LAD response for client %d, status=%d\n",
578 handle, status)
580 for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
581 for (priority = 0; priority < 2; priority++) {
582 transport = MessageQ_module->transports[rprocId][priority];
583 if (transport) {
584 IMessageQTransport_unbind((Void *)transport, obj->queue);
585 }
586 }
587 }
588 for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
589 transInst = MessageQ_module->transInst[tid];
590 if (transInst) {
591 INetworkTransport_unbind((Void *)transInst, obj->queue);
592 }
593 }
595 queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
596 MessageQ_module->queues[queueIndex] = NULL;
598 free(obj);
599 *handlePtr = NULL;
601 return status;
602 }
604 /*
605 * MessageQ_open - Opens an instance of MessageQ for sending.
606 */
607 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
608 {
609 Int status = MessageQ_S_SUCCESS;
611 status = NameServer_getUInt32(MessageQ_module->nameServer,
612 name, queueId, NULL);
614 if (status == NameServer_E_NOTFOUND) {
615 /* Set return queue ID to invalid */
616 *queueId = MessageQ_INVALIDMESSAGEQ;
617 status = MessageQ_E_NOTFOUND;
618 }
619 else if (status >= 0) {
620 /* Override with a MessageQ status code */
621 status = MessageQ_S_SUCCESS;
622 }
623 else {
624 /* Set return queue ID to invalid */
625 *queueId = MessageQ_INVALIDMESSAGEQ;
627 /* Override with a MessageQ status code */
628 if (status == NameServer_E_TIMEOUT) {
629 status = MessageQ_E_TIMEOUT;
630 }
631 else {
632 status = MessageQ_E_FAIL;
633 }
634 }
636 return status;
637 }
639 /*
640 * MessageQ_close - Closes previously opened instance of MessageQ.
641 */
642 Int MessageQ_close(MessageQ_QueueId *queueId)
643 {
644 Int32 status = MessageQ_S_SUCCESS;
646 /* Nothing more to be done for closing the MessageQ. */
647 *queueId = MessageQ_INVALIDMESSAGEQ;
649 return status;
650 }
652 /*
653 * MessageQ_put - place a message onto a message queue.
654 *
655 * Calls transport's put(), which handles the sending of the message using the
656 * appropriate kernel interface (socket, device ioctl) call for the remote
657 * procId encoded in the queueId argument.
658 *
659 */
660 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
661 {
662 MessageQ_Object *obj;
663 UInt16 dstProcId = (UInt16)(queueId >> 16);
664 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
665 Int status = MessageQ_S_SUCCESS;
666 ITransport_Handle transport;
667 IMessageQTransport_Handle msgTrans;
668 INetworkTransport_Handle netTrans;
669 Int priority;
670 UInt tid;
672 msg->dstId = queueIndex;
673 msg->dstProc = dstProcId;
675 /* invoke put hook function after addressing the message */
676 if (MessageQ_module->putHookFxn != NULL) {
677 MessageQ_module->putHookFxn(queueId, msg);
678 }
680 if (dstProcId != MultiProc_self()) {
681 tid = MessageQ_getTransportId(msg);
682 if (tid == 0) {
683 priority = MessageQ_getMsgPri(msg);
684 msgTrans = MessageQ_module->transports[dstProcId][priority];
686 IMessageQTransport_put(msgTrans, (Ptr)msg);
687 }
688 else {
689 if (tid >= MessageQ_MAXTRANSPORTS) {
690 printf("MessageQ_put: transport id %d too big, must be < %d\n",
691 tid, MessageQ_MAXTRANSPORTS);
693 return MessageQ_E_FAIL;
694 }
696 /* use secondary transport */
697 netTrans = MessageQ_module->transInst[tid];
698 transport = INetworkTransport_upCast(netTrans);
700 /* downcast instance pointer to transport interface */
701 switch (ITransport_itype(transport)) {
702 case INetworkTransport_TypeId:
703 INetworkTransport_put(netTrans, (Ptr)msg);
705 break;
707 default:
708 /* error */
709 printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
711 status = MessageQ_E_FAIL;
713 break;
714 }
715 }
716 }
717 else {
718 obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
720 pthread_mutex_lock(&MessageQ_module->gate);
722 /* It is a local MessageQ */
723 CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
725 pthread_mutex_unlock(&MessageQ_module->gate);
727 sem_post(&obj->synchronizer);
728 }
730 return status;
731 }
733 /*
734 * MessageQ_get - gets a message for a message queue and blocks if
735 * the queue is empty.
736 *
737 * If a message is present, it returns it. Otherwise it blocks
738 * waiting for a message to arrive.
739 * When a message is returned, it is owned by the caller.
740 */
741 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
742 {
743 MessageQ_Object * obj = (MessageQ_Object *)handle;
744 Int status = MessageQ_S_SUCCESS;
745 struct timespec ts;
746 struct timeval tv;
748 #if 0
749 /*
750 * Optimization here to get a message without going in to the sem
751 * operation, but the sem count will not be maintained properly.
752 */
753 pthread_mutex_lock(&MessageQ_module->gate);
755 if (obj->msgList.cqh_first != &obj->msgList) {
756 *msg = (MessageQ_Msg)obj->msglist.cqh_first;
757 CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
759 pthread_mutex_unlock(&MessageQ_module->gate);
760 }
761 else {
762 pthread_mutex_unlock(&MessageQ_module->gate);
763 }
764 #endif
766 if (timeout == MessageQ_FOREVER) {
767 sem_wait(&obj->synchronizer);
768 }
769 else {
770 gettimeofday(&tv, NULL);
771 ts.tv_sec = tv.tv_sec;
772 ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
774 if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
775 if (errno == ETIMEDOUT) {
776 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
778 return MessageQ_E_TIMEOUT;
779 }
780 }
781 }
783 if (obj->unblocked) {
784 return MessageQ_E_UNBLOCKED;
785 }
787 pthread_mutex_lock(&MessageQ_module->gate);
789 *msg = (MessageQ_Msg)obj->msgList.cqh_first;
790 CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
792 pthread_mutex_unlock(&MessageQ_module->gate);
794 return status;
795 }
797 /*
798 * Return a count of the number of messages in the queue
799 *
800 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
801 */
802 Int MessageQ_count(MessageQ_Handle handle)
803 {
804 Int count = -1;
805 #if 0
806 MessageQ_Object * obj = (MessageQ_Object *) handle;
807 socklen_t optlen;
809 /*
810 * TBD: Need to find a way to implement (if anyone uses it!), and
811 * push down into transport..
812 */
814 /*
815 * 2nd arg to getsockopt should be transport independent, but using
816 * SSKPROTO_SHMFIFO for now:
817 */
818 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
819 &count, &optlen);
820 #endif
822 return count;
823 }
825 /*
826 * Initializes a message not obtained from MessageQ_alloc.
827 */
828 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
829 {
830 /* Fill in the fields of the message */
831 MessageQ_msgInit(msg);
832 msg->heapId = MessageQ_STATICMSG;
833 msg->msgSize = size;
834 }
836 /*
837 * Allocate a message and initialize the needed fields (note some
838 * of the fields in the header are set via other APIs or in the
839 * MessageQ_put function,
840 */
841 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
842 {
843 MessageQ_Msg msg;
845 /*
846 * heapId not used for local alloc (as this is over a copy transport), but
847 * we need to send to other side as heapId is used in BIOS transport.
848 */
849 msg = (MessageQ_Msg)calloc(1, size);
850 MessageQ_msgInit(msg);
851 msg->msgSize = size;
852 msg->heapId = heapId;
854 return msg;
855 }
857 /*
858 * Frees the message back to the heap that was used to allocate it.
859 */
860 Int MessageQ_free(MessageQ_Msg msg)
861 {
862 UInt32 status = MessageQ_S_SUCCESS;
864 /* Check to ensure this was not allocated by user: */
865 if (msg->heapId == MessageQ_STATICMSG) {
866 status = MessageQ_E_CANNOTFREESTATICMSG;
867 }
868 else {
869 free(msg);
870 }
872 return status;
873 }
875 /* Register a heap with MessageQ. */
876 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
877 {
878 Int status = MessageQ_S_SUCCESS;
880 /* Do nothing, as this uses a copy transport */
882 return status;
883 }
885 /* Unregister a heap with MessageQ. */
886 Int MessageQ_unregisterHeap(UInt16 heapId)
887 {
888 Int status = MessageQ_S_SUCCESS;
890 /* Do nothing, as this uses a copy transport */
892 return status;
893 }
895 /* Unblocks a MessageQ */
896 Void MessageQ_unblock(MessageQ_Handle handle)
897 {
898 MessageQ_Object *obj = (MessageQ_Object *)handle;
900 obj->unblocked = TRUE;
901 sem_post(&obj->synchronizer);
902 }
904 /* Embeds a source message queue into a message */
905 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
906 {
907 MessageQ_Object *obj = (MessageQ_Object *)handle;
909 msg->replyId = (UInt16)(obj->queue);
910 msg->replyProc = (UInt16)(obj->queue >> 16);
911 }
913 /* Returns the QueueId associated with the handle. */
914 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
915 {
916 MessageQ_Object *obj = (MessageQ_Object *) handle;
917 UInt32 queueId;
919 queueId = (obj->queue);
921 return queueId;
922 }
924 /* Sets the tracing of a message */
925 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
926 {
927 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
928 }
930 /*
931 * Returns the amount of shared memory used by one transport instance.
932 *
933 * The MessageQ module itself does not use any shared memory but the
934 * underlying transport may use some shared memory.
935 */
936 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
937 {
938 SizeT memReq = 0u;
940 /* Do nothing, as this is a copy transport. */
942 return memReq;
943 }
945 /*
946 * This is a helper function to initialize a message.
947 */
948 Void MessageQ_msgInit(MessageQ_Msg msg)
949 {
950 #if 0
951 Int status = MessageQ_S_SUCCESS;
952 LAD_ClientHandle handle;
953 struct LAD_CommandObj cmd;
954 union LAD_ResponseObj rsp;
956 handle = LAD_findHandle();
957 if (handle == LAD_MAXNUMCLIENTS) {
958 PRINTVERBOSE1(
959 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
960 getpid())
962 return;
963 }
965 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
966 cmd.clientId = handle;
968 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
969 PRINTVERBOSE1(
970 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
971 return;
972 }
974 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
975 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
976 return;
977 }
978 status = rsp.msgInit.status;
980 PRINTVERBOSE2(
981 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
982 handle, status)
984 memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
985 #else
986 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
987 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
988 msg->msgId = MessageQ_INVALIDMSGID;
989 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
990 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
991 msg->srcProc = MultiProc_self();
993 pthread_mutex_lock(&MessageQ_module->gate);
994 msg->seqNum = MessageQ_module->seqNum++;
995 pthread_mutex_unlock(&MessageQ_module->gate);
996 #endif
997 }
999 /*
1000 * Grow module's queues[] array to accommodate queueIndex from LAD
1001 */
1002 Void _MessageQ_grow(UInt16 queueIndex)
1003 {
1004 MessageQ_Handle *queues;
1005 MessageQ_Handle *oldQueues;
1006 UInt oldSize;
1008 oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1010 queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
1011 memcpy(queues, MessageQ_module->queues, oldSize);
1013 oldQueues = MessageQ_module->queues;
1014 MessageQ_module->queues = queues;
1015 MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1017 free(oldQueues);
1019 return;
1020 }