]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/api/MessageQ.c
MessageQ: Fix MessageQ_get() dropping messages
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 5921fe78325111b341ea28621cdc463a342bd9c5..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
@@ -221,7 +221,7 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
 }
 
 /* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+Int MessageQ_setup(const MessageQ_Config * cfg)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -266,10 +266,9 @@ Int MessageQ_setup (const MessageQ_Config * cfg)
 
     /* Clear sockets array. */
     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
-       MessageQ_module->sock[i]      = Transport_INVALIDSOCKET;
+        MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
     }
 
-
     return status;
 }
 
@@ -333,7 +332,7 @@ Void MessageQ_Params_init (MessageQ_Params * params)
  */
 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
 {
-    Int                   status    = MessageQ_S_SUCCESS;
+    Int                   status;
     MessageQ_Object *     obj    = NULL;
     UInt16                queueIndex = 0u;
     UInt16                procId;
@@ -413,7 +412,7 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
         status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
                                            queueIndex);
         if (status < 0) {
-           goto cleanup;
+           obj->fd[rprocId] = Transport_INVALIDSOCKET;
         }
     }
 
@@ -425,13 +424,20 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
     if (obj->unblockFd == -1)  {
         printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
                    errno, strerror(errno));
-        status = MessageQ_E_FAIL;
+        MessageQ_delete((MessageQ_Handle *)&obj);
     }
+    else {
+        int endpointFound = 0;
 
-cleanup:
-    /* Cleanup if fail: */
-    if (status < 0) {
-        MessageQ_delete((MessageQ_Handle *)&obj);
+        for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
+            if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+                endpointFound = 1;
+            }
+        }
+        if (!endpointFound) {
+            printf("MessageQ_create: no transport endpoints found, deleting\n");
+            MessageQ_delete((MessageQ_Handle *)&obj);
+        }
     }
 
     return ((MessageQ_Handle) obj);
@@ -585,6 +591,7 @@ Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
  */
 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
 {
+    static int last = 0;
     Int     status = MessageQ_S_SUCCESS;
     Int     tmpStatus;
     MessageQ_Object * obj = (MessageQ_Object *) handle;
@@ -595,11 +602,14 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
     void    *timevalPtr;
     UInt16  rprocId;
     int     maxfd = 0;
+    int     selfId;
+    int     nProcessors;
 
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (rprocId == MultiProc_self()) {
+        if (rprocId == MultiProc_self() ||
+            obj->fd[rprocId] == Transport_INVALIDSOCKET) {
             continue;
         }
         maxfd = MAX(maxfd, obj->fd[rprocId]);
@@ -636,20 +646,30 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
             status = MessageQ_E_UNBLOCKED;
         }
         else {
-            for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
-                 rprocId++) {
-                if (rprocId == MultiProc_self()) {
-                    continue;
-                }
-                if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                    /* Our transport's fd was signalled: Get the message: */
-                    tmpStatus = transportGet(obj->fd[rprocId], msg);
-                    if (tmpStatus < 0) {
-                        printf ("MessageQ_get: tranposrtshm_get failed.");
-                        status = MessageQ_E_FAIL;
-                    }
+           /* start where we last left off */
+           rprocId = last;
+
+           selfId = MultiProc_self();
+           nProcessors = MultiProc_getNumProcessors();
+
+           do {
+                if (rprocId != selfId &&
+                    obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+                   if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+                       /* Our transport's fd was signalled: Get the message */
+                       tmpStatus = transportGet(obj->fd[rprocId], msg);
+                       if (tmpStatus < 0) {
+                           printf ("MessageQ_get: tranposrtshm_get failed.");
+                           status = MessageQ_E_FAIL;
+                       }
+
+                       last = (rprocId + 1) % nProcessors;
+                       break;
+                   }
                 }
-            }
+               rprocId = (rprocId + 1) % nProcessors;
+            } while (rprocId != last);
         }
     }
     else if (retval == 0) {
@@ -840,7 +860,13 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
             PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
             MessageQ_module->sock[remoteProcId] = sock;
             /* Attempt to connect: */
-            ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            if (status < 0) {
+                status = MessageQ_E_RESOURCE;
+                /* don't hard-printf since this is no longer fatal */
+                PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
+                       remoteProcId);
+            }
         }
     }
     else {
@@ -849,6 +875,10 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
 
+    if (status == MessageQ_E_RESOURCE) {
+        MessageQ_detach(remoteProcId);
+    }
+
 exit:
     return (status);
 }
@@ -870,14 +900,16 @@ Int MessageQ_detach (UInt16 remoteProcId)
     pthread_mutex_lock (&(MessageQ_module->gate));
 
     sock = MessageQ_module->sock[remoteProcId];
-    if (close (sock)) {
-        status = MessageQ_E_OSFAILURE;
-        printf ("MessageQ_detach: close failed: %d, %s\n",
-                       errno, strerror(errno));
-    }
-    else {
-        PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
-        MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+    if (sock != Transport_INVALIDSOCKET) {
+        if (close(sock)) {
+            status = MessageQ_E_OSFAILURE;
+            printf("MessageQ_detach: close failed: %d, %s\n",
+                   errno, strerror(errno));
+        }
+        else {
+            PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
+            MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+        }
     }
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
@@ -900,7 +932,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
         PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
+          "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
            getpid())
 
         return;
@@ -969,8 +1001,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
     err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
     if (err < 0) {
         status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: bind failed: %d, %s\n",
-                  errno, strerror(errno));
+        /* don't hard-printf since this is no longer fatal */
+        PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
+                      errno, strerror(errno));
     }
 
 exit: