1 /*
2 * Copyright (c) 2012-2014, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * ======== MessageQ.c ========
34 * Implementation of functions specified in MessageQ.xdc.
35 */
37 #include <xdc/std.h>
39 #include <string.h>
41 #include <xdc/runtime/Error.h>
42 #include <xdc/runtime/Assert.h>
43 #include <xdc/runtime/Log.h>
44 #include <xdc/runtime/Memory.h>
45 #include <xdc/runtime/IHeap.h>
46 #include <xdc/runtime/IGateProvider.h>
47 #include <xdc/runtime/Startup.h>
49 #include <xdc/runtime/knl/ISync.h>
50 #include <xdc/runtime/knl/GateThread.h>
52 #include <ti/sysbios/hal/Hwi.h>
53 #include <ti/sysbios/syncs/SyncSem.h>
55 #include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/sdo/utils/List.h>
58 /* must be included after the internal header file for now */
59 #include <ti/sdo/ipc/_MessageQ.h>
60 #include <ti/sdo/utils/_MultiProc.h>
61 #include <ti/sdo/utils/_NameServer.h>
63 #include "package/internal/MessageQ.xdc.h"
65 #ifdef __ti__
66 #pragma FUNC_EXT_CALLED(MessageQ_Params_init);
67 #pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
68 #pragma FUNC_EXT_CALLED(MessageQ_alloc);
69 #pragma FUNC_EXT_CALLED(MessageQ_close);
70 #pragma FUNC_EXT_CALLED(MessageQ_count);
71 #pragma FUNC_EXT_CALLED(MessageQ_create);
72 #pragma FUNC_EXT_CALLED(MessageQ_create2);
73 #pragma FUNC_EXT_CALLED(MessageQ_delete);
74 #pragma FUNC_EXT_CALLED(MessageQ_open);
75 #pragma FUNC_EXT_CALLED(MessageQ_openQueueId);
76 #pragma FUNC_EXT_CALLED(MessageQ_free);
77 #pragma FUNC_EXT_CALLED(MessageQ_get);
78 #pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
79 #pragma FUNC_EXT_CALLED(MessageQ_put);
80 #pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
81 #pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
82 #pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
83 #pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
84 #pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
85 #pragma FUNC_EXT_CALLED(MessageQ_unblock);
86 #pragma FUNC_EXT_CALLED(MessageQ_unregisterHeap);
87 #endif
89 /*
90 * ======== MessageQ_msgInit ========
91 * This is a helper function to initialize a message.
92 */
93 static Void MessageQ_msgInit(MessageQ_Msg msg)
94 {
95 UInt key;
97 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
98 msg->msgId = MessageQ_INVALIDMSGID;
99 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
100 msg->flags = ti_sdo_ipc_MessageQ_HEADERVERSION |
101 MessageQ_NORMALPRI |
102 (ti_sdo_ipc_MessageQ_TRACEMASK &
103 (ti_sdo_ipc_MessageQ_traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT));
104 msg->srcProc = MultiProc_self();
106 key = Hwi_disable();
107 msg->seqNum = MessageQ_module->seqNum++;
108 Hwi_restore(key);
109 }
111 /*
112 *************************************************************************
113 * Common Header Functions
114 *************************************************************************
115 */
117 /*
118 * ======== MessageQ_Params_init ========
119 */
120 Void MessageQ_Params_init(MessageQ_Params *params)
121 {
122 params->synchronizer = NULL;
123 }
125 /*
126 * ======== MessageQ_Params2_init ========
127 */
128 Void MessageQ_Params2_init(MessageQ_Params2 *params)
129 {
130 params->synchronizer = NULL;
131 params->queueIndex = MessageQ_ANY;
132 }
134 /*
135 * ======== MessageQ_alloc ========
136 * Allocate a message and initialize the needed fields
137 *
138 * Note: some of the fields in the header are set via other
139 * APIs or in the MessageQ_put function.
140 */
141 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
142 {
143 MessageQ_Msg msg;
144 Error_Block eb;
146 Assert_isTrue((heapId < MessageQ_module->numHeaps),
147 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
149 Assert_isTrue((MessageQ_module->heaps[heapId] != NULL),
150 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
152 /* Allocate the message. No alignment requested */
153 Error_init(&eb);
154 msg = Memory_alloc(MessageQ_module->heaps[heapId], size, 0, &eb);
156 if (msg == NULL) {
157 return (NULL);
158 }
160 /* Fill in the fields of the message */
161 MessageQ_msgInit(msg);
162 msg->msgSize = size;
163 msg->heapId = heapId;
165 if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
166 Log_write3(ti_sdo_ipc_MessageQ_LM_alloc, (UArg)(msg),
167 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
168 }
170 return (msg);
171 }
173 /*
174 * ======== MessageQ_count ========
175 */
176 Int MessageQ_count(MessageQ_Handle handle)
177 {
178 Int count = 0;
179 UInt key;
180 List_Elem *tempMsg = NULL;
181 List_Handle listHandle;
182 ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
184 /* lock */
185 key = Hwi_disable();
187 /* Get the list */
188 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
190 /* Loop through and count the messages */
191 while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
192 count++;
193 }
195 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
197 /* Loop through and count the messages */
198 while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
199 count++;
200 }
202 /* unlock scheduler */
203 Hwi_restore(key);
205 return (count);
206 }
208 /*
209 * ======== MessageQ_close ========
210 */
211 Int MessageQ_close(MessageQ_QueueId *queueId)
212 {
213 *queueId = MessageQ_INVALIDMESSAGEQ;
215 return (MessageQ_S_SUCCESS);
216 }
218 /*
219 * ======== MessageQ_create ========
220 */
221 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
222 {
223 MessageQ_Handle handle;
224 MessageQ_Params2 params2;
226 MessageQ_Params2_init(¶ms2);
228 /* Use the MessageQ_Params fields if not NULL */
229 if (params != NULL) {
230 params2.synchronizer = params->synchronizer;
231 }
233 handle = MessageQ_create2(name, ¶ms2);
235 return ((MessageQ_Handle)handle);
236 }
238 /*
239 * ======== MessageQ_create2 ========
240 */
241 MessageQ_Handle MessageQ_create2(String name, const MessageQ_Params2 *params)
242 {
243 ti_sdo_ipc_MessageQ_Handle handle;
244 ti_sdo_ipc_MessageQ_Params prms;
245 Error_Block eb;
247 Error_init(&eb);
249 if (params != NULL) {
250 ti_sdo_ipc_MessageQ_Params_init(&prms);
252 prms.synchronizer = params->synchronizer;
253 prms.queueIndex = params->queueIndex;
255 handle = ti_sdo_ipc_MessageQ_create(name, &prms, &eb);
256 }
257 else {
258 handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
259 }
261 return ((MessageQ_Handle)handle);
262 }
264 /*
265 * ======== MessageQ_delete ========
266 */
267 Int MessageQ_delete(MessageQ_Handle *handlePtr)
268 {
269 ti_sdo_ipc_MessageQ_Handle *instp;
271 instp = (ti_sdo_ipc_MessageQ_Handle *)handlePtr;
273 ti_sdo_ipc_MessageQ_delete(instp);
275 return (MessageQ_S_SUCCESS);
276 }
278 /*
279 * ======== MessageQ_free ========
280 */
281 Int MessageQ_free(MessageQ_Msg msg)
282 {
283 IHeap_Handle heap;
284 Bits16 msgId;
285 Bits16 heapId;
287 /* make sure msg is not NULL */
288 Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
290 /* Cannot free a message that was initialized via MessageQ_staticMsgInit */
291 Assert_isTrue((msg->heapId != ti_sdo_ipc_MessageQ_STATICMSG),
292 ti_sdo_ipc_MessageQ_A_cannotFreeStaticMsg);
294 Assert_isTrue((msg->heapId < MessageQ_module->numHeaps),
295 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
297 Assert_isTrue((MessageQ_module->heaps[msg->heapId] != NULL),
298 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
300 if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
301 (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
302 Log_write3(ti_sdo_ipc_MessageQ_LM_free, (UArg)(msg),
303 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
304 }
306 heap = MessageQ_module->heaps[msg->heapId];
308 if (heap != NULL) {
309 msgId = MessageQ_getMsgId(msg);
310 heapId = msg->heapId;
311 Memory_free(heap, msg, msg->msgSize);
312 if (MessageQ_module->freeHookFxn != NULL) {
313 MessageQ_module->freeHookFxn(heapId, msgId);
314 }
315 }
316 else {
317 return (MessageQ_E_FAIL);
318 }
320 return (MessageQ_S_SUCCESS);
321 }
323 /*
324 * ======== MessageQ_get ========
325 */
326 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
327 {
328 Int status;
329 List_Handle highList;
330 List_Handle normalList;
331 ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
333 /* Get the list */
334 normalList = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
335 highList = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
337 /* Keep looping while there are no elements on either list */
338 *msg = (MessageQ_Msg)List_get(highList);
339 while (*msg == NULL) {
340 *msg = (MessageQ_Msg)List_get(normalList);
341 if (*msg == NULL) {
342 /* Block until notified. */
343 status = ISync_wait(obj->synchronizer, timeout, NULL);
344 if (status == ISync_WaitStatus_TIMEOUT) {
345 return (MessageQ_E_TIMEOUT);
346 }
347 else if (status < 0) {
348 return (MessageQ_E_FAIL);
349 }
351 if (obj->unblocked) {
352 /* *(msg) may be NULL */
353 return (MessageQ_E_UNBLOCKED);
354 }
356 *msg = (MessageQ_Msg)List_get(highList);
357 }
358 }
360 if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
361 (((*msg)->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0)) {
362 Log_write4(ti_sdo_ipc_MessageQ_LM_get, (UArg)(*msg),
363 (UArg)((*msg)->seqNum), (UArg)((*msg)->srcProc), (UArg)(obj));
364 }
366 return (MessageQ_S_SUCCESS);
367 }
369 /*
370 * ======== MessageQ_getQueueId ========
371 */
372 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
373 {
374 MessageQ_QueueId queueId;
375 ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
377 queueId = (obj->queue);
379 return (queueId);
380 }
382 /*
383 * ======== MessageQ_open ========
384 */
385 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
386 {
387 Int status;
388 Error_Block eb;
390 Assert_isTrue(name != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
391 Assert_isTrue(queueId != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
393 Error_init(&eb);
395 /* Search NameServer */
396 status = NameServer_getUInt32(
397 (NameServer_Handle)MessageQ_module->nameServer, name, queueId,
398 NULL);
400 if (status >= 0) {
401 return (MessageQ_S_SUCCESS); /* name found */
402 }
403 else {
404 return (MessageQ_E_NOTFOUND); /* name not found */
405 }
406 }
408 /*
409 * ======== MessageQ_openQueueId ========
410 */
411 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 remoteProcId)
412 {
413 MessageQ_QueueId queueId;
415 queueId = ((MessageQ_QueueId)(remoteProcId) << 16) | queueIndex;
417 return (queueId);
418 }
420 /*
421 * ======== MessageQ_put ========
422 */
423 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
424 {
425 IMessageQTransport_Handle transport;
426 MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
427 List_Handle listHandle;
428 Int status;
429 UInt priority;
430 #ifndef xdc_runtime_Log_DISABLE_ALL
431 UInt16 flags;
432 UInt16 seqNum;
433 UInt16 srcProc;
434 #endif
435 ti_sdo_ipc_MessageQ_Object *obj;
437 Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
439 msg->dstId = (UInt16)(queueId);
440 msg->dstProc = (UInt16)(queueId >> 16);
442 if (dstProcId != MultiProc_self()) {
443 /* assert that dstProcId is valid */
444 Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
445 ti_sdo_ipc_MessageQ_A_procIdInvalid);
447 /* Put the high and urgent messages to the high priority transport */
448 priority = (UInt)((msg->flags) &
449 ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
451 /* Call the transport associated with this message queue */
452 transport = MessageQ_module->transports[dstProcId][priority];
453 if (transport == NULL) {
454 /* Try the other transport */
455 priority = !priority;
456 transport = MessageQ_module->transports[dstProcId][priority];
457 }
459 /* assert transport is not null */
460 Assert_isTrue(transport != NULL,
461 ti_sdo_ipc_MessageQ_A_unregisteredTransport);
463 #ifndef xdc_runtime_Log_DISABLE_ALL
464 /* use local vars so msg does not get cached after put */
465 flags = msg->flags;
467 if ((ti_sdo_ipc_MessageQ_traceFlag) ||
468 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
469 /* use local vars so msg does not get cached after put */
470 seqNum = msg->seqNum;
471 srcProc = msg->srcProc;
472 }
473 #endif
475 /* put msg to remote processor using transport */
476 if (IMessageQTransport_put(transport, msg)) {
477 status = MessageQ_S_SUCCESS;
479 #ifndef xdc_runtime_Log_DISABLE_ALL
480 /* if trace enabled */
481 if ((ti_sdo_ipc_MessageQ_traceFlag) ||
482 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
483 Log_write4(ti_sdo_ipc_MessageQ_LM_putRemote, (UArg)(msg),
484 (UArg)(seqNum), (UArg)(srcProc),
485 (UArg)(dstProcId));
486 }
487 #endif
488 }
489 else {
490 status = MessageQ_E_FAIL;
491 }
492 }
493 else {
494 /* Assert queueId is valid */
495 Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
496 ti_sdo_ipc_MessageQ_A_invalidQueueId);
498 /* It is a local MessageQ */
499 obj = MessageQ_module->queues[(UInt16)(queueId)];
501 /* Assert object is not NULL */
502 Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
504 if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
505 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
506 List_putHead(listHandle, (List_Elem *)msg);
507 }
508 else {
509 if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
510 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
511 }
512 else {
513 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
514 }
515 /* put on the queue */
516 List_put(listHandle, (List_Elem *)msg);
517 }
519 ISync_signal(obj->synchronizer);
521 status = MessageQ_S_SUCCESS;
523 if ((ti_sdo_ipc_MessageQ_traceFlag) ||
524 (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
525 Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
526 (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
527 }
528 }
530 return (status);
531 }
533 /*
534 * ======== MessageQ_registerHeap ========
535 * Register a heap
536 */
537 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
538 {
539 Int status;
540 UInt key;
541 IHeap_Handle iheap = (IHeap_Handle)heap;
543 /* Make sure the heapId is valid */
544 Assert_isTrue((heapId < MessageQ_module->numHeaps),
545 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
547 /* lock scheduler */
548 key = Hwi_disable();
550 /* Make sure the id is not already in use */
551 if (MessageQ_module->heaps[heapId] == NULL) {
552 MessageQ_module->heaps[heapId] = iheap;
553 status = MessageQ_S_SUCCESS;
554 }
555 else {
556 status = MessageQ_E_ALREADYEXISTS;
557 }
559 /* unlock scheduler */
560 Hwi_restore(key);
562 return (status);
563 }
565 /*
566 * ======== MessageQ_setFreeHookFxn ========
567 */
568 Void MessageQ_setFreeHookFxn(MessageQ_FreeHookFxn freeHookFxn)
569 {
570 MessageQ_module->freeHookFxn = freeHookFxn;
571 }
573 /*
574 * ======== MessageQ_setMsgTrace ========
575 */
576 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
577 {
578 msg->flags = (msg->flags & ~ti_sdo_ipc_MessageQ_TRACEMASK) |
579 (traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT);
580 Log_write4(ti_sdo_ipc_MessageQ_LM_setTrace, (UArg)(msg), (UArg)(msg->seqNum),
581 (UArg)(msg->srcProc), (UArg)(traceFlag));
582 }
584 /*
585 * ======== MessageQ_setReplyQueue ========
586 * Embed the source message queue into a message.
587 */
588 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
589 {
590 ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
592 msg->replyId = (UInt16)(obj->queue);
593 msg->replyProc = (UInt16)(obj->queue >> 16);
594 }
596 /*
597 * ======== MessageQ_staticMsgInit ========
598 */
599 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
600 {
601 Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
603 MessageQ_msgInit(msg);
605 msg->heapId = ti_sdo_ipc_MessageQ_STATICMSG;
606 msg->msgSize = size;
608 if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
609 Log_write3(ti_sdo_ipc_MessageQ_LM_staticMsgInit, (UArg)(msg),
610 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
611 }
612 }
614 /*
615 * ======== MessageQ_unblock ========
616 */
617 Void MessageQ_unblock(MessageQ_Handle handle)
618 {
619 ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
621 /* Assert that the queue is using a blocking synchronizer */
622 Assert_isTrue(ISync_query((obj->synchronizer), ISync_Q_BLOCKING) == TRUE,
623 ti_sdo_ipc_MessageQ_A_invalidUnblock);
625 /* Set instance to 'unblocked' state */
626 obj->unblocked = TRUE;
628 /* Signal the synchronizer */
629 ISync_signal(obj->synchronizer);
630 }
632 /*
633 * ======== MessageQ_unregisterHeap ========
634 * Unregister a heap
635 */
636 Int MessageQ_unregisterHeap(UInt16 heapId)
637 {
638 UInt key;
640 Assert_isTrue((heapId < MessageQ_module->numHeaps),
641 ti_sdo_ipc_MessageQ_A_heapIdInvalid);
643 /* lock scheduler */
644 key = Hwi_disable();
646 MessageQ_module->heaps[heapId] = NULL;
648 /* unlock scheduler */
649 Hwi_restore(key);
651 return (MessageQ_S_SUCCESS);
652 }
654 /*
655 *************************************************************************
656 * Module functions
657 *************************************************************************
658 */
660 /*
661 * ======== ti_sdo_ipc_MessageQ_Module_startup ========
662 */
663 Int ti_sdo_ipc_MessageQ_Module_startup(Int phase)
664 {
665 Int i;
667 /* Ensure NameServer Module_startup() has completed */
668 if (ti_sdo_utils_NameServer_Module_startupDone() == FALSE) {
669 return (Startup_NOTDONE);
670 }
672 if (GateThread_Module_startupDone() == FALSE) {
673 return (Startup_NOTDONE);
674 }
676 if (MessageQ_module->gate == NULL) {
677 MessageQ_module->gate =
678 GateThread_Handle_upCast(GateThread_create(NULL, NULL));
679 }
681 /* Loop through all the static objects and set the id */
682 for (i = 0; i < ti_sdo_ipc_MessageQ_Object_count(); i++) {
683 MessageQ_module->queues[i] = (ti_sdo_ipc_MessageQ_Object *)
684 ti_sdo_ipc_MessageQ_Object_get(NULL, i);
685 }
687 /* Null out the dynamic ones */
688 for (i = ti_sdo_ipc_MessageQ_Object_count(); i < MessageQ_module->numQueues;
689 i++) {
690 MessageQ_module->queues[i] = NULL;
691 }
693 return (Startup_DONE);
694 }
696 /*
697 * ======== ti_sdo_ipc_MessageQ_registerTransport ========
698 * Register a transport
699 */
700 Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
701 UInt16 procId, UInt priority)
702 {
703 Bool flag = FALSE;
704 UInt key;
706 /* Make sure the procId is valid */
707 Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
709 /* lock scheduler */
710 key = Hwi_disable();
712 /* Make sure the id is not already in use */
713 if (MessageQ_module->transports[procId][priority] == NULL) {
714 MessageQ_module->transports[procId][priority] = handle;
715 flag = TRUE;
716 }
718 /* unlock scheduler */
719 Hwi_restore(key);
721 return (flag);
722 }
724 /*
725 * ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
726 * Unregister a heap
727 */
728 Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
729 {
730 UInt key;
732 /* Make sure the procId is valid */
733 Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
735 /* lock scheduler */
736 key = Hwi_disable();
738 MessageQ_module->transports[procId][priority] = NULL;
740 /* unlock scheduler */
741 Hwi_restore(key);
742 }
744 /*
745 *************************************************************************
746 * Instance functions
747 *************************************************************************
748 */
750 /*
751 * ======== MessageQ_Instance_init ========
752 */
753 Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String name,
754 const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
755 {
756 Int i;
757 UInt16 start;
758 UInt16 count;
759 UInt key;
760 Bool found = FALSE;
761 List_Handle listHandle;
762 SyncSem_Handle syncSemHandle;
763 MessageQ_QueueIndex queueIndex;
765 /* lock */
766 key = IGateProvider_enter(MessageQ_module->gate);
768 if (params->queueIndex != MessageQ_ANY) {
769 queueIndex = params->queueIndex;
771 if ((queueIndex >= ti_sdo_ipc_MessageQ_numReservedEntries) ||
772 (MessageQ_module->queues[queueIndex] != NULL)) {
773 IGateProvider_leave(MessageQ_module->gate, key);
774 Error_raise(eb, ti_sdo_ipc_MessageQ_E_indexNotAvailable,
775 queueIndex, 0);
776 return (5);
777 }
778 MessageQ_module->queues[queueIndex] = obj;
779 found = TRUE;
780 }
781 else {
783 start = ti_sdo_ipc_MessageQ_numReservedEntries;
784 count = MessageQ_module->numQueues;
786 /* Search the dynamic array for any holes */
787 for (i = start; (i < count) && (found == FALSE); i++) {
788 if (MessageQ_module->queues[i] == NULL) {
789 MessageQ_module->queues[i] = obj;
790 queueIndex = i;
791 found = TRUE;
792 }
793 }
794 }
796 /*
797 * If no free slot was found:
798 * - if no growth allowed, raise and error
799 * - if growth is allowed, grow the array
800 */
801 if (found == FALSE) {
802 if (ti_sdo_ipc_MessageQ_maxRuntimeEntries != NameServer_ALLOWGROWTH) {
803 /* unlock scheduler */
804 IGateProvider_leave(MessageQ_module->gate, key);
806 Error_raise(eb, ti_sdo_ipc_MessageQ_E_maxReached,
807 ti_sdo_ipc_MessageQ_maxRuntimeEntries, 0);
808 return (1);
809 }
810 else {
811 queueIndex = MessageQ_grow(obj, eb);
812 if (queueIndex == MessageQ_INVALIDMESSAGEQ) {
813 /* unlock scheduler */
814 IGateProvider_leave(MessageQ_module->gate, key);
815 return (2);
816 }
817 }
818 }
820 /* create default sync if not specified */
821 if (params->synchronizer == NULL) {
822 /* Create a SyncSem as the synchronizer */
823 syncSemHandle = SyncSem_create(NULL, eb);
825 if (syncSemHandle == NULL) {
826 /* unlock scheduler */
827 IGateProvider_leave(MessageQ_module->gate, key);
828 return (3);
829 }
831 /* store handle for use in finalize ... */
832 obj->syncSemHandle = syncSemHandle;
834 obj->synchronizer = SyncSem_Handle_upCast(syncSemHandle);
835 }
836 else {
837 obj->syncSemHandle = NULL;
839 obj->synchronizer = params->synchronizer;
840 }
842 /* unlock scheduler */
843 IGateProvider_leave(MessageQ_module->gate, key);
845 /* Fill in the message queue object */
846 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
847 List_construct(List_struct(listHandle), NULL);
849 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
850 List_construct(List_struct(listHandle), NULL);
852 obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queueIndex;
854 obj->unblocked = FALSE;
856 /* Add into NameServer */
857 if (name != NULL) {
858 obj->nsKey = NameServer_addUInt32(
859 (NameServer_Handle)MessageQ_module->nameServer, name,
860 obj->queue);
862 if (obj->nsKey == NULL) {
863 Error_raise(eb, ti_sdo_ipc_MessageQ_E_nameFailed, name, 0);
864 return (4);
865 }
866 }
868 return (0);
869 }
871 /*
872 * ======== MessageQ_Instance_finalize ========
873 */
874 Void ti_sdo_ipc_MessageQ_Instance_finalize(
875 ti_sdo_ipc_MessageQ_Object* obj, Int status)
876 {
877 UInt key;
878 MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
879 List_Handle listHandle;
881 /* Requested queueId was not available. Nothing was done in the init */
882 if (status == 5) {
883 return;
884 }
886 if (obj->syncSemHandle != NULL) {
887 SyncSem_delete(&obj->syncSemHandle);
888 }
890 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
892 /* Destruct the list */
893 List_destruct(List_struct(listHandle));
895 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
897 /* Destruct the list */
898 List_destruct(List_struct(listHandle));
900 /* lock */
901 key = IGateProvider_enter(MessageQ_module->gate);
903 /* Null out entry in the array. */
904 MessageQ_module->queues[index] = NULL;
906 /* unlock scheduler */
907 IGateProvider_leave(MessageQ_module->gate, key);
909 if (obj->nsKey != NULL) {
910 NameServer_removeEntry((NameServer_Handle)MessageQ_module->nameServer,
911 obj->nsKey);
912 }
913 }
915 /*
916 *************************************************************************
917 * Internal functions
918 *************************************************************************
919 */
921 /*
922 * ======== ti_sdo_ipc_MessageQ_grow ========
923 */
924 UInt16 ti_sdo_ipc_MessageQ_grow(ti_sdo_ipc_MessageQ_Object *obj,
925 Error_Block *eb)
926 {
927 UInt16 oldSize;
928 UInt16 queueIndex = MessageQ_module->numQueues;
929 ti_sdo_ipc_MessageQ_Handle *queues;
930 ti_sdo_ipc_MessageQ_Handle *oldQueues;
931 oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
934 /* Allocate larger table */
935 queues = Memory_alloc(ti_sdo_ipc_MessageQ_Object_heap(),
936 oldSize + sizeof(MessageQ_Handle), 0, eb);
938 if (queues == NULL) {
939 return (MessageQ_INVALIDMESSAGEQ);
940 }
942 /* Copy contents into new table */
943 memcpy(queues, MessageQ_module->queues, oldSize);
945 /* Fill in the new entry */
946 queues[queueIndex] = obj;
948 /* Hook-up new table */
949 oldQueues = MessageQ_module->queues;
950 MessageQ_module->queues = queues;
951 MessageQ_module->numQueues++;
953 /* Delete old table if not statically defined */
954 if (MessageQ_module->canFreeQueues == TRUE) {
955 Memory_free(ti_sdo_ipc_MessageQ_Object_heap(), oldQueues, oldSize);
956 }
957 else {
958 MessageQ_module->canFreeQueues = TRUE;
959 }
961 return (queueIndex);
962 }