Add Ipc attach phase to Linux implementation
authorRamsey Harris <ramsey@ti.com>
Fri, 20 Mar 2015 00:23:16 +0000 (17:23 -0700)
committerRobert Tivy <rtivy@ti.com>
Fri, 20 Mar 2015 22:05:48 +0000 (15:05 -0700)
Refactor the Ipc startup code to separate work done during
setup and during attach. Previous attach mode always assumed
Ipc_ProcSync_ALL, so all work was always done during startup
phase. Add new Ipc config file to LAD for specifying attach
mode. Add new NameServer and TransportRpmsg attach API.
Significant rework of the TransportRpmsg module.

13 files changed:
hlos_common/include/_NameServer.h
linux/include/_Ipc.h [new file with mode: 0644]
linux/include/_lad.h
linux/include/ti/ipc/transports/TransportRpmsg.h
linux/src/api/Ipc.c
linux/src/api/NameServer.c
linux/src/daemon/Ipc_daemon.c [new file with mode: 0644]
linux/src/daemon/Makefile.am
linux/src/daemon/NameServer_daemon.c
linux/src/daemon/cfg/IpcCfg.c [new file with mode: 0644]
linux/src/daemon/lad.c
linux/src/transport/TransportRpmsg.c
packages/ti/ipc/Ipc.h

index 79e0dce73e377406d46706ff807a9549e378136e..4225b3926178829ae4c4a9b76b6cb5d1cb79f941 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -81,6 +81,20 @@ Int NameServer_setup (Void);
  */
 Int NameServer_destroy (void);
 
+/** @cond INTERNAL */
+/*!
+ *  @brief      Establish connection to remote processor
+ */
+/** @endcond INTERNAL */
+Int NameServer_attach(UInt16 procId);
+
+/** @cond INTERNAL */
+/*!
+ *  @brief       Remove connection to remote processor
+ */
+/** @endcond INTERNAL */
+Int NameServer_detach(UInt16 procId);
+
 #if defined (__cplusplus)
 }
 #endif
diff --git a/linux/include/_Ipc.h b/linux/include/_Ipc.h
new file mode 100644 (file)
index 0000000..0a9ef42
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ *  Copyright (c) 2015 Texas Instruments Incorporated - http://www.ti.com
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions
+ *  are met:
+ *
+ *  *  Redistributions of source code must retain the above copyright
+ *     notice, this list of conditions and the following disclaimer.
+ *
+ *  *  Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *
+ *  *  Neither the name of Texas Instruments Incorporated nor the names of
+ *     its contributors may be used to endorse or promote products derived
+ *     from this software without specific prior written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ *  THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ *  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ *  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ *  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ *  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ *  OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ *  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ *  OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/** ============================================================================
+ *  @file   _Ipc.h
+ *
+ *  @brief  Internal data structure shared between API and LAD
+ *  ============================================================================
+ */
+
+#ifndef _IPC_H
+#define _IPC_H
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+typedef enum {
+    Ipc_ProcSync_NONE,
+    Ipc_ProcSync_PAIR,
+    Ipc_ProcSync_ALL
+} Ipc_ProcSync;
+
+typedef struct {
+    Ipc_ProcSync procSync;
+} Ipc_Config;
+
+Void Ipc_getConfig(Ipc_Config *cfg);
+
+#if defined (__cplusplus)
+}
+#endif
+#endif
index 00a0c1e6b9216552071698fb71f1992f4c60bdce..bd756c037d00d29ff934eaf97692bf008777f5a3 100644 (file)
@@ -48,6 +48,7 @@ extern "C" {
 #include <stdio.h>
 #include <ti/ipc/GateMP.h>
 #include <_GateMP.h>
+#include <_Ipc.h>
 #include <GateHWSpinlock.h>
 #include <sys/time.h>
 #include <ti/ipc/namesrv/_NameServerRemoteRpmsg.h>
@@ -152,6 +153,7 @@ extern struct timeval start_tv;
 typedef enum {
     LAD_CONNECT = 0,
     LAD_DISCONNECT,
+    LAD_IPC_GETCONFIG,
     LAD_NAMESERVER_SETUP,
     LAD_NAMESERVER_DESTROY,
     LAD_NAMESERVER_PARAMS_INIT,
@@ -163,6 +165,8 @@ typedef enum {
     LAD_NAMESERVER_GETUINT32,
     LAD_NAMESERVER_REMOVE,
     LAD_NAMESERVER_REMOVEENTRY,
+    LAD_NAMESERVER_ATTACH,
+    LAD_NAMESERVER_DETACH,
     LAD_MESSAGEQ_GETCONFIG,
     LAD_MESSAGEQ_SETUP,
     LAD_MESSAGEQ_DESTROY,
@@ -225,6 +229,12 @@ struct LAD_CommandObj {
             NameServer_Handle handle;
             Ptr entryPtr;
         } removeEntry;
+        struct {
+            UInt16 procId;
+        } attach;
+        struct {
+            UInt16 procId;
+        } detach;
         struct {
             MessageQ_Config cfg;
         } messageQSetup;
@@ -316,6 +326,7 @@ union LAD_ResponseObj {
        Int status;
        GateHWSpinlock_Config cfgParams;
     } gateHWSpinlockGetConfig;
+    Ipc_Config ipcConfig;
     NameServer_Params params;
     NameServer_Handle handle;
     Ptr entryPtr;
index 81505e04c87cfc0e9f92acb483f253c7e4ef9fc8..66179e2695c50540856c8adf0b7def2d6df21819 100644 (file)
@@ -61,8 +61,7 @@ typedef struct TransportRpmsg_Params TransportRpmsg_Params;
 
 typedef IMessageQTransport_Handle TransportRpmsg_Handle;
 
-TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *param,
-                                            Int *attachStatus);
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params);
 Void TransportRpmsg_delete(TransportRpmsg_Handle *hp);
 
 IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle);
index b6290941f5180e9f9df0a50f56307bbd5496f4ea..d13c3383b292994f99674e38a6c98f0a8acc7f77 100644 (file)
@@ -42,6 +42,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <signal.h>
+#include <string.h>
 
 /* package headers */
 #include <ti/ipc/Std.h>
@@ -55,6 +56,7 @@
 #include <ti/ipc/MultiProc.h>
 #include <GateHWSpinlock.h>
 #include <_GateMP.h>
+#include <_Ipc.h>
 #include <_MultiProc.h>
 #include <_MessageQ.h>
 #include <_NameServer.h>
@@ -64,6 +66,8 @@ typedef struct {
     Int                         refCount;
     pthread_mutex_t             gate;
     Ipc_TransportFactoryFxns   *transportFactory;
+    Ipc_Config                  config;
+    Int                         attached[MultiProc_MAXPROCESSORS];
 } Ipc_Module;
 
 
@@ -73,13 +77,22 @@ typedef struct {
  */
 static Ipc_Module Ipc_module = {
     .refCount           = 0,
-    .gate               = PTHREAD_MUTEX_INITIALIZER,
-    .transportFactory   = NULL
+#if defined(IPC_BUILDOS_ANDROID)
+    .gate               = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+// only _NP (non-portable) type available in CG tools which we're using
+    .gate               = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+    .transportFactory   = NULL,
+    .config.procSync    = Ipc_ProcSync_NONE
 };
 
 GateHWSpinlock_Config _GateHWSpinlock_cfgParams;
 static LAD_ClientHandle ladHandle;
 
+/* traces in this file are controlled via _Ipc_verbose */
+Bool _Ipc_verbose = FALSE;
+#define verbose _Ipc_verbose
 
 /** ============================================================================
  *  Functions
@@ -100,12 +113,18 @@ Int Ipc_start(Void)
 #endif
     Int         status;
     LAD_Status  ladStatus;
+    UInt16      procId;
+    UInt16      clusterSize;
+    UInt16      baseId;
+    UInt16      clusterId;
+    Int         i;
 
     /* function must be serialized */
     pthread_mutex_lock(&Ipc_module.gate);
 
     /* ensure only first thread performs startup procedure */
-    if (++Ipc_module.refCount > 1) {
+    if (Ipc_module.refCount >= 1) {
+        Ipc_module.refCount++;
         status = Ipc_S_ALREADYSETUP;
         goto exit;
     }
@@ -116,6 +135,11 @@ Int Ipc_start(Void)
         goto exit;
     }
 
+    /* initialize module object */
+    for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
+        Ipc_module.attached[i] = 0;
+    }
+
     /* Catch ctrl-C, and cleanup: */
     (void) signal(SIGINT, cleanup);
 
@@ -154,35 +178,63 @@ Int Ipc_start(Void)
         }
     }
 
+    /* establish a communication link to the LAD daemon */
     ladStatus = LAD_connect(&ladHandle);
+
     if (ladStatus != LAD_SUCCESS) {
         printf("Ipc_start: LAD_connect() failed: %d\n", ladStatus);
         status = Ipc_E_FAIL;
         goto exit;
     }
 
-    /*  Get MultiProc configuration from LAD and initialize local
-     *  MultiProc config structure.
-     */
+    /* get global configuration from LAD */
+    Ipc_getConfig(&Ipc_module.config);
+
+    /* get global configuration from LAD */
     MultiProc_getConfig(&mpCfg);
     _MultiProc_initCfg(&mpCfg);
 
+    /* setup name server thread in LAD daemon */
     status = NameServer_setup();
 
-    if (status >= 0) {
-        MessageQ_getConfig(&msgqCfg);
-        MessageQ_setup(&msgqCfg);
+    if (status < 0) {
+        printf("Ipc_start: NameServer_setup() failed: %d\n", status);
+        status = Ipc_E_FAIL;
+        goto exit;
+    }
+
+    /* get global configuration from LAD */
+    MessageQ_getConfig(&msgqCfg);
+    MessageQ_setup(&msgqCfg);
 
-        /* invoke the transport factory create method */
-        status = Ipc_module.transportFactory->createFxn();
+    /* invoke the transport factory create method */
+    status = Ipc_module.transportFactory->createFxn();
 
-        if (status < 0) {
-            goto exit;
-        }
+    if (status < 0) {
+        goto exit;
     }
-    else {
-        printf("Ipc_start: NameServer_setup() failed: %d\n", status);
-        status = Ipc_E_FAIL;
+
+    /* if using ProcSync_ALL, then attach to all processors in the cluster */
+    if (Ipc_module.config.procSync == Ipc_ProcSync_ALL) {
+        clusterSize = MultiProc_getNumProcsInCluster();
+        baseId = MultiProc_getBaseIdOfCluster();
+
+        for (clusterId = 0; clusterId < clusterSize; clusterId++) {
+            procId = baseId + clusterId;
+
+            if (procId == MultiProc_self()) {
+                continue;
+            }
+
+            status = Ipc_attach(procId);
+
+            /*  For backward compatibility, it is okay for attach to fail.
+             *  We don't expect all remote processors to be running.
+             */
+            if (status < 0) {
+                /* do nothing */
+            }
+        }
     }
 
     /* Start GateMP only if device has support */
@@ -197,41 +249,25 @@ Int Ipc_start(Void)
 
         status = GateHWSpinlock_start();
         if (status < 0) {
-            printf("Ipc_start: GateHWSpinlock_start failed: %d\n",
-                status);
+            printf("Ipc_start: GateHWSpinlock_start failed: %d\n", status);
             status = Ipc_E_FAIL;
-            goto gatehwspinlockstart_fail;
+            goto exit;
         }
-        else {
-            status = GateMP_start();
-            if (status < 0) {
-                printf("Ipc_start: GateMP_start failed: %d\n",
-                status);
-                status = Ipc_E_FAIL;
-                goto gatempstart_fail;
-            }
+
+        status = GateMP_start();
+        if (status < 0) {
+            printf("Ipc_start: GateMP_start failed: %d\n", status);
+            status = Ipc_E_FAIL;
+            GateHWSpinlock_stop();
+            goto exit;
         }
     }
-#endif
-    /* Success */
-    goto exit;
-#if defined(GATEMP_SUPPORT)
-gatempstart_fail:
-    GateHWSpinlock_stop();
-gatehwspinlockstart_fail:
-#if 0
-    for (procId = procId - 1; (procId > 0) && (status >= 0); procId--) {
-        MessageQ_detach(procId);
-    }
-#endif
 #endif
 
-exit:
-    /* if error, must decrement reference count */
-    if (status < 0) {
-        Ipc_module.refCount--;
-    }
+    /* getting here means we have successfully started */
+    Ipc_module.refCount++;
 
+exit:
     pthread_mutex_unlock(&Ipc_module.gate);
 
     return (status);
@@ -244,44 +280,53 @@ Int Ipc_stop(Void)
 {
     Int32       status = Ipc_S_SUCCESS;
     LAD_Status  ladStatus;
-    Int         i;
     UInt16      procId;
     UInt16      clusterSize;
-    UInt16      clusterBase;
+    UInt16      baseId;
+    UInt16      clusterId;
 
     /* function must be serialized */
     pthread_mutex_lock(&Ipc_module.gate);
 
+    if (Ipc_module.refCount == 0) {
+        status = Ipc_E_INVALIDSTATE;
+        goto exit;
+    }
+
     /* ensure only last thread performs stop procedure */
     if (--Ipc_module.refCount > 0) {
         goto exit;
     }
 
-    /* invoke the transport factory delete method */
-    Ipc_module.transportFactory->deleteFxn();
+    /* if using ProcSync_ALL, then detach from all processors in the cluster */
+    if (Ipc_module.config.procSync == Ipc_ProcSync_ALL) {
+        clusterSize = MultiProc_getNumProcsInCluster();
+        baseId = MultiProc_getBaseIdOfCluster();
 
-    /* needed to enumerate processors in cluster */
-    clusterSize = MultiProc_getNumProcsInCluster();
-    clusterBase = MultiProc_getBaseIdOfCluster();
+        for (clusterId = 0; clusterId < clusterSize; clusterId++) {
+            procId = baseId + clusterId;
 
-    /* detach from all remote processors, assuming they are up */
-    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+            if (MultiProc_self() == procId) {
+                continue;
+            }
 
-        /*  no need to detach from myself */
-        if (MultiProc_self() == procId) {
-            continue;
+            /*  For backward compatibility, we might not be attached to
+             *  all cluster members. Skip unattached processors.
+             */
+            if (!Ipc_isAttached(procId)) {
+                continue;
+            }
+
+            status = Ipc_detach(procId);
+
+            if (status < 0) {
+                /* Should we keep going or stop? */
+            }
         }
-#if 0
-        status = MessageQ_detach(procId);
-        if (status < 0) {
-            printf("Ipc_stop: MessageQ_detach(%d) failed: %d\n",
-                procId, status);
-            status = Ipc_E_FAIL;
-            goto exit;
-       }
-#endif
     }
 
+    Ipc_module.transportFactory->deleteFxn();
+
     status = MessageQ_destroy();
     if (status < 0) {
         printf("Ipc_stop: MessageQ_destroy() failed: %d\n", status);
@@ -342,3 +387,183 @@ static void cleanup(int arg)
     Ipc_stop();
     exit(0);
 }
+
+/*
+ *  ======== Ipc_attach ========
+ */
+Int Ipc_attach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+
+    /* cannot attach to yourself */
+    if (MultiProc_self() == procId) {
+        status =  Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status =  Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* function must be serialized */
+    pthread_mutex_lock(&Ipc_module.gate);
+
+    /* if already attached, just increment reference count */
+    if (Ipc_module.attached[clusterId] > 0) {
+        Ipc_module.attached[clusterId]++;
+        goto done;
+    }
+
+    /* establish name server connection to remote processor */
+    status = NameServer_attach(procId);
+
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    /* attach the transport to remote processor */
+    status = Ipc_module.transportFactory->attachFxn(procId);
+
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    /* getting here means we have successfully attached */
+    Ipc_module.attached[clusterId]++;
+
+done:
+    pthread_mutex_unlock(&Ipc_module.gate);
+
+    return (status);
+}
+
+/*
+ *  ======== Ipc_detach ========
+ */
+Int Ipc_detach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+
+    /* cannot detach from yourself */
+    if (MultiProc_self() == procId) {
+        status =  Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status =  Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* function must be serialized */
+    pthread_mutex_lock(&Ipc_module.gate);
+
+    if (Ipc_module.attached[clusterId] == 0) {
+        status = Ipc_E_INVALIDSTATE;
+        goto done;
+    }
+
+    if (--Ipc_module.attached[clusterId] > 0) {
+        goto done;
+    }
+
+    /* detach transport from remote processor */
+    status = Ipc_module.transportFactory->detachFxn(procId);
+
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        /* report the error */
+        goto done;
+    }
+
+    /* remove connection to remote processor */
+    status = NameServer_detach(procId);
+
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        /* report the error */
+        goto done;
+    }
+
+done:
+    if (status < 0) {
+        /* report error */
+        printf("Ipc_detach: Error %d, procId %d\n", status, procId);
+    }
+    pthread_mutex_unlock(&Ipc_module.gate);
+
+    return (status);
+}
+
+/*
+ *  ======== Ipc_isAttached ========
+ */
+Bool Ipc_isAttached(UInt16 procId)
+{
+    Bool attached;
+    UInt16 clusterId;
+
+    /* cannot be attached to yourself */
+    if (MultiProc_self() == procId) {
+        return (FALSE);
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        return (FALSE);
+    }
+
+    attached = (Ipc_module.attached[clusterId] > 0 ? TRUE : FALSE);
+    return (attached);
+}
+
+/*
+ *  ======== Ipc_getConfig ========
+ *  Get the run-time configuration for the Ipc module
+ *
+ *  This is an IPC internal function. It is used to acquire
+ *  the global Ipc module configuration values from LAD.
+ */
+Void Ipc_getConfig(Ipc_Config *cfg)
+{
+    Int status;
+    LAD_ClientHandle handle;
+    struct LAD_CommandObj cmd;
+    union LAD_ResponseObj rsp;
+
+    handle = LAD_findHandle();
+    if (handle == LAD_MAXNUMCLIENTS) {
+        PRINTVERBOSE0("Ipc_getConfig: no connection to LAD\n");
+        return;
+    }
+
+    cmd.cmd = LAD_IPC_GETCONFIG;
+    cmd.clientId = handle;
+
+    if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("Ipc_getConfig: sending LAD command failed, "
+                "status=%d\n", status);
+        return;
+    }
+
+    if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("Ipc_getConfig: no LAD response, status=%d\n", status);
+        return;
+    }
+
+    memcpy(cfg, &rsp.ipcConfig, sizeof(Ipc_Config));
+    return;
+}
index 525dccedc53a71f2be1f711ff2a038f35d16015e..7b3da335526723d14554087d207976ea6f734d46 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -537,3 +537,81 @@ Int NameServer_delete(NameServer_Handle *nsHandle)
 
     return status;
 }
+
+/*
+ *  ======== NameServer_attach ========
+ *  Internal function.
+ */
+Int NameServer_attach(UInt16 procId)
+{
+    Int status;
+    LAD_ClientHandle clHandle;
+    struct LAD_CommandObj cmd;
+    union LAD_ResponseObj rsp;
+
+    clHandle = LAD_findHandle();
+
+    if (clHandle == LAD_MAXNUMCLIENTS) {
+        PRINTVERBOSE0("NameServer_attach: not connected to LAD\n");
+        return (NameServer_E_RESOURCE);
+    }
+
+    cmd.cmd = LAD_NAMESERVER_ATTACH;
+    cmd.clientId = clHandle;
+    cmd.args.attach.procId = procId;
+
+    if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("NameServer_attach: sending LAD command failed, "
+                "status=%d\n", status);
+        return (NameServer_E_FAIL);
+    }
+
+    if ((status = LAD_getResponse(clHandle, &rsp)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("NameServer_attach: no LAD response, status=%d\n",
+                status);
+        return (NameServer_E_FAIL);
+    }
+
+    status = rsp.status;
+    PRINTVERBOSE1("NameServer_attach: LAD response, status=%d\n", status)
+    return (status);
+}
+
+/*
+ *  ======== NameServer_detach ========
+ *  Internal function.
+ */
+Int NameServer_detach(UInt16 procId)
+{
+    Int status;
+    LAD_ClientHandle clHandle;
+    struct LAD_CommandObj cmd;
+    union LAD_ResponseObj rsp;
+
+    clHandle = LAD_findHandle();
+
+    if (clHandle == LAD_MAXNUMCLIENTS) {
+        PRINTVERBOSE0("NameServer_detach: not connected to LAD\n");
+        return (NameServer_E_RESOURCE);
+    }
+
+    cmd.cmd = LAD_NAMESERVER_DETACH;
+    cmd.clientId = clHandle;
+    cmd.args.detach.procId = procId;
+
+    if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("NameServer_detach: sending LAD command failed, "
+                "status=%d\n", status);
+        return (NameServer_E_FAIL);
+    }
+
+    if ((status = LAD_getResponse(clHandle, &rsp)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("NameServer_detach: no LAD response, status=%d\n",
+                status);
+        return (NameServer_E_FAIL);
+    }
+
+    status = rsp.status;
+    PRINTVERBOSE1("NameServer_detach: LAD response, status=%d\n", status)
+    return (status);
+}
diff --git a/linux/src/daemon/Ipc_daemon.c b/linux/src/daemon/Ipc_daemon.c
new file mode 100644 (file)
index 0000000..c3539da
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 Texas Instruments Incorporated - http://www.ti.com
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*============================================================================
+ *  @file   Ipc.c
+ *
+ *  @brief  Ipc module support in LAD daemon
+ */
+
+/* standard headers */
+#include <string.h>
+#include <assert.h>
+
+/* package headers */
+#include <ti/ipc/Std.h>
+#include <_Ipc.h>
+
+extern Ipc_Config ti_ipc_Ipc_config;
+
+/*
+ *  ======== Ipc_getConfig ========
+ */
+Void Ipc_getConfig(Ipc_Config *cfg)
+{
+    assert(cfg != NULL);
+    memcpy(cfg, &ti_ipc_Ipc_config, sizeof(Ipc_Config));
+}
index 1d2d8620305cd9f6953485e463032b3314e7f6bf..db049f7046847820ed3bf2dcbd006ee342b14f60 100644 (file)
@@ -92,6 +92,8 @@ endif
 
 common_sources = \
                 lad.c \
+                Ipc_daemon.c \
+                cfg/IpcCfg.c \
                 MessageQ_daemon.c \
                 cfg/MessageQCfg.c \
                 MultiProc_daemon.c \
@@ -100,6 +102,7 @@ common_sources = \
                 $(top_srcdir)/hlos_common/include/_NameServerRemoteRpmsg.h \
                 $(top_srcdir)/hlos_common/include/_MessageQ.h \
                 $(top_srcdir)/hlos_common/include/_NameServer.h \
+                $(top_srcdir)/linux/include/_Ipc.h \
                 $(top_srcdir)/linux/include/_MultiProc.h \
                 $(top_srcdir)/linux/include/_lad.h \
                 $(top_srcdir)/linux/include/SocketFxns.h \
index 7b327ee6dde98be924057e0418ebcdb4040537f7..9bc1fad66df31ccdbd2a473324a9062f0c4cd00d 100644 (file)
 
 #define INVALIDSOCKET     (-1)
 
+#define NameServer_Event_ACK            (1 << 0)
+#define NameServer_Event_REFRESH        (1 << 1)
+#define NameServer_Event_SHUTDOWN       (1 << 2)
+
 #if defined (__cplusplus)
 extern "C" {
 #endif
@@ -121,14 +125,15 @@ struct NameServer_Object {
 typedef struct NameServer_ModuleObject {
     CIRCLEQ_HEAD(dummy1, NameServer_Object) objList;
     Int32                refCount;
-    int                  sendSock[MultiProc_MAXPROCESSORS];
-    /* Sockets for sending to remote proc nameserver ports: */
-    int                  recvSock[MultiProc_MAXPROCESSORS];
-    /* Sockets for recving from remote proc nameserver ports: */
+    struct {
+        Int refCount;           /* attached reference count */
+        int sendSock;           /* socket for sending */
+        int recvSock;           /* socket for receiving */
+    } comm[MultiProc_MAXPROCESSORS];
     pthread_t            listener;
     /* Listener thread for NameServer replies and requests. */
     int                  unblockFd;
-    /* Event to post to exit listener. */
+    /* Event to wake up listener thread. */
     int                  waitFd;
     /* Event to post to NameServer_get. */
     NameServerRemote_Msg nsMsg;
@@ -237,13 +242,13 @@ static UInt32 stringHash(String s)
     return (hash);
 }
 
-static void NameServerRemote_processMessage(NameServerRemote_Msg * msg, UInt16 procId)
+static void NameServerRemote_processMessage(NameServerRemote_Msg *msg,
+        UInt16 procId)
 {
     NameServer_Handle handle;
     Int               status = NameServer_E_FAIL;
     int               err;
     uint64_t          buf = 1;
-    int               waitFd = NameServer_module->waitFd;
     UInt16            clusterId;
 
     if (msg->request == NAMESERVER_REQUEST) {
@@ -289,26 +294,27 @@ static void NameServerRemote_processMessage(NameServerRemote_Msg * msg, UInt16 p
 
         /* send response message to remote processor */
         clusterId = procId - MultiProc_getBaseIdOfCluster();
-        err = send(NameServer_module->sendSock[clusterId], msg,
+        err = send(NameServer_module->comm[clusterId].sendSock, msg,
                    sizeof(NameServerRemote_Msg), 0);
         if (err < 0) {
             LOG2("NameServer: send failed: %d, %s\n", errno, strerror(errno))
         }
     }
     else {
-        LOG2("NameServer Reply: instanceName: %s, name: %s",
-             (String)msg->instanceName, (String)msg->name)
-        LOG1(", value: 0x%x\n", msg->value)
+        LOG3("NameServer Reply: instanceName: %s, name: %s, value: 0x%x\n",
+                (String)msg->instanceName, (String)msg->name, msg->value);
 
         /* Save the response message.  */
         memcpy(&NameServer_module->nsMsg, msg, sizeof(NameServerRemote_Msg));
 
         /* Post the eventfd upon which NameServer_get() is waiting */
-        write(waitFd, &buf, sizeof(uint64_t));
+        write(NameServer_module->waitFd, &buf, sizeof(uint64_t));
     }
 }
 
-
+/*
+ *  ======== listener_cb ========
+ */
 static void *listener_cb(void *arg)
 {
     fd_set rfds;
@@ -318,10 +324,12 @@ static void *listener_cb(void *arg)
     struct  sockaddr_rpmsg  fromAddr;
     unsigned int len;
     NameServerRemote_Msg msg;
-    int     byteCount;
-    UInt16  numProcs = MultiProc_getNumProcsInCluster();
-    UInt16  baseId = MultiProc_getBaseIdOfCluster();
-    int     sock;
+    int nbytes;
+    UInt16 numProcs = MultiProc_getNumProcsInCluster();
+    UInt16 baseId = MultiProc_getBaseIdOfCluster();
+    int sock;
+    uint64_t event;
+    Bool run = TRUE;
 
     LOG0("listener_cb: Entered Listener thread.\n")
 
@@ -333,30 +341,34 @@ static void *listener_cb(void *arg)
 
         for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
             if ((MultiProc_self() == procId)
-                || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+                || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
                 continue;
             }
-            sock = NameServer_module->recvSock[i];
+            sock = NameServer_module->comm[i].recvSock;
             FD_SET(sock, &rfds);
             maxfd = sock > maxfd ? sock : maxfd;
         }
 
-        maxfd = maxfd + 1;
         LOG2("NameServer: waiting for unblockFd: %d, and socks: maxfd: %d\n",
              NameServer_module->unblockFd, maxfd)
-        ret = select(maxfd, &rfds, NULL, NULL, NULL);
+
+        /* wait here until new data available */
+        ret = select(maxfd + 1, &rfds, NULL, NULL, NULL);
+
         if (ret == -1) {
             LOG0("listener_cb: select failed.")
             break;
         }
         LOG0("NameServer: back from select()\n")
 
+        /* check all receive sockets for pending data */
         for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
             if ((MultiProc_self() == procId)
-                || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+                || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
                 continue;
             }
-            sock = NameServer_module->recvSock[i];
+            sock = NameServer_module->comm[i].recvSock;
+
             if (FD_ISSET(sock, &rfds)) {
                 LOG1("NameServer: Listener got NameServer message "
                      "from sock: %d!\n", sock);
@@ -364,31 +376,45 @@ static void *listener_cb(void *arg)
                 memset(&fromAddr, 0, sizeof(fromAddr));
                 len = sizeof(fromAddr);
 
-                byteCount = recvfrom(sock, &msg, sizeof(NameServerRemote_Msg), 0,
+                nbytes = recvfrom(sock, &msg, sizeof(NameServerRemote_Msg), 0,
                                 (struct sockaddr *)&fromAddr, &len);
                 if (len != sizeof(fromAddr)) {
                     LOG1("recvfrom: got bad addr len (%d)\n", len)
                     break;
                 }
-                if (byteCount < 0) {
+                if (nbytes < 0) {
                     LOG2("recvfrom failed: %s (%d)\n", strerror(errno), errno)
                     break;
                 }
                 else {
                     LOG1("listener_cb: recvfrom socket: fd: %d\n", sock)
-                    LOG2("\tReceived ns msg: byteCount: %d, from addr: %d, ",
-                         byteCount, fromAddr.addr)
+                    LOG2("\tReceived ns msg: nbytes: %d, from addr: %d, ",
+                         nbytes, fromAddr.addr)
                     LOG1("from vproc: %d\n", fromAddr.vproc_id)
                     NameServerRemote_processMessage(&msg, procId);
                 }
             }
         }
+
+        /* check for events */
         if (FD_ISSET(NameServer_module->unblockFd, &rfds)) {
-            /* We are told to unblock and exit: */
-            LOG0("NameServer: Listener thread exiting\n")
-            break;
+
+            read(NameServer_module->unblockFd, &event, sizeof(event));
+
+            if (event & NameServer_Event_SHUTDOWN) {
+                LOG0("NameServer: listener thread, event: SHUTDOWN\n")
+                event &= ~NameServer_Event_SHUTDOWN;
+                run = FALSE;
+            }
+            if (event & NameServer_Event_REFRESH) {
+                LOG0("NameServer: listener thread, event: REFRESH\n")
+                /* send ACK event */
+                event = NameServer_Event_ACK;
+                write(NameServer_module->waitFd, &event, sizeof(event));
+            }
         }
-    } while (1);
+
+    } while (run);
 
     return ((void *)ret);
 }
@@ -398,17 +424,15 @@ static void *listener_cb(void *arg)
  * =============================================================================
  */
 
-/* Function to setup the nameserver module. */
+/*
+ *  ======== NameServer_setup ========
+ *  Function to setup the name server module
+ */
 Int NameServer_setup(Void)
 {
     Int    status = NameServer_S_SUCCESS;
-    int    err;
-    int    sock;
     int    ret;
-    int    clId;
-    UInt16 procId;
-    UInt16 numProcs;
-    UInt16 baseId;
+    Int    i;
 
     pthread_mutex_lock(&NameServer_module->modGate);
 
@@ -423,79 +447,28 @@ Int NameServer_setup(Void)
         goto exit;
     }
 
+    /* counter event object for passing commands to worker thread */
     NameServer_module->unblockFd = eventfd(0, 0);
+
     if (NameServer_module->unblockFd < 0) {
         status = NameServer_E_FAIL;
         LOG0("NameServer_setup: failed to create unblockFd.\n")
         goto exit;
     }
 
-    NameServer_module->waitFd = eventfd(0, 0);
+    /* semaphore event object for acknowledging LAD command thread */
+    NameServer_module->waitFd = eventfd(0, EFD_SEMAPHORE);
+
     if (NameServer_module->waitFd < 0) {
         status = NameServer_E_FAIL;
         LOG0("NameServer_setup: failed to create waitFd.\n")
         goto exit;
     }
 
-    numProcs = MultiProc_getNumProcsInCluster();
-    baseId = MultiProc_getBaseIdOfCluster();
-
-    for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
-        NameServer_module->sendSock[clId] = INVALIDSOCKET;
-        NameServer_module->recvSock[clId] = INVALIDSOCKET;
-
-        /* Only support NameServer to remote procs: */
-        if (MultiProc_self() == procId) {
-            continue;
-        }
-
-        /* Create the socket for sending messages to each remote proc: */
-        sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-        if (sock < 0) {
-            status = NameServer_E_FAIL;
-            LOG2("NameServer_setup: socket failed: %d, %s\n",
-                 errno, strerror(errno))
-        }
-        else  {
-            LOG2("NameServer_setup: created send socket: %d, procId %d\n",
-                    sock, procId)
-            err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
-            if (err < 0) {
-                status = NameServer_E_FAIL;
-                LOG3("NameServer_setup: connect failed: procId=%d, "
-                        "errno=%d (%s)\n", procId, errno, strerror(errno))
-
-                LOG1("    closing send socket: %d\n", sock)
-                close(sock);
-            }
-            else {
-                NameServer_module->sendSock[clId] = sock;
-            }
-        }
-
-        /* Create the socket for recving messages from each remote proc: */
-        sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-        if (sock < 0) {
-            status = NameServer_E_FAIL;
-            LOG2("NameServer_setup: socket failed: %d, %s\n",
-                 errno, strerror(errno))
-        }
-        else  {
-            LOG2("NameServer_setup: created recv socket: %d, procId %d\n",
-                    sock, procId)
-            err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
-            if (err < 0) {
-                status = NameServer_E_FAIL;
-                LOG2("NameServer_setup: bind failed: %d, %s\n",
-                    errno, strerror(errno))
-
-                LOG1("    closing recv socket: %d\n", sock)
-                close(sock);
-            }
-            else {
-                NameServer_module->recvSock[clId] = sock;
-            }
-        }
+    for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
+        NameServer_module->comm[i].refCount = 0;
+        NameServer_module->comm[i].sendSock = INVALIDSOCKET;
+        NameServer_module->comm[i].recvSock = INVALIDSOCKET;
     }
 
     /* Construct the list object */
@@ -506,20 +479,8 @@ Int NameServer_setup(Void)
     ret = pthread_create(&NameServer_module->listener, NULL, listener_cb, NULL);
     if (ret) {
         LOG1("NameServer_setup: can't spawn thread: %s\n", strerror(ret))
-        LOG0("NameServer_setup: eventfd failed");
-
         status = NameServer_E_FAIL;
     }
-    else {
-        /* look for at least one good send/recv pair to indicate success */
-        for (clId = 0; clId < numProcs; clId++) {
-            if (NameServer_module->sendSock[clId] != INVALIDSOCKET &&
-                NameServer_module->recvSock[clId] != INVALIDSOCKET) {
-                status = NameServer_S_SUCCESS;
-                break;
-            }
-        }
-    }
 
 exit:
     LOG1("NameServer_setup: exiting, refCount=%d\n",
@@ -530,16 +491,14 @@ exit:
     return (status);
 }
 
-/*! Function to destroy the nameserver module. */
+/*
+ *  ======== NameServer_destroy ========
+ *  Function to destroy the name server module
+ */
 Int NameServer_destroy(void)
 {
-    Int      status = NameServer_S_SUCCESS;
-    UInt16   numProcs;
-    UInt16   baseId;
-    UInt16   procId;
-    int      clId;
-    int      sock;
-    uint64_t buf = 1;
+    Int status = NameServer_S_SUCCESS;
+    uint64_t event;
 
     pthread_mutex_lock(&NameServer_module->modGate);
 
@@ -555,39 +514,12 @@ Int NameServer_destroy(void)
         goto exit;
     }
 
-    numProcs = MultiProc_getNumProcsInCluster();
-    baseId = MultiProc_getBaseIdOfCluster();
-
-    LOG2("NameServer_destroy: numProcs=%d, baseId=%d\n", numProcs, baseId);
-
-    for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
-
-        /* Only support NameServer to remote procs: */
-        if (MultiProc_self() == procId) {
-            continue;
-        }
-
-        /* Close the socket: */
-        sock = NameServer_module->sendSock[clId];
-        if (sock != INVALIDSOCKET) {
-            LOG1("NameServer_destroy: closing socket: %d\n", sock)
-            close(sock);
-            NameServer_module->sendSock[clId] = INVALIDSOCKET;
-        }
-        /* Close the socket: */
-        sock = NameServer_module->recvSock[clId];
-        if (sock != INVALIDSOCKET) {
-            LOG1("NameServer_destroy: closing socket: %d\n", sock)
-            close(sock);
-            NameServer_module->recvSock[clId] = INVALIDSOCKET;
-        }
-    }
-
     CIRCLEQ_destruct(&NameServer_module->objList);
 
-    /* Unblock the NameServer listener thread: */
-    LOG0("NameServer_destroy: unblocking listener...\n")
-    write(NameServer_module->unblockFd, &buf, sizeof(uint64_t));
+    /* shutdown the NameServer listener thread */
+    LOG0("NameServer_destroy: shutdown listener...\n")
+    event = NameServer_Event_SHUTDOWN;
+    write(NameServer_module->unblockFd, &event, sizeof(event));
 
     /* Join: */
     LOG0("NameServer_destroy: joining listener thread...\n")
@@ -1013,7 +945,7 @@ Int NameServer_getRemote(NameServer_Handle handle,
 
     /* Create request message and send to remote: */
     clusterId = procId - MultiProc_getBaseIdOfCluster();
-    sock = NameServer_module->sendSock[clusterId];
+    sock = NameServer_module->comm[clusterId].sendSock;
     if (sock == INVALIDSOCKET) {
         LOG1("NameServer_getRemote: no socket connection to processor %d\n",
              procId);
@@ -1052,7 +984,9 @@ Int NameServer_getRemote(NameServer_Handle handle,
         FD_SET(waitFd, &rfds);
         maxfd = waitFd + 1;
         LOG1("NameServer_getRemote: pending on waitFd: %d\n", waitFd)
+
         ret = select(maxfd, &rfds, NULL, NULL, &tv);
+
         if (ret == -1) {
             LOG0("NameServer_getRemote: select failed.")
             status = NameServer_E_FAIL;
@@ -1321,6 +1255,142 @@ Int NameServer_getLocalUInt32(NameServer_Handle handle, String name, Ptr value)
     return (status);
 }
 
+/*
+ *  ======== NameServer_attach ========
+ */
+Int NameServer_attach(UInt16 procId)
+{
+    Int status = NameServer_S_SUCCESS;
+    int sock;
+    int err;
+    UInt16 clId;
+    uint64_t event;
+
+    /* procId already validated in API layer */
+    clId = procId - MultiProc_getBaseIdOfCluster();
+
+    /* must reference count because we have multiple clients */
+    if (NameServer_module->comm[clId].refCount > 0) {
+        NameServer_module->comm[clId].refCount++;
+        goto done;
+    }
+
+    /* create socket for sending messages to remote processor */
+    sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (sock < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+    NameServer_module->comm[clId].sendSock = sock;
+    LOG2("NameServer_attach: created send socket: %d, procId %d\n", sock,
+            procId);
+
+    err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
+    if (err < 0) {
+        status = NameServer_E_FAIL;
+        LOG3("NameServer_attach: connect failed: procId=%d, errno=%d (%s)\n",
+                procId, errno, strerror(errno));
+        goto done;
+    }
+
+    /* create socket for receiving messages from remote processor */
+    sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (sock < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+    NameServer_module->comm[clId].recvSock = sock;
+    LOG2("NameServer_attach: created receive socket: %d, procId %d\n", sock,
+            procId);
+
+    err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
+    if (err < 0) {
+        status = NameServer_E_FAIL;
+        LOG2("NameServer_attach: bind failed: %d, %s\n", errno,
+                strerror(errno));
+        goto done;
+    }
+
+    /* getting here means we have successfully attached */
+    NameServer_module->comm[clId].refCount++;
+
+    /* tell the listener thread to add new receive sockets */
+    event = NameServer_Event_REFRESH;
+    write(NameServer_module->unblockFd, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(NameServer_module->waitFd, &event, sizeof(event));
+
+done:
+    if (status < 0) {
+        sock = NameServer_module->comm[clId].recvSock;
+        if (sock != INVALIDSOCKET) {
+            LOG1("    closing receive socket: %d\n", sock)
+            close(sock);
+            NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+        }
+
+        sock = NameServer_module->comm[clId].sendSock;
+        if (sock != INVALIDSOCKET) {
+            LOG1("    closing send socket: %d\n", sock)
+            close(sock);
+            NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+        }
+    }
+
+    return (status);
+}
+
+/*
+ *  ======== NameServer_detach ========
+ */
+Int NameServer_detach(UInt16 procId)
+{
+    Int status = NameServer_S_SUCCESS;
+    UInt16 clId;
+    int sendSock;
+    int recvSock;
+    uint64_t event;
+
+    /* procId already validated in API layer */
+    clId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (--NameServer_module->comm[clId].refCount > 0) {
+        goto done;
+    }
+
+    /* remove sockets from active list */
+    sendSock = NameServer_module->comm[clId].sendSock;
+    NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+
+    recvSock = NameServer_module->comm[clId].recvSock;
+    NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+
+    /* tell the listener thread to remove old sockets */
+    event = NameServer_Event_REFRESH;
+    write(NameServer_module->unblockFd, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(NameServer_module->waitFd, &event, sizeof(event));
+
+    /* close the sending socket */
+    LOG1("NameServer_destroy: closing socket: %d\n", sendSock)
+    close(sendSock);
+
+    /* close the receiving socket */
+    LOG1("NameServer_destroy: closing socket: %d\n", recvSock)
+    close(recvSock);
+
+    /* decrement the reference count */
+    NameServer_module->comm[clId].refCount--;
+
+done:
+    return (status);
+}
 
 #if defined (__cplusplus)
 }
diff --git a/linux/src/daemon/cfg/IpcCfg.c b/linux/src/daemon/cfg/IpcCfg.c
new file mode 100644 (file)
index 0000000..1668284
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2015 Texas Instruments Incorporated - http://www.ti.com
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*============================================================================
+ *  @file   Ipc.c
+ *
+ *  @brief  Module configuration
+ */
+
+#include <ti/ipc/Std.h>
+#include <_Ipc.h>
+
+/*
+ *  ======== ti_ipc_Ipc_config ========
+ *  The Ipc module configuration object
+ */
+Ipc_Config ti_ipc_Ipc_config = {
+    .procSync = Ipc_ProcSync_ALL
+};
index 1162c2d3701bf48ec4f60b81c5b79b9e257c4427..ca0b1ca12123d7756ebaab094dc9fadc9215c6cf 100644 (file)
@@ -372,6 +372,10 @@ opencommandFIFO:
 
             break;
 
+          case LAD_IPC_GETCONFIG:
+            Ipc_getConfig(&rsp.ipcConfig);
+            break;
+
           case LAD_NAMESERVER_SETUP:
             LOG0("LAD_NAMESERVER_SETUP: calling NameServer_setup()...\n")
 
@@ -522,6 +526,14 @@ opencommandFIFO:
 
             break;
 
+          case LAD_NAMESERVER_ATTACH:
+            rsp.status = NameServer_attach(cmd.args.attach.procId);
+            break;
+
+          case LAD_NAMESERVER_DETACH:
+            rsp.status = NameServer_detach(cmd.args.detach.procId);
+            break;
+
           case LAD_MESSAGEQ_GETCONFIG:
             LOG0("LAD_MESSAGEQ_GETCONFIG: calling MessageQ_getConfig()...\n")
 
@@ -699,6 +711,7 @@ opencommandFIFO:
           case LAD_DISCONNECT:
             break;
 
+          case LAD_IPC_GETCONFIG:
           case LAD_NAMESERVER_SETUP:
           case LAD_NAMESERVER_DESTROY:
           case LAD_NAMESERVER_PARAMS_INIT:
@@ -710,6 +723,8 @@ opencommandFIFO:
           case LAD_NAMESERVER_GETUINT32:
           case LAD_NAMESERVER_REMOVE:
           case LAD_NAMESERVER_REMOVEENTRY:
+          case LAD_NAMESERVER_ATTACH:
+          case LAD_NAMESERVER_DETACH:
           case LAD_MESSAGEQ_GETCONFIG:
           case LAD_MESSAGEQ_SETUP:
           case LAD_MESSAGEQ_DESTROY:
index ae44f7526d04e2c07cc4c82a06e1d6adcaf7aec7..cb9eea4cd05871bb366f9756b2b037b0fb6447d7 100644 (file)
 #define MESSAGEQ_RPMSG_MAXSIZE   512
 
 #define TransportRpmsg_GROWSIZE 32
+#define INVALIDSOCKET (-1)
+
+#define TransportRpmsg_Event_ACK        (1 << 0)
+#define TransportRpmsg_Event_PAUSE      (1 << 1)
+#define TransportRpmsg_Event_CONTINUE   (1 << 2)
+#define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
+
 
 #define _MAX(a,b) (((a)>(b))?(a):(b))
 
@@ -83,7 +90,8 @@ typedef struct TransportRpmsg_Module {
     int             inFds[1024];
     int                    nInFds;
     pthread_mutex_t gate;
-    int             unblockEvent;    /* eventFd for unblocking socket */
+    int             unblockEvent;    /* unblock the dispatch thread */
+    int             waitEvent;       /* block the client thread */
     pthread_t       threadId;        /* ID returned by pthread_create() */
     Bool            threadStarted;
 
@@ -105,14 +113,14 @@ typedef struct TransportRpmsg_Object {
 } TransportRpmsg_Object;
 
 TransportRpmsg_Module TransportRpmsg_state = {
-    .sock = {0},
+    .sock = {INVALIDSOCKET},
+    .unblockEvent = -1,
+    .waitEvent = -1,
     .threadStarted = FALSE,
     .inst = NULL
 };
 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
 
-static Int attach(UInt16 rprocId);
-static Int detach(UInt16 rprocId);
 static void *rpmsgThreadFxn(void *arg);
 static Int transportGet(int sock, MessageQ_Msg *retMsg);
 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
@@ -121,12 +129,17 @@ static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
 
+/* factory functions */
 Int TransportRpmsg_Factory_create(Void);
 Void TransportRpmsg_Factory_delete(Void);
+Int TransportRpmsg_Factory_attach(UInt16 procId);
+Int TransportRpmsg_Factory_detach(UInt16 procId);
 
 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
     .createFxn = TransportRpmsg_Factory_create,
-    .deleteFxn = TransportRpmsg_Factory_delete
+    .deleteFxn = TransportRpmsg_Factory_delete,
+    .attachFxn = TransportRpmsg_Factory_attach,
+    .detachFxn = TransportRpmsg_Factory_detach
 };
 
 /* -------------------------------------------------------------------------- */
@@ -143,237 +156,205 @@ TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
     return ((TransportRpmsg_Handle)base);
 }
 
-TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
-                                            Int *attachStatus)
-{
-    TransportRpmsg_Object *obj;
-    Int rv;
-
-    rv = attach(params->rprocId);
-    if (attachStatus) {
-        *attachStatus = rv;
-    }
-
-    if (rv != MessageQ_S_SUCCESS) {
-        return NULL;
-    }
-
-    obj = calloc(1, sizeof (TransportRpmsg_Object));
-
-    /* structure copy */
-    obj->base.base.interfaceType = IMessageQTransport_TypeId;
-    obj->base.fxns = &TransportRpmsg_fxns;
-    obj->rprocId = params->rprocId;
-
-    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
-    obj->numQueues = TransportRpmsg_GROWSIZE;
-
-    return (TransportRpmsg_Handle)obj;
-}
-
-Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
+/*
+ *  ======== TransportRpmsg_create ========
+ */
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
 {
-    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
-
-    detach(obj->rprocId);
+    Int status = MessageQ_S_SUCCESS;
+    TransportRpmsg_Object *obj = NULL;
+    int sock;
+    UInt16 clusterId;
 
-    free(obj->qIndexToFd);
-    free(obj);
-
-    *handlep = NULL;
-}
-
-static Int attach(UInt16 rprocId)
-{
-    Int     status = MessageQ_S_SUCCESS;
-    int     sock;
-    UInt16  clusterId;
 
+    clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
 
-    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
-
-    /* Create the socket for sending messages to the remote proc: */
+    /* create socket for sending messages to remote processor */
     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-    if (sock < 0) {
-        status = MessageQ_E_FAIL;
-        printf("attach: socket failed: %d (%s)\n",
-               errno, strerror(errno));
 
-        goto exit;
+    if (sock < 0) {
+        status = Ipc_E_FAIL;
+        printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
+                strerror(errno));
+        goto done;
     }
-
+    TransportRpmsg_module->sock[clusterId] = sock;
     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
 
-    /* Attempt to connect: */
-    status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
-    if (status < 0) {
-        /* is it ok to "borrow" this error code from MessageQ? */
-        status = MessageQ_E_RESOURCE;
-
-        /* don't hard-printf or exit since this is no longer fatal */
-        PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+    status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
 
-        goto exitSock;
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
+                errno, strerror(errno), params->rprocId);
+        goto done;
     }
 
-    TransportRpmsg_module->sock[clusterId] = sock;
-
-    if (TransportRpmsg_module->threadStarted == FALSE) {
-        /* create a module wide event to unblock the socket select thread */
-        TransportRpmsg_module->unblockEvent = eventfd(0, 0);
-        if (TransportRpmsg_module->unblockEvent == -1) {
-            printf("attach: unblock socket failed: %d (%s)\n",
-                   errno, strerror(errno));
-            status = MessageQ_E_FAIL;
-
-            goto exitSock;
-        }
-
-        PRINTVERBOSE1("attach: created unblock event %d\n",
-                      TransportRpmsg_module->unblockEvent)
-
-        FD_ZERO(&TransportRpmsg_module->rfds);
-        FD_SET(TransportRpmsg_module->unblockEvent,
-               &TransportRpmsg_module->rfds);
-        TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
-        TransportRpmsg_module->nInFds = 0;
-
-        pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+    /* create the instance object */
+    obj = calloc(1, sizeof(TransportRpmsg_Object));
 
-        status = pthread_create(&TransportRpmsg_module->threadId, NULL,
-                                &rpmsgThreadFxn, NULL);
-        if (status < 0) {
-            status = MessageQ_E_FAIL;
-            printf("attach: failed to spawn thread\n");
-
-            goto exitEvent;
-        }
-        else {
-            TransportRpmsg_module->threadStarted = TRUE;
-        }
+    if (obj == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
     }
 
-    goto exit;
+    /* initialize the instance */
+    obj->base.base.interfaceType = IMessageQTransport_TypeId;
+    obj->base.fxns = &TransportRpmsg_fxns;
+    obj->rprocId = params->rprocId;
+    obj->numQueues = TransportRpmsg_GROWSIZE;
 
-exitEvent:
-    close(TransportRpmsg_module->unblockEvent);
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
 
-    FD_ZERO(&TransportRpmsg_module->rfds);
-    TransportRpmsg_module->maxFd = 0;
+    if (obj->qIndexToFd == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
+    }
 
-exitSock:
-    close(sock);
-    TransportRpmsg_module->sock[clusterId] = 0;
+done:
+    if (status < 0) {
+        TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
+    }
 
-exit:
-    return status;
+    return (TransportRpmsg_Handle)obj;
 }
 
-static Int detach(UInt16 rprocId)
+/*
+ *  ======== TransportRpmsg_delete ========
+ */
+Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
 {
+    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
+    UInt16 clusterId;
+    int sock;
 
-    Int     status = -1;
-    int     sock;
-    UInt16  clusterId;
 
-    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
-    sock = TransportRpmsg_module->sock[clusterId];
+    clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
 
-    if (sock) {
+    /* close the socket for the given transport instance */
+    sock = TransportRpmsg_module->sock[clusterId];
+    if (sock != INVALIDSOCKET) {
         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+        close(sock);
+    }
+    TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
 
-        status = close(sock);
+    if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
+        free(obj->qIndexToFd);
+        obj->qIndexToFd = NULL;
     }
 
-    return status;
+    if (obj != NULL) {
+        free(obj);
+        obj = NULL;
+    }
+
+    *pHandle = NULL;
 }
 
+/*
+ *  ======== TransportRpmsg_bind ========
+ */
 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
-    UInt16   queuePort = queueId & 0x0000ffff;
-    int      fd;
-    int      err;
-    uint64_t buf;
-    UInt16   rprocId;
+    UInt16 queuePort = queueId & 0x0000ffff;
+    int fd;
+    int err;
+    uint64_t event;
+    UInt16 rprocId;
+    pthread_t tid;
 
+    tid = pthread_self();
     rprocId = obj->rprocId;
 
-    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
-            "queuePort 0x%x\n", rprocId, queuePort)
+    PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
+            "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
 
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
     if (fd < 0) {
         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
                 errno, strerror(errno));
-        goto exitClose;
+        return (MessageQ_E_OSFAILURE);
     }
-
-    PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
+    PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
+            (unsigned int)tid);
 
     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
     if (err < 0) {
         /* don't hard-printf since this is no longer fatal */
         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
-                      errno, strerror(errno))
-
+                      errno, strerror(errno));
         close(fd);
-
-        return -1;
+        return (MessageQ_E_OSFAILURE);
     }
 
     pthread_mutex_lock(&TransportRpmsg_module->gate);
 
+    /*  pause the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+    PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
+            (int)event, (unsigned int)tid);
+
     /* add to our fat fd array and update select() parameters */
     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
     FD_SET(fd, &TransportRpmsg_module->rfds);
-
-    pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
     bindFdToQueueIndex(obj, fd, queuePort);
 
-    /*
-     * Even though we use the unblock event as just a signalling event with
-     * no related payload, we need to write some non-zero value.  Might as
-     * well make it the fd (which the reader could decide to use if needed).
-     */
-    buf = fd;
-    write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+    /* release the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-    goto exit;
-
-exitClose:
-    TransportRpmsg_unbind(handle, fd);
-    fd = 0;
+    pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-exit:
-    return fd;
+    return (MessageQ_S_SUCCESS);
 }
 
+/*
+ *  ======== TransportRpmsg_unbind ========
+ */
 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
     UInt16 queuePort = queueId & 0x0000ffff;
-    uint64_t buf;
+    uint64_t event;
     Int    status = MessageQ_S_SUCCESS;
     int    maxFd;
     int    fd;
     int    i;
     int    j;
 
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  pause the dispatch thread */
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+
+    /* retrieve file descriptor for the given queue port */
     fd = queueIndexToFd(obj, queuePort);
     if (!fd) {
         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
-                      queueId)
-
-        return -1;
+                queueId);
+        status = MessageQ_E_INVALIDARG;
+        goto done;
     }
-
     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
+    /* guarenteed to work because queueIndexToFd above succeeded */
+    unbindQueueIndex(obj, queuePort);
 
     /* remove from input fd array */
     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
@@ -383,42 +364,39 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
             /* shift subsequent elements down */
             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
                 TransportRpmsg_module->inFds[j] =
-                    TransportRpmsg_module->inFds[j + 1];
+                        TransportRpmsg_module->inFds[j + 1];
             }
-            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
-
-            FD_CLR(fd, &TransportRpmsg_module->rfds);
-            if (fd == TransportRpmsg_module->maxFd) {
-                /* find new max fd */
-                maxFd = TransportRpmsg_module->unblockEvent;
-                for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
-                    maxFd = _MAX(TransportRpmsg_module->inFds[j], maxFd);
-                }
-                TransportRpmsg_module->maxFd = maxFd;
-            }
-
-            /*
-             * Even though we use the unblock event as just a signalling
-             * event with no related payload, we need to write some non-zero
-             * value.  Might as well make it the fd (which the reader could
-             * decide to use if needed).
-             */
-            buf = fd;
-            write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
             break;
         }
+    }
 
-        close(fd);
+    /* remove fd from the descriptor set, compute new max value */
+    FD_CLR(fd, &TransportRpmsg_module->rfds);
+    if (fd == TransportRpmsg_module->maxFd) {
+        /* find new max fd */
+        maxFd = TransportRpmsg_module->unblockEvent;
+        for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+            maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
+        }
+        TransportRpmsg_module->maxFd = maxFd;
     }
 
-    unbindQueueIndex(obj, queuePort);
+    close(fd);
 
+    /* release the dispatch thread */
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+done:
     pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-    return status;
+    return (status);
 }
 
+/*
+ *  ======== TransportRpmsg_put ========
+ */
 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
 {
     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
@@ -458,97 +436,103 @@ exit:
     return status;
 }
 
+/*
+ *  ======== TransportRpmsg_control ========
+ */
 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
 {
     return FALSE;
 }
 
+/*
+ *  ======== rpmsgThreadFxn ========
+ */
 void *rpmsgThreadFxn(void *arg)
 {
-    static int lastFdx = 0;
-    int      curFdx = 0;
     Int      status = MessageQ_S_SUCCESS;
     Int      tmpStatus;
     int      retval;
-    uint64_t buf;
+    uint64_t event;
     fd_set   rfds;
     int      maxFd;
     int      nfds;
     MessageQ_Msg     retMsg;
     MessageQ_QueueId queueId;
+    Bool run = TRUE;
+    int i;
+    int fd;
 
-    while (TRUE) {
-        pthread_mutex_lock(&TransportRpmsg_module->gate);
 
+    while (run) {
         maxFd = TransportRpmsg_module->maxFd;
         rfds = TransportRpmsg_module->rfds;
         nfds = TransportRpmsg_module->nInFds;
 
-        pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
-                      (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+                (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
 
         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
-        if (retval) {
-            if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
-                /*
-                 * Our event was signalled by TransportRpmsg_bind()
-                 * or TransportRpmsg_unbind() to tell us that the set of
-                 * fds has changed.
-                 */
-                PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
-
-                /* we don't need the written value */
-                read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-            }
-            else {
-                /* start where we last left off */
-                curFdx = lastFdx;
-
-                /*
-                 * The set of fds that's used by select has been recorded
-                 * locally, but the array of fds that are scanned below is
-                 * a changing set (MessageQ_create/delete() can change it).
-                 * While this might present an issue in itself, one key
-                 * takeaway is that 'nfds' must not be zero else the % below
-                 * will cause a divide-by-zero exception.  We won't even get
-                 * here if nfds == 0 since it's a local copy of the module's
-                 * 'nInFds' which has to be > 0 for us to get here.  So, even
-                 * though the module's 'nInFds' might go to 0 during this loop,
-                 * the loop itself will still remain intact.
-                 */
-                do {
-                    if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
-
-                        PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
-                                      TransportRpmsg_module->inFds[curFdx])
-
-                        /* transport input fd was signalled: get the message */
-                        tmpStatus = transportGet(
-                            TransportRpmsg_module->inFds[curFdx], &retMsg);
-                        if (tmpStatus < 0) {
-                            printf("rpmsgThreadFxn: transportGet failed.");
-                            status = MessageQ_E_FAIL;
-                        }
-                        else {
-                            queueId = MessageQ_getDstQueue(retMsg);
-
-                            PRINTVERBOSE1("rpmsgThreadFxn: got message, "
-                                    "delivering to queueId 0x%x\n", queueId)
-
-                            MessageQ_put(queueId, retMsg);
-                        }
-
-                        lastFdx = (curFdx + 1) % nfds;
-
-                        break;
-                    }
-
-                    curFdx = (curFdx + 1) % nfds;
-                } while (curFdx != lastFdx);
+
+        /* if error, try again */
+        if (retval < 0) {
+            printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
+            continue;
+        }
+
+        /* dispatch all pending messages, do this first */
+        for (i = 0; i < nfds; i++) {
+            fd = TransportRpmsg_module->inFds[i];
+
+            if (FD_ISSET(fd, &rfds)) {
+                PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+                        TransportRpmsg_module->inFds[i]);
+
+                /* transport input fd was signalled: get the message */
+                tmpStatus = transportGet(fd, &retMsg);
+                if (tmpStatus < 0) {
+                    printf("rpmsgThreadFxn: transportGet failed\n");
+                }
+                else {
+                    queueId = MessageQ_getDstQueue(retMsg);
+                    PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+                            "delivering to queueId 0x%x\n", queueId)
+                    MessageQ_put(queueId, retMsg);
+                }
             }
         }
+
+        /* check for events */
+        if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+
+            read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+            do {
+                if (event & TransportRpmsg_Event_SHUTDOWN) {
+                    PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
+                    run = FALSE;
+                    break; /* highest priority, stop processing events */
+                }
+                if (event & TransportRpmsg_Event_CONTINUE) {
+                    PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
+                            (int)event);
+                    event &= ~TransportRpmsg_Event_CONTINUE;
+                }
+                if (event & TransportRpmsg_Event_PAUSE) {
+                    /*  Our event was signalled by TransportRpmsg_bind()
+                     *  or TransportRpmsg_unbind() to tell us that the set
+                     *  of file descriptors has changed.
+                     */
+                    PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
+                    /* send the acknowledgement */
+                    event = TransportRpmsg_Event_ACK;
+                    write(TransportRpmsg_module->waitEvent, &event,
+                            sizeof(event));
+                    /* now wait to be released */
+                    read(TransportRpmsg_module->unblockEvent, &event,
+                            sizeof(event));
+                }
+            } while (event != 0);
+        }
     }
 
     return (void *)status;
@@ -557,7 +541,7 @@ void *rpmsgThreadFxn(void *arg)
 /*
  * ======== transportGet ========
  *  Retrieve a message waiting in the socket's queue.
-*/
+ */
 static Int transportGet(int sock, MessageQ_Msg *retMsg)
 {
     Int           status    = MessageQ_S_SUCCESS;
@@ -622,6 +606,11 @@ exit:
     return status;
 }
 
+/*
+ *  ======== bindFdToQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
 {
     Int *queues;
@@ -636,7 +625,7 @@ Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
                 queueIndex + TransportRpmsg_GROWSIZE)
 
-        /* allocate larget table */
+        /* allocate larger table */
         oldSize = obj->numQueues * sizeof (Int);
         queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
 
@@ -654,6 +643,11 @@ Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
     obj->qIndexToFd[queueIndex] = fd;
 }
 
+/*
+ *  ======== unbindQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
     UInt queueIndex;
@@ -662,9 +656,14 @@ Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
     queueIndex = queuePort - MessageQ_PORTOFFSET;
 
     /* clear table entry */
-    obj->qIndexToFd[queueIndex] = 0;
+    obj->qIndexToFd[queueIndex] = -1;
 }
 
+/*
+ *  ======== queueIndexToFd ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
     UInt queueIndex;
@@ -688,26 +687,14 @@ Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
  */
 Int TransportRpmsg_Factory_create(Void)
 {
-    Int     status;
-    Int     attachStatus;
-    Int     i;
-    UInt16  procId;
-    Int32   attachedAny;
-    UInt16  clusterSize;
-    UInt16  clusterBase;
-
-    TransportRpmsg_Handle      *inst;
-    TransportRpmsg_Handle       transport;
-    TransportRpmsg_Params       params;
-    IMessageQTransport_Handle   iMsgQTrans;
-
+    Int status = Ipc_S_SUCCESS;
+    Int i;
+    UInt16 clusterSize;
+    TransportRpmsg_Handle *inst;
 
-    status = Ipc_S_SUCCESS;
-    attachedAny = FALSE;
 
     /* needed to enumerate processors in cluster */
     clusterSize = MultiProc_getNumProcsInCluster();
-    clusterBase = MultiProc_getBaseIdOfCluster();
 
     /* allocate the instance array */
     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
@@ -715,44 +702,63 @@ Int TransportRpmsg_Factory_create(Void)
     if (inst == NULL) {
         printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
         status = Ipc_E_MEMORY;
-        goto exit;
+        goto done;
+    }
+
+    for (i = 0; i < clusterSize; i++) {
+        inst[i] = NULL;
     }
 
     TransportRpmsg_module->inst = inst;
 
-    /* create transport instance for all processors in cluster */
-    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+    /* counter event object for passing commands to dispatch thread */
+    TransportRpmsg_module->unblockEvent = eventfd(0, 0);
 
-        if (MultiProc_self() == procId) {
-            continue;
-        }
+    if (TransportRpmsg_module->unblockEvent == -1) {
+        printf("create: unblock event failed: %d (%s)\n",
+               errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
+    }
 
-        params.rprocId = procId;
-        transport = TransportRpmsg_create(&params, &attachStatus);
+    PRINTVERBOSE1("create: created unblock event %d\n",
+            TransportRpmsg_module->unblockEvent)
 
-        if (transport != NULL) {
-            iMsgQTrans = TransportRpmsg_upCast(transport);
-            MessageQ_registerTransport(iMsgQTrans, procId, 0);
-            attachedAny = TRUE;
-        }
-        else {
-            if (attachStatus == MessageQ_E_RESOURCE) {
-                continue;
-            }
-            printf("TransportRpmsg_Factory_create: failed to attach to "
-                    "procId=%d status=%d\n", procId, attachStatus);
-            status = Ipc_E_FAIL;
-            break;
-        }
+    /* semaphore event object for acknowledging client thread */
+    TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
 
-        TransportRpmsg_module->inst[i] = transport;
+    if (TransportRpmsg_module->waitEvent == -1) {
+        printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
     }
 
-    if (!attachedAny) {
+    PRINTVERBOSE1("create: created wait event %d\n",
+            TransportRpmsg_module->waitEvent)
+
+    FD_ZERO(&TransportRpmsg_module->rfds);
+    FD_SET(TransportRpmsg_module->unblockEvent,
+            &TransportRpmsg_module->rfds);
+    TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+    TransportRpmsg_module->nInFds = 0;
+
+    pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+    status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+            &rpmsgThreadFxn, NULL);
+
+    if (status < 0) {
         status = Ipc_E_FAIL;
+        printf("attach: failed to spawn thread\n");
+        goto done;
+    }
+    TransportRpmsg_module->threadStarted = TRUE;
+
+done:
+    if (status < 0) {
+        TransportRpmsg_Factory_delete();
     }
 
-exit:
     return (status);
 }
 
@@ -762,27 +768,117 @@ exit:
  */
 Void TransportRpmsg_Factory_delete(Void)
 {
-    Int     i;
-    UInt16  procId;
-    UInt16  clusterSize;
-    UInt16  clusterBase;
+    uint64_t event;
 
-    /* needed to enumerate processors in cluster */
-    clusterSize = MultiProc_getNumProcsInCluster();
-    clusterBase = MultiProc_getBaseIdOfCluster();
 
-    /* detach from all remote processors, assuming they are up */
-    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+    /* shutdown the message dispatch thread */
+    if (TransportRpmsg_module->threadStarted) {
+        event = TransportRpmsg_Event_SHUTDOWN;
+        write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-        if (MultiProc_self() == procId) {
-            continue;
-        }
+        /* wait for dispatch thread to exit */
+        pthread_join(TransportRpmsg_module->threadId, NULL);
+    }
 
-        if (TransportRpmsg_module->inst[i] != NULL) {
-            MessageQ_unregisterTransport(procId, 0);
-            TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
-        }
+    /* destroy the mutex object */
+    pthread_mutex_destroy(&TransportRpmsg_module->gate);
+
+    /* close the client wait event */
+    if (TransportRpmsg_module->waitEvent != -1) {
+        close(TransportRpmsg_module->waitEvent);
+        TransportRpmsg_module->waitEvent = -1;
+    }
+
+    /* close the dispatch thread unblock event */
+    if (TransportRpmsg_module->unblockEvent != -1) {
+        close(TransportRpmsg_module->unblockEvent);
+        TransportRpmsg_module->unblockEvent = -1;
+    }
+
+    /* free the instance handle array */
+    if (TransportRpmsg_module->inst != NULL) {
+        free(TransportRpmsg_module->inst);
+        TransportRpmsg_module->inst = NULL;
     }
 
     return;
 }
+
+/*
+ *  ======== TransportRpmsg_Factory_attach ========
+ */
+Int TransportRpmsg_Factory_attach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+    TransportRpmsg_Params params;
+    TransportRpmsg_Handle transport;
+    IMessageQTransport_Handle iMsgQTrans;
+
+    /* cannot attach to yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* create transport instance for given processor */
+    params.rprocId = procId;
+    transport = TransportRpmsg_create(&params);
+
+    if (transport == NULL) {
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    /* register transport instance with MessageQ */
+    iMsgQTrans = TransportRpmsg_upCast(transport);
+    MessageQ_registerTransport(iMsgQTrans, procId, 0);
+    TransportRpmsg_module->inst[clusterId] = transport;
+
+done:
+    return (status);
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_detach ========
+ */
+Int TransportRpmsg_Factory_detach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+
+    /* cannot detach from yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* must be attached in order to detach */
+    if (TransportRpmsg_module->inst[clusterId] == NULL) {
+        status = Ipc_E_INVALIDSTATE;
+        goto done;
+    }
+
+    /* unregister from MessageQ, delete the transport instance */
+    MessageQ_unregisterTransport(procId, 0);
+    TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
+
+done:
+    return (status);
+}
index 9141ee5c14af8348b2a58fe6d766f602230a3d17..01f9704fbda24843e4ef526c61997521cc95ccf8 100644 (file)
@@ -147,6 +147,8 @@ extern "C" {
 typedef struct {
     Int (*createFxn)(Void);             /*!< factory create method      */
     Void (*deleteFxn)(Void);            /*!< factory finalize method    */
+    Int (*attachFxn)(UInt16 procId);    /*!< attach transport to processor */
+    Int (*detachFxn)(UInt16 procId);    /*!< detach transport from processor */
 } Ipc_TransportFactoryFxns;