2e9422e43025730fb771ffd054d8303f0159ff9c
[ipc/ipcdev.git] / packages / ti / sdo / ipc / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== MessageQ.c ========
34  *  Implementation of functions specified in MessageQ.xdc.
35  */
37 #include <xdc/std.h>
39 #include <string.h>
41 #include <xdc/runtime/Error.h>
42 #include <xdc/runtime/Assert.h>
43 #include <xdc/runtime/Log.h>
44 #include <xdc/runtime/Memory.h>
45 #include <xdc/runtime/IHeap.h>
46 #include <xdc/runtime/IGateProvider.h>
47 #include <xdc/runtime/Startup.h>
49 #include <xdc/runtime/knl/ISync.h>
50 #include <xdc/runtime/knl/GateThread.h>
52 #include <ti/sysbios/hal/Hwi.h>
53 #include <ti/sysbios/syncs/SyncSem.h>
55 #include <ti/sdo/ipc/interfaces/ITransport.h>
56 #include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
57 #include <ti/sdo/ipc/interfaces/INetworkTransport.h>
58 #include <ti/sdo/utils/List.h>
60 /* must be included after the internal header file for now */
61 #include <ti/sdo/ipc/_MessageQ.h>
62 #include <ti/sdo/utils/_MultiProc.h>
63 #include <ti/sdo/utils/_NameServer.h>
65 #include "package/internal/MessageQ.xdc.h"
67 #ifdef __ti__
68     #pragma FUNC_EXT_CALLED(MessageQ_Params_init);
69     #pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
70     #pragma FUNC_EXT_CALLED(MessageQ_alloc);
71     #pragma FUNC_EXT_CALLED(MessageQ_close);
72     #pragma FUNC_EXT_CALLED(MessageQ_count);
73     #pragma FUNC_EXT_CALLED(MessageQ_create);
74     #pragma FUNC_EXT_CALLED(MessageQ_create2);
75     #pragma FUNC_EXT_CALLED(MessageQ_delete);
76     #pragma FUNC_EXT_CALLED(MessageQ_open);
77     #pragma FUNC_EXT_CALLED(MessageQ_openQueueId);
78     #pragma FUNC_EXT_CALLED(MessageQ_free);
79     #pragma FUNC_EXT_CALLED(MessageQ_get);
80     #pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
81     #pragma FUNC_EXT_CALLED(MessageQ_put);
82     #pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
83     #pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
84     #pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
85     #pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
86     #pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
87     #pragma FUNC_EXT_CALLED(MessageQ_unblock);
88     #pragma FUNC_EXT_CALLED(MessageQ_unregisterHeap);
89 #endif
91 /*
92  *  ======== MessageQ_msgInit ========
93  *  This is a helper function to initialize a message.
94  */
95 static Void MessageQ_msgInit(MessageQ_Msg msg)
96 {
97     UInt key;
99     msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
100     msg->msgId   = MessageQ_INVALIDMSGID;
101     msg->dstId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
102     msg->flags   = ti_sdo_ipc_MessageQ_HEADERVERSION |
103                    MessageQ_NORMALPRI |
104                    (ti_sdo_ipc_MessageQ_TRACEMASK &
105                    (ti_sdo_ipc_MessageQ_traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT));
106     msg->srcProc = MultiProc_self();
108     key = Hwi_disable();
109     msg->seqNum  = MessageQ_module->seqNum++;
110     Hwi_restore(key);
113 /*
114  *************************************************************************
115  *                       Common Header Functions
116  *************************************************************************
117  */
119 /*
120  *  ======== MessageQ_Params_init ========
121  */
122 Void MessageQ_Params_init(MessageQ_Params *params)
124     params->synchronizer = NULL;
127 /*
128  *  ======== MessageQ_Params2_init ========
129  */
130 Void MessageQ_Params2_init(MessageQ_Params2 *params)
132     params->synchronizer = NULL;
133     params->queueIndex = MessageQ_ANY;
136 /*
137  *  ======== MessageQ_alloc ========
138  *  Allocate a message and initialize the needed fields
139  *
140  *  Note: some of the fields in the header are set via other
141  *  APIs or in the MessageQ_put function.
142  */
143 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
145     MessageQ_Msg msg;
146     Error_Block eb;
148     Assert_isTrue((heapId < MessageQ_module->numHeaps),
149                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
151     Assert_isTrue((MessageQ_module->heaps[heapId] != NULL),
152                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
154     /* Allocate the message. No alignment requested */
155     Error_init(&eb);
156     msg = Memory_alloc(MessageQ_module->heaps[heapId], size, 0, &eb);
158     if (msg == NULL) {
159         return (NULL);
160     }
162     /* Fill in the fields of the message */
163     MessageQ_msgInit(msg);
164     msg->msgSize = size;
165     msg->heapId  = heapId;
167     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
168         Log_write3(ti_sdo_ipc_MessageQ_LM_alloc, (UArg)(msg),
169             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
170     }
172     return (msg);
175 /*
176  *  ======== MessageQ_count ========
177  */
178 Int MessageQ_count(MessageQ_Handle handle)
180     Int              count = 0;
181     UInt             key;
182     List_Elem       *tempMsg = NULL;
183     List_Handle      listHandle;
184     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
186     /* lock */
187     key = Hwi_disable();
189     /* Get the list */
190     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
192     /* Loop through and count the messages */
193     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
194         count++;
195     }
197     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
199     /* Loop through and count the messages */
200     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
201         count++;
202     }
204     /* unlock scheduler */
205     Hwi_restore(key);
207     return (count);
210 /*
211  *  ======== MessageQ_close ========
212  */
213 Int MessageQ_close(MessageQ_QueueId *queueId)
215     *queueId = MessageQ_INVALIDMESSAGEQ;
217     return (MessageQ_S_SUCCESS);
220 /*
221  *  ======== MessageQ_create ========
222  */
223 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
225     MessageQ_Handle handle;
226     MessageQ_Params2 params2;
228     MessageQ_Params2_init(&params2);
230     /* Use the MessageQ_Params fields if not NULL */
231     if (params != NULL) {
232         params2.synchronizer = params->synchronizer;
233     }
235     handle = MessageQ_create2(name, &params2);
237     return ((MessageQ_Handle)handle);
240 /*
241  *  ======== MessageQ_create2 ========
242  */
243 MessageQ_Handle MessageQ_create2(String name, const MessageQ_Params2 *params)
245     ti_sdo_ipc_MessageQ_Handle handle;
246     ti_sdo_ipc_MessageQ_Params prms;
247     Error_Block eb;
249     Error_init(&eb);
251     if (params != NULL) {
252         ti_sdo_ipc_MessageQ_Params_init(&prms);
254         prms.synchronizer = params->synchronizer;
255         prms.queueIndex = params->queueIndex;
257         handle = ti_sdo_ipc_MessageQ_create(name, &prms, &eb);
258     }
259     else {
260         handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
261     }
263     return ((MessageQ_Handle)handle);
266 /*
267  *  ======== MessageQ_delete ========
268  */
269 Int MessageQ_delete(MessageQ_Handle *handlePtr)
271     ti_sdo_ipc_MessageQ_Handle *instp;
273     instp = (ti_sdo_ipc_MessageQ_Handle *)handlePtr;
275     ti_sdo_ipc_MessageQ_delete(instp);
277     return (MessageQ_S_SUCCESS);
280 /*
281  *  ======== MessageQ_free ========
282  */
283 Int MessageQ_free(MessageQ_Msg msg)
285     IHeap_Handle heap;
286     Bits16 msgId;
287     Bits16 heapId;
289     /* make sure msg is not NULL */
290     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
292     /* Cannot free a message that was initialized via MessageQ_staticMsgInit */
293     Assert_isTrue((msg->heapId != ti_sdo_ipc_MessageQ_STATICMSG),
294                   ti_sdo_ipc_MessageQ_A_cannotFreeStaticMsg);
296     Assert_isTrue((msg->heapId < MessageQ_module->numHeaps),
297                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
299     Assert_isTrue((MessageQ_module->heaps[msg->heapId] != NULL),
300                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
302     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
303         (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
304         Log_write3(ti_sdo_ipc_MessageQ_LM_free, (UArg)(msg),
305             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
306     }
308     heap = MessageQ_module->heaps[msg->heapId];
310     if (heap != NULL) {
311         msgId = MessageQ_getMsgId(msg);
312         heapId = msg->heapId;
313         Memory_free(heap, msg, msg->msgSize);
314         if (MessageQ_module->freeHookFxn != NULL) {
315             MessageQ_module->freeHookFxn(heapId, msgId);
316         }
317     }
318     else {
319         return (MessageQ_E_FAIL);
320     }
322     return (MessageQ_S_SUCCESS);
325 /*
326  *  ======== MessageQ_get ========
327  */
328 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
330     Int status;
331     List_Handle highList;
332     List_Handle normalList;
333     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
335     /* Get the list */
336     normalList = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
337     highList   = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
339     /* Keep looping while there are no elements on either list */
340     *msg = (MessageQ_Msg)List_get(highList);
341     while (*msg == NULL) {
342         *msg = (MessageQ_Msg)List_get(normalList);
343         if (*msg == NULL) {
344             /*  Block until notified. */
345             status = ISync_wait(obj->synchronizer, timeout, NULL);
346             if (status == ISync_WaitStatus_TIMEOUT) {
347                 return (MessageQ_E_TIMEOUT);
348             }
349             else if (status < 0) {
350                 return (MessageQ_E_FAIL);
351             }
353             if (obj->unblocked) {
354                 /* *(msg) may be NULL */
355                 return (MessageQ_E_UNBLOCKED);
356             }
358             *msg = (MessageQ_Msg)List_get(highList);
359         }
360     }
362     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
363         (((*msg)->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0)) {
364         Log_write4(ti_sdo_ipc_MessageQ_LM_get, (UArg)(*msg),
365             (UArg)((*msg)->seqNum), (UArg)((*msg)->srcProc), (UArg)(obj));
366     }
368     return (MessageQ_S_SUCCESS);
371 /*
372  *  ======== MessageQ_getQueueId ========
373  */
374 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
376     MessageQ_QueueId queueId;
377     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
379     queueId = (obj->queue);
381     return (queueId);
384 /*
385  *  ======== MessageQ_open ========
386  */
387 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
389     Int         status;
390     Error_Block eb;
392     Assert_isTrue(name != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
393     Assert_isTrue(queueId != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
395     Error_init(&eb);
397     /* Search NameServer */
398     status = NameServer_getUInt32(
399             (NameServer_Handle)MessageQ_module->nameServer, name, queueId,
400             NULL);
402     if (status >= 0) {
403         return (MessageQ_S_SUCCESS);    /* name found */
404     }
405     else {
406         return (MessageQ_E_NOTFOUND);   /* name not found */
407     }
410 /*
411  *  ======== MessageQ_openQueueId ========
412  */
413 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 remoteProcId)
415     MessageQ_QueueId queueId;
417     queueId = ((MessageQ_QueueId)(remoteProcId) << 16) | queueIndex;
419     return (queueId);
422 /*
423  *  ======== MessageQ_put ========
424  */
425 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
427     IMessageQTransport_Handle transport;
428     MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
429     List_Handle       listHandle;
430     Int               status;
431     UInt              priority;
432 #ifndef xdc_runtime_Log_DISABLE_ALL
433     UInt16            flags;
434     UInt16            seqNum;
435     UInt16            srcProc;
436 #endif
437     ti_sdo_ipc_MessageQ_Object   *obj;
438     Int tid;
439     ITransport_Handle baseTrans;
440     INetworkTransport_Handle netTrans;
442     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
444     msg->dstId   = (UInt16)(queueId);
445     msg->dstProc = (UInt16)(queueId >> 16);
447     /* extract the transport ID from the message header */
448     tid = MessageQ_getTransportId(msg);
450     /* if recipient is local, use direct message delivery */
451     if (dstProcId == MultiProc_self()) {
452         /* Assert queueId is valid */
453         Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
454                       ti_sdo_ipc_MessageQ_A_invalidQueueId);
456         /* It is a local MessageQ */
457         obj = MessageQ_module->queues[(UInt16)(queueId)];
459         /* Assert object is not NULL */
460         Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
462         if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
463             listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
464             List_putHead(listHandle, (List_Elem *)msg);
465         }
466         else {
467             if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
468                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
469             }
470             else {
471                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
472             }
473             /* put on the queue */
474             List_put(listHandle, (List_Elem *)msg);
475         }
477         ISync_signal(obj->synchronizer);
479         status = MessageQ_S_SUCCESS;
481         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
482             (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
483             Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
484                        (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
485         }
486     }
488     /* if transport ID is zero, use primary transport array */
489     else if (tid == 0) {
490         /* assert that dstProcId is valid */
491         Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
492                       ti_sdo_ipc_MessageQ_A_procIdInvalid);
494         /* Put the high and urgent messages to the high priority transport */
495         priority = (UInt)((msg->flags) &
496             ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
498         /* Call the transport associated with this message queue */
499         transport = MessageQ_module->transports[dstProcId][priority];
500         if (transport == NULL) {
501             /* Try the other transport */
502             priority = !priority;
503             transport = MessageQ_module->transports[dstProcId][priority];
504         }
506         /* assert transport is not null */
507         Assert_isTrue(transport != NULL,
508             ti_sdo_ipc_MessageQ_A_unregisteredTransport);
510 #ifndef xdc_runtime_Log_DISABLE_ALL
511         /* use local vars so msg does not get cached after put */
512         flags = msg->flags;
514         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
515             (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
516             /* use local vars so msg does not get cached after put */
517             seqNum  = msg->seqNum;
518             srcProc = msg->srcProc;
519         }
520 #endif
522         /* put msg to remote processor using transport */
523         if (IMessageQTransport_put(transport, msg)) {
524             status = MessageQ_S_SUCCESS;
526 #ifndef xdc_runtime_Log_DISABLE_ALL
527             /* if trace enabled */
528             if ((ti_sdo_ipc_MessageQ_traceFlag) ||
529                 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
530                 Log_write4(ti_sdo_ipc_MessageQ_LM_putRemote, (UArg)(msg),
531                           (UArg)(seqNum), (UArg)(srcProc),
532                           (UArg)(dstProcId));
533             }
534 #endif
535         }
536         else {
537             status = MessageQ_E_FAIL;
538         }
539     }
541     /* use a registered transport to deliver the message */
542     else {
543         baseTrans = MessageQ_module->regTrans[tid].transport;
545         if (baseTrans == NULL) {
546             /* raise error */
547             status = MessageQ_E_FAIL;
548             goto leave;
549         }
551         switch (MessageQ_module->regTrans[tid].type) {
553             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
554                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
556                 if (INetworkTransport_put(netTrans, msg)) {
557                     status = MessageQ_S_SUCCESS;
558                 }
559                 else {
560                     status = MessageQ_E_FAIL;
561                 }
562             break;
563         }
564     }
566 leave:
567     return (status);
570 /*
571  *  ======== MessageQ_registerHeap ========
572  *  Register a heap
573  */
574 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
576     Int status;
577     UInt key;
578     IHeap_Handle iheap = (IHeap_Handle)heap;
580     /* Make sure the heapId is valid */
581     Assert_isTrue((heapId < MessageQ_module->numHeaps),
582                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
584     /* lock scheduler */
585     key = Hwi_disable();
587     /* Make sure the id is not already in use */
588     if (MessageQ_module->heaps[heapId] == NULL) {
589         MessageQ_module->heaps[heapId] = iheap;
590         status = MessageQ_S_SUCCESS;
591     }
592     else {
593         status = MessageQ_E_ALREADYEXISTS;
594     }
596     /* unlock scheduler */
597     Hwi_restore(key);
599     return (status);
602 /*
603  *  ======== MessageQ_setFreeHookFxn ========
604  */
605 Void MessageQ_setFreeHookFxn(MessageQ_FreeHookFxn freeHookFxn)
607     MessageQ_module->freeHookFxn = freeHookFxn;
610 /*
611  *  ======== MessageQ_setMsgTrace ========
612  */
613 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
615     msg->flags = (msg->flags & ~ti_sdo_ipc_MessageQ_TRACEMASK) |
616                  (traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT);
617     Log_write4(ti_sdo_ipc_MessageQ_LM_setTrace, (UArg)(msg), (UArg)(msg->seqNum),
618                (UArg)(msg->srcProc), (UArg)(traceFlag));
621 /*
622  *  ======== MessageQ_setReplyQueue ========
623  *  Embed the source message queue into a message.
624  */
625 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
627     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
629     msg->replyId   = (UInt16)(obj->queue);
630     msg->replyProc = (UInt16)(obj->queue >> 16);
633 /*
634  *  ======== MessageQ_staticMsgInit ========
635  */
636 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
638     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
640     MessageQ_msgInit(msg);
642     msg->heapId  = ti_sdo_ipc_MessageQ_STATICMSG;
643     msg->msgSize = size;
645     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
646         Log_write3(ti_sdo_ipc_MessageQ_LM_staticMsgInit, (UArg)(msg),
647                 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
648     }
651 /*
652  *  ======== MessageQ_unblock ========
653  */
654 Void MessageQ_unblock(MessageQ_Handle handle)
656     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
658     /* Assert that the queue is using a blocking synchronizer */
659     Assert_isTrue(ISync_query((obj->synchronizer), ISync_Q_BLOCKING) == TRUE,
660         ti_sdo_ipc_MessageQ_A_invalidUnblock);
662     /* Set instance to 'unblocked' state */
663     obj->unblocked = TRUE;
665     /* Signal the synchronizer */
666     ISync_signal(obj->synchronizer);
669 /*
670  *  ======== MessageQ_unregisterHeap ========
671  *  Unregister a heap
672  */
673 Int MessageQ_unregisterHeap(UInt16 heapId)
675     UInt key;
677     Assert_isTrue((heapId < MessageQ_module->numHeaps),
678                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
680     /* lock scheduler */
681     key = Hwi_disable();
683     MessageQ_module->heaps[heapId] = NULL;
685     /* unlock scheduler */
686     Hwi_restore(key);
688     return (MessageQ_S_SUCCESS);
691 /*
692  *************************************************************************
693  *                      Module functions
694  *************************************************************************
695  */
697 /*
698  *  ======== ti_sdo_ipc_MessageQ_Module_startup ========
699  */
700 Int ti_sdo_ipc_MessageQ_Module_startup(Int phase)
702     Int i;
704     /* Ensure NameServer Module_startup() has completed */
705     if (ti_sdo_utils_NameServer_Module_startupDone() == FALSE) {
706         return (Startup_NOTDONE);
707     }
709     if (GateThread_Module_startupDone() == FALSE) {
710         return (Startup_NOTDONE);
711     }
713     if (MessageQ_module->gate == NULL) {
714         MessageQ_module->gate =
715             GateThread_Handle_upCast(GateThread_create(NULL, NULL));
716     }
718     /* Loop through all the static objects and set the id */
719     for (i = 0; i < ti_sdo_ipc_MessageQ_Object_count(); i++) {
720         MessageQ_module->queues[i] = (ti_sdo_ipc_MessageQ_Object *)
721                 ti_sdo_ipc_MessageQ_Object_get(NULL, i);
722     }
724     /* Null out the dynamic ones */
725     for (i = ti_sdo_ipc_MessageQ_Object_count(); i < MessageQ_module->numQueues;
726             i++) {
727         MessageQ_module->queues[i] = NULL;
728     }
730     return (Startup_DONE);
733 /*
734  *  ======== ti_sdo_ipc_MessageQ_registerTransport ========
735  *  Register a transport
736  */
737 Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
738     UInt16 procId, UInt priority)
740     Bool flag = FALSE;
741     UInt key;
743     /* Make sure the procId is valid */
744     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
746     /* lock scheduler */
747     key = Hwi_disable();
749     /* Make sure the id is not already in use */
750     if (MessageQ_module->transports[procId][priority] == NULL) {
751         MessageQ_module->transports[procId][priority] = handle;
752         flag = TRUE;
753     }
755     /* unlock scheduler */
756     Hwi_restore(key);
758     return (flag);
761 /*
762  *  ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
763  *  Unregister a heap
764  */
765 Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
767     UInt key;
769     /* Make sure the procId is valid */
770     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
772     /* lock scheduler */
773     key = Hwi_disable();
775     MessageQ_module->transports[procId][priority] = NULL;
777     /* unlock scheduler */
778     Hwi_restore(key);
781 /*
782  *  ======== ti_sdo_ipc_MessageQ_registerTransportId ========
783  */
784 Bool ti_sdo_ipc_MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
786     ti_sdo_ipc_MessageQ_TransportType type;
788     /* validate transport ID */
789     if ((tid < 1) || (tid > 7)) {
790         /* raise error */
791         return (FALSE);
792     }
794     /* don't overwrite an existing transport */
795     if (MessageQ_module->regTrans[tid].transport != NULL) {
796         /* raise error */
797         return (FALSE);
798     }
800     /* determine the transport type */
801     if (INetworkTransport_Handle_downCast(inst) != NULL) {
802         type = ti_sdo_ipc_MessageQ_TransportType_INetworkTransport;
803     }
804     else {
805         /* raise error */
806         return (FALSE);
807     }
809     /* register the transport instance */
810     MessageQ_module->regTrans[tid].transport = inst;
811     MessageQ_module->regTrans[tid].type = type;
812     return (TRUE);
815 /*
816  *  ======== ti_sdo_ipc_MessageQ_registerTransportId ========
817  */
818 Bool ti_sdo_ipc_MessageQ_unregisterTransportId(UInt tid)
820     /* forget the registered transport instance */
821     MessageQ_module->regTrans[tid].transport = NULL;
822     MessageQ_module->regTrans[tid].type =
823             ti_sdo_ipc_MessageQ_TransportType_Invalid;
825     return (TRUE);
829 /*
830  *************************************************************************
831  *                       Instance functions
832  *************************************************************************
833  */
835 /*
836  *  ======== MessageQ_Instance_init ========
837  */
838 Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String name,
839         const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
841     Int              i;
842     UInt16           start;
843     UInt16           count;
844     UInt             key;
845     Bool             found = FALSE;
846     List_Handle      listHandle;
847     SyncSem_Handle   syncSemHandle;
848     MessageQ_QueueIndex queueIndex;
849     Int tid;
850     Int status;
851     ITransport_Handle baseTrans;
852     INetworkTransport_Handle netTrans;
854     /* lock */
855     key = IGateProvider_enter(MessageQ_module->gate);
857     if (params->queueIndex != MessageQ_ANY) {
858         queueIndex = params->queueIndex;
860         if ((queueIndex >= ti_sdo_ipc_MessageQ_numReservedEntries) ||
861             (MessageQ_module->queues[queueIndex] != NULL)) {
862             IGateProvider_leave(MessageQ_module->gate, key);
863             Error_raise(eb, ti_sdo_ipc_MessageQ_E_indexNotAvailable,
864                 queueIndex, 0);
865             return (5);
866         }
867         MessageQ_module->queues[queueIndex] = obj;
868         found = TRUE;
869     }
870     else {
872         start = ti_sdo_ipc_MessageQ_numReservedEntries;
873         count = MessageQ_module->numQueues;
875         /* Search the dynamic array for any holes */
876         for (i = start; (i < count) && (found == FALSE); i++) {
877             if (MessageQ_module->queues[i] == NULL) {
878                 MessageQ_module->queues[i] = obj;
879                 queueIndex = i;
880                 found = TRUE;
881             }
882         }
883     }
885     /*
886      *  If no free slot was found:
887      *     - if no growth allowed, raise and error
888      *     - if growth is allowed, grow the array
889      */
890     if (found == FALSE) {
891         if (ti_sdo_ipc_MessageQ_maxRuntimeEntries != NameServer_ALLOWGROWTH) {
892             /* unlock scheduler */
893             IGateProvider_leave(MessageQ_module->gate, key);
895             Error_raise(eb, ti_sdo_ipc_MessageQ_E_maxReached,
896                 ti_sdo_ipc_MessageQ_maxRuntimeEntries, 0);
897             return (1);
898         }
899         else {
900             queueIndex = MessageQ_grow(obj, eb);
901             if (queueIndex == MessageQ_INVALIDMESSAGEQ) {
902                 /* unlock scheduler */
903                 IGateProvider_leave(MessageQ_module->gate, key);
904                 return (2);
905             }
906         }
907     }
909     /* create default sync if not specified */
910     if (params->synchronizer == NULL) {
911         /* Create a SyncSem as the synchronizer */
912         syncSemHandle = SyncSem_create(NULL, eb);
914         if (syncSemHandle == NULL) {
915             /* unlock scheduler */
916             IGateProvider_leave(MessageQ_module->gate, key);
917             return (3);
918         }
920         /* store handle for use in finalize ...  */
921         obj->syncSemHandle = syncSemHandle;
923         obj->synchronizer = SyncSem_Handle_upCast(syncSemHandle);
924     }
925     else {
926         obj->syncSemHandle = NULL;
928         obj->synchronizer = params->synchronizer;
929     }
931     /* unlock scheduler */
932     IGateProvider_leave(MessageQ_module->gate, key);
934     /* Fill in the message queue object */
935     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
936     List_construct(List_struct(listHandle), NULL);
938     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
939     List_construct(List_struct(listHandle), NULL);
941     obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queueIndex;
943     obj->unblocked = FALSE;
945     /* Add into NameServer */
946     if (name != NULL) {
947         obj->nsKey = NameServer_addUInt32(
948                 (NameServer_Handle)MessageQ_module->nameServer, name,
949                 obj->queue);
951         if (obj->nsKey == NULL) {
952             Error_raise(eb, ti_sdo_ipc_MessageQ_E_nameFailed, name, 0);
953             return (4);
954         }
955     }
957     /* notify all registered transports about the new queue */
958     for (tid = 1; tid <= 7; tid++) {
959         if (MessageQ_module->regTrans[tid].transport == NULL) {
960             continue;
961         }
962         baseTrans = MessageQ_module->regTrans[tid].transport;
964         switch (MessageQ_module->regTrans[tid].type) {
966             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
967                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
969                 if (INetworkTransport_bind(netTrans, obj->queue)) {
970                     status = MessageQ_S_SUCCESS;
971                 }
972                 else {
973                     status = MessageQ_E_FAIL;
974                 }
975             break;
976         }
978         /* check for failure */
979         if (status < 0) {
980             /* TODO add error handling */
981         }
982     }
984     return (0);
987 /*
988  *  ======== MessageQ_Instance_finalize ========
989  */
990 Void ti_sdo_ipc_MessageQ_Instance_finalize(
991         ti_sdo_ipc_MessageQ_Object* obj, Int status)
993     UInt key;
994     MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
995     List_Handle listHandle;
996     Int tid;
997     ITransport_Handle baseTrans;
998     INetworkTransport_Handle netTrans;
1000     /* Requested queueId was not available. Nothing was done in the init */
1001     if (status == 5) {
1002         return;
1003     }
1005     /* notify all registered transports that given queue is being deleted */
1006     for (tid = 1; tid <= 7; tid++) {
1007         if (MessageQ_module->regTrans[tid].transport == NULL) {
1008             continue;
1009         }
1010         baseTrans = MessageQ_module->regTrans[tid].transport;
1012         switch (MessageQ_module->regTrans[tid].type) {
1014             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
1015                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
1017                 if (INetworkTransport_unbind(netTrans, obj->queue)) {
1018                     status = MessageQ_S_SUCCESS;
1019                 }
1020                 else {
1021                     status = MessageQ_E_FAIL;
1022                 }
1023             break;
1024         }
1026         /* check for failure */
1027         if (status < 0) {
1028             /* TODO add error handling */
1029         }
1030     }
1032     if (obj->syncSemHandle != NULL) {
1033         SyncSem_delete(&obj->syncSemHandle);
1034     }
1036     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
1038     /* Destruct the list */
1039     List_destruct(List_struct(listHandle));
1041     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
1043     /* Destruct the list */
1044     List_destruct(List_struct(listHandle));
1046     /* lock */
1047     key = IGateProvider_enter(MessageQ_module->gate);
1049     /* Null out entry in the array. */
1050     MessageQ_module->queues[index] = NULL;
1052     /* unlock scheduler */
1053     IGateProvider_leave(MessageQ_module->gate, key);
1055     if (obj->nsKey != NULL) {
1056         NameServer_removeEntry((NameServer_Handle)MessageQ_module->nameServer,
1057             obj->nsKey);
1058     }
1061 /*
1062  *************************************************************************
1063  *                       Internal functions
1064  *************************************************************************
1065  */
1067 /*
1068  *  ======== ti_sdo_ipc_MessageQ_grow ========
1069  */
1070 UInt16 ti_sdo_ipc_MessageQ_grow(ti_sdo_ipc_MessageQ_Object *obj,
1071         Error_Block *eb)
1073     UInt16 oldSize;
1074     UInt16 queueIndex = MessageQ_module->numQueues;
1075     ti_sdo_ipc_MessageQ_Handle *queues;
1076     ti_sdo_ipc_MessageQ_Handle *oldQueues;
1077     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
1080     /* Allocate larger table */
1081     queues = Memory_alloc(ti_sdo_ipc_MessageQ_Object_heap(),
1082                           oldSize + sizeof(MessageQ_Handle), 0, eb);
1084     if (queues == NULL) {
1085         return (MessageQ_INVALIDMESSAGEQ);
1086     }
1088     /* Copy contents into new table */
1089     memcpy(queues, MessageQ_module->queues, oldSize);
1091     /* Fill in the new entry */
1092     queues[queueIndex] = obj;
1094     /* Hook-up new table */
1095     oldQueues = MessageQ_module->queues;
1096     MessageQ_module->queues = queues;
1097     MessageQ_module->numQueues++;
1099     /* Delete old table if not statically defined */
1100     if (MessageQ_module->canFreeQueues == TRUE) {
1101         Memory_free(ti_sdo_ipc_MessageQ_Object_heap(), oldQueues, oldSize);
1102     }
1103     else {
1104         MessageQ_module->canFreeQueues = TRUE;
1105     }
1107     return (queueIndex);