ipc: Remove compiler warnings
[ipc/ipcdev.git] / packages / ti / sdo / ipc / MessageQ.c
1 /*
2  * Copyright (c) 2012-2018 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== MessageQ.c ========
34  *  Implementation of functions specified in MessageQ.xdc.
35  */
37 #include <xdc/std.h>
39 #include <string.h>
41 #include <xdc/runtime/Error.h>
42 #include <xdc/runtime/Assert.h>
43 #include <xdc/runtime/Log.h>
44 #include <xdc/runtime/Memory.h>
45 #include <xdc/runtime/IHeap.h>
46 #include <xdc/runtime/IGateProvider.h>
47 #include <xdc/runtime/Startup.h>
49 #include <xdc/runtime/knl/ISync.h>
50 #include <xdc/runtime/knl/GateThread.h>
52 #include <ti/sysbios/hal/Hwi.h>
53 #include <ti/sysbios/syncs/SyncSem.h>
55 #include <ti/sdo/ipc/interfaces/ITransport.h>
56 #include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
57 #include <ti/sdo/ipc/interfaces/INetworkTransport.h>
58 #include <ti/sdo/utils/List.h>
60 /* must be included after the internal header file for now */
61 #define MessageQ_internal 1     /* must be defined before include file */
62 #include <ti/sdo/ipc/_MessageQ.h>
63 #include <ti/sdo/utils/_MultiProc.h>
64 #include <ti/sdo/utils/_NameServer.h>
66 #include "package/internal/MessageQ.xdc.h"
68 /* params structure evolution */
69 typedef struct {
70     Void *synchronizer;
71 } MessageQ_Params_Legacy;
73 typedef struct {
74     Int __version;
75     Void *synchronizer;
76     MessageQ_QueueIndex queueIndex;
77 } MessageQ_Params_Version2;
79 #ifdef __ti__
80     #pragma FUNC_EXT_CALLED(MessageQ_Params_init);
81     #pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
82     #pragma FUNC_EXT_CALLED(MessageQ_alloc);
83     #pragma FUNC_EXT_CALLED(MessageQ_close);
84     #pragma FUNC_EXT_CALLED(MessageQ_count);
85     #pragma FUNC_EXT_CALLED(MessageQ_create);
86     #pragma FUNC_EXT_CALLED(MessageQ_create2);
87     #pragma FUNC_EXT_CALLED(MessageQ_delete);
88     #pragma FUNC_EXT_CALLED(MessageQ_open);
89     #pragma FUNC_EXT_CALLED(MessageQ_openQueueId);
90     #pragma FUNC_EXT_CALLED(MessageQ_free);
91     #pragma FUNC_EXT_CALLED(MessageQ_get);
92     #pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
93     #pragma FUNC_EXT_CALLED(MessageQ_put);
94     #pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
95     #pragma FUNC_EXT_CALLED(MessageQ_registerTransportId);
96     #pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
97     #pragma FUNC_EXT_CALLED(MessageQ_setPutHookFxn);
98     #pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
99     #pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
100     #pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
101     #pragma FUNC_EXT_CALLED(MessageQ_unblock);
102     #pragma FUNC_EXT_CALLED(MessageQ_unregisterHeap);
103 #endif
105 /*
106  *  ======== MessageQ_msgInit ========
107  *  This is a helper function to initialize a message.
108  */
109 static Void MessageQ_msgInit(MessageQ_Msg msg)
111     UInt key;
113     msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
114     msg->msgId   = MessageQ_INVALIDMSGID;
115     msg->dstId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
116     msg->flags   = ti_sdo_ipc_MessageQ_HEADERVERSION |
117                    MessageQ_NORMALPRI |
118                    (ti_sdo_ipc_MessageQ_TRACEMASK &
119                    (ti_sdo_ipc_MessageQ_traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT));
120     msg->srcProc = MultiProc_self();
122     key = Hwi_disable();
123     msg->seqNum  = MessageQ_module->seqNum++;
124     Hwi_restore(key);
127 /*
128  *************************************************************************
129  *                       Common Header Functions
130  *************************************************************************
131  */
133 /*
134  *  ======== MessageQ_Params_init ========
135  *  Legacy implementation.
136  */
137 Void MessageQ_Params_init(MessageQ_Params *params)
139     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
142 /*
143  *  ======== MessageQ_Params_init__S ========
144  *  New implementation which is version aware.
145  */
146 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
148     MessageQ_Params_Version2 *params2;
150     switch (version) {
152         case MessageQ_Params_VERSION_2:
153             params2 = (MessageQ_Params_Version2 *)params;
154             params2->__version = MessageQ_Params_VERSION_2;
155             params2->synchronizer = NULL;
156             params2->queueIndex = MessageQ_ANY;
157             break;
159         default:
160             Assert_isTrue(FALSE, 0);
161             break;
162     }
165 /*
166  *  ======== MessageQ_Params2_init ========
167  *  Deprecated
168  */
169 Void MessageQ_Params2_init(MessageQ_Params2 *params)
171     params->synchronizer = NULL;
172     params->queueIndex = MessageQ_ANY;
175 /*
176  *  ======== MessageQ_alloc ========
177  *  Allocate a message and initialize the needed fields
178  *
179  *  Note: some of the fields in the header are set via other
180  *  APIs or in the MessageQ_put function.
181  */
182 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
184     MessageQ_Msg msg;
185     Error_Block eb;
187     Assert_isTrue((heapId < MessageQ_module->numHeaps),
188                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
190     Assert_isTrue((MessageQ_module->heaps[heapId] != NULL),
191                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
193     /* Allocate the message. No alignment requested */
194     Error_init(&eb);
195     msg = Memory_alloc(MessageQ_module->heaps[heapId], size, 0, &eb);
197     if (msg == NULL) {
198         return (NULL);
199     }
201     /* Fill in the fields of the message */
202     MessageQ_msgInit(msg);
203     msg->msgSize = size;
204     msg->heapId  = heapId;
206     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
207         Log_write3(ti_sdo_ipc_MessageQ_LM_alloc, (UArg)(msg),
208             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
209     }
211     return (msg);
214 /*
215  *  ======== MessageQ_count ========
216  */
217 Int MessageQ_count(MessageQ_Handle handle)
219     Int              count = 0;
220     UInt             key;
221     List_Elem       *tempMsg = NULL;
222     List_Handle      listHandle;
223     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
225     /* lock */
226     key = Hwi_disable();
228     /* Get the list */
229     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
231     /* Loop through and count the messages */
232     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
233         count++;
234     }
236     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
238     /* Loop through and count the messages */
239     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
240         count++;
241     }
243     /* unlock scheduler */
244     Hwi_restore(key);
246     return (count);
249 /*
250  *  ======== MessageQ_close ========
251  */
252 Int MessageQ_close(MessageQ_QueueId *queueId)
254     *queueId = MessageQ_INVALIDMESSAGEQ;
256     return (MessageQ_S_SUCCESS);
259 /*
260  *  ======== MessageQ_create ========
261  */
262 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
264     ti_sdo_ipc_MessageQ_Handle handle;
265     ti_sdo_ipc_MessageQ_Params ps;
266     Error_Block eb;
268     Error_init(&eb);
270     if (pp != NULL) {
271         ti_sdo_ipc_MessageQ_Params_init(&ps);
273         /* snoop the params pointer to see if it's a legacy structure */
274         if ((pp->__version == 0) || (pp->__version > 100)) {
275             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
276         }
278         /* not legacy structure, use params version field */
279         else if (pp->__version == MessageQ_Params_VERSION_2) {
280             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
281             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
282         }
283         else {
284             Assert_isTrue(FALSE, 0);
285         }
287         handle = ti_sdo_ipc_MessageQ_create(name, &ps, &eb);
288     }
289     else {
290         handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
291     }
293     return ((MessageQ_Handle)handle);
296 /*
297  *  ======== MessageQ_create2 ========
298  */
299 MessageQ_Handle MessageQ_create2(String name, const MessageQ_Params2 *params)
301     ti_sdo_ipc_MessageQ_Handle handle;
302     ti_sdo_ipc_MessageQ_Params prms;
303     Error_Block eb;
305     Error_init(&eb);
307     if (params != NULL) {
308         ti_sdo_ipc_MessageQ_Params_init(&prms);
310         prms.synchronizer = params->synchronizer;
311         prms.queueIndex = params->queueIndex;
313         handle = ti_sdo_ipc_MessageQ_create(name, &prms, &eb);
314     }
315     else {
316         handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
317     }
319     return ((MessageQ_Handle)handle);
322 /*
323  *  ======== MessageQ_delete ========
324  */
325 Int MessageQ_delete(MessageQ_Handle *handlePtr)
327     ti_sdo_ipc_MessageQ_Handle *instp;
329     instp = (ti_sdo_ipc_MessageQ_Handle *)handlePtr;
331     ti_sdo_ipc_MessageQ_delete(instp);
333     return (MessageQ_S_SUCCESS);
336 /*
337  *  ======== MessageQ_free ========
338  */
339 Int MessageQ_free(MessageQ_Msg msg)
341     IHeap_Handle heap;
342     Bits16 msgId;
343     Bits16 heapId;
345     /* make sure msg is not NULL */
346     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
348     /* Cannot free a message that was initialized via MessageQ_staticMsgInit */
349     Assert_isTrue((msg->heapId != ti_sdo_ipc_MessageQ_STATICMSG),
350                   ti_sdo_ipc_MessageQ_A_cannotFreeStaticMsg);
352     Assert_isTrue((msg->heapId < MessageQ_module->numHeaps),
353                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
355     Assert_isTrue((MessageQ_module->heaps[msg->heapId] != NULL),
356                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
358     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
359         (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
360         Log_write3(ti_sdo_ipc_MessageQ_LM_free, (UArg)(msg),
361             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
362     }
364     heap = MessageQ_module->heaps[msg->heapId];
366     if (heap != NULL) {
367         msgId = MessageQ_getMsgId(msg);
368         heapId = msg->heapId;
369         Memory_free(heap, msg, msg->msgSize);
370         if (MessageQ_module->freeHookFxn != NULL) {
371             MessageQ_module->freeHookFxn(heapId, msgId);
372         }
373     }
374     else {
375         return (MessageQ_E_FAIL);
376     }
378     return (MessageQ_S_SUCCESS);
381 /*
382  *  ======== MessageQ_get ========
383  */
384 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
386     Int status;
387     List_Handle highList;
388     List_Handle normalList;
389     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
391     /* Get the list */
392     normalList = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
393     highList   = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
395     /* Keep looping while there are no elements on either list */
396     *msg = (MessageQ_Msg)List_get(highList);
397     while (*msg == NULL) {
398         *msg = (MessageQ_Msg)List_get(normalList);
399         if (*msg == NULL) {
400             /*  Block until notified. */
401             status = ISync_wait(obj->synchronizer, timeout, NULL);
402             if (status == ISync_WaitStatus_TIMEOUT) {
403                 return (MessageQ_E_TIMEOUT);
404             }
405             else if (status < 0) {
406                 return (MessageQ_E_FAIL);
407             }
409             if (obj->unblocked) {
410                 /* *(msg) may be NULL */
411                 return (MessageQ_E_UNBLOCKED);
412             }
414             *msg = (MessageQ_Msg)List_get(highList);
415         }
416     }
418     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
419         (((*msg)->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0)) {
420         Log_write4(ti_sdo_ipc_MessageQ_LM_get, (UArg)(*msg),
421             (UArg)((*msg)->seqNum), (UArg)((*msg)->srcProc), (UArg)(obj));
422     }
424     return (MessageQ_S_SUCCESS);
427 /*
428  *  ======== MessageQ_getQueueId ========
429  */
430 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
432     MessageQ_QueueId queueId;
433     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
435     queueId = (obj->queue);
437     return (queueId);
440 /*
441  *  ======== MessageQ_open ========
442  */
443 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
445     Int         status;
446     Error_Block eb;
448     Assert_isTrue(name != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
449     Assert_isTrue(queueId != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
451     Error_init(&eb);
453     /* Search NameServer */
454     status = NameServer_getUInt32(
455             (NameServer_Handle)MessageQ_module->nameServer, name, queueId,
456             NULL);
458     if (status >= 0) {
459         return (MessageQ_S_SUCCESS);    /* name found */
460     }
461     else {
462         return (MessageQ_E_NOTFOUND);   /* name not found */
463     }
466 /*
467  *  ======== MessageQ_openQueueId ========
468  */
469 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
471     MessageQ_QueueIndex queuePort;
472     MessageQ_QueueId queueId;
474     /* queue port is embedded in the queueId */
475     queuePort = queueIndex + MessageQ_PORTOFFSET;
476     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
478     return (queueId);
481 /*
482  *  ======== MessageQ_put ========
483  */
484 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
486     IMessageQTransport_Handle transport;
487     MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
488     List_Handle       listHandle;
489     Int               status = MessageQ_E_FAIL;
490     UInt              priority;
491 #ifndef xdc_runtime_Log_DISABLE_ALL
492     UInt16            flags;
493     UInt16            seqNum;
494     UInt16            srcProc;
495 #endif
496     ti_sdo_ipc_MessageQ_Object   *obj;
497     Int tid;
498     ITransport_Handle baseTrans;
499     INetworkTransport_Handle netTrans;
500     MessageQ_QueueIndex queueIndex;
501     UInt16 index;
503     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
505     msg->dstId   = (UInt16)(queueId);
506     msg->dstProc = (UInt16)(queueId >> 16);
508     /* invoke put hook function after addressing the message */
509     if (MessageQ_module->putHookFxn != NULL) {
510         MessageQ_module->putHookFxn(queueId, (Ptr)msg);
511     }
513     /* extract the transport ID from the message header */
514     tid = MessageQ_getTransportId(msg);
516     /* if recipient is local, use direct message delivery */
517     if (dstProcId == MultiProc_self()) {
518         /* assert queue index is valid */
519         queueIndex = MessageQ_getQueueIndex(queueId);
520         Assert_isTrue(queueIndex < MessageQ_module->numQueues,
521                       ti_sdo_ipc_MessageQ_A_invalidQueueId);
523         /* It is a local MessageQ */
524         obj = MessageQ_module->queues[queueIndex];
526         /* Assert object is not NULL */
527         Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
529         if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
530             listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
531             List_putHead(listHandle, (List_Elem *)msg);
532         }
533         else {
534             if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
535                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
536             }
537             else {
538                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
539             }
540             /* put on the queue */
541             List_put(listHandle, (List_Elem *)msg);
542         }
544         ISync_signal(obj->synchronizer);
546         status = MessageQ_S_SUCCESS;
548         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
549             (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
550             Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
551                        (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
552         }
553     }
555     /* if transport ID is zero, use primary transport array */
556     else if (tid == 0) {
557         /* assert that dstProcId is valid */
558         Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
559                       ti_sdo_ipc_MessageQ_A_procIdInvalid);
561         /* Put the high and urgent messages to the high priority transport */
562         priority = (UInt)((msg->flags) &
563             ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
565         switch (ti_sdo_utils_MultiProc_procAddrMode) {
566             case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
567                 index = dstProcId;
568                 break;
570             case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
571                 index = dstProcId - MultiProc_getBaseIdOfCluster();
572                 break;
574             default:
575                 Assert_isTrue(FALSE, 0);
576                 break;
577         }
579         if (index >= MessageQ_module->transports.length) {
580             /* raise error */
581             status = MessageQ_E_FAIL;
582             goto leave;
583         }
585         /* Call the transport associated with this message queue */
586         transport = MessageQ_module->transports.elem[index][priority];
588         if (transport == NULL) {
589             /* Try the other transport */
590             priority = !priority;
591             transport = MessageQ_module->transports.elem[index][priority];
592         }
594         /* assert transport is not null */
595         Assert_isTrue(transport != NULL,
596             ti_sdo_ipc_MessageQ_A_unregisteredTransport);
598 #ifndef xdc_runtime_Log_DISABLE_ALL
599         /* use local vars so msg does not get cached after put */
600         flags = msg->flags;
602         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
603             (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
604             /* use local vars so msg does not get cached after put */
605             seqNum  = msg->seqNum;
606             srcProc = msg->srcProc;
607         }
608 #endif
610         /* put msg to remote processor using transport */
611         if (IMessageQTransport_put(transport, msg)) {
612             status = MessageQ_S_SUCCESS;
614 #ifndef xdc_runtime_Log_DISABLE_ALL
615             /* if trace enabled */
616             if ((ti_sdo_ipc_MessageQ_traceFlag) ||
617                 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
618                 Log_write4(ti_sdo_ipc_MessageQ_LM_putRemote, (UArg)(msg),
619                           (UArg)(seqNum), (UArg)(srcProc),
620                           (UArg)(dstProcId));
621             }
622 #endif
623         }
624         else {
625             status = MessageQ_E_FAIL;
626         }
627     }
629     /* use a registered transport to deliver the message */
630     else {
631         baseTrans = MessageQ_module->regTrans[tid].transport;
633         if (baseTrans == NULL) {
634             /* raise error */
635             status = MessageQ_E_FAIL;
636             goto leave;
637         }
639         switch (MessageQ_module->regTrans[tid].type) {
641             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
642                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
643                 if(netTrans == NULL) {
644                     status = MessageQ_E_FAIL;
645                     break;
646                 }
647                 if (INetworkTransport_put(netTrans, msg)) {
648                     status = MessageQ_S_SUCCESS;
649                 }
650                 else {
651                     status = MessageQ_E_FAIL;
652                 }
653             break;
654         }
655     }
657 leave:
658     return (status);
661 /*
662  *  ======== MessageQ_registerHeap ========
663  *  Register a heap
664  */
665 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
667     Int status;
668     UInt key;
669     IHeap_Handle iheap = (IHeap_Handle)heap;
671     /* Make sure the heapId is valid */
672     Assert_isTrue((heapId < MessageQ_module->numHeaps),
673                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
675     /* lock scheduler */
676     key = Hwi_disable();
678     /* Make sure the id is not already in use */
679     if (MessageQ_module->heaps[heapId] == NULL) {
680         MessageQ_module->heaps[heapId] = iheap;
681         status = MessageQ_S_SUCCESS;
682     }
683     else {
684         status = MessageQ_E_ALREADYEXISTS;
685     }
687     /* unlock scheduler */
688     Hwi_restore(key);
690     return (status);
693 /*
694  *  ======== MessageQ_setFreeHookFxn ========
695  */
696 Void MessageQ_setFreeHookFxn(MessageQ_FreeHookFxn freeHookFxn)
698     MessageQ_module->freeHookFxn = freeHookFxn;
701 /*
702  *  ======== MessageQ_setPutHookFxn ========
703  */
704 Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn)
706     MessageQ_module->putHookFxn = (ti_sdo_ipc_MessageQ_PutHookFxn)putHookFxn;
709 /*
710  *  ======== MessageQ_setMsgTrace ========
711  */
712 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
714     msg->flags = (msg->flags & ~ti_sdo_ipc_MessageQ_TRACEMASK) |
715                  (traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT);
716     Log_write4(ti_sdo_ipc_MessageQ_LM_setTrace, (UArg)(msg), (UArg)(msg->seqNum),
717                (UArg)(msg->srcProc), (UArg)(traceFlag));
720 /*
721  *  ======== MessageQ_setReplyQueue ========
722  *  Embed the source message queue into a message.
723  */
724 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
726     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
728     msg->replyId   = (UInt16)(obj->queue);
729     msg->replyProc = (UInt16)(obj->queue >> 16);
732 /*
733  *  ======== MessageQ_staticMsgInit ========
734  */
735 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
737     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
739     MessageQ_msgInit(msg);
741     msg->heapId  = ti_sdo_ipc_MessageQ_STATICMSG;
742     msg->msgSize = size;
744     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
745         Log_write3(ti_sdo_ipc_MessageQ_LM_staticMsgInit, (UArg)(msg),
746                 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
747     }
750 /*
751  *  ======== MessageQ_unblock ========
752  */
753 Void MessageQ_unblock(MessageQ_Handle handle)
755     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
757     /* Assert that the queue is using a blocking synchronizer */
758     Assert_isTrue(ISync_query((obj->synchronizer), ISync_Q_BLOCKING) == TRUE,
759         ti_sdo_ipc_MessageQ_A_invalidUnblock);
761     /* Set instance to 'unblocked' state */
762     obj->unblocked = TRUE;
764     /* Signal the synchronizer */
765     ISync_signal(obj->synchronizer);
768 /*
769  *  ======== MessageQ_unregisterHeap ========
770  *  Unregister a heap
771  */
772 Int MessageQ_unregisterHeap(UInt16 heapId)
774     UInt key;
776     Assert_isTrue((heapId < MessageQ_module->numHeaps),
777                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
779     /* lock scheduler */
780     key = Hwi_disable();
782     MessageQ_module->heaps[heapId] = NULL;
784     /* unlock scheduler */
785     Hwi_restore(key);
787     return (MessageQ_S_SUCCESS);
790 /*
791  *************************************************************************
792  *                      Module functions
793  *************************************************************************
794  */
796 /*
797  *  ======== ti_sdo_ipc_MessageQ_Module_startup ========
798  */
799 Int ti_sdo_ipc_MessageQ_Module_startup(Int phase)
801     Int i;
803     /* Ensure NameServer Module_startup() has completed */
804     if (ti_sdo_utils_NameServer_Module_startupDone() == FALSE) {
805         return (Startup_NOTDONE);
806     }
808     if (GateThread_Module_startupDone() == FALSE) {
809         return (Startup_NOTDONE);
810     }
812     if (MessageQ_module->gate == NULL) {
813         MessageQ_module->gate =
814             GateThread_Handle_upCast(GateThread_create(NULL, NULL));
815     }
817     /* Loop through all the static objects and set the id */
818     for (i = 0; i < ti_sdo_ipc_MessageQ_Object_count(); i++) {
819         MessageQ_module->queues[i] = (ti_sdo_ipc_MessageQ_Object *)
820                 ti_sdo_ipc_MessageQ_Object_get(NULL, i);
821     }
823     /* Null out the dynamic ones */
824     for (i = ti_sdo_ipc_MessageQ_Object_count(); i < MessageQ_module->numQueues;
825             i++) {
826         MessageQ_module->queues[i] = NULL;
827     }
829     return (Startup_DONE);
832 /*
833  *  ======== ti_sdo_ipc_MessageQ_registerTransport ========
834  */
835 Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
836     UInt16 procId, UInt priority)
838     Bool flag = FALSE;
839     UInt key;
840     UInt16 index;
842     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors,
843             ti_sdo_ipc_MessageQ_A_procIdInvalid);
845     switch (ti_sdo_utils_MultiProc_procAddrMode) {
846         case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
847             index = procId;
848             break;
850         case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
851             index = procId - MultiProc_getBaseIdOfCluster();
852             break;
854         default:
855             Assert_isTrue(FALSE, 0);
856             break;
857     }
859     Assert_isTrue(index < MessageQ_module->transports.length,
860             ti_sdo_ipc_MessageQ_A_procIdInvalid);
862     /* lock scheduler */
863     key = Hwi_disable();
865     /* make sure the id is not already in use */
866     if (MessageQ_module->transports.elem[index][priority] == NULL) {
867         MessageQ_module->transports.elem[index][priority] = handle;
868         flag = TRUE;
869     }
871     /* unlock scheduler */
872     Hwi_restore(key);
874     return (flag);
877 /*
878  *  ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
879  */
880 Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
882     UInt key;
883     UInt16 index;
885     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors,
886             ti_sdo_ipc_MessageQ_A_procIdInvalid);
888     switch (ti_sdo_utils_MultiProc_procAddrMode) {
889         case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
890             index = procId;
891             break;
893         case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
894             index = procId - MultiProc_getBaseIdOfCluster();
895             break;
897         default:
898             Assert_isTrue(FALSE, 0);
899             break;
900     }
902     Assert_isTrue(index < MessageQ_module->transports.length,
903             ti_sdo_ipc_MessageQ_A_procIdInvalid);
905     /* lock scheduler */
906     key = Hwi_disable();
908     MessageQ_module->transports.elem[index][priority] = NULL;
910     /* unlock scheduler */
911     Hwi_restore(key);
914 /*
915  *  ======== MessageQ_registerTransportId ========
916  */
917 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
919     ti_sdo_ipc_MessageQ_TransportType type;
921     /* validate transport ID */
922     if ((tid < 1) || (tid > 7)) {
923         /* raise error */
924         return (FALSE);
925     }
927     /* don't overwrite an existing transport */
928     if (MessageQ_module->regTrans[tid].transport != NULL) {
929         /* raise error */
930         return (FALSE);
931     }
933     /* determine the transport type */
934     if (INetworkTransport_Handle_downCast(inst) != NULL) {
935         type = ti_sdo_ipc_MessageQ_TransportType_INetworkTransport;
936     }
937     else {
938         /* raise error */
939         return (FALSE);
940     }
942     /* register the transport instance */
943     MessageQ_module->regTrans[tid].transport = inst;
944     MessageQ_module->regTrans[tid].type = type;
945     return (TRUE);
948 /*
949  *  ======== MessageQ_unregisterTransportId ========
950  */
951 Void MessageQ_unregisterTransportId(UInt tid)
953     /* forget the registered transport instance */
954     MessageQ_module->regTrans[tid].transport = NULL;
955     MessageQ_module->regTrans[tid].type =
956             ti_sdo_ipc_MessageQ_TransportType_Invalid;
958     return;
962 /*
963  *************************************************************************
964  *                       Instance functions
965  *************************************************************************
966  */
968 /*
969  *  ======== MessageQ_Instance_init ========
970  */
971 Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj,
972         String name, const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
974     Int              i;
975     UInt16           start;
976     UInt16           count;
977     UInt             key;
978     Bool             found = FALSE;
979     List_Handle      listHandle;
980     SyncSem_Handle   syncSemHandle;
981     MessageQ_QueueIndex queueIndex;
982     MessageQ_QueueIndex queuePort;
983     Int tid;
984     Int status = MessageQ_E_FAIL;
985     ITransport_Handle baseTrans;
986     INetworkTransport_Handle netTrans;
988     /* lock */
989     key = IGateProvider_enter(MessageQ_module->gate);
991     if (params->queueIndex != MessageQ_ANY) {
992         queueIndex = params->queueIndex;
994         if ((queueIndex >= ti_sdo_ipc_MessageQ_numReservedEntries) ||
995             (MessageQ_module->queues[queueIndex] != NULL)) {
996             IGateProvider_leave(MessageQ_module->gate, key);
997             Error_raise(eb, ti_sdo_ipc_MessageQ_E_indexNotAvailable,
998                 queueIndex, 0);
999             return (5);
1000         }
1001         MessageQ_module->queues[queueIndex] = obj;
1002         found = TRUE;
1003     }
1004     else {
1005         start = ti_sdo_ipc_MessageQ_numReservedEntries;
1006         count = MessageQ_module->numQueues;
1008         /* Search the dynamic array for any holes */
1009         for (i = start; (i < count) && (found == FALSE); i++) {
1010             if (MessageQ_module->queues[i] == NULL) {
1011                 MessageQ_module->queues[i] = obj;
1012                 queueIndex = i;
1013                 found = TRUE;
1014             }
1015         }
1016     }
1018     /*
1019      *  If no free slot was found:
1020      *     - if no growth allowed, raise and error
1021      *     - if growth is allowed, grow the array
1022      */
1023     if (found == FALSE) {
1024         if (ti_sdo_ipc_MessageQ_maxRuntimeEntries != NameServer_ALLOWGROWTH) {
1025             /* unlock scheduler */
1026             IGateProvider_leave(MessageQ_module->gate, key);
1028             Error_raise(eb, ti_sdo_ipc_MessageQ_E_maxReached,
1029                 ti_sdo_ipc_MessageQ_maxRuntimeEntries, 0);
1030             return (1);
1031         }
1032         else {
1033             queueIndex = MessageQ_grow(obj, eb);
1034             if (queueIndex == MessageQ_INVALIDMESSAGEQ) {
1035                 /* unlock scheduler */
1036                 IGateProvider_leave(MessageQ_module->gate, key);
1037                 return (2);
1038             }
1039         }
1040     }
1042     /* create default sync if not specified */
1043     if (params->synchronizer == NULL) {
1044         /* Create a SyncSem as the synchronizer */
1045         syncSemHandle = SyncSem_create(NULL, eb);
1047         if (syncSemHandle == NULL) {
1048             /* unlock scheduler */
1049             IGateProvider_leave(MessageQ_module->gate, key);
1050             return (3);
1051         }
1053         /* store handle for use in finalize ...  */
1054         obj->syncSemHandle = syncSemHandle;
1056         obj->synchronizer = SyncSem_Handle_upCast(syncSemHandle);
1057     }
1058     else {
1059         obj->syncSemHandle = NULL;
1061         obj->synchronizer = params->synchronizer;
1062     }
1064     /* unlock scheduler */
1065     IGateProvider_leave(MessageQ_module->gate, key);
1067     /* Fill in the message queue object */
1068     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
1069     List_construct(List_struct(listHandle), NULL);
1071     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
1072     List_construct(List_struct(listHandle), NULL);
1074     /* queue port is embedded in the queueId */
1075     queuePort = queueIndex + MessageQ_PORTOFFSET;
1076     obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queuePort;
1078     obj->unblocked = FALSE;
1080     /* Add into NameServer */
1081     if (name != NULL) {
1082         obj->nsKey = NameServer_addUInt32(
1083                 (NameServer_Handle)MessageQ_module->nameServer, name,
1084                 obj->queue);
1086         if (obj->nsKey == NULL) {
1087             Error_raise(eb, ti_sdo_ipc_MessageQ_E_nameFailed, name, 0);
1088             return (4);
1089         }
1090     }
1092     /* notify all registered transports about the new queue */
1093     for (tid = 1; tid <= 7; tid++) {
1094         if (MessageQ_module->regTrans[tid].transport == NULL) {
1095             continue;
1096         }
1097         baseTrans = MessageQ_module->regTrans[tid].transport;
1099         switch (MessageQ_module->regTrans[tid].type) {
1101             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
1102                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
1103                 if(netTrans == NULL) {
1104                     status = MessageQ_E_FAIL;
1105                     break;
1106                 }
1108                 if (INetworkTransport_bind(netTrans, obj->queue)) {
1109                     status = MessageQ_S_SUCCESS;
1110                 }
1111                 else {
1112                     status = MessageQ_E_FAIL;
1113                 }
1114             break;
1115         }
1117         /* check for failure */
1118         if (status < 0) {
1119             return(6);
1120         }
1121     }
1123     return (0);
1126 /*
1127  *  ======== MessageQ_Instance_finalize ========
1128  */
1129 Void ti_sdo_ipc_MessageQ_Instance_finalize(
1130         ti_sdo_ipc_MessageQ_Object* obj, Int status)
1132     UInt key;
1133     MessageQ_QueueIndex queueIndex;
1134     List_Handle listHandle;
1135     Int tid;
1136     ITransport_Handle baseTrans;
1137     INetworkTransport_Handle netTrans;
1139     /* Requested queueId was not available. Nothing was done in the init */
1140     if (status == 5) {
1141         return;
1142     }
1144     /* notify all registered transports that given queue is being deleted */
1145     for (tid = 1; tid <= 7; tid++) {
1146         if (MessageQ_module->regTrans[tid].transport == NULL) {
1147             continue;
1148         }
1149         baseTrans = MessageQ_module->regTrans[tid].transport;
1151         switch (MessageQ_module->regTrans[tid].type) {
1153             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
1154                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
1155                 if(netTrans == NULL) {
1156                     status = MessageQ_E_FAIL;
1157                     break;
1158                 }
1160                 if (INetworkTransport_unbind(netTrans, obj->queue)) {
1161                     status = MessageQ_S_SUCCESS;
1162                 }
1163                 else {
1164                     status = MessageQ_E_FAIL;
1165                 }
1166             break;
1167         }
1169         /* check for failure */
1170         if (status < 0) {
1171             /* TODO add error handling */
1172         }
1173     }
1175     if (obj->syncSemHandle != NULL) {
1176         SyncSem_delete(&obj->syncSemHandle);
1177     }
1179     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
1181     /* Destruct the list */
1182     List_destruct(List_struct(listHandle));
1184     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
1186     /* Destruct the list */
1187     List_destruct(List_struct(listHandle));
1189     /* lock */
1190     key = IGateProvider_enter(MessageQ_module->gate);
1192     /* Null out entry in the array. */
1193     queueIndex = MessageQ_getQueueIndex(obj->queue);
1194     MessageQ_module->queues[queueIndex] = NULL;
1196     /* unlock scheduler */
1197     IGateProvider_leave(MessageQ_module->gate, key);
1199     if (obj->nsKey != NULL) {
1200         NameServer_removeEntry((NameServer_Handle)MessageQ_module->nameServer,
1201             obj->nsKey);
1202     }
1205 /*
1206  *************************************************************************
1207  *                       Internal functions
1208  *************************************************************************
1209  */
1211 /*
1212  *  ======== ti_sdo_ipc_MessageQ_grow ========
1213  */
1214 UInt16 ti_sdo_ipc_MessageQ_grow(ti_sdo_ipc_MessageQ_Object *obj,
1215         Error_Block *eb)
1217     UInt16 oldSize;
1218     UInt16 queueIndex = MessageQ_module->numQueues;
1219     ti_sdo_ipc_MessageQ_Handle *queues;
1220     ti_sdo_ipc_MessageQ_Handle *oldQueues;
1221     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
1224     /* Allocate larger table */
1225     queues = Memory_alloc(ti_sdo_ipc_MessageQ_Object_heap(),
1226                           oldSize + sizeof(MessageQ_Handle), 0, eb);
1228     if (queues == NULL) {
1229         return (MessageQ_INVALIDMESSAGEQ);
1230     }
1232     /* Copy contents into new table */
1233     memcpy(queues, MessageQ_module->queues, oldSize);
1235     /* Fill in the new entry */
1236     queues[queueIndex] = obj;
1238     /* Hook-up new table */
1239     oldQueues = MessageQ_module->queues;
1240     MessageQ_module->queues = queues;
1241     MessageQ_module->numQueues++;
1243     /* Delete old table if not statically defined */
1244     if (MessageQ_module->canFreeQueues == TRUE) {
1245         Memory_free(ti_sdo_ipc_MessageQ_Object_heap(), oldQueues, oldSize);
1246     }
1247     else {
1248         MessageQ_module->canFreeQueues = TRUE;
1249     }
1251     return (queueIndex);