MessageQ: Fix MessageQ_get() dropping messages
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 827af575bd152447f5627250bb0eab09a98167a9..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
@@ -591,6 +591,7 @@ Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
  */
 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
 {
+    static int last = 0;
     Int     status = MessageQ_S_SUCCESS;
     Int     tmpStatus;
     MessageQ_Object * obj = (MessageQ_Object *) handle;
@@ -601,6 +602,8 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
     void    *timevalPtr;
     UInt16  rprocId;
     int     maxfd = 0;
+    int     selfId;
+    int     nProcessors;
 
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
@@ -643,21 +646,30 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
             status = MessageQ_E_UNBLOCKED;
         }
         else {
-            for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
-                 rprocId++) {
-                if (rprocId == MultiProc_self() ||
-                    obj->fd[rprocId] == Transport_INVALIDSOCKET) {
-                    continue;
+           /* start where we last left off */
+           rprocId = last;
+
+           selfId = MultiProc_self();
+           nProcessors = MultiProc_getNumProcessors();
+
+           do {
+                if (rprocId != selfId &&
+                    obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+                   if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+                       /* Our transport's fd was signalled: Get the message */
+                       tmpStatus = transportGet(obj->fd[rprocId], msg);
+                       if (tmpStatus < 0) {
+                           printf ("MessageQ_get: tranposrtshm_get failed.");
+                           status = MessageQ_E_FAIL;
+                       }
+
+                       last = (rprocId + 1) % nProcessors;
+                       break;
+                   }
                 }
-                if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                    /* Our transport's fd was signalled: Get the message: */
-                    tmpStatus = transportGet(obj->fd[rprocId], msg);
-                    if (tmpStatus < 0) {
-                        printf ("MessageQ_get: tranposrtshm_get failed.");
-                        status = MessageQ_E_FAIL;
-                    }
-                }
-            }
+               rprocId = (rprocId + 1) % nProcessors;
+            } while (rprocId != last);
         }
     }
     else if (retval == 0) {