Added function hook for the MessageQ_put method
[ipc/ipcdev.git] / packages / ti / sdo / ipc / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== MessageQ.c ========
34  *  Implementation of functions specified in MessageQ.xdc.
35  */
37 #include <xdc/std.h>
39 #include <string.h>
41 #include <xdc/runtime/Error.h>
42 #include <xdc/runtime/Assert.h>
43 #include <xdc/runtime/Log.h>
44 #include <xdc/runtime/Memory.h>
45 #include <xdc/runtime/IHeap.h>
46 #include <xdc/runtime/IGateProvider.h>
47 #include <xdc/runtime/Startup.h>
49 #include <xdc/runtime/knl/ISync.h>
50 #include <xdc/runtime/knl/GateThread.h>
52 #include <ti/sysbios/hal/Hwi.h>
53 #include <ti/sysbios/syncs/SyncSem.h>
55 #include <ti/sdo/ipc/interfaces/ITransport.h>
56 #include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
57 #include <ti/sdo/ipc/interfaces/INetworkTransport.h>
58 #include <ti/sdo/utils/List.h>
60 /* must be included after the internal header file for now */
61 #include <ti/sdo/ipc/_MessageQ.h>
62 #include <ti/sdo/utils/_MultiProc.h>
63 #include <ti/sdo/utils/_NameServer.h>
65 #include "package/internal/MessageQ.xdc.h"
67 #ifdef __ti__
68     #pragma FUNC_EXT_CALLED(MessageQ_Params_init);
69     #pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
70     #pragma FUNC_EXT_CALLED(MessageQ_alloc);
71     #pragma FUNC_EXT_CALLED(MessageQ_close);
72     #pragma FUNC_EXT_CALLED(MessageQ_count);
73     #pragma FUNC_EXT_CALLED(MessageQ_create);
74     #pragma FUNC_EXT_CALLED(MessageQ_create2);
75     #pragma FUNC_EXT_CALLED(MessageQ_delete);
76     #pragma FUNC_EXT_CALLED(MessageQ_open);
77     #pragma FUNC_EXT_CALLED(MessageQ_openQueueId);
78     #pragma FUNC_EXT_CALLED(MessageQ_free);
79     #pragma FUNC_EXT_CALLED(MessageQ_get);
80     #pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
81     #pragma FUNC_EXT_CALLED(MessageQ_put);
82     #pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
83     #pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
84     #pragma FUNC_EXT_CALLED(MessageQ_setPutHookFxn);
85     #pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
86     #pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
87     #pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
88     #pragma FUNC_EXT_CALLED(MessageQ_unblock);
89     #pragma FUNC_EXT_CALLED(MessageQ_unregisterHeap);
90 #endif
92 /*
93  *  ======== MessageQ_msgInit ========
94  *  This is a helper function to initialize a message.
95  */
96 static Void MessageQ_msgInit(MessageQ_Msg msg)
97 {
98     UInt key;
100     msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
101     msg->msgId   = MessageQ_INVALIDMSGID;
102     msg->dstId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
103     msg->flags   = ti_sdo_ipc_MessageQ_HEADERVERSION |
104                    MessageQ_NORMALPRI |
105                    (ti_sdo_ipc_MessageQ_TRACEMASK &
106                    (ti_sdo_ipc_MessageQ_traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT));
107     msg->srcProc = MultiProc_self();
109     key = Hwi_disable();
110     msg->seqNum  = MessageQ_module->seqNum++;
111     Hwi_restore(key);
114 /*
115  *************************************************************************
116  *                       Common Header Functions
117  *************************************************************************
118  */
120 /*
121  *  ======== MessageQ_Params_init ========
122  */
123 Void MessageQ_Params_init(MessageQ_Params *params)
125     params->synchronizer = NULL;
128 /*
129  *  ======== MessageQ_Params2_init ========
130  */
131 Void MessageQ_Params2_init(MessageQ_Params2 *params)
133     params->synchronizer = NULL;
134     params->queueIndex = MessageQ_ANY;
137 /*
138  *  ======== MessageQ_alloc ========
139  *  Allocate a message and initialize the needed fields
140  *
141  *  Note: some of the fields in the header are set via other
142  *  APIs or in the MessageQ_put function.
143  */
144 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
146     MessageQ_Msg msg;
147     Error_Block eb;
149     Assert_isTrue((heapId < MessageQ_module->numHeaps),
150                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
152     Assert_isTrue((MessageQ_module->heaps[heapId] != NULL),
153                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
155     /* Allocate the message. No alignment requested */
156     Error_init(&eb);
157     msg = Memory_alloc(MessageQ_module->heaps[heapId], size, 0, &eb);
159     if (msg == NULL) {
160         return (NULL);
161     }
163     /* Fill in the fields of the message */
164     MessageQ_msgInit(msg);
165     msg->msgSize = size;
166     msg->heapId  = heapId;
168     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
169         Log_write3(ti_sdo_ipc_MessageQ_LM_alloc, (UArg)(msg),
170             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
171     }
173     return (msg);
176 /*
177  *  ======== MessageQ_count ========
178  */
179 Int MessageQ_count(MessageQ_Handle handle)
181     Int              count = 0;
182     UInt             key;
183     List_Elem       *tempMsg = NULL;
184     List_Handle      listHandle;
185     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
187     /* lock */
188     key = Hwi_disable();
190     /* Get the list */
191     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
193     /* Loop through and count the messages */
194     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
195         count++;
196     }
198     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
200     /* Loop through and count the messages */
201     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
202         count++;
203     }
205     /* unlock scheduler */
206     Hwi_restore(key);
208     return (count);
211 /*
212  *  ======== MessageQ_close ========
213  */
214 Int MessageQ_close(MessageQ_QueueId *queueId)
216     *queueId = MessageQ_INVALIDMESSAGEQ;
218     return (MessageQ_S_SUCCESS);
221 /*
222  *  ======== MessageQ_create ========
223  */
224 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
226     MessageQ_Handle handle;
227     MessageQ_Params2 params2;
229     MessageQ_Params2_init(&params2);
231     /* Use the MessageQ_Params fields if not NULL */
232     if (params != NULL) {
233         params2.synchronizer = params->synchronizer;
234     }
236     handle = MessageQ_create2(name, &params2);
238     return ((MessageQ_Handle)handle);
241 /*
242  *  ======== MessageQ_create2 ========
243  */
244 MessageQ_Handle MessageQ_create2(String name, const MessageQ_Params2 *params)
246     ti_sdo_ipc_MessageQ_Handle handle;
247     ti_sdo_ipc_MessageQ_Params prms;
248     Error_Block eb;
250     Error_init(&eb);
252     if (params != NULL) {
253         ti_sdo_ipc_MessageQ_Params_init(&prms);
255         prms.synchronizer = params->synchronizer;
256         prms.queueIndex = params->queueIndex;
258         handle = ti_sdo_ipc_MessageQ_create(name, &prms, &eb);
259     }
260     else {
261         handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
262     }
264     return ((MessageQ_Handle)handle);
267 /*
268  *  ======== MessageQ_delete ========
269  */
270 Int MessageQ_delete(MessageQ_Handle *handlePtr)
272     ti_sdo_ipc_MessageQ_Handle *instp;
274     instp = (ti_sdo_ipc_MessageQ_Handle *)handlePtr;
276     ti_sdo_ipc_MessageQ_delete(instp);
278     return (MessageQ_S_SUCCESS);
281 /*
282  *  ======== MessageQ_free ========
283  */
284 Int MessageQ_free(MessageQ_Msg msg)
286     IHeap_Handle heap;
287     Bits16 msgId;
288     Bits16 heapId;
290     /* make sure msg is not NULL */
291     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
293     /* Cannot free a message that was initialized via MessageQ_staticMsgInit */
294     Assert_isTrue((msg->heapId != ti_sdo_ipc_MessageQ_STATICMSG),
295                   ti_sdo_ipc_MessageQ_A_cannotFreeStaticMsg);
297     Assert_isTrue((msg->heapId < MessageQ_module->numHeaps),
298                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
300     Assert_isTrue((MessageQ_module->heaps[msg->heapId] != NULL),
301                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
303     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
304         (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
305         Log_write3(ti_sdo_ipc_MessageQ_LM_free, (UArg)(msg),
306             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
307     }
309     heap = MessageQ_module->heaps[msg->heapId];
311     if (heap != NULL) {
312         msgId = MessageQ_getMsgId(msg);
313         heapId = msg->heapId;
314         Memory_free(heap, msg, msg->msgSize);
315         if (MessageQ_module->freeHookFxn != NULL) {
316             MessageQ_module->freeHookFxn(heapId, msgId);
317         }
318     }
319     else {
320         return (MessageQ_E_FAIL);
321     }
323     return (MessageQ_S_SUCCESS);
326 /*
327  *  ======== MessageQ_get ========
328  */
329 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
331     Int status;
332     List_Handle highList;
333     List_Handle normalList;
334     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
336     /* Get the list */
337     normalList = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
338     highList   = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
340     /* Keep looping while there are no elements on either list */
341     *msg = (MessageQ_Msg)List_get(highList);
342     while (*msg == NULL) {
343         *msg = (MessageQ_Msg)List_get(normalList);
344         if (*msg == NULL) {
345             /*  Block until notified. */
346             status = ISync_wait(obj->synchronizer, timeout, NULL);
347             if (status == ISync_WaitStatus_TIMEOUT) {
348                 return (MessageQ_E_TIMEOUT);
349             }
350             else if (status < 0) {
351                 return (MessageQ_E_FAIL);
352             }
354             if (obj->unblocked) {
355                 /* *(msg) may be NULL */
356                 return (MessageQ_E_UNBLOCKED);
357             }
359             *msg = (MessageQ_Msg)List_get(highList);
360         }
361     }
363     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
364         (((*msg)->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0)) {
365         Log_write4(ti_sdo_ipc_MessageQ_LM_get, (UArg)(*msg),
366             (UArg)((*msg)->seqNum), (UArg)((*msg)->srcProc), (UArg)(obj));
367     }
369     return (MessageQ_S_SUCCESS);
372 /*
373  *  ======== MessageQ_getQueueId ========
374  */
375 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
377     MessageQ_QueueId queueId;
378     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
380     queueId = (obj->queue);
382     return (queueId);
385 /*
386  *  ======== MessageQ_open ========
387  */
388 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
390     Int         status;
391     Error_Block eb;
393     Assert_isTrue(name != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
394     Assert_isTrue(queueId != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
396     Error_init(&eb);
398     /* Search NameServer */
399     status = NameServer_getUInt32(
400             (NameServer_Handle)MessageQ_module->nameServer, name, queueId,
401             NULL);
403     if (status >= 0) {
404         return (MessageQ_S_SUCCESS);    /* name found */
405     }
406     else {
407         return (MessageQ_E_NOTFOUND);   /* name not found */
408     }
411 /*
412  *  ======== MessageQ_openQueueId ========
413  */
414 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 remoteProcId)
416     MessageQ_QueueId queueId;
418     queueId = ((MessageQ_QueueId)(remoteProcId) << 16) | queueIndex;
420     return (queueId);
423 /*
424  *  ======== MessageQ_put ========
425  */
426 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
428     IMessageQTransport_Handle transport;
429     MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
430     List_Handle       listHandle;
431     Int               status;
432     UInt              priority;
433 #ifndef xdc_runtime_Log_DISABLE_ALL
434     UInt16            flags;
435     UInt16            seqNum;
436     UInt16            srcProc;
437 #endif
438     ti_sdo_ipc_MessageQ_Object   *obj;
439     Int tid;
440     ITransport_Handle baseTrans;
441     INetworkTransport_Handle netTrans;
443     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
445     msg->dstId   = (UInt16)(queueId);
446     msg->dstProc = (UInt16)(queueId >> 16);
448     /* invoke put hook function after addressing the message */
449     if (MessageQ_module->putHookFxn != NULL) {
450         MessageQ_module->putHookFxn(queueId, (Ptr)msg);
451     }
453     /* extract the transport ID from the message header */
454     tid = MessageQ_getTransportId(msg);
456     /* if recipient is local, use direct message delivery */
457     if (dstProcId == MultiProc_self()) {
458         /* Assert queueId is valid */
459         Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
460                       ti_sdo_ipc_MessageQ_A_invalidQueueId);
462         /* It is a local MessageQ */
463         obj = MessageQ_module->queues[(UInt16)(queueId)];
465         /* Assert object is not NULL */
466         Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
468         if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
469             listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
470             List_putHead(listHandle, (List_Elem *)msg);
471         }
472         else {
473             if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
474                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
475             }
476             else {
477                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
478             }
479             /* put on the queue */
480             List_put(listHandle, (List_Elem *)msg);
481         }
483         ISync_signal(obj->synchronizer);
485         status = MessageQ_S_SUCCESS;
487         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
488             (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
489             Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
490                        (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
491         }
492     }
494     /* if transport ID is zero, use primary transport array */
495     else if (tid == 0) {
496         /* assert that dstProcId is valid */
497         Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
498                       ti_sdo_ipc_MessageQ_A_procIdInvalid);
500         /* Put the high and urgent messages to the high priority transport */
501         priority = (UInt)((msg->flags) &
502             ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
504         /* Call the transport associated with this message queue */
505         transport = MessageQ_module->transports[dstProcId][priority];
506         if (transport == NULL) {
507             /* Try the other transport */
508             priority = !priority;
509             transport = MessageQ_module->transports[dstProcId][priority];
510         }
512         /* assert transport is not null */
513         Assert_isTrue(transport != NULL,
514             ti_sdo_ipc_MessageQ_A_unregisteredTransport);
516 #ifndef xdc_runtime_Log_DISABLE_ALL
517         /* use local vars so msg does not get cached after put */
518         flags = msg->flags;
520         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
521             (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
522             /* use local vars so msg does not get cached after put */
523             seqNum  = msg->seqNum;
524             srcProc = msg->srcProc;
525         }
526 #endif
528         /* put msg to remote processor using transport */
529         if (IMessageQTransport_put(transport, msg)) {
530             status = MessageQ_S_SUCCESS;
532 #ifndef xdc_runtime_Log_DISABLE_ALL
533             /* if trace enabled */
534             if ((ti_sdo_ipc_MessageQ_traceFlag) ||
535                 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
536                 Log_write4(ti_sdo_ipc_MessageQ_LM_putRemote, (UArg)(msg),
537                           (UArg)(seqNum), (UArg)(srcProc),
538                           (UArg)(dstProcId));
539             }
540 #endif
541         }
542         else {
543             status = MessageQ_E_FAIL;
544         }
545     }
547     /* use a registered transport to deliver the message */
548     else {
549         baseTrans = MessageQ_module->regTrans[tid].transport;
551         if (baseTrans == NULL) {
552             /* raise error */
553             status = MessageQ_E_FAIL;
554             goto leave;
555         }
557         switch (MessageQ_module->regTrans[tid].type) {
559             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
560                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
562                 if (INetworkTransport_put(netTrans, msg)) {
563                     status = MessageQ_S_SUCCESS;
564                 }
565                 else {
566                     status = MessageQ_E_FAIL;
567                 }
568             break;
569         }
570     }
572 leave:
573     return (status);
576 /*
577  *  ======== MessageQ_registerHeap ========
578  *  Register a heap
579  */
580 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
582     Int status;
583     UInt key;
584     IHeap_Handle iheap = (IHeap_Handle)heap;
586     /* Make sure the heapId is valid */
587     Assert_isTrue((heapId < MessageQ_module->numHeaps),
588                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
590     /* lock scheduler */
591     key = Hwi_disable();
593     /* Make sure the id is not already in use */
594     if (MessageQ_module->heaps[heapId] == NULL) {
595         MessageQ_module->heaps[heapId] = iheap;
596         status = MessageQ_S_SUCCESS;
597     }
598     else {
599         status = MessageQ_E_ALREADYEXISTS;
600     }
602     /* unlock scheduler */
603     Hwi_restore(key);
605     return (status);
608 /*
609  *  ======== MessageQ_setFreeHookFxn ========
610  */
611 Void MessageQ_setFreeHookFxn(MessageQ_FreeHookFxn freeHookFxn)
613     MessageQ_module->freeHookFxn = freeHookFxn;
616 /*
617  *  ======== MessageQ_setPutHookFxn ========
618  */
619 Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn)
621     MessageQ_module->putHookFxn = (ti_sdo_ipc_MessageQ_PutHookFxn)putHookFxn;
624 /*
625  *  ======== MessageQ_setMsgTrace ========
626  */
627 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
629     msg->flags = (msg->flags & ~ti_sdo_ipc_MessageQ_TRACEMASK) |
630                  (traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT);
631     Log_write4(ti_sdo_ipc_MessageQ_LM_setTrace, (UArg)(msg), (UArg)(msg->seqNum),
632                (UArg)(msg->srcProc), (UArg)(traceFlag));
635 /*
636  *  ======== MessageQ_setReplyQueue ========
637  *  Embed the source message queue into a message.
638  */
639 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
641     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
643     msg->replyId   = (UInt16)(obj->queue);
644     msg->replyProc = (UInt16)(obj->queue >> 16);
647 /*
648  *  ======== MessageQ_staticMsgInit ========
649  */
650 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
652     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
654     MessageQ_msgInit(msg);
656     msg->heapId  = ti_sdo_ipc_MessageQ_STATICMSG;
657     msg->msgSize = size;
659     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
660         Log_write3(ti_sdo_ipc_MessageQ_LM_staticMsgInit, (UArg)(msg),
661                 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
662     }
665 /*
666  *  ======== MessageQ_unblock ========
667  */
668 Void MessageQ_unblock(MessageQ_Handle handle)
670     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
672     /* Assert that the queue is using a blocking synchronizer */
673     Assert_isTrue(ISync_query((obj->synchronizer), ISync_Q_BLOCKING) == TRUE,
674         ti_sdo_ipc_MessageQ_A_invalidUnblock);
676     /* Set instance to 'unblocked' state */
677     obj->unblocked = TRUE;
679     /* Signal the synchronizer */
680     ISync_signal(obj->synchronizer);
683 /*
684  *  ======== MessageQ_unregisterHeap ========
685  *  Unregister a heap
686  */
687 Int MessageQ_unregisterHeap(UInt16 heapId)
689     UInt key;
691     Assert_isTrue((heapId < MessageQ_module->numHeaps),
692                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
694     /* lock scheduler */
695     key = Hwi_disable();
697     MessageQ_module->heaps[heapId] = NULL;
699     /* unlock scheduler */
700     Hwi_restore(key);
702     return (MessageQ_S_SUCCESS);
705 /*
706  *************************************************************************
707  *                      Module functions
708  *************************************************************************
709  */
711 /*
712  *  ======== ti_sdo_ipc_MessageQ_Module_startup ========
713  */
714 Int ti_sdo_ipc_MessageQ_Module_startup(Int phase)
716     Int i;
718     /* Ensure NameServer Module_startup() has completed */
719     if (ti_sdo_utils_NameServer_Module_startupDone() == FALSE) {
720         return (Startup_NOTDONE);
721     }
723     if (GateThread_Module_startupDone() == FALSE) {
724         return (Startup_NOTDONE);
725     }
727     if (MessageQ_module->gate == NULL) {
728         MessageQ_module->gate =
729             GateThread_Handle_upCast(GateThread_create(NULL, NULL));
730     }
732     /* Loop through all the static objects and set the id */
733     for (i = 0; i < ti_sdo_ipc_MessageQ_Object_count(); i++) {
734         MessageQ_module->queues[i] = (ti_sdo_ipc_MessageQ_Object *)
735                 ti_sdo_ipc_MessageQ_Object_get(NULL, i);
736     }
738     /* Null out the dynamic ones */
739     for (i = ti_sdo_ipc_MessageQ_Object_count(); i < MessageQ_module->numQueues;
740             i++) {
741         MessageQ_module->queues[i] = NULL;
742     }
744     return (Startup_DONE);
747 /*
748  *  ======== ti_sdo_ipc_MessageQ_registerTransport ========
749  *  Register a transport
750  */
751 Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
752     UInt16 procId, UInt priority)
754     Bool flag = FALSE;
755     UInt key;
757     /* Make sure the procId is valid */
758     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
760     /* lock scheduler */
761     key = Hwi_disable();
763     /* Make sure the id is not already in use */
764     if (MessageQ_module->transports[procId][priority] == NULL) {
765         MessageQ_module->transports[procId][priority] = handle;
766         flag = TRUE;
767     }
769     /* unlock scheduler */
770     Hwi_restore(key);
772     return (flag);
775 /*
776  *  ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
777  *  Unregister a heap
778  */
779 Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
781     UInt key;
783     /* Make sure the procId is valid */
784     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
786     /* lock scheduler */
787     key = Hwi_disable();
789     MessageQ_module->transports[procId][priority] = NULL;
791     /* unlock scheduler */
792     Hwi_restore(key);
795 /*
796  *  ======== ti_sdo_ipc_MessageQ_registerTransportId ========
797  */
798 Bool ti_sdo_ipc_MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
800     ti_sdo_ipc_MessageQ_TransportType type;
802     /* validate transport ID */
803     if ((tid < 1) || (tid > 7)) {
804         /* raise error */
805         return (FALSE);
806     }
808     /* don't overwrite an existing transport */
809     if (MessageQ_module->regTrans[tid].transport != NULL) {
810         /* raise error */
811         return (FALSE);
812     }
814     /* determine the transport type */
815     if (INetworkTransport_Handle_downCast(inst) != NULL) {
816         type = ti_sdo_ipc_MessageQ_TransportType_INetworkTransport;
817     }
818     else {
819         /* raise error */
820         return (FALSE);
821     }
823     /* register the transport instance */
824     MessageQ_module->regTrans[tid].transport = inst;
825     MessageQ_module->regTrans[tid].type = type;
826     return (TRUE);
829 /*
830  *  ======== ti_sdo_ipc_MessageQ_registerTransportId ========
831  */
832 Bool ti_sdo_ipc_MessageQ_unregisterTransportId(UInt tid)
834     /* forget the registered transport instance */
835     MessageQ_module->regTrans[tid].transport = NULL;
836     MessageQ_module->regTrans[tid].type =
837             ti_sdo_ipc_MessageQ_TransportType_Invalid;
839     return (TRUE);
843 /*
844  *************************************************************************
845  *                       Instance functions
846  *************************************************************************
847  */
849 /*
850  *  ======== MessageQ_Instance_init ========
851  */
852 Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String name,
853         const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
855     Int              i;
856     UInt16           start;
857     UInt16           count;
858     UInt             key;
859     Bool             found = FALSE;
860     List_Handle      listHandle;
861     SyncSem_Handle   syncSemHandle;
862     MessageQ_QueueIndex queueIndex;
863     Int tid;
864     Int status;
865     ITransport_Handle baseTrans;
866     INetworkTransport_Handle netTrans;
868     /* lock */
869     key = IGateProvider_enter(MessageQ_module->gate);
871     if (params->queueIndex != MessageQ_ANY) {
872         queueIndex = params->queueIndex;
874         if ((queueIndex >= ti_sdo_ipc_MessageQ_numReservedEntries) ||
875             (MessageQ_module->queues[queueIndex] != NULL)) {
876             IGateProvider_leave(MessageQ_module->gate, key);
877             Error_raise(eb, ti_sdo_ipc_MessageQ_E_indexNotAvailable,
878                 queueIndex, 0);
879             return (5);
880         }
881         MessageQ_module->queues[queueIndex] = obj;
882         found = TRUE;
883     }
884     else {
886         start = ti_sdo_ipc_MessageQ_numReservedEntries;
887         count = MessageQ_module->numQueues;
889         /* Search the dynamic array for any holes */
890         for (i = start; (i < count) && (found == FALSE); i++) {
891             if (MessageQ_module->queues[i] == NULL) {
892                 MessageQ_module->queues[i] = obj;
893                 queueIndex = i;
894                 found = TRUE;
895             }
896         }
897     }
899     /*
900      *  If no free slot was found:
901      *     - if no growth allowed, raise and error
902      *     - if growth is allowed, grow the array
903      */
904     if (found == FALSE) {
905         if (ti_sdo_ipc_MessageQ_maxRuntimeEntries != NameServer_ALLOWGROWTH) {
906             /* unlock scheduler */
907             IGateProvider_leave(MessageQ_module->gate, key);
909             Error_raise(eb, ti_sdo_ipc_MessageQ_E_maxReached,
910                 ti_sdo_ipc_MessageQ_maxRuntimeEntries, 0);
911             return (1);
912         }
913         else {
914             queueIndex = MessageQ_grow(obj, eb);
915             if (queueIndex == MessageQ_INVALIDMESSAGEQ) {
916                 /* unlock scheduler */
917                 IGateProvider_leave(MessageQ_module->gate, key);
918                 return (2);
919             }
920         }
921     }
923     /* create default sync if not specified */
924     if (params->synchronizer == NULL) {
925         /* Create a SyncSem as the synchronizer */
926         syncSemHandle = SyncSem_create(NULL, eb);
928         if (syncSemHandle == NULL) {
929             /* unlock scheduler */
930             IGateProvider_leave(MessageQ_module->gate, key);
931             return (3);
932         }
934         /* store handle for use in finalize ...  */
935         obj->syncSemHandle = syncSemHandle;
937         obj->synchronizer = SyncSem_Handle_upCast(syncSemHandle);
938     }
939     else {
940         obj->syncSemHandle = NULL;
942         obj->synchronizer = params->synchronizer;
943     }
945     /* unlock scheduler */
946     IGateProvider_leave(MessageQ_module->gate, key);
948     /* Fill in the message queue object */
949     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
950     List_construct(List_struct(listHandle), NULL);
952     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
953     List_construct(List_struct(listHandle), NULL);
955     obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queueIndex;
957     obj->unblocked = FALSE;
959     /* Add into NameServer */
960     if (name != NULL) {
961         obj->nsKey = NameServer_addUInt32(
962                 (NameServer_Handle)MessageQ_module->nameServer, name,
963                 obj->queue);
965         if (obj->nsKey == NULL) {
966             Error_raise(eb, ti_sdo_ipc_MessageQ_E_nameFailed, name, 0);
967             return (4);
968         }
969     }
971     /* notify all registered transports about the new queue */
972     for (tid = 1; tid <= 7; tid++) {
973         if (MessageQ_module->regTrans[tid].transport == NULL) {
974             continue;
975         }
976         baseTrans = MessageQ_module->regTrans[tid].transport;
978         switch (MessageQ_module->regTrans[tid].type) {
980             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
981                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
983                 if (INetworkTransport_bind(netTrans, obj->queue)) {
984                     status = MessageQ_S_SUCCESS;
985                 }
986                 else {
987                     status = MessageQ_E_FAIL;
988                 }
989             break;
990         }
992         /* check for failure */
993         if (status < 0) {
994             /* TODO add error handling */
995         }
996     }
998     return (0);
1001 /*
1002  *  ======== MessageQ_Instance_finalize ========
1003  */
1004 Void ti_sdo_ipc_MessageQ_Instance_finalize(
1005         ti_sdo_ipc_MessageQ_Object* obj, Int status)
1007     UInt key;
1008     MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
1009     List_Handle listHandle;
1010     Int tid;
1011     ITransport_Handle baseTrans;
1012     INetworkTransport_Handle netTrans;
1014     /* Requested queueId was not available. Nothing was done in the init */
1015     if (status == 5) {
1016         return;
1017     }
1019     /* notify all registered transports that given queue is being deleted */
1020     for (tid = 1; tid <= 7; tid++) {
1021         if (MessageQ_module->regTrans[tid].transport == NULL) {
1022             continue;
1023         }
1024         baseTrans = MessageQ_module->regTrans[tid].transport;
1026         switch (MessageQ_module->regTrans[tid].type) {
1028             case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
1029                 netTrans = INetworkTransport_Handle_downCast(baseTrans);
1031                 if (INetworkTransport_unbind(netTrans, obj->queue)) {
1032                     status = MessageQ_S_SUCCESS;
1033                 }
1034                 else {
1035                     status = MessageQ_E_FAIL;
1036                 }
1037             break;
1038         }
1040         /* check for failure */
1041         if (status < 0) {
1042             /* TODO add error handling */
1043         }
1044     }
1046     if (obj->syncSemHandle != NULL) {
1047         SyncSem_delete(&obj->syncSemHandle);
1048     }
1050     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
1052     /* Destruct the list */
1053     List_destruct(List_struct(listHandle));
1055     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
1057     /* Destruct the list */
1058     List_destruct(List_struct(listHandle));
1060     /* lock */
1061     key = IGateProvider_enter(MessageQ_module->gate);
1063     /* Null out entry in the array. */
1064     MessageQ_module->queues[index] = NULL;
1066     /* unlock scheduler */
1067     IGateProvider_leave(MessageQ_module->gate, key);
1069     if (obj->nsKey != NULL) {
1070         NameServer_removeEntry((NameServer_Handle)MessageQ_module->nameServer,
1071             obj->nsKey);
1072     }
1075 /*
1076  *************************************************************************
1077  *                       Internal functions
1078  *************************************************************************
1079  */
1081 /*
1082  *  ======== ti_sdo_ipc_MessageQ_grow ========
1083  */
1084 UInt16 ti_sdo_ipc_MessageQ_grow(ti_sdo_ipc_MessageQ_Object *obj,
1085         Error_Block *eb)
1087     UInt16 oldSize;
1088     UInt16 queueIndex = MessageQ_module->numQueues;
1089     ti_sdo_ipc_MessageQ_Handle *queues;
1090     ti_sdo_ipc_MessageQ_Handle *oldQueues;
1091     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
1094     /* Allocate larger table */
1095     queues = Memory_alloc(ti_sdo_ipc_MessageQ_Object_heap(),
1096                           oldSize + sizeof(MessageQ_Handle), 0, eb);
1098     if (queues == NULL) {
1099         return (MessageQ_INVALIDMESSAGEQ);
1100     }
1102     /* Copy contents into new table */
1103     memcpy(queues, MessageQ_module->queues, oldSize);
1105     /* Fill in the new entry */
1106     queues[queueIndex] = obj;
1108     /* Hook-up new table */
1109     oldQueues = MessageQ_module->queues;
1110     MessageQ_module->queues = queues;
1111     MessageQ_module->numQueues++;
1113     /* Delete old table if not statically defined */
1114     if (MessageQ_module->canFreeQueues == TRUE) {
1115         Memory_free(ti_sdo_ipc_MessageQ_Object_heap(), oldQueues, oldSize);
1116     }
1117     else {
1118         MessageQ_module->canFreeQueues = TRUE;
1119     }
1121     return (queueIndex);