Linux: Reattach to Proc In NameServer daemon In case of Error Recovery
authorAngela Stegmaier <angelabaker@ti.com>
Fri, 15 Apr 2016 15:01:50 +0000 (10:01 -0500)
committerAngela Stegmaier <angelabaker@ti.com>
Wed, 20 Apr 2016 18:47:49 +0000 (13:47 -0500)
In the case of error recovery on one core while more than one
core is currently connected to the LAD daemon, the refcount for
the NameServer daemon will never reach zero unless all connected
applications call Ipc_stop. This will prevent the NameServer
daemon from reconnecting to the recovered remote core until all
applications call Ipc_stop. This is not desireable behavior because
this prevents an application opening a MessageQ on the recovered
core until all applications call Ipc_stop.

Instead, the patch makes changes to internally re-connect with the
recovered core within NameServer daemon itself in the case of
remote core error recovery. The NameServer daemon will continue
trying to reconnect to the recovered core until successful. In
this way, the other applications are not required to exit for a new
MessageQ_open to be successful with the recovered core.

Signed-off-by: Angela Stegmaier <angelabaker@ti.com>
linux/src/api/Ipc.c
linux/src/daemon/NameServer_daemon.c

index 1b9d25f1e9bd401682acf12661bffd3656385d56..90725d394dcd26058a87184522ddfb280475e3b7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
+ * Copyright (c) 2012-2016 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -459,6 +459,7 @@ Int Ipc_attach(UInt16 procId)
     status = Ipc_module.transportFactory->attachFxn(procId);
 
     if (status < 0) {
+        NameServer_detach(procId);
         status = Ipc_E_FAIL;
         goto done;
     }
index e2961762be1d26baccd38263a3d143a6e774fb72..288b39ebf009c55d5e7d44a572202d8d2c7919f0 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
+ * Copyright (c) 2012-2016 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -145,6 +145,7 @@ typedef struct NameServer_ModuleObject {
     NameServer_Params    defInstParams;
     /* Default instance paramters */
     pthread_mutex_t      modGate;
+    pthread_mutex_t      attachGate;
 } NameServer_ModuleObject;
 
 #define CIRCLEQ_elemClear(elem) { \
@@ -178,6 +179,7 @@ static NameServer_ModuleObject NameServer_state = {
 // only _NP (non-portable) type available in CG tools which we're using
     .modGate                         = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
 #endif
+    .attachGate                      = PTHREAD_MUTEX_INITIALIZER,
     .refCount                        = 0
 };
 
@@ -241,6 +243,97 @@ static UInt32 stringHash(String s)
     return (hash);
 }
 
+static Int NameServer_reattach(UInt16 procId)
+{
+    Int status = NameServer_S_SUCCESS;
+    UInt16 clId;
+    int sendSock = INVALIDSOCKET;
+    int recvSock = INVALIDSOCKET;
+    int err;
+
+    /* procId already validated in API layer */
+    clId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (NameServer_module->comm[clId].refCount == 0) {
+        goto done;
+    }
+
+    LOG2("NameServer_reattach: --> procId=%d, refCount=%d\n",
+            procId, NameServer_module->comm[clId].refCount)
+
+    /* first create new sockets */
+    sendSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (sendSock < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_reattach: socket failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+    LOG2("NameServer_reattach: created send socket: %d, procId %d\n", sendSock,
+            procId);
+
+    err = ConnectSocket(sendSock, procId, MESSAGEQ_RPMSG_PORT);
+    if (err < 0) {
+        status = NameServer_E_FAIL;
+        LOG3("NameServer_reattach: connect failed: procId=%d, errno=%d (%s)\n",
+                procId, errno, strerror(errno));
+        goto done;
+    }
+
+    /* create socket for receiving messages from remote processor */
+    recvSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (recvSock < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_reattach: socket failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+
+    LOG2("NameServer_attach: created receive socket: %d, procId %d\n", recvSock,
+            procId);
+
+    err = SocketBindAddr(recvSock, procId, NAME_SERVER_RPMSG_ADDR);
+    if (err < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_attach: bind failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+
+    /* then close old sockets */
+    /* close the sending socket */
+    LOG1("NameServer_reattach: closing socket: %d\n",
+         NameServer_module->comm[clId].sendSock)
+    close(NameServer_module->comm[clId].sendSock);
+
+    /* close the receiving socket */
+    LOG1("NameServer_reattach: closing socket: %d\n",
+         NameServer_module->comm[clId].recvSock)
+    close(NameServer_module->comm[clId].recvSock);
+
+    /* assign new sockets */
+    NameServer_module->comm[clId].sendSock = sendSock;
+    NameServer_module->comm[clId].recvSock = recvSock;
+
+done:
+    if (status < 0) {
+        if (recvSock >= 0) {
+            LOG1("    closing receive socket: %d\n", recvSock)
+            close(recvSock);
+        }
+
+        if (sendSock >= 0) {
+            LOG1("    closing send socket: %d\n", sendSock)
+            close(sendSock);
+        }
+    }
+
+    LOG2("NameServer_reattach: <-- refCount=%d, status=%d\n",
+            NameServer_module->comm[clId].refCount, status)
+
+    return (status);
+}
+
 static void NameServerRemote_processMessage(NameServerRemote_Msg *msg,
         UInt16 procId)
 {
@@ -330,6 +423,7 @@ static void *listener_cb(void *arg)
     int sock;
     uint64_t event;
     Bool run = TRUE;
+    Bool reconnect = FALSE;
 
     LOG0("listener_cb: Entered Listener thread.\n")
 
@@ -384,6 +478,10 @@ static void *listener_cb(void *arg)
                 }
                 if (nbytes < 0) {
                     LOG2("recvfrom failed: %s (%d)\n", strerror(errno), errno)
+                    if (errno == ENOLINK) {
+                        LOG0("Socket is no longer valid, MUST re-attach!\n");
+                        reconnect = TRUE;
+                    }
                     break;
                 }
                 else {
@@ -414,6 +512,14 @@ static void *listener_cb(void *arg)
             }
         }
 
+        if (reconnect) {
+            reconnect = FALSE;
+            /* grab lock to prevent users from attach/deattach while recovering */
+            pthread_mutex_lock(&NameServer_module->attachGate);
+            NameServer_reattach(procId);
+            pthread_mutex_unlock(&NameServer_module->attachGate);
+        }
+
     } while (run);
 
     return ((void *)ret);
@@ -1281,6 +1387,8 @@ Int NameServer_attach(UInt16 procId)
     /* procId already validated in API layer */
     clId = procId - MultiProc_getBaseIdOfCluster();
 
+    pthread_mutex_lock(&NameServer_module->attachGate);
+
     LOG2("NameServer_attach: --> procId=%d, refCount=%d\n",
             procId, NameServer_module->comm[clId].refCount)
 
@@ -1333,6 +1441,8 @@ Int NameServer_attach(UInt16 procId)
     /* getting here means we have successfully attached */
     NameServer_module->comm[clId].refCount++;
 
+    pthread_mutex_unlock(&NameServer_module->attachGate);
+
     /* tell the listener thread to add new receive sockets */
     event = NameServer_Event_REFRESH;
     write(NameServer_module->unblockFd, &event, sizeof(event));
@@ -1340,6 +1450,8 @@ Int NameServer_attach(UInt16 procId)
     /* wait for ACK event */
     read(NameServer_module->waitFd, &event, sizeof(event));
 
+    pthread_mutex_lock(&NameServer_module->attachGate);
+
 done:
     if (status < 0) {
         sock = NameServer_module->comm[clId].recvSock;
@@ -1360,6 +1472,8 @@ done:
     LOG2("NameServer_attach: <-- refCount=%d, status=%d\n",
             NameServer_module->comm[clId].refCount, status)
 
+    pthread_mutex_unlock(&NameServer_module->attachGate);
+
     return (status);
 }
 
@@ -1377,11 +1491,14 @@ Int NameServer_detach(UInt16 procId)
     /* procId already validated in API layer */
     clId = procId - MultiProc_getBaseIdOfCluster();
 
+    pthread_mutex_lock(&NameServer_module->attachGate);
+
     LOG2("NameServer_detach: --> procId=%d, refCount=%d\n",
             procId, NameServer_module->comm[clId].refCount)
 
     /* decrement reference count regardless of outcome below */
     if (--NameServer_module->comm[clId].refCount > 0) {
+        pthread_mutex_unlock(&NameServer_module->attachGate);
         goto done;
     }
 
@@ -1392,6 +1509,8 @@ Int NameServer_detach(UInt16 procId)
     recvSock = NameServer_module->comm[clId].recvSock;
     NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
 
+    pthread_mutex_unlock(&NameServer_module->attachGate);
+
     /* tell the listener thread to remove old sockets */
     event = NameServer_Event_REFRESH;
     write(NameServer_module->unblockFd, &event, sizeof(event));