index a92456c9deec8aac23c04d6eb80340cfaee6b812..111ce9278f258b999d597e16072cdab8ac5a2e1a 100644 (file)
/*
- * Copyright (c) 2012-2013, Texas Instruments Incorporated
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#include <ti/sysbios/hal/Hwi.h>
#include <ti/sysbios/syncs/SyncSem.h>
+#include <ti/sdo/ipc/interfaces/ITransport.h>
#include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
+#include <ti/sdo/ipc/interfaces/INetworkTransport.h>
#include <ti/sdo/utils/List.h>
/* must be included after the internal header file for now */
#pragma FUNC_EXT_CALLED(MessageQ_put);
#pragma FUNC_EXT_CALLED(MessageQ_registerHeap);
#pragma FUNC_EXT_CALLED(MessageQ_setFreeHookFxn);
+ #pragma FUNC_EXT_CALLED(MessageQ_setPutHookFxn);
#pragma FUNC_EXT_CALLED(MessageQ_setReplyQueue);
#pragma FUNC_EXT_CALLED(MessageQ_setMsgTrace);
#pragma FUNC_EXT_CALLED(MessageQ_staticMsgInit);
/*
* ======== MessageQ_alloc ========
- * Allocate a message and initial the needed fields (note some
- * of the fields in the header at set via other APIs or in the
- * MessageQ_put function.
+ * Allocate a message and initialize the needed fields
+ *
+ * Note: some of the fields in the header are set via other
+ * APIs or in the MessageQ_put function.
*/
-MessageQ_Msg MessageQ_alloc(UInt16 heapId, Uint32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
{
MessageQ_Msg msg;
Error_Block eb;
UInt16 srcProc;
#endif
ti_sdo_ipc_MessageQ_Object *obj;
+ Int tid;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
msg->dstId = (UInt16)(queueId);
msg->dstProc = (UInt16)(queueId >> 16);
- if (dstProcId != MultiProc_self()) {
+ /* invoke put hook function after addressing the message */
+ if (MessageQ_module->putHookFxn != NULL) {
+ MessageQ_module->putHookFxn(queueId, (Ptr)msg);
+ }
+
+ /* extract the transport ID from the message header */
+ tid = MessageQ_getTransportId(msg);
+
+ /* if recipient is local, use direct message delivery */
+ if (dstProcId == MultiProc_self()) {
+ /* Assert queueId is valid */
+ Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
+ ti_sdo_ipc_MessageQ_A_invalidQueueId);
+
+ /* It is a local MessageQ */
+ obj = MessageQ_module->queues[(UInt16)(queueId)];
+
+ /* Assert object is not NULL */
+ Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
+
+ if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
+ List_putHead(listHandle, (List_Elem *)msg);
+ }
+ else {
+ if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
+ }
+ else {
+ listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
+ }
+ /* put on the queue */
+ List_put(listHandle, (List_Elem *)msg);
+ }
+
+ ISync_signal(obj->synchronizer);
+
+ status = MessageQ_S_SUCCESS;
+
+ if ((ti_sdo_ipc_MessageQ_traceFlag) ||
+ (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
+ Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
+ (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
+ }
+ }
+
+ /* if transport ID is zero, use primary transport array */
+ else if (tid == 0) {
/* assert that dstProcId is valid */
Assert_isTrue(dstProcId < ti_sdo_utils_MultiProc_numProcessors,
ti_sdo_ipc_MessageQ_A_procIdInvalid);
status = MessageQ_E_FAIL;
}
}
- else {
- /* Assert queueId is valid */
- Assert_isTrue((UInt16)queueId < MessageQ_module->numQueues,
- ti_sdo_ipc_MessageQ_A_invalidQueueId);
- /* It is a local MessageQ */
- obj = MessageQ_module->queues[(UInt16)(queueId)];
-
- /* Assert object is not NULL */
- Assert_isTrue(obj != NULL, ti_sdo_ipc_MessageQ_A_invalidObj);
+ /* use a registered transport to deliver the message */
+ else {
+ baseTrans = MessageQ_module->regTrans[tid].transport;
- if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_URGENTPRI) {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
- List_putHead(listHandle, (List_Elem *)msg);
- }
- else {
- if ((msg->flags & MessageQ_PRIORITYMASK) == MessageQ_NORMALPRI) {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_normalList(obj);
- }
- else {
- listHandle = ti_sdo_ipc_MessageQ_Instance_State_highList(obj);
- }
- /* put on the queue */
- List_put(listHandle, (List_Elem *)msg);
+ if (baseTrans == NULL) {
+ /* raise error */
+ status = MessageQ_E_FAIL;
+ goto leave;
}
- ISync_signal(obj->synchronizer);
+ switch (MessageQ_module->regTrans[tid].type) {
- status = MessageQ_S_SUCCESS;
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
- if ((ti_sdo_ipc_MessageQ_traceFlag) ||
- (msg->flags & ti_sdo_ipc_MessageQ_TRACEMASK) != 0) {
- Log_write4(ti_sdo_ipc_MessageQ_LM_putLocal, (UArg)(msg),
- (UArg)(msg->seqNum), (UArg)(msg->srcProc), (UArg)(obj));
+ if (INetworkTransport_put(netTrans, msg)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
}
}
+leave:
return (status);
}
MessageQ_module->freeHookFxn = freeHookFxn;
}
+/*
+ * ======== MessageQ_setPutHookFxn ========
+ */
+Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn)
+{
+ MessageQ_module->putHookFxn = (ti_sdo_ipc_MessageQ_PutHookFxn)putHookFxn;
+}
+
/*
* ======== MessageQ_setMsgTrace ========
*/
/*
* ======== MessageQ_staticMsgInit ========
*/
-Void MessageQ_staticMsgInit(MessageQ_Msg msg, Uint32 size)
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
{
Assert_isTrue((msg != NULL), ti_sdo_ipc_MessageQ_A_invalidMsg);
Hwi_restore(key);
}
+/*
+ * ======== ti_sdo_ipc_MessageQ_registerTransportId ========
+ */
+Bool ti_sdo_ipc_MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+ ti_sdo_ipc_MessageQ_TransportType type;
+
+ /* validate transport ID */
+ if ((tid < 1) || (tid > 7)) {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* don't overwrite an existing transport */
+ if (MessageQ_module->regTrans[tid].transport != NULL) {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* determine the transport type */
+ if (INetworkTransport_Handle_downCast(inst) != NULL) {
+ type = ti_sdo_ipc_MessageQ_TransportType_INetworkTransport;
+ }
+ else {
+ /* raise error */
+ return (FALSE);
+ }
+
+ /* register the transport instance */
+ MessageQ_module->regTrans[tid].transport = inst;
+ MessageQ_module->regTrans[tid].type = type;
+ return (TRUE);
+}
+
+/*
+ * ======== ti_sdo_ipc_MessageQ_registerTransportId ========
+ */
+Bool ti_sdo_ipc_MessageQ_unregisterTransportId(UInt tid)
+{
+ /* forget the registered transport instance */
+ MessageQ_module->regTrans[tid].transport = NULL;
+ MessageQ_module->regTrans[tid].type =
+ ti_sdo_ipc_MessageQ_TransportType_Invalid;
+
+ return (TRUE);
+}
+
+
/*
*************************************************************************
* Instance functions
@@ -760,6 +860,10 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
List_Handle listHandle;
SyncSem_Handle syncSemHandle;
MessageQ_QueueIndex queueIndex;
+ Int tid;
+ Int status;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
/* lock */
key = IGateProvider_enter(MessageQ_module->gate);
@@ -864,6 +968,33 @@ Int ti_sdo_ipc_MessageQ_Instance_init(ti_sdo_ipc_MessageQ_Object *obj, String na
}
}
+ /* notify all registered transports about the new queue */
+ for (tid = 1; tid <= 7; tid++) {
+ if (MessageQ_module->regTrans[tid].transport == NULL) {
+ continue;
+ }
+ baseTrans = MessageQ_module->regTrans[tid].transport;
+
+ switch (MessageQ_module->regTrans[tid].type) {
+
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
+
+ if (INetworkTransport_bind(netTrans, obj->queue)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
+ }
+
+ /* check for failure */
+ if (status < 0) {
+ /* TODO add error handling */
+ }
+ }
+
return (0);
}
UInt key;
MessageQ_QueueIndex index = (MessageQ_QueueIndex)(obj->queue);
List_Handle listHandle;
+ Int tid;
+ ITransport_Handle baseTrans;
+ INetworkTransport_Handle netTrans;
/* Requested queueId was not available. Nothing was done in the init */
if (status == 5) {
return;
}
+ /* notify all registered transports that given queue is being deleted */
+ for (tid = 1; tid <= 7; tid++) {
+ if (MessageQ_module->regTrans[tid].transport == NULL) {
+ continue;
+ }
+ baseTrans = MessageQ_module->regTrans[tid].transport;
+
+ switch (MessageQ_module->regTrans[tid].type) {
+
+ case ti_sdo_ipc_MessageQ_TransportType_INetworkTransport:
+ netTrans = INetworkTransport_Handle_downCast(baseTrans);
+
+ if (INetworkTransport_unbind(netTrans, obj->queue)) {
+ status = MessageQ_S_SUCCESS;
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+ break;
+ }
+
+ /* check for failure */
+ if (status < 0) {
+ /* TODO add error handling */
+ }
+ }
+
if (obj->syncSemHandle != NULL) {
SyncSem_delete(&obj->syncSemHandle);
}