Re-work MessageQ_put to eliminate transport recursion
authorRamsey Harris <ramsey@ti.com>
Thu, 19 Feb 2015 22:56:44 +0000 (14:56 -0800)
committerRobert Tivy <rtivy@ti.com>
Sat, 21 Feb 2015 00:47:34 +0000 (16:47 -0800)
On Linux, when delivering an inbound message sent from another process
on the same processor, the logic in MessageQ_put caused a transport
infinite recursion. In other words, the message was given back to the
transport instead of being delivered to the queue. The new logic is to
attempt the local message deliver first; if it fails, then give it to
the transport for delivery instead of failing back to the caller.

linux/src/api/MessageQ.c

index 9ebf0063332aec489d697b1d8519aabcc4948947..53185fcdabfce7d44d74f0765cb577befcbe210d 100644 (file)
@@ -772,58 +772,95 @@ Int MessageQ_close(MessageQ_QueueId *queueId)
 
 /*
  *  ======== MessageQ_put ========
- *  Place a message onto a message queue
+ *  Deliver the given message, either locally or to the transport
  *
- *  Calls transport's put(), which handles the sending of the message
- *  using the appropriate kernel interface (socket, device ioctl) call
- *  for the remote procId encoded in the queueId argument.
+ *  If the destination is a local queue, deliver the message. Otherwise,
+ *  pass the message to a transport for delivery. The transport handles
+ *  the sending of the message using the appropriate interface (socket,
+ *  device ioctl, etc.).
  */
 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 {
+    Int status = MessageQ_S_SUCCESS;
     MessageQ_Object *obj;
-    UInt16   dstProcId  = (UInt16)(queueId >> 16);
-    UInt16   queueIndex;
-    UInt16   queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
-    Int      status = MessageQ_S_SUCCESS;
+    UInt16 dstProcId;
+    UInt16 queueIndex;
+    UInt16 queuePort;
     ITransport_Handle baseTrans;
     IMessageQTransport_Handle msgTrans;
     INetworkTransport_Handle netTrans;
     Int priority;
     UInt tid;
     UInt16 clusterId;
+    Bool delivered;
+
+    /* extract destination address from the given queueId */
+    dstProcId  = (UInt16)(queueId >> 16);
+    queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
 
-    /* use the queue port # for destination address */
+    /* write the destination address into the message header */
     msg->dstId = queuePort;
     msg->dstProc= dstProcId;
 
-    /* invoke put hook function after addressing the message */
+    /* invoke the hook function after addressing the message */
     if (MessageQ_module->putHookFxn != NULL) {
         MessageQ_module->putHookFxn(queueId, msg);
     }
 
-    /* extract the transport ID from the message header */
+    /*  For an outbound message: If message destination is on this
+     *  processor, then check if the destination queue is in this
+     *  process (thread-to-thread messaging).
+     *
+     *  For an inbound message: Check if destination queue is in this
+     *  process (process-to-process messaging).
+     */
+    if (dstProcId == MultiProc_self()) {
+        queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+        if (queueIndex < MessageQ_module->numQueues) {
+            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+            if (obj != NULL) {
+                /* deliver message to queue */
+                pthread_mutex_lock(&MessageQ_module->gate);
+                CIRCLEQ_INSERT_TAIL(&obj->msgList,
+                        (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+                pthread_mutex_unlock(&MessageQ_module->gate);
+                sem_post(&obj->synchronizer);
+                goto done;
+            }
+        }
+    }
+
+    /*  Getting here implies the message is outbound. Must give it to
+     *  either the primary or secondary transport for delivery. Start
+     *  by extracting the transport ID from the message header.
+     */
     tid = MessageQ_getTransportId(msg);
 
     if (tid >= MessageQ_MAXTRANSPORTS) {
         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
                 tid, MessageQ_MAXTRANSPORTS);
-        return (MessageQ_E_FAIL);
+        status = MessageQ_E_FAIL;
+        goto done;
     }
 
-    /* if tid is set, use secondary transport regardless of destination */
+    /* if transportId is set, use secondary transport for message delivery */
     if (tid != 0) {
         baseTrans = MessageQ_module->transInst[tid];
 
         if (baseTrans == NULL) {
             printf("MessageQ_put: Error: transport is null\n");
-            return (MessageQ_E_FAIL);
+            status = MessageQ_E_FAIL;
+            goto done;
         }
 
         /* downcast instance pointer to transport interface */
         switch (ITransport_itype(baseTrans)) {
             case INetworkTransport_TypeId:
                 netTrans = INetworkTransport_downCast(baseTrans);
-                INetworkTransport_put(netTrans, (Ptr)msg);
+                delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+                status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
                 break;
 
             default:
@@ -835,49 +872,24 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
         }
     }
     else {
-        /* if destination on another processor, use primary transport */
-        if (dstProcId != MultiProc_self()) {
-            priority = MessageQ_getMsgPri(msg);
-            clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
-
-            /* primary transport can only be used for intra-cluster delivery */
-            if (clusterId > MultiProc_getNumProcsInCluster()) {
-                printf("MessageQ_put: Error: destination procId=%d is not "
-                        "in cluster. Must specify a transportId.\n", dstProcId);
-                return (MessageQ_E_FAIL);
-            }
-
-            msgTrans = MessageQ_module->transports[clusterId][priority];
-
-            IMessageQTransport_put(msgTrans, (Ptr)msg);
+        /* use primary transport for delivery */
+        priority = MessageQ_getMsgPri(msg);
+        clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+        /* primary transport can only be used for intra-cluster delivery */
+        if (clusterId > MultiProc_getNumProcsInCluster()) {
+            printf("MessageQ_put: Error: destination procId=%d is not "
+                    "in cluster. Must specify a transportId.\n", dstProcId);
+            status =  MessageQ_E_FAIL;
+            goto done;
         }
-        else {
-            /* check if destination queue is in this process */
-            queueIndex = queuePort - MessageQ_PORTOFFSET;
-
-            if (queueIndex >= MessageQ_module->numQueues) {
-                printf("MessageQ_put: Error: unable to deliver message, "
-                        "queueIndex too large or transportId missing.\n");
-                return (MessageQ_E_FAIL);
-            }
-
-            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
 
-            if (obj == NULL) {
-                printf("MessageQ_put: Error: unable to deliver message, "
-                        "destination queue not in this process.\n");
-                return (MessageQ_E_FAIL);
-            }
-
-            /* deliver message to process local queue */
-            pthread_mutex_lock(&MessageQ_module->gate);
-            CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
-                    elem);
-            pthread_mutex_unlock(&MessageQ_module->gate);
-            sem_post(&obj->synchronizer);
-        }
+        msgTrans = MessageQ_module->transports[clusterId][priority];
+        delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
     }
 
+done:
     return (status);
 }