33ff05a0dc4d3ee260579b7d0c67dc75921b3bc4
[ipc/ipcdev.git] / packages / ti / sdo / ipc / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  ======== MessageQ.c ========
34  *  Implementation of functions specified in MessageQ.xdc.
35  */
37 #include <xdc/std.h>
39 #include <string.h>
41 #include <xdc/runtime/Error.h>
42 #include <xdc/runtime/Assert.h>
43 #include <xdc/runtime/Log.h>
44 #include <xdc/runtime/Memory.h>
45 #include <xdc/runtime/IHeap.h>
46 #include <xdc/runtime/IGateProvider.h>
47 #include <xdc/runtime/Startup.h>
49 #include <xdc/runtime/knl/ISync.h>
50 #include <xdc/runtime/knl/GateThread.h>
52 #include <ti/sysbios/hal/Hwi.h>
53 #include <ti/sysbios/syncs/SyncSem.h>
55 #include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
56 #include <ti/sdo/utils/List.h>
58 /* must be included after the internal header file for now */
59 #include <ti/sdo/ipc/_MessageQ.h>
60 #include <ti/sdo/utils/_MultiProc.h>
61 #include <ti/sdo/utils/_NameServer.h>
63 #include "package/internal/MessageQ.xdc.h"
65 #ifdef __ti__
66     #pragma FUNC_EXT_CALLED(MessageQ_Params_init);
67     #pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
68     #pragma FUNC_EXT_CALLED(MessageQ_alloc);
69     #pragma FUNC_EXT_CALLED(MessageQ_close);
70     #pragma FUNC_EXT_CALLED(MessageQ_count);
71     #pragma FUNC_EXT_CALLED(MessageQ_create);
72     #pragma FUNC_EXT_CALLED(MessageQ_create2);
73     #pragma FUNC_EXT_CALLED(MessageQ_delete);
74     #pragma FUNC_EXT_CALLED(MessageQ_open);
75     #pragma FUNC_EXT_CALLED(MessageQ_openQueueId);
76     #pragma FUNC_EXT_CALLED(MessageQ_free);
77     #pragma FUNC_EXT_CALLED(MessageQ_get);
78     #pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
79     #pragma FUNC_EXT_CALLED(MessageQ_put);
80     #pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
81     #pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
82     #pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
83     #pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
84     #pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
85     #pragma FUNC_EXT_CALLED(MessageQ_unblock);
86     #pragma FUNC_EXT_CALLED(MessageQ_unregisterHeap);
87 #endif
89 /*
90  *  ======== MessageQ_msgInit ========
91  *  This is a helper function to initialize a message.
92  */
93 static Void MessageQ_msgInit(MessageQ_Msg msg)
94 {
95     UInt key;
97     msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
98     msg->msgId   = MessageQ_INVALIDMSGID;
99     msg->dstId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
100     msg->flags   = ti_sdo_ipc_MessageQ_HEADERVERSION |
101                    MessageQ_NORMALPRI |
102                    (ti_sdo_ipc_MessageQ_TRACEMASK &
103                    (ti_sdo_ipc_MessageQ_traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT));
104     msg->srcProc = MultiProc_self();
106     key = Hwi_disable();
107     msg->seqNum  = MessageQ_module->seqNum++;
108     Hwi_restore(key);
111 /*
112  *************************************************************************
113  *                       Common Header Functions
114  *************************************************************************
115  */
117 /*
118  *  ======== MessageQ_Params_init ========
119  */
120 Void MessageQ_Params_init(MessageQ_Params *params)
122     params->synchronizer = NULL;
125 /*
126  *  ======== MessageQ_Params2_init ========
127  */
128 Void MessageQ_Params2_init(MessageQ_Params2 *params)
130     params->synchronizer = NULL;
131     params->queueIndex = MessageQ_ANY;
134 /*
135  *  ======== MessageQ_alloc ========
136  *  Allocate a message and initial the needed fields (note some
137  *  of the fields in the header at set via other APIs or in the
138  *  MessageQ_put function.
139  */
140 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
142     MessageQ_Msg msg;
143     Error_Block eb;
145     Assert_isTrue((heapId < MessageQ_module->numHeaps),
146                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
148     Assert_isTrue((MessageQ_module->heaps[heapId] != NULL),
149                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
151     /* Allocate the message. No alignment requested */
152     Error_init(&eb);
153     msg = Memory_alloc(MessageQ_module->heaps[heapId], size, 0, &eb);
155     if (msg == NULL) {
156         return (NULL);
157     }
159     /* Fill in the fields of the message */
160     MessageQ_msgInit(msg);
161     msg->msgSize = size;
162     msg->heapId  = heapId;
164     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
165         Log_write3(ti_sdo_ipc_MessageQ_LM_alloc, (UArg)(msg),
166             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
167     }
169     return (msg);
172 /*
173  *  ======== MessageQ_count ========
174  */
175 Int MessageQ_count(MessageQ_Handle handle)
177     Int              count = 0;
178     UInt             key;
179     List_Elem       *tempMsg = NULL;
180     List_Handle      listHandle;
181     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
183     /* lock */
184     key = Hwi_disable();
186     /* Get the list */
187     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
189     /* Loop through and count the messages */
190     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
191         count++;
192     }
194     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
196     /* Loop through and count the messages */
197     while ((tempMsg = List_next(listHandle, tempMsg)) != NULL) {
198         count++;
199     }
201     /* unlock scheduler */
202     Hwi_restore(key);
204     return (count);
207 /*
208  *  ======== MessageQ_close ========
209  */
210 Int MessageQ_close(MessageQ_QueueId *queueId)
212     *queueId = MessageQ_INVALIDMESSAGEQ;
214     return (MessageQ_S_SUCCESS);
217 /*
218  *  ======== MessageQ_create ========
219  */
220 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
222     MessageQ_Handle handle;
223     MessageQ_Params2 params2;
225     MessageQ_Params2_init(&params2);
227     /* Use the MessageQ_Params fields if not NULL */
228     if (params != NULL) {
229         params2.synchronizer = params->synchronizer;
230     }
232     handle = MessageQ_create2(name, &params2);
234     return ((MessageQ_Handle)handle);
237 /*
238  *  ======== MessageQ_create2 ========
239  */
240 MessageQ_Handle MessageQ_create2(String name, const MessageQ_Params2 *params)
242     ti_sdo_ipc_MessageQ_Handle handle;
243     ti_sdo_ipc_MessageQ_Params prms;
244     Error_Block eb;
246     Error_init(&eb);
248     if (params != NULL) {
249         ti_sdo_ipc_MessageQ_Params_init(&prms);
251         prms.synchronizer = params->synchronizer;
252         prms.queueIndex = params->queueIndex;
254         handle = ti_sdo_ipc_MessageQ_create(name, &prms, &eb);
255     }
256     else {
257         handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
258     }
260     return ((MessageQ_Handle)handle);
263 /*
264  *  ======== MessageQ_delete ========
265  */
266 Int MessageQ_delete(MessageQ_Handle *handlePtr)
268     ti_sdo_ipc_MessageQ_Handle *instp;
270     instp = (ti_sdo_ipc_MessageQ_Handle *)handlePtr;
272     ti_sdo_ipc_MessageQ_delete(instp);
274     return (MessageQ_S_SUCCESS);
277 /*
278  *  ======== MessageQ_free ========
279  */
280 Int MessageQ_free(MessageQ_Msg msg)
282     IHeap_Handle heap;
283     Bits16 msgId;
284     Bits16 heapId;
286     /* make sure msg is not NULL */
287     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
289     /* Cannot free a message that was initialized via MessageQ_staticMsgInit */
290     Assert_isTrue((msg->heapId != ti_sdo_ipc_MessageQ_STATICMSG),
291                   ti_sdo_ipc_MessageQ_A_cannotFreeStaticMsg);
293     Assert_isTrue((msg->heapId < MessageQ_module->numHeaps),
294                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
296     Assert_isTrue((MessageQ_module->heaps[msg->heapId] != NULL),
297                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
299     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
300         (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
301         Log_write3(ti_sdo_ipc_MessageQ_LM_free, (UArg)(msg),
302             (UArg)(msg->seqNum), (UArg)(msg->srcProc));
303     }
305     heap = MessageQ_module->heaps[msg->heapId];
307     if (heap != NULL) {
308         msgId = MessageQ_getMsgId(msg);
309         heapId = msg->heapId;
310         Memory_free(heap, msg, msg->msgSize);
311         if (MessageQ_module->freeHookFxn != NULL) {
312             MessageQ_module->freeHookFxn(heapId, msgId);
313         }
314     }
315     else {
316         return (MessageQ_E_FAIL);
317     }
319     return (MessageQ_S_SUCCESS);
322 /*
323  *  ======== MessageQ_get ========
324  */
325 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
327     Int status;
328     List_Handle highList;
329     List_Handle normalList;
330     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
332     /* Get the list */
333     normalList = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
334     highList   = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
336     /* Keep looping while there are no elements on either list */
337     *msg = (MessageQ_Msg)List_get(highList);
338     while (*msg == NULL) {
339         *msg = (MessageQ_Msg)List_get(normalList);
340         if (*msg == NULL) {
341             /*  Block until notified. */
342             status = ISync_wait(obj->synchronizer, timeout, NULL);
343             if (status == ISync_WaitStatus_TIMEOUT) {
344                 return (MessageQ_E_TIMEOUT);
345             }
346             else if (status < 0) {
347                 return (MessageQ_E_FAIL);
348             }
350             if (obj->unblocked) {
351                 /* *(msg) may be NULL */
352                 return (MessageQ_E_UNBLOCKED);
353             }
355             *msg = (MessageQ_Msg)List_get(highList);
356         }
357     }
359     if ((ti_sdo_ipc_MessageQ_traceFlag == TRUE) ||
360         (((*msg)->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0)) {
361         Log_write4(ti_sdo_ipc_MessageQ_LM_get, (UArg)(*msg),
362             (UArg)((*msg)->seqNum), (UArg)((*msg)->srcProc), (UArg)(obj));
363     }
365     return (MessageQ_S_SUCCESS);
368 /*
369  *  ======== MessageQ_getQueueId ========
370  */
371 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
373     MessageQ_QueueId queueId;
374     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
376     queueId = (obj->queue);
378     return (queueId);
381 /*
382  *  ======== MessageQ_open ========
383  */
384 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
386     Int         status;
387     Error_Block eb;
389     Assert_isTrue(name != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
390     Assert_isTrue(queueId != NULL, ti_sdo_ipc_MessageQ_A_invalidParam);
392     Error_init(&eb);
394     /* Search NameServer */
395     status = NameServer_getUInt32(
396             (NameServer_Handle)MessageQ_module->nameServer, name, queueId,
397             NULL);
399     if (status >= 0) {
400         return (MessageQ_S_SUCCESS);    /* name found */
401     }
402     else {
403         return (MessageQ_E_NOTFOUND);   /* name not found */
404     }
407 /*
408  *  ======== MessageQ_openQueueId ========
409  */
410 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 remoteProcId)
412     MessageQ_QueueId queueId;
414     queueId = ((MessageQ_QueueId)(remoteProcId) << 16) | queueIndex;
416     return (queueId);
419 /*
420  *  ======== MessageQ_put ========
421  */
422 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
424     IMessageQTransport_Handle transport;
425     MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
426     List_Handle       listHandle;
427     Int               status;
428     UInt              priority;
429 #ifndef xdc_runtime_Log_DISABLE_ALL
430     UInt16            flags;
431     UInt16            seqNum;
432     UInt16            srcProc;
433 #endif
434     ti_sdo_ipc_MessageQ_Object   *obj;
436     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
438     msg->dstId   = (UInt16)(queueId);
439     msg->dstProc = (UInt16)(queueId >> 16);
441     if (dstProcId != MultiProc_self()) {
442         /* assert that dstProcId is valid */
443         Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
444                       ti_sdo_ipc_MessageQ_A_procIdInvalid);
446         /* Put the high and urgent messages to the high priority transport */
447         priority = (UInt)((msg->flags) &
448             ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
450         /* Call the transport associated with this message queue */
451         transport = MessageQ_module->transports[dstProcId][priority];
452         if (transport == NULL) {
453             /* Try the other transport */
454             priority = !priority;
455             transport = MessageQ_module->transports[dstProcId][priority];
456         }
458         /* assert transport is not null */
459         Assert_isTrue(transport != NULL,
460             ti_sdo_ipc_MessageQ_A_unregisteredTransport);
462 #ifndef xdc_runtime_Log_DISABLE_ALL
463         /* use local vars so msg does not get cached after put */
464         flags = msg->flags;
466         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
467             (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
468             /* use local vars so msg does not get cached after put */
469             seqNum  = msg->seqNum;
470             srcProc = msg->srcProc;
471         }
472 #endif
474         /* put msg to remote processor using transport */
475         if (IMessageQTransport_put(transport, msg)) {
476             status = MessageQ_S_SUCCESS;
478 #ifndef xdc_runtime_Log_DISABLE_ALL
479             /* if trace enabled */
480             if ((ti_sdo_ipc_MessageQ_traceFlag) ||
481                 (flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
482                 Log_write4(ti_sdo_ipc_MessageQ_LM_putRemote, (UArg)(msg),
483                           (UArg)(seqNum), (UArg)(srcProc),
484                           (UArg)(dstProcId));
485             }
486 #endif
487         }
488         else {
489             status = MessageQ_E_FAIL;
490         }
491     }
492     else {
493         /* Assert queueId is valid */
494         Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
495                       ti_sdo_ipc_MessageQ_A_invalidQueueId);
497         /* It is a local MessageQ */
498         obj = MessageQ_module->queues[(UInt16)(queueId)];
500         /* Assert object is not NULL */
501         Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
503         if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
504             listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
505             List_putHead(listHandle, (List_Elem *)msg);
506         }
507         else {
508             if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
509                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
510             }
511             else {
512                 listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
513             }
514             /* put on the queue */
515             List_put(listHandle, (List_Elem *)msg);
516         }
518         ISync_signal(obj->synchronizer);
520         status = MessageQ_S_SUCCESS;
522         if ((ti_sdo_ipc_MessageQ_traceFlag) ||
523             (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
524             Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
525                        (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
526         }
527     }
529     return (status);
532 /*
533  *  ======== MessageQ_registerHeap ========
534  *  Register a heap
535  */
536 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
538     Int status;
539     UInt key;
540     IHeap_Handle iheap = (IHeap_Handle)heap;
542     /* Make sure the heapId is valid */
543     Assert_isTrue((heapId < MessageQ_module->numHeaps),
544                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
546     /* lock scheduler */
547     key = Hwi_disable();
549     /* Make sure the id is not already in use */
550     if (MessageQ_module->heaps[heapId] == NULL) {
551         MessageQ_module->heaps[heapId] = iheap;
552         status = MessageQ_S_SUCCESS;
553     }
554     else {
555         status = MessageQ_E_ALREADYEXISTS;
556     }
558     /* unlock scheduler */
559     Hwi_restore(key);
561     return (status);
564 /*
565  *  ======== MessageQ_setFreeHookFxn ========
566  */
567 Void MessageQ_setFreeHookFxn(MessageQ_FreeHookFxn freeHookFxn)
569     MessageQ_module->freeHookFxn = freeHookFxn;
572 /*
573  *  ======== MessageQ_setMsgTrace ========
574  */
575 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
577     msg->flags = (msg->flags & ~ti_sdo_ipc_MessageQ_TRACEMASK) |
578                  (traceFlag << ti_sdo_ipc_MessageQ_TRACESHIFT);
579     Log_write4(ti_sdo_ipc_MessageQ_LM_setTrace, (UArg)(msg), (UArg)(msg->seqNum),
580                (UArg)(msg->srcProc), (UArg)(traceFlag));
583 /*
584  *  ======== MessageQ_setReplyQueue ========
585  *  Embed the source message queue into a message.
586  */
587 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
589     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
591     msg->replyId   = (UInt16)(obj->queue);
592     msg->replyProc = (UInt16)(obj->queue >> 16);
595 /*
596  *  ======== MessageQ_staticMsgInit ========
597  */
598 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
600     Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
602     MessageQ_msgInit(msg);
604     msg->heapId  = ti_sdo_ipc_MessageQ_STATICMSG;
605     msg->msgSize = size;
607     if (ti_sdo_ipc_MessageQ_traceFlag == TRUE) {
608         Log_write3(ti_sdo_ipc_MessageQ_LM_staticMsgInit, (UArg)(msg),
609                 (UArg)(msg->seqNum), (UArg)(msg->srcProc));
610     }
613 /*
614  *  ======== MessageQ_unblock ========
615  */
616 Void MessageQ_unblock(MessageQ_Handle handle)
618     ti_sdo_ipc_MessageQ_Object *obj = (ti_sdo_ipc_MessageQ_Object *)handle;
620     /* Assert that the queue is using a blocking synchronizer */
621     Assert_isTrue(ISync_query((obj->synchronizer), ISync_Q_BLOCKING) == TRUE,
622         ti_sdo_ipc_MessageQ_A_invalidUnblock);
624     /* Set instance to 'unblocked' state */
625     obj->unblocked = TRUE;
627     /* Signal the synchronizer */
628     ISync_signal(obj->synchronizer);
631 /*
632  *  ======== MessageQ_unregisterHeap ========
633  *  Unregister a heap
634  */
635 Int MessageQ_unregisterHeap(UInt16 heapId)
637     UInt key;
639     Assert_isTrue((heapId < MessageQ_module->numHeaps),
640                   ti_sdo_ipc_MessageQ_A_heapIdInvalid);
642     /* lock scheduler */
643     key = Hwi_disable();
645     MessageQ_module->heaps[heapId] = NULL;
647     /* unlock scheduler */
648     Hwi_restore(key);
650     return (MessageQ_S_SUCCESS);
653 /*
654  *************************************************************************
655  *                      Module functions
656  *************************************************************************
657  */
659 /*
660  *  ======== ti_sdo_ipc_MessageQ_Module_startup ========
661  */
662 Int ti_sdo_ipc_MessageQ_Module_startup(Int phase)
664     Int i;
666     /* Ensure NameServer Module_startup() has completed */
667     if (ti_sdo_utils_NameServer_Module_startupDone() == FALSE) {
668         return (Startup_NOTDONE);
669     }
671     if (GateThread_Module_startupDone() == FALSE) {
672         return (Startup_NOTDONE);
673     }
675     if (MessageQ_module->gate == NULL) {
676         MessageQ_module->gate =
677             GateThread_Handle_upCast(GateThread_create(NULL, NULL));
678     }
680     /* Loop through all the static objects and set the id */
681     for (i = 0; i < ti_sdo_ipc_MessageQ_Object_count(); i++) {
682         MessageQ_module->queues[i] = (ti_sdo_ipc_MessageQ_Object *)
683                 ti_sdo_ipc_MessageQ_Object_get(NULL, i);
684     }
686     /* Null out the dynamic ones */
687     for (i = ti_sdo_ipc_MessageQ_Object_count(); i < MessageQ_module->numQueues;
688             i++) {
689         MessageQ_module->queues[i] = NULL;
690     }
692     return (Startup_DONE);
695 /*
696  *  ======== ti_sdo_ipc_MessageQ_registerTransport ========
697  *  Register a transport
698  */
699 Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
700     UInt16 procId, UInt priority)
702     Bool flag = FALSE;
703     UInt key;
705     /* Make sure the procId is valid */
706     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
708     /* lock scheduler */
709     key = Hwi_disable();
711     /* Make sure the id is not already in use */
712     if (MessageQ_module->transports[procId][priority] == NULL) {
713         MessageQ_module->transports[procId][priority] = handle;
714         flag = TRUE;
715     }
717     /* unlock scheduler */
718     Hwi_restore(key);
720     return (flag);
723 /*
724  *  ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
725  *  Unregister a heap
726  */
727 Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
729     UInt key;
731     /* Make sure the procId is valid */
732     Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
734     /* lock scheduler */
735     key = Hwi_disable();
737     MessageQ_module->transports[procId][priority] = NULL;
739     /* unlock scheduler */
740     Hwi_restore(key);
743 /*
744  *************************************************************************
745  *                       Instance functions
746  *************************************************************************
747  */
749 /*
750  *  ======== MessageQ_Instance_init ========
751  */
752 Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String name,
753         const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
755     Int              i;
756     UInt16           start;
757     UInt16           count;
758     UInt             key;
759     Bool             found = FALSE;
760     List_Handle      listHandle;
761     SyncSem_Handle   syncSemHandle;
762     MessageQ_QueueIndex queueIndex;
764     /* lock */
765     key = IGateProvider_enter(MessageQ_module->gate);
767     if (params->queueIndex != MessageQ_ANY) {
768         queueIndex = params->queueIndex;
770         if ((queueIndex >= ti_sdo_ipc_MessageQ_numReservedEntries) ||
771             (MessageQ_module->queues[queueIndex] != NULL)) {
772             IGateProvider_leave(MessageQ_module->gate, key);
773             Error_raise(eb, ti_sdo_ipc_MessageQ_E_indexNotAvailable,
774                 queueIndex, 0);
775             return (5);
776         }
777         MessageQ_module->queues[queueIndex] = obj;
778         found = TRUE;
779     }
780     else {
782         start = ti_sdo_ipc_MessageQ_numReservedEntries;
783         count = MessageQ_module->numQueues;
785         /* Search the dynamic array for any holes */
786         for (i = start; (i < count) && (found == FALSE); i++) {
787             if (MessageQ_module->queues[i] == NULL) {
788                 MessageQ_module->queues[i] = obj;
789                 queueIndex = i;
790                 found = TRUE;
791             }
792         }
793     }
795     /*
796      *  If no free slot was found:
797      *     - if no growth allowed, raise and error
798      *     - if growth is allowed, grow the array
799      */
800     if (found == FALSE) {
801         if (ti_sdo_ipc_MessageQ_maxRuntimeEntries != NameServer_ALLOWGROWTH) {
802             /* unlock scheduler */
803             IGateProvider_leave(MessageQ_module->gate, key);
805             Error_raise(eb, ti_sdo_ipc_MessageQ_E_maxReached,
806                 ti_sdo_ipc_MessageQ_maxRuntimeEntries, 0);
807             return (1);
808         }
809         else {
810             queueIndex = MessageQ_grow(obj, eb);
811             if (queueIndex == MessageQ_INVALIDMESSAGEQ) {
812                 /* unlock scheduler */
813                 IGateProvider_leave(MessageQ_module->gate, key);
814                 return (2);
815             }
816         }
817     }
819     /* create default sync if not specified */
820     if (params->synchronizer == NULL) {
821         /* Create a SyncSem as the synchronizer */
822         syncSemHandle = SyncSem_create(NULL, eb);
824         if (syncSemHandle == NULL) {
825             /* unlock scheduler */
826             IGateProvider_leave(MessageQ_module->gate, key);
827             return (3);
828         }
830         /* store handle for use in finalize ...  */
831         obj->syncSemHandle = syncSemHandle;
833         obj->synchronizer = SyncSem_Handle_upCast(syncSemHandle);
834     }
835     else {
836         obj->syncSemHandle = NULL;
838         obj->synchronizer = params->synchronizer;
839     }
841     /* unlock scheduler */
842     IGateProvider_leave(MessageQ_module->gate, key);
844     /* Fill in the message queue object */
845     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
846     List_construct(List_struct(listHandle), NULL);
848     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
849     List_construct(List_struct(listHandle), NULL);
851     obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queueIndex;
853     obj->unblocked = FALSE;
855     /* Add into NameServer */
856     if (name != NULL) {
857         obj->nsKey = NameServer_addUInt32(
858                 (NameServer_Handle)MessageQ_module->nameServer, name,
859                 obj->queue);
861         if (obj->nsKey == NULL) {
862             Error_raise(eb, ti_sdo_ipc_MessageQ_E_nameFailed, name, 0);
863             return (4);
864         }
865     }
867     return (0);
870 /*
871  *  ======== MessageQ_Instance_finalize ========
872  */
873 Void ti_sdo_ipc_MessageQ_Instance_finalize(
874         ti_sdo_ipc_MessageQ_Object* obj, Int status)
876     UInt key;
877     MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
878     List_Handle listHandle;
880     /* Requested queueId was not available. Nothing was done in the init */
881     if (status == 5) {
882         return;
883     }
885     if (obj->syncSemHandle != NULL) {
886         SyncSem_delete(&obj->syncSemHandle);
887     }
889     listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
891     /* Destruct the list */
892     List_destruct(List_struct(listHandle));
894     listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
896     /* Destruct the list */
897     List_destruct(List_struct(listHandle));
899     /* lock */
900     key = IGateProvider_enter(MessageQ_module->gate);
902     /* Null out entry in the array. */
903     MessageQ_module->queues[index] = NULL;
905     /* unlock scheduler */
906     IGateProvider_leave(MessageQ_module->gate, key);
908     if (obj->nsKey != NULL) {
909         NameServer_removeEntry((NameServer_Handle)MessageQ_module->nameServer,
910             obj->nsKey);
911     }
914 /*
915  *************************************************************************
916  *                       Internal functions
917  *************************************************************************
918  */
920 /*
921  *  ======== ti_sdo_ipc_MessageQ_grow ========
922  */
923 UInt16 ti_sdo_ipc_MessageQ_grow(ti_sdo_ipc_MessageQ_Object *obj,
924         Error_Block *eb)
926     UInt16 oldSize;
927     UInt16 queueIndex = MessageQ_module->numQueues;
928     ti_sdo_ipc_MessageQ_Handle *queues;
929     ti_sdo_ipc_MessageQ_Handle *oldQueues;
930     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
933     /* Allocate larger table */
934     queues = Memory_alloc(ti_sdo_ipc_MessageQ_Object_heap(),
935                           oldSize + sizeof(MessageQ_Handle), 0, eb);
937     if (queues == NULL) {
938         return (MessageQ_INVALIDMESSAGEQ);
939     }
941     /* Copy contents into new table */
942     memcpy(queues, MessageQ_module->queues, oldSize);
944     /* Fill in the new entry */
945     queues[queueIndex] = obj;
947     /* Hook-up new table */
948     oldQueues = MessageQ_module->queues;
949     MessageQ_module->queues = queues;
950     MessageQ_module->numQueues++;
952     /* Delete old table if not statically defined */
953     if (MessageQ_module->canFreeQueues == TRUE) {
954         Memory_free(ti_sdo_ipc_MessageQ_Object_heap(), oldQueues, oldSize);
955     }
956     else {
957         MessageQ_module->canFreeQueues = TRUE;
958     }
960     return (queueIndex);