MessageQ: Fix MessageQ_get() dropping messages
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 55aabce6efafd250ec25eb9e681d3926ee1460da..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 /*
- *  @file       MessageQ.c
- *  @brief      Prototype Mapping of SysLink MessageQ to Socket ABI
- *              (SysLink 3).
- *
- *  @ver        02.00.00.51_alpha2 (kernel code is basis for this module)
- *
- */
-/*============================================================================
  *  @file   MessageQ.c
  *
  *  @brief  MessageQ module "client" implementation
  *  system-wide data is maintained in a "server" component and process-
  *  specific data is handled here.  At the moment, this implementation
  *  connects and communicates with LAD for the server connection.
- *
- *  The MessageQ module supports the structured sending and receiving of
- *  variable length messages. This module can be used for homogeneous or
- *  heterogeneous multi-processor messaging.
- *
- *  MessageQ provides more sophisticated messaging than other modules. It is
- *  typically used for complex situations such as multi-processor messaging.
- *
- *  The following are key features of the MessageQ module:
- *  -Writers and readers can be relocated to another processor with no
- *   runtime code changes.
- *  -Timeouts are allowed when receiving messages.
- *  -Readers can determine the writer and reply back.
- *  -Receiving a message is deterministic when the timeout is zero.
- *  -Messages can reside on any message queue.
- *  -Supports zero-copy transfers.
- *  -Can send and receive from any type of thread.
- *  -Notification mechanism is specified by application.
- *  -Allows QoS (quality of service) on message buffer pools. For example,
- *   using specific buffer pools for specific message queues.
- *
- *  Messages are sent and received via a message queue. A reader is a thread
- *  that gets (reads) messages from a message queue. A writer is a thread that
- *  puts (writes) a message to a message queue. Each message queue has one
- *  reader and can have many writers. A thread may read from or write to multiple
- *  message queues.
- *
- *  Conceptually, the reader thread owns a message queue. The reader thread
- *  creates a message queue. Writer threads  a created message queues to
- *  get access to them.
- *
- *  Message queues are identified by a system-wide unique name. Internally,
- *  MessageQ uses the NameServermodule for managing
- *  these names. The names are used for opening a message queue. Using
- *  names is not required.
- *
- *  Messages must be allocated from the MessageQ module. Once a message is
- *  allocated, it can be sent on any message queue. Once a message is sent, the
- *  writer loses ownership of the message and should not attempt to modify the
- *  message. Once the reader receives the message, it owns the message. It
- *  may either free the message or re-use the message.
- *
- *  Messages in a message queue can be of variable length. The only
- *  requirement is that the first field in the definition of a message must be a
- *  MsgHeader structure. For example:
- *  typedef struct MyMsg {
- *      MessageQ_MsgHeader header;
- *      ...
- *  } MyMsg;
- *
- *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
- *  should not modify or directly access the fields in the MessageQ_MsgHeader.
- *
- *  All messages sent via the MessageQ module must be allocated from a
- *  Heap implementation. The heap can be used for
- *  other memory allocation not related to MessageQ.
- *
- *  An application can use multiple heaps. The purpose of having multiple
- *  heaps is to allow an application to regulate its message usage. For
- *  example, an application can allocate critical messages from one heap of fast
- *  on-chip memory and non-critical messages from another heap of slower
- *  external memory
- *
- *  MessageQ does support the usage of messages that allocated via the
- *  alloc function. Please refer to the staticMsgInit
- *  function description for more details.
- *
- *  In a multiple processor system, MessageQ communications to other
- *  processors via MessageQTransport instances. There must be one and
- *  only one MessageQTransport instance for each processor where communication
- *  is desired.
- *  So on a four processor system, each processor must have three
- *  MessageQTransport instance.
- *
- *  The user only needs to create the MessageQTransport instances. The instances
- *  are responsible for registering themselves with MessageQ.
- *  This is accomplished via the registerTransport function.
- *
- *  ============================================================================
  */
 
 
-/* Standard headers */
-#include <Std.h>
+/* Standard IPC header */
+#include <ti/ipc/Std.h>
 
 /* Linux specific header files, replacing OSAL: */
 #include <pthread.h>
 #include <unistd.h>
 #include <assert.h>
 
-/* SysLink Socket Protocol Family */
+/* Socket Protocol Family */
 #include <net/rpmsg.h>
 
 /* Socket utils: */
@@ -308,7 +221,7 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
 }
 
 /* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+Int MessageQ_setup(const MessageQ_Config * cfg)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -353,10 +266,9 @@ Int MessageQ_setup (const MessageQ_Config * cfg)
 
     /* Clear sockets array. */
     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
-       MessageQ_module->sock[i]      = Transport_INVALIDSOCKET;
+        MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
     }
 
-
     return status;
 }
 
@@ -420,7 +332,7 @@ Void MessageQ_Params_init (MessageQ_Params * params)
  */
 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
 {
-    Int                   status    = MessageQ_S_SUCCESS;
+    Int                   status;
     MessageQ_Object *     obj    = NULL;
     UInt16                queueIndex = 0u;
     UInt16                procId;
@@ -500,7 +412,7 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
         status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
                                            queueIndex);
         if (status < 0) {
-           goto cleanup;
+           obj->fd[rprocId] = Transport_INVALIDSOCKET;
         }
     }
 
@@ -512,13 +424,20 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
     if (obj->unblockFd == -1)  {
         printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
                    errno, strerror(errno));
-        status = MessageQ_E_FAIL;
+        MessageQ_delete((MessageQ_Handle *)&obj);
     }
+    else {
+        int endpointFound = 0;
 
-cleanup:
-    /* Cleanup if fail: */
-    if (status < 0) {
-        MessageQ_delete((MessageQ_Handle *)&obj);
+        for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
+            if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+                endpointFound = 1;
+            }
+        }
+        if (!endpointFound) {
+            printf("MessageQ_create: no transport endpoints found, deleting\n");
+            MessageQ_delete((MessageQ_Handle *)&obj);
+        }
     }
 
     return ((MessageQ_Handle) obj);
@@ -672,6 +591,7 @@ Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
  */
 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
 {
+    static int last = 0;
     Int     status = MessageQ_S_SUCCESS;
     Int     tmpStatus;
     MessageQ_Object * obj = (MessageQ_Object *) handle;
@@ -682,11 +602,14 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
     void    *timevalPtr;
     UInt16  rprocId;
     int     maxfd = 0;
+    int     selfId;
+    int     nProcessors;
 
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (rprocId == MultiProc_self()) {
+        if (rprocId == MultiProc_self() ||
+            obj->fd[rprocId] == Transport_INVALIDSOCKET) {
             continue;
         }
         maxfd = MAX(maxfd, obj->fd[rprocId]);
@@ -723,20 +646,30 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
             status = MessageQ_E_UNBLOCKED;
         }
         else {
-            for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
-                 rprocId++) {
-                if (rprocId == MultiProc_self()) {
-                    continue;
-                }
-                if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                    /* Our transport's fd was signalled: Get the message: */
-                    tmpStatus = transportGet(obj->fd[rprocId], msg);
-                    if (tmpStatus < 0) {
-                        printf ("MessageQ_get: tranposrtshm_get failed.");
-                        status = MessageQ_E_FAIL;
-                    }
+           /* start where we last left off */
+           rprocId = last;
+
+           selfId = MultiProc_self();
+           nProcessors = MultiProc_getNumProcessors();
+
+           do {
+                if (rprocId != selfId &&
+                    obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+                   if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+                       /* Our transport's fd was signalled: Get the message */
+                       tmpStatus = transportGet(obj->fd[rprocId], msg);
+                       if (tmpStatus < 0) {
+                           printf ("MessageQ_get: tranposrtshm_get failed.");
+                           status = MessageQ_E_FAIL;
+                       }
+
+                       last = (rprocId + 1) % nProcessors;
+                       break;
+                   }
                 }
-            }
+               rprocId = (rprocId + 1) % nProcessors;
+            } while (rprocId != last);
         }
     }
     else if (retval == 0) {
@@ -927,7 +860,13 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
             PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
             MessageQ_module->sock[remoteProcId] = sock;
             /* Attempt to connect: */
-            ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            if (status < 0) {
+                status = MessageQ_E_RESOURCE;
+                /* don't hard-printf since this is no longer fatal */
+                PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
+                       remoteProcId);
+            }
         }
     }
     else {
@@ -936,6 +875,10 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
 
+    if (status == MessageQ_E_RESOURCE) {
+        MessageQ_detach(remoteProcId);
+    }
+
 exit:
     return (status);
 }
@@ -957,14 +900,16 @@ Int MessageQ_detach (UInt16 remoteProcId)
     pthread_mutex_lock (&(MessageQ_module->gate));
 
     sock = MessageQ_module->sock[remoteProcId];
-    if (close (sock)) {
-        status = MessageQ_E_OSFAILURE;
-        printf ("MessageQ_detach: close failed: %d, %s\n",
-                       errno, strerror(errno));
-    }
-    else {
-        PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
-        MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+    if (sock != Transport_INVALIDSOCKET) {
+        if (close(sock)) {
+            status = MessageQ_E_OSFAILURE;
+            printf("MessageQ_detach: close failed: %d, %s\n",
+                   errno, strerror(errno));
+        }
+        else {
+            PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
+            MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+        }
     }
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
@@ -987,7 +932,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
         PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
+          "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
            getpid())
 
         return;
@@ -1056,8 +1001,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
     err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
     if (err < 0) {
         status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: bind failed: %d, %s\n",
-                  errno, strerror(errno));
+        /* don't hard-printf since this is no longer fatal */
+        PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
+                      errno, strerror(errno));
     }
 
 exit: