MessageQ: Fix MessageQ_get() dropping messages
authorRobert Tivy <rtivy@ti.com>
Tue, 10 Sep 2013 23:01:49 +0000 (16:01 -0700)
committerChris Ring <cring@ti.com>
Wed, 11 Sep 2013 21:17:16 +0000 (14:17 -0700)
MessageQ_get() had a bug where it would loop over all remote cores
looking for and getting a message and would return the message from
the highest numbered processor (in MultiProc order) that had one
available, tossing messages received from lower numbered processors.

The MessageQ_get() loop now terminates as soon as the first message
is found.  Further to this change, on the next call to the function
it starts looking for messages from the processor that is "next" in
the (cyclic) list after the one in which it last found a message.

Fixes SDOCM00103664.

Signed-off-by: Robert Tivy <rtivy@ti.com>
linux/src/api/MessageQ.c

index 827af575bd152447f5627250bb0eab09a98167a9..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
@@ -591,6 +591,7 @@ Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
  */
 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
 {
  */
 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
 {
+    static int last = 0;
     Int     status = MessageQ_S_SUCCESS;
     Int     tmpStatus;
     MessageQ_Object * obj = (MessageQ_Object *) handle;
     Int     status = MessageQ_S_SUCCESS;
     Int     tmpStatus;
     MessageQ_Object * obj = (MessageQ_Object *) handle;
@@ -601,6 +602,8 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
     void    *timevalPtr;
     UInt16  rprocId;
     int     maxfd = 0;
     void    *timevalPtr;
     UInt16  rprocId;
     int     maxfd = 0;
+    int     selfId;
+    int     nProcessors;
 
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
 
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
@@ -643,21 +646,30 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
             status = MessageQ_E_UNBLOCKED;
         }
         else {
             status = MessageQ_E_UNBLOCKED;
         }
         else {
-            for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
-                 rprocId++) {
-                if (rprocId == MultiProc_self() ||
-                    obj->fd[rprocId] == Transport_INVALIDSOCKET) {
-                    continue;
+           /* start where we last left off */
+           rprocId = last;
+
+           selfId = MultiProc_self();
+           nProcessors = MultiProc_getNumProcessors();
+
+           do {
+                if (rprocId != selfId &&
+                    obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+                   if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+                       /* Our transport's fd was signalled: Get the message */
+                       tmpStatus = transportGet(obj->fd[rprocId], msg);
+                       if (tmpStatus < 0) {
+                           printf ("MessageQ_get: tranposrtshm_get failed.");
+                           status = MessageQ_E_FAIL;
+                       }
+
+                       last = (rprocId + 1) % nProcessors;
+                       break;
+                   }
                 }
                 }
-                if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                    /* Our transport's fd was signalled: Get the message: */
-                    tmpStatus = transportGet(obj->fd[rprocId], msg);
-                    if (tmpStatus < 0) {
-                        printf ("MessageQ_get: tranposrtshm_get failed.");
-                        status = MessageQ_E_FAIL;
-                    }
-                }
-            }
+               rprocId = (rprocId + 1) % nProcessors;
+            } while (rprocId != last);
         }
     }
     else if (retval == 0) {
         }
     }
     else if (retval == 0) {