Changes to support non-NameServer-capable slaves on Linux
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 5921fe78325111b341ea28621cdc463a342bd9c5..827af575bd152447f5627250bb0eab09a98167a9 100644 (file)
@@ -221,7 +221,7 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
 }
 
 /* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+Int MessageQ_setup(const MessageQ_Config * cfg)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -266,10 +266,9 @@ Int MessageQ_setup (const MessageQ_Config * cfg)
 
     /* Clear sockets array. */
     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
-       MessageQ_module->sock[i]      = Transport_INVALIDSOCKET;
+        MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
     }
 
-
     return status;
 }
 
@@ -333,7 +332,7 @@ Void MessageQ_Params_init (MessageQ_Params * params)
  */
 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
 {
-    Int                   status    = MessageQ_S_SUCCESS;
+    Int                   status;
     MessageQ_Object *     obj    = NULL;
     UInt16                queueIndex = 0u;
     UInt16                procId;
@@ -413,7 +412,7 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
         status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
                                            queueIndex);
         if (status < 0) {
-           goto cleanup;
+           obj->fd[rprocId] = Transport_INVALIDSOCKET;
         }
     }
 
@@ -425,13 +424,20 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
     if (obj->unblockFd == -1)  {
         printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
                    errno, strerror(errno));
-        status = MessageQ_E_FAIL;
+        MessageQ_delete((MessageQ_Handle *)&obj);
     }
+    else {
+        int endpointFound = 0;
 
-cleanup:
-    /* Cleanup if fail: */
-    if (status < 0) {
-        MessageQ_delete((MessageQ_Handle *)&obj);
+        for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
+            if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+                endpointFound = 1;
+            }
+        }
+        if (!endpointFound) {
+            printf("MessageQ_create: no transport endpoints found, deleting\n");
+            MessageQ_delete((MessageQ_Handle *)&obj);
+        }
     }
 
     return ((MessageQ_Handle) obj);
@@ -599,7 +605,8 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
     /* Wait (with timeout) and retreive message from socket: */
     FD_ZERO(&rfds);
     for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (rprocId == MultiProc_self()) {
+        if (rprocId == MultiProc_self() ||
+            obj->fd[rprocId] == Transport_INVALIDSOCKET) {
             continue;
         }
         maxfd = MAX(maxfd, obj->fd[rprocId]);
@@ -638,7 +645,8 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
         else {
             for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
                  rprocId++) {
-                if (rprocId == MultiProc_self()) {
+                if (rprocId == MultiProc_self() ||
+                    obj->fd[rprocId] == Transport_INVALIDSOCKET) {
                     continue;
                 }
                 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
@@ -840,7 +848,13 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
             PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
             MessageQ_module->sock[remoteProcId] = sock;
             /* Attempt to connect: */
-            ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+            if (status < 0) {
+                status = MessageQ_E_RESOURCE;
+                /* don't hard-printf since this is no longer fatal */
+                PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
+                       remoteProcId);
+            }
         }
     }
     else {
@@ -849,6 +863,10 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
 
+    if (status == MessageQ_E_RESOURCE) {
+        MessageQ_detach(remoteProcId);
+    }
+
 exit:
     return (status);
 }
@@ -870,14 +888,16 @@ Int MessageQ_detach (UInt16 remoteProcId)
     pthread_mutex_lock (&(MessageQ_module->gate));
 
     sock = MessageQ_module->sock[remoteProcId];
-    if (close (sock)) {
-        status = MessageQ_E_OSFAILURE;
-        printf ("MessageQ_detach: close failed: %d, %s\n",
-                       errno, strerror(errno));
-    }
-    else {
-        PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
-        MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+    if (sock != Transport_INVALIDSOCKET) {
+        if (close(sock)) {
+            status = MessageQ_E_OSFAILURE;
+            printf("MessageQ_detach: close failed: %d, %s\n",
+                   errno, strerror(errno));
+        }
+        else {
+            PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
+            MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+        }
     }
 
     pthread_mutex_unlock (&(MessageQ_module->gate));
@@ -900,7 +920,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
         PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
+          "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
            getpid())
 
         return;
@@ -969,8 +989,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
     err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
     if (err < 0) {
         status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: bind failed: %d, %s\n",
-                  errno, strerror(errno));
+        /* don't hard-printf since this is no longer fatal */
+        PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
+                      errno, strerror(errno));
     }
 
 exit: