index a92456c9deec8aac23c04d6eb80340cfaee6b812..da85a1e5cba64f0bb38154e6f1783de6ebe9784e 100644 (file)
/*
- * Copyright (c) 2012-2013, Texas Instruments Incorporated
+ * Copyright (c) 2012-2018 Texas Instruments Incorporated - http://www.ti.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#include <ti/sysbios/hal/Hwi.h>
#include <ti/sysbios/syncs/SyncSem.h>
+#include <ti/sdo/ipc/interfaces/ITransport.h>
#include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
+#include <ti/sdo/ipc/interfaces/INetworkTransport.h>
#include <ti/sdo/utils/List.h>
/* must be included after the internal header file for now */
+#define MessageQ_internal 1 /* must be defined before include file */
#include <ti/sdo/ipc/_MessageQ.h>
#include <ti/sdo/utils/_MultiProc.h>
#include <ti/sdo/utils/_NameServer.h>
#include "package/internal/MessageQ.xdc.h"
+/* params structure evolution */
+typedef struct {
+ Void *synchronizer;
+} MessageQ_Params_Legacy;
+
+typedef struct {
+ Int __version;
+ Void *synchronizer;
+ MessageQ_QueueIndex queueIndex;
+} MessageQ_Params_Version2;
+
#ifdef __ti__
#pragma FUNC_EXT_CALLED(MessageQ_Params_init);
#pragma FUNC_EXT_CALLED(MessageQ_Params2_init);
#pragma FUNC_EXT_CALLED(MessageQ_getQueueId);
#pragma FUNC_EXT_CALLED(MessageQ_put);
#pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
+ #pragma FUNC_EXT_CALLED(MessageQ_registerTransportId);
#pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
+ #pragma FUNC_EXT_CALLED(MessageQ_setPutHookFxn);
#pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
#pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
#pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
/*
* ======== MessageQ_Params_init ========
+ * Legacy implementation.
*/
Void MessageQ_Params_init(MessageQ_Params *params)
{
- params->synchronizer = NULL;
+ ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
+}
+
+/*
+ * ======== MessageQ_Params_init__S ========
+ * New implementation which is version aware.
+ */
+Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
+{
+ MessageQ_Params_Version2 *params2;
+
+ switch (version) {
+
+ case MessageQ_Params_VERSION_2:
+ params2 = (MessageQ_Params_Version2 *)params;
+ params2->__version = MessageQ_Params_VERSION_2;
+ params2->synchronizer = NULL;
+ params2->queueIndex = MessageQ_ANY;
+ break;
+
+ default:
+ Assert_isTrue(FALSE, 0);
+ break;
+ }
}
/*
* ======== MessageQ_Params2_init ========
+ * Deprecated
*/
Void MessageQ_Params2_init(MessageQ_Params2 *params)
{
/*
* ======== MessageQ_alloc ========
- * Allocate a message and initial the needed fields (note some
- * of the fields in the header at set via other APIs or in the
- * MessageQ_put function.
+ * Allocate a message and initialize the needed fields
+ *
+ * Note: some of the fields in the header are set via other
+ * APIs or in the MessageQ_put function.
*/
-MessageQ_Msg MessageQ_alloc(UInt16 heapId, Uint32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
{
MessageQ_Msg msg;
Error_Block eb;
/*
* ======== MessageQ_create ========
*/
-MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
{
- MessageQ_Handle handle;
- MessageQ_Params2 params2;
+ ti_sdo_ipc_MessageQ_Handle handle;
+ ti_sdo_ipc_MessageQ_Params ps;
+ Error_Block eb;
- MessageQ_Params2_init(¶ms2);
+ Error_init(&eb);
- /* Use the MessageQ_Params fields if not NULL */
- if (params != NULL) {
- params2.synchronizer = params->synchronizer;
- }
+ if (pp != NULL) {
+ ti_sdo_ipc_MessageQ_Params_init(&ps);
- handle = MessageQ_create2(name, ¶ms2);
+ /* snoop the params pointer to see if it's a legacy structure */
+ if ((pp->__version == 0) || (pp->__version > 100)) {
+ ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
+ }
+
+ /* not legacy structure, use params version field */
+ else if (pp->__version == MessageQ_Params_VERSION_2) {
+ ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
+ ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
+ }
+ else {
+ Assert_isTrue(FALSE, 0);
+ }
+
+ handle = ti_sdo_ipc_MessageQ_create(name, &ps, &eb);
+ }
+ else {
+ handle = ti_sdo_ipc_MessageQ_create(name, NULL, &eb);
+ }
return ((MessageQ_Handle)handle);
}
/*
* ======== MessageQ_openQueueId ========
*/
-MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 remoteProcId)
+MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
{
+ MessageQ_QueueIndex queuePort;
MessageQ_QueueId queueId;
- queueId = ((MessageQ_QueueId)(remoteProcId) << 16) | queueIndex;
+ /* queue port is embedded in the queueId */
+ queuePort = queueIndex + MessageQ_PORTOFFSET;
+ queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
return (queueId);
}
IMessageQTransport_Handle transport;
MessageQ_QueueIndex dstProcId = (MessageQ_QueueIndex)(queueId >> 16);
List_Handle listHandle;
- Int status;
+ Int status = MessageQ_E_FAIL;
UInt priority;
#ifndef xdc_runtime_Log_DISABLE_ALL
UInt16 flags;
UInt16 srcProc;
#endif
ti_sdo_ipc_MessageQ_Object *obj;
+ Int tid;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
+ MessageQ_QueueIndex queueIndex;
+ UInt16 index;
Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
msg->dstId = (UInt16)(queueId);
msg->dstProc = (UInt16)(queueId >> 16);
- if (dstProcId != MultiProc_self()) {
+ /* invoke put hook function after addressing the message */
+ if (MessageQ_module->putHookFxn != NULL) {
+ MessageQ_module->putHookFxn(queueId, (Ptr)msg);
+ }
+
+ /* extract the transport ID from the message header */
+ tid = MessageQ_getTransportId(msg);
+
+ /* if recipient is local, use direct message delivery */
+ if (dstProcId == MultiProc_self()) {
+ /* assert queue index is valid */
+ queueIndex = MessageQ_getQueueIndex(queueId);
+ Assert_isTrue(queueIndex < MessageQ_module->numQueues,
+ ti_sdo_ipc_MessageQ_A_invalidQueueId);
+
+ /* It is a local MessageQ */
+ obj = MessageQ_module->queues[queueIndex];
+
+ /* Assert object is not NULL */
+ Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
+
+ if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
+ List_putHead(listHandle, (List_Elem *)msg);
+ }
+ else {
+ if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
+ }
+ else {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
+ }
+ /* put on the queue */
+ List_put(listHandle, (List_Elem *)msg);
+ }
+
+ ISync_signal(obj->synchronizer);
+
+ status = MessageQ_S_SUCCESS;
+
+ if ((ti_sdo_ipc_MessageQ_traceFlag) ||
+ (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
+ Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
+ (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
+ }
+ }
+
+ /* if transport ID is zero, use primary transport array */
+ else if (tid == 0) {
/* assert that dstProcId is valid */
Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
ti_sdo_ipc_MessageQ_A_procIdInvalid);
priority = (UInt)((msg->flags) &
ti_sdo_ipc_MessageQ_TRANSPORTPRIORITYMASK);
+ switch (ti_sdo_utils_MultiProc_procAddrMode) {
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
+ index = dstProcId;
+ break;
+
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
+ index = dstProcId - MultiProc_getBaseIdOfCluster();
+ break;
+
+ default:
+ Assert_isTrue(FALSE, 0);
+ break;
+ }
+
+ if (index >= MessageQ_module->transports.length) {
+ /* raise error */
+ status = MessageQ_E_FAIL;
+ goto leave;
+ }
+
/* Call the transport associated with this message queue */
- transport = MessageQ_module->transports[dstProcId][priority];
+ transport = MessageQ_module->transports.elem[index][priority];
+
if (transport == NULL) {
/* Try the other transport */
priority = !priority;
- transport = MessageQ_module->transports[dstProcId][priority];
+ transport = MessageQ_module->transports.elem[index][priority];
}
/* assert transport is not null */
status = MessageQ_E_FAIL;
}
}
- else {
- /* Assert queueId is valid */
- Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
- ti_sdo_ipc_MessageQ_A_invalidQueueId);
-
- /* It is a local MessageQ */
- obj = MessageQ_module->queues[(UInt16)(queueId)];
- /* Assert object is not NULL */
- Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
+ /* use a registered transport to deliver the message */
+ else {
+ baseTrans = MessageQ_module->regTrans[tid].transport;
- if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
- List_putHead(listHandle, (List_Elem *)msg);
- }
- else {
- if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
- }
- else {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
- }
- /* put on the queue */
- List_put(listHandle, (List_Elem *)msg);
+ if (baseTrans == NULL) {
+ /* raise error */
+ status = MessageQ_E_FAIL;
+ goto leave;
}
- ISync_signal(obj->synchronizer);
-
- status = MessageQ_S_SUCCESS;
-
- if ((ti_sdo_ipc_MessageQ_traceFlag) ||
- (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
- Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
- (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
+ switch (MessageQ_module->regTrans[tid].type) {
+
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
+ if(netTrans == NULL) {
+ status = MessageQ_E_FAIL;
+ break;
+ }
+ if (INetworkTransport_put(netTrans, msg)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
}
}
+leave:
return (status);
}
MessageQ_module->freeHookFxn = freeHookFxn;
}
+/*
+ * ======== MessageQ_setPutHookFxn ========
+ */
+Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn)
+{
+ MessageQ_module->putHookFxn = (ti_sdo_ipc_MessageQ_PutHookFxn)putHookFxn;
+}
+
/*
* ======== MessageQ_setMsgTrace ========
*/
/*
* ======== MessageQ_staticMsgInit ========
*/
-Void MessageQ_staticMsgInit(MessageQ_Msg msg, Uint32 size)
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
{
Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
/*
* ======== ti_sdo_ipc_MessageQ_registerTransport ========
- * Register a transport
*/
Bool ti_sdo_ipc_MessageQ_registerTransport(IMessageQTransport_Handle handle,
UInt16 procId, UInt priority)
{
Bool flag = FALSE;
UInt key;
+ UInt16 index;
- /* Make sure the procId is valid */
- Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
+ Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors,
+ ti_sdo_ipc_MessageQ_A_procIdInvalid);
+
+ switch (ti_sdo_utils_MultiProc_procAddrMode) {
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
+ index = procId;
+ break;
+
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
+ index = procId - MultiProc_getBaseIdOfCluster();
+ break;
+
+ default:
+ Assert_isTrue(FALSE, 0);
+ break;
+ }
+
+ Assert_isTrue(index < MessageQ_module->transports.length,
+ ti_sdo_ipc_MessageQ_A_procIdInvalid);
/* lock scheduler */
key = Hwi_disable();
- /* Make sure the id is not already in use */
- if (MessageQ_module->transports[procId][priority] == NULL) {
- MessageQ_module->transports[procId][priority] = handle;
+ /* make sure the id is not already in use */
+ if (MessageQ_module->transports.elem[index][priority] == NULL) {
+ MessageQ_module->transports.elem[index][priority] = handle;
flag = TRUE;
}
/*
* ======== ti_sdo_ipc_MessageQ_unregisterTransport ========
- * Unregister a heap
*/
Void ti_sdo_ipc_MessageQ_unregisterTransport(UInt16 procId, UInt priority)
{
UInt key;
+ UInt16 index;
+
+ Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors,
+ ti_sdo_ipc_MessageQ_A_procIdInvalid);
+
+ switch (ti_sdo_utils_MultiProc_procAddrMode) {
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Global:
+ index = procId;
+ break;
- /* Make sure the procId is valid */
- Assert_isTrue(procId < ti_sdo_utils_MultiProc_numProcessors, ti_sdo_ipc_MessageQ_A_procIdInvalid);
+ case ti_sdo_utils_MultiProc_ProcAddrMode_Cluster:
+ index = procId - MultiProc_getBaseIdOfCluster();
+ break;
+
+ default:
+ Assert_isTrue(FALSE, 0);
+ break;
+ }
+
+ Assert_isTrue(index < MessageQ_module->transports.length,
+ ti_sdo_ipc_MessageQ_A_procIdInvalid);
/* lock scheduler */
key = Hwi_disable();
- MessageQ_module->transports[procId][priority] = NULL;
+ MessageQ_module->transports.elem[index][priority] = NULL;
/* unlock scheduler */
Hwi_restore(key);
}
+/*
+ * ======== MessageQ_registerTransportId ========
+ */
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+ ti_sdo_ipc_MessageQ_TransportType type;
+
+ /* validate transport ID */
+ if ((tid < 1) || (tid > 7)) {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* don't overwrite an existing transport */
+ if (MessageQ_module->regTrans[tid].transport != NULL) {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* determine the transport type */
+ if (INetworkTransport_Handle_downCast(inst) != NULL) {
+ type = ti_sdo_ipc_MessageQ_TransportType_INetworkTransport;
+ }
+ else {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* register the transport instance */
+ MessageQ_module->regTrans[tid].transport = inst;
+ MessageQ_module->regTrans[tid].type = type;
+ return (TRUE);
+}
+
+/*
+ * ======== MessageQ_unregisterTransportId ========
+ */
+Void MessageQ_unregisterTransportId(UInt tid)
+{
+ /* forget the registered transport instance */
+ MessageQ_module->regTrans[tid].transport = NULL;
+ MessageQ_module->regTrans[tid].type =
+ ti_sdo_ipc_MessageQ_TransportType_Invalid;
+
+ return;
+}
+
+
/*
*************************************************************************
* Instance functions
/*
* ======== MessageQ_Instance_init ========
*/
-Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String name,
- const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
+Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj,
+ String name, const ti_sdo_ipc_MessageQ_Params *params, Error_Block *eb)
{
Int i;
UInt16 start;
@@ -760,6 +979,11 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
List_Handle listHandle;
SyncSem_Handle syncSemHandle;
MessageQ_QueueIndex queueIndex;
+ MessageQ_QueueIndex queuePort;
+ Int tid;
+ Int status = MessageQ_E_FAIL;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
/* lock */
key = IGateProvider_enter(MessageQ_module->gate);
@@ -778,7 +1002,6 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
found = TRUE;
}
else {
-
start = ti_sdo_ipc_MessageQ_numReservedEntries;
count = MessageQ_module->numQueues;
@@ -848,7 +1071,9 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
List_construct(List_struct(listHandle), NULL);
- obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queueIndex;
+ /* queue port is embedded in the queueId */
+ queuePort = queueIndex + MessageQ_PORTOFFSET;
+ obj->queue = ((MessageQ_QueueId)(MultiProc_self()) << 16) | queuePort;
obj->unblocked = FALSE;
@@ -864,6 +1089,37 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
}
}
+ /* notify all registered transports about the new queue */
+ for (tid = 1; tid <= 7; tid++) {
+ if (MessageQ_module->regTrans[tid].transport == NULL) {
+ continue;
+ }
+ baseTrans = MessageQ_module->regTrans[tid].transport;
+
+ switch (MessageQ_module->regTrans[tid].type) {
+
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
+ if(netTrans == NULL) {
+ status = MessageQ_E_FAIL;
+ break;
+ }
+
+ if (INetworkTransport_bind(netTrans, obj->queue)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
+ }
+
+ /* check for failure */
+ if (status < 0) {
+ return(6);
+ }
+ }
+
return (0);
}
ti_sdo_ipc_MessageQ_Object* obj, Int status)
{
UInt key;
- MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
+ MessageQ_QueueIndex queueIndex;
List_Handle listHandle;
+ Int tid;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
/* Requested queueId was not available. Nothing was done in the init */
if (status == 5) {
return;
}
+ /* notify all registered transports that given queue is being deleted */
+ for (tid = 1; tid <= 7; tid++) {
+ if (MessageQ_module->regTrans[tid].transport == NULL) {
+ continue;
+ }
+ baseTrans = MessageQ_module->regTrans[tid].transport;
+
+ switch (MessageQ_module->regTrans[tid].type) {
+
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
+ if(netTrans == NULL) {
+ status = MessageQ_E_FAIL;
+ break;
+ }
+
+ if (INetworkTransport_unbind(netTrans, obj->queue)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
+ }
+
+ /* check for failure */
+ if (status < 0) {
+ /* TODO add error handling */
+ }
+ }
+
if (obj->syncSemHandle != NULL) {
SyncSem_delete(&obj->syncSemHandle);
}
key = IGateProvider_enter(MessageQ_module->gate);
/* Null out entry in the array. */
- MessageQ_module->queues[index] = NULL;
+ queueIndex = MessageQ_getQueueIndex(obj->queue);
+ MessageQ_module->queues[queueIndex] = NULL;
/* unlock scheduler */
IGateProvider_leave(MessageQ_module->gate, key);