Rework MessageQ_put to prioritize transport if given
authorRamsey Harris <ramsey@ti.com>
Thu, 5 Feb 2015 22:46:20 +0000 (14:46 -0800)
committerRobert Tivy <rtivy@ti.com>
Mon, 9 Feb 2015 18:43:08 +0000 (10:43 -0800)
In MessageQ_put, use secondary transport (if given) regardless
of destination queue. For processor local deliver, verify that
destination queue is process local, otherwise give error. Changed
secondary transport array to base interface type. Added interface
casting as needed.

linux/src/api/MessageQ.c

index b236277175613807ba1cb5d12711dbb34f07006e..1929e173c6ebe49fe2a389e6f59ed3b830ed39e0 100644 (file)
@@ -122,7 +122,7 @@ typedef struct MessageQ_ModuleObject {
     pthread_mutex_t           gate;
     int                       seqNum;
     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
-    INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
+    ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
     MessageQ_PutHookFxn       putHookFxn;
 } MessageQ_ModuleObject;
 
@@ -224,7 +224,7 @@ Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
         return MessageQ_E_ALREADYEXISTS;
     }
 
-    MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+    MessageQ_module->transInst[tid] = inst;
 
     return MessageQ_S_SUCCESS;
 }
@@ -462,7 +462,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
     Int                   status;
     MessageQ_Object      *obj = NULL;
     IMessageQTransport_Handle transport;
-    INetworkTransport_Handle transInst;
+    INetworkTransport_Handle netTrans;
+    ITransport_Handle     baseTrans;
     UInt16                queueIndex;
     UInt16                clusterId;
     Int                   tid;
@@ -573,10 +574,21 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
     }
 
     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
-        transInst = MessageQ_module->transInst[tid];
-        if (transInst) {
-            /* need to check return and do something if error */
-            INetworkTransport_bind((Void *)transInst, obj->queue);
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_bind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    printf("MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
         }
     }
 
@@ -603,7 +615,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
 {
     MessageQ_Object *obj;
     IMessageQTransport_Handle transport;
-    INetworkTransport_Handle transInst;
+    INetworkTransport_Handle  netTrans;
+    ITransport_Handle         baseTrans;
     Int              status = MessageQ_S_SUCCESS;
     UInt16           queueIndex;
     UInt16                clusterId;
@@ -654,9 +667,21 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
     }
 
     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
-        transInst = MessageQ_module->transInst[tid];
-        if (transInst) {
-            INetworkTransport_unbind((Void *)transInst, obj->queue);
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_unbind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    printf("MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
         }
     }
 
@@ -731,7 +756,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
     UInt16   dstProcId  = (UInt16)(queueId >> 16);
     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
     Int      status = MessageQ_S_SUCCESS;
-    ITransport_Handle transport;
+    ITransport_Handle baseTrans;
     IMessageQTransport_Handle msgTrans;
     INetworkTransport_Handle netTrans;
     Int priority;
@@ -746,9 +771,42 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
         MessageQ_module->putHookFxn(queueId, msg);
     }
 
-    if (dstProcId != MultiProc_self()) {
-        tid = MessageQ_getTransportId(msg);
-        if (tid == 0) {
+    /* extract the transport ID from the message header */
+    tid = MessageQ_getTransportId(msg);
+
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
+                tid, MessageQ_MAXTRANSPORTS);
+        return (MessageQ_E_FAIL);
+    }
+
+    /* if tid is set, use secondary transport regardless of destination */
+    if (tid != 0) {
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans == NULL) {
+            printf("MessageQ_put: Error: transport is null\n");
+            return (MessageQ_E_FAIL);
+        }
+
+        /* downcast instance pointer to transport interface */
+        switch (ITransport_itype(baseTrans)) {
+            case INetworkTransport_TypeId:
+                netTrans = INetworkTransport_downCast(baseTrans);
+                INetworkTransport_put(netTrans, (Ptr)msg);
+                break;
+
+            default:
+                /* error */
+                printf("MessageQ_put: Error: transport id %d is an "
+                        "unsupported transport type\n", tid);
+                status = MessageQ_E_FAIL;
+                break;
+        }
+    }
+    else {
+        /* if destination on another processor, use primary transport */
+        if (dstProcId != MultiProc_self()) {
             priority = MessageQ_getMsgPri(msg);
             clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
 
@@ -756,7 +814,7 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
             if (clusterId > MultiProc_getNumProcsInCluster()) {
                 printf("MessageQ_put: Error: destination procId=%d is not "
                         "in cluster. Must specify a transportId.\n", dstProcId);
-                return MessageQ_E_FAIL;
+                return (MessageQ_E_FAIL);
             }
 
             msgTrans = MessageQ_module->transports[clusterId][priority];
@@ -764,45 +822,31 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
             IMessageQTransport_put(msgTrans, (Ptr)msg);
         }
         else {
-            if (tid >= MessageQ_MAXTRANSPORTS) {
-                printf("MessageQ_put: transport id %d too big, must be < %d\n",
-                       tid, MessageQ_MAXTRANSPORTS);
-                return MessageQ_E_FAIL;
+            /* check if destination queue is in this process */
+            if (queueIndex >= MessageQ_module->numQueues) {
+                printf("MessageQ_put: Error: unable to deliver message, "
+                        "queueIndex too large or transportId missing.\n");
+                return (MessageQ_E_FAIL);
             }
 
-            /* use secondary transport */
-            netTrans = MessageQ_module->transInst[tid];
-            transport = INetworkTransport_upCast(netTrans);
+            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
 
-            /* downcast instance pointer to transport interface */
-            switch (ITransport_itype(transport)) {
-                case INetworkTransport_TypeId:
-                    INetworkTransport_put(netTrans, (Ptr)msg);
-                    break;
-
-                default:
-                    /* error */
-                    printf("MessageQ_put: Error: transport id %d is an "
-                            "unsupported transport type\n", tid);
-                    status = MessageQ_E_FAIL;
-                    break;
+            if (obj == NULL) {
+                printf("MessageQ_put: Error: unable to deliver message, "
+                        "destination queue not in this process.\n");
+                return (MessageQ_E_FAIL);
             }
-        }
-    }
-    else {
-        obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
-        pthread_mutex_lock(&MessageQ_module->gate);
 
-        /* It is a local MessageQ */
-        CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
-
-        pthread_mutex_unlock(&MessageQ_module->gate);
-
-       sem_post(&obj->synchronizer);
+            /* deliver message to process local queue */
+            pthread_mutex_lock(&MessageQ_module->gate);
+            CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
+                    elem);
+            pthread_mutex_unlock(&MessageQ_module->gate);
+            sem_post(&obj->synchronizer);
+        }
     }
 
-    return status;
+    return (status);
 }
 
 /*