]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/transport/TransportRpmsg.c
Tests: ping_rpmsg: Update Test with Proper Socket Usage
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
index ae44f7526d04e2c07cc4c82a06e1d6adcaf7aec7..960b922ba4f99f1cce6aa6a18a4fb3546c28e439 100644 (file)
@@ -46,6 +46,7 @@
 #include <fcntl.h>
 #include <pthread.h>
 
+
 /* Socket Protocol Family */
 #include <net/rpmsg.h>
 
 #include <_MessageQ.h>
 #include <_lad.h>
 
+#if !defined(EFD_SEMAPHORE)
+#  define EFD_SEMAPHORE (1 << 0)
+#endif
+
 /* More magic rpmsg port numbers: */
 #define MESSAGEQ_RPMSG_PORT       61
 #define MESSAGEQ_RPMSG_MAXSIZE   512
 
 #define TransportRpmsg_GROWSIZE 32
+#define INVALIDSOCKET (-1)
+
+#define TransportRpmsg_Event_ACK        (1 << 0)
+#define TransportRpmsg_Event_PAUSE      (1 << 1)
+#define TransportRpmsg_Event_CONTINUE   (1 << 2)
+#define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
+
 
 #define _MAX(a,b) (((a)>(b))?(a):(b))
 
@@ -80,10 +92,14 @@ typedef struct TransportRpmsg_Module {
     int             sock[MultiProc_MAXPROCESSORS];
     fd_set          rfds;
     int             maxFd;
-    int             inFds[1024];
+    struct {
+        int     fd;
+        UInt32  qId;
+    } inFds[1024];
     int                    nInFds;
     pthread_mutex_t gate;
-    int             unblockEvent;    /* eventFd for unblocking socket */
+    int             unblockEvent;    /* unblock the dispatch thread */
+    int             waitEvent;       /* block the client thread */
     pthread_t       threadId;        /* ID returned by pthread_create() */
     Bool            threadStarted;
 
@@ -105,14 +121,14 @@ typedef struct TransportRpmsg_Object {
 } TransportRpmsg_Object;
 
 TransportRpmsg_Module TransportRpmsg_state = {
-    .sock = {0},
+    .sock = {INVALIDSOCKET},
+    .unblockEvent = -1,
+    .waitEvent = -1,
     .threadStarted = FALSE,
     .inst = NULL
 };
 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
 
-static Int attach(UInt16 rprocId);
-static Int detach(UInt16 rprocId);
 static void *rpmsgThreadFxn(void *arg);
 static Int transportGet(int sock, MessageQ_Msg *retMsg);
 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
@@ -121,12 +137,17 @@ static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
 
+/* factory functions */
 Int TransportRpmsg_Factory_create(Void);
 Void TransportRpmsg_Factory_delete(Void);
+Int TransportRpmsg_Factory_attach(UInt16 procId);
+Int TransportRpmsg_Factory_detach(UInt16 procId);
 
 Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
     .createFxn = TransportRpmsg_Factory_create,
-    .deleteFxn = TransportRpmsg_Factory_delete
+    .deleteFxn = TransportRpmsg_Factory_delete,
+    .attachFxn = TransportRpmsg_Factory_attach,
+    .detachFxn = TransportRpmsg_Factory_detach
 };
 
 /* -------------------------------------------------------------------------- */
@@ -143,282 +164,286 @@ TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
     return ((TransportRpmsg_Handle)base);
 }
 
-TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
-                                            Int *attachStatus)
-{
-    TransportRpmsg_Object *obj;
-    Int rv;
-
-    rv = attach(params->rprocId);
-    if (attachStatus) {
-        *attachStatus = rv;
-    }
-
-    if (rv != MessageQ_S_SUCCESS) {
-        return NULL;
-    }
-
-    obj = calloc(1, sizeof (TransportRpmsg_Object));
-
-    /* structure copy */
-    obj->base.base.interfaceType = IMessageQTransport_TypeId;
-    obj->base.fxns = &TransportRpmsg_fxns;
-    obj->rprocId = params->rprocId;
-
-    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
-    obj->numQueues = TransportRpmsg_GROWSIZE;
-
-    return (TransportRpmsg_Handle)obj;
-}
-
-Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
-{
-    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
-
-    detach(obj->rprocId);
-
-    free(obj->qIndexToFd);
-    free(obj);
-
-    *handlep = NULL;
-}
-
-static Int attach(UInt16 rprocId)
+/*
+ *  ======== TransportRpmsg_create ========
+ */
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
 {
-    Int     status = MessageQ_S_SUCCESS;
-    int     sock;
-    UInt16  clusterId;
+    Int status = MessageQ_S_SUCCESS;
+    TransportRpmsg_Object *obj = NULL;
+    int sock;
+    int flags;
+    UInt16 clusterId;
+    int i;
 
 
-    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+    clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
 
-    /* Create the socket for sending messages to the remote proc: */
+    /* create socket for sending messages to remote processor */
     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-    if (sock < 0) {
-        status = MessageQ_E_FAIL;
-        printf("attach: socket failed: %d (%s)\n",
-               errno, strerror(errno));
 
-        goto exit;
+    if (sock < 0) {
+        status = Ipc_E_FAIL;
+        fprintf(stderr,
+                "TransportRpmsg_create: socket failed: %d (%s)\n", errno,
+                strerror(errno));
+        goto done;
     }
-
+    TransportRpmsg_module->sock[clusterId] = sock;
     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
 
-    /* Attempt to connect: */
-    status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
-    if (status < 0) {
-        /* is it ok to "borrow" this error code from MessageQ? */
-        status = MessageQ_E_RESOURCE;
-
-        /* don't hard-printf or exit since this is no longer fatal */
-        PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+    status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
 
-        goto exitSock;
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        fprintf(stderr,
+                "TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
+                errno, strerror(errno), params->rprocId);
+        goto done;
     }
 
-    TransportRpmsg_module->sock[clusterId] = sock;
-
-    if (TransportRpmsg_module->threadStarted == FALSE) {
-        /* create a module wide event to unblock the socket select thread */
-        TransportRpmsg_module->unblockEvent = eventfd(0, 0);
-        if (TransportRpmsg_module->unblockEvent == -1) {
-            printf("attach: unblock socket failed: %d (%s)\n",
-                   errno, strerror(errno));
-            status = MessageQ_E_FAIL;
-
-            goto exitSock;
-        }
+    /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
+    flags = fcntl(sock, F_GETFD);
+    if (flags != -1) {
+        fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
+    }
 
-        PRINTVERBOSE1("attach: created unblock event %d\n",
-                      TransportRpmsg_module->unblockEvent)
+    /* create the instance object */
+    obj = calloc(1, sizeof(TransportRpmsg_Object));
 
-        FD_ZERO(&TransportRpmsg_module->rfds);
-        FD_SET(TransportRpmsg_module->unblockEvent,
-               &TransportRpmsg_module->rfds);
-        TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
-        TransportRpmsg_module->nInFds = 0;
+    if (obj == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
+    }
 
-        pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+    /* initialize the instance */
+    obj->base.base.interfaceType = IMessageQTransport_TypeId;
+    obj->base.fxns = &TransportRpmsg_fxns;
+    obj->rprocId = params->rprocId;
+    obj->numQueues = TransportRpmsg_GROWSIZE;
 
-        status = pthread_create(&TransportRpmsg_module->threadId, NULL,
-                                &rpmsgThreadFxn, NULL);
-        if (status < 0) {
-            status = MessageQ_E_FAIL;
-            printf("attach: failed to spawn thread\n");
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
 
-            goto exitEvent;
-        }
-        else {
-            TransportRpmsg_module->threadStarted = TRUE;
-        }
+    if (obj->qIndexToFd == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
     }
 
-    goto exit;
-
-exitEvent:
-    close(TransportRpmsg_module->unblockEvent);
-
-    FD_ZERO(&TransportRpmsg_module->rfds);
-    TransportRpmsg_module->maxFd = 0;
+    /* must initialize array */
+    for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
+        obj->qIndexToFd[i] = -1;
+    }
 
-exitSock:
-    close(sock);
-    TransportRpmsg_module->sock[clusterId] = 0;
+done:
+    if (status < 0) {
+        TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
+    }
 
-exit:
-    return status;
+    return (TransportRpmsg_Handle)obj;
 }
 
-static Int detach(UInt16 rprocId)
+/*
+ *  ======== TransportRpmsg_delete ========
+ */
+Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
 {
+    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
+    UInt16 clusterId;
+    int sock;
 
-    Int     status = -1;
-    int     sock;
-    UInt16  clusterId;
 
-    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
-    sock = TransportRpmsg_module->sock[clusterId];
+    clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
 
-    if (sock) {
+    /* close the socket for the given transport instance */
+    sock = TransportRpmsg_module->sock[clusterId];
+    if (sock != INVALIDSOCKET) {
         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+        close(sock);
+    }
+    TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
 
-        status = close(sock);
+    if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
+        free(obj->qIndexToFd);
+        obj->qIndexToFd = NULL;
     }
 
-    return status;
+    if (obj != NULL) {
+        free(obj);
+        obj = NULL;
+    }
+
+    *pHandle = NULL;
 }
 
+/*
+ *  ======== TransportRpmsg_bind ========
+ */
 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
-    UInt16   queuePort = queueId & 0x0000ffff;
-    int      fd;
-    int      err;
-    uint64_t buf;
-    UInt16   rprocId;
+    UInt16 queuePort = queueId & 0x0000ffff;
+    int fd;
+    int flags;
+    int err;
+    uint64_t event;
+    UInt16 rprocId;
+    pthread_t tid;
+    Int status = MessageQ_S_SUCCESS;
 
+    tid = pthread_self();
     rprocId = obj->rprocId;
 
-    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
-            "queuePort 0x%x\n", rprocId, queuePort)
+    PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
+            "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
+
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  Check if binding already exists.
+     *
+     *  There is a race condition between a thread calling MessageQ_create
+     *  and another thread calling Ipc_attach. Must make sure we don't bind
+     *  the same queue twice.
+     */
+    if (queueIndexToFd(obj, queueId) != -1) {
+        goto done;
+    }
 
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
     if (fd < 0) {
-        printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
+        fprintf(stderr, "TransportRpmsg_bind: socket call failed: %d (%s)\n",
                 errno, strerror(errno));
-        goto exitClose;
+        status = MessageQ_E_OSFAILURE;
+        goto done;
     }
-
-    PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
+    PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
+            (unsigned int)tid);
 
     err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
     if (err < 0) {
         /* don't hard-printf since this is no longer fatal */
         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
-                      errno, strerror(errno))
-
+                      errno, strerror(errno));
         close(fd);
+        status = MessageQ_E_OSFAILURE;
+        goto done;
+    }
 
-        return -1;
+    /* make sure socket fd doesn't exist for 'fork() -> exec*()'ed child */
+    flags = fcntl(fd, F_GETFD);
+    if (flags != -1) {
+        fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
     }
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
+    /*  pause the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+    PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
+            (int)event, (unsigned int)tid);
 
     /* add to our fat fd array and update select() parameters */
-    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
+    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
+    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
     FD_SET(fd, &TransportRpmsg_module->rfds);
-
-    pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
     bindFdToQueueIndex(obj, fd, queuePort);
 
-    /*
-     * Even though we use the unblock event as just a signalling event with
-     * no related payload, we need to write some non-zero value.  Might as
-     * well make it the fd (which the reader could decide to use if needed).
-     */
-    buf = fd;
-    write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
-    goto exit;
+    /* release the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-exitClose:
-    TransportRpmsg_unbind(handle, fd);
-    fd = 0;
+done:
+    pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-exit:
-    return fd;
+    return (status);
 }
 
+/*
+ *  ======== TransportRpmsg_unbind ========
+ */
 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
     UInt16 queuePort = queueId & 0x0000ffff;
-    uint64_t buf;
+    uint64_t event;
     Int    status = MessageQ_S_SUCCESS;
     int    maxFd;
     int    fd;
     int    i;
     int    j;
 
-    fd = queueIndexToFd(obj, queuePort);
-    if (!fd) {
-        PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
-                      queueId)
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  pause the dispatch thread */
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-        return -1;
-    }
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
 
+    /*  Check if binding already deleted.
+     *
+     *  There is a race condition between a thread calling MessageQ_delete
+     *  and another thread calling Ipc_detach. Must make sure we don't unbind
+     *  the same queue twice.
+     */
+    if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
+        goto done;
+    }
     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
+    /* guarenteed to work because queueIndexToFd above succeeded */
+    unbindQueueIndex(obj, queuePort);
 
     /* remove from input fd array */
     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
-        if (TransportRpmsg_module->inFds[i] == fd) {
+        if (TransportRpmsg_module->inFds[i].fd == fd) {
             TransportRpmsg_module->nInFds--;
 
             /* shift subsequent elements down */
             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
                 TransportRpmsg_module->inFds[j] =
-                    TransportRpmsg_module->inFds[j + 1];
-            }
-            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
-
-            FD_CLR(fd, &TransportRpmsg_module->rfds);
-            if (fd == TransportRpmsg_module->maxFd) {
-                /* find new max fd */
-                maxFd = TransportRpmsg_module->unblockEvent;
-                for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
-                    maxFd = _MAX(TransportRpmsg_module->inFds[j], maxFd);
-                }
-                TransportRpmsg_module->maxFd = maxFd;
+                        TransportRpmsg_module->inFds[j + 1];
             }
-
-            /*
-             * Even though we use the unblock event as just a signalling
-             * event with no related payload, we need to write some non-zero
-             * value.  Might as well make it the fd (which the reader could
-             * decide to use if needed).
-             */
-            buf = fd;
-            write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
             break;
         }
+    }
 
-        close(fd);
+    /* remove fd from the descriptor set, compute new max value */
+    FD_CLR(fd, &TransportRpmsg_module->rfds);
+    if (fd == TransportRpmsg_module->maxFd) {
+        /* find new max fd */
+        maxFd = TransportRpmsg_module->unblockEvent;
+        for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+            maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
+        }
+        TransportRpmsg_module->maxFd = maxFd;
     }
 
-    unbindQueueIndex(obj, queuePort);
+    close(fd);
 
+    /* release the dispatch thread */
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+done:
     pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-    return status;
+    return (status);
 }
 
+/*
+ *  ======== TransportRpmsg_put ========
+ */
 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
 {
     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
@@ -441,8 +466,8 @@ Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
 
     err = send(sock, msg, msg->msgSize, 0);
     if (err < 0) {
-        printf("TransportRpmsg_put: send failed: %d (%s)\n",
-               errno, strerror(errno));
+        fprintf(stderr, "TransportRpmsg_put: send failed: %d (%s)\n",
+                errno, strerror(errno));
         status = FALSE;
 
         goto exit;
@@ -458,97 +483,148 @@ exit:
     return status;
 }
 
+/*
+ *  ======== TransportRpmsg_control ========
+ */
 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
 {
     return FALSE;
 }
 
+/*
+ *  ======== rpmsgThreadFxn ========
+ */
 void *rpmsgThreadFxn(void *arg)
 {
-    static int lastFdx = 0;
-    int      curFdx = 0;
     Int      status = MessageQ_S_SUCCESS;
     Int      tmpStatus;
     int      retval;
-    uint64_t buf;
+    uint64_t event;
     fd_set   rfds;
     int      maxFd;
     int      nfds;
     MessageQ_Msg     retMsg;
     MessageQ_QueueId queueId;
+    MessageQ_Handle handle;
+    Bool run = TRUE;
+    int i;
+    int j;
+    int fd;
 
-    while (TRUE) {
-        pthread_mutex_lock(&TransportRpmsg_module->gate);
 
+    while (run) {
         maxFd = TransportRpmsg_module->maxFd;
         rfds = TransportRpmsg_module->rfds;
         nfds = TransportRpmsg_module->nInFds;
 
-        pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
-                      (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+                (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
 
         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
-        if (retval) {
-            if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
-                /*
-                 * Our event was signalled by TransportRpmsg_bind()
-                 * or TransportRpmsg_unbind() to tell us that the set of
-                 * fds has changed.
-                 */
-                PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
-
-                /* we don't need the written value */
-                read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-            }
-            else {
-                /* start where we last left off */
-                curFdx = lastFdx;
-
-                /*
-                 * The set of fds that's used by select has been recorded
-                 * locally, but the array of fds that are scanned below is
-                 * a changing set (MessageQ_create/delete() can change it).
-                 * While this might present an issue in itself, one key
-                 * takeaway is that 'nfds' must not be zero else the % below
-                 * will cause a divide-by-zero exception.  We won't even get
-                 * here if nfds == 0 since it's a local copy of the module's
-                 * 'nInFds' which has to be > 0 for us to get here.  So, even
-                 * though the module's 'nInFds' might go to 0 during this loop,
-                 * the loop itself will still remain intact.
-                 */
-                do {
-                    if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
-
-                        PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
-                                      TransportRpmsg_module->inFds[curFdx])
-
-                        /* transport input fd was signalled: get the message */
-                        tmpStatus = transportGet(
-                            TransportRpmsg_module->inFds[curFdx], &retMsg);
-                        if (tmpStatus < 0) {
-                            printf("rpmsgThreadFxn: transportGet failed.");
-                            status = MessageQ_E_FAIL;
-                        }
-                        else {
-                            queueId = MessageQ_getDstQueue(retMsg);
 
-                            PRINTVERBOSE1("rpmsgThreadFxn: got message, "
-                                    "delivering to queueId 0x%x\n", queueId)
+        /* if error, try again */
+        if (retval < 0) {
+            printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
+            continue;
+        }
 
-                            MessageQ_put(queueId, retMsg);
-                        }
+        /* dispatch all pending messages, do this first */
+        for (i = 0; i < nfds; i++) {
+            fd = TransportRpmsg_module->inFds[i].fd;
 
-                        lastFdx = (curFdx + 1) % nfds;
+            if (FD_ISSET(fd, &rfds)) {
+                PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+                        TransportRpmsg_module->inFds[i].fd);
 
-                        break;
+                /* transport input fd was signalled: get the message */
+                tmpStatus = transportGet(fd, &retMsg);
+                if (tmpStatus < 0 && tmpStatus != MessageQ_E_SHUTDOWN) {
+                    fprintf(stderr,
+                            "rpmsgThreadFxn: transportGet failed on fd %d, "
+                            "returned %d\n", fd, tmpStatus);
+                }
+                else if (tmpStatus == MessageQ_E_SHUTDOWN) {
+                    fprintf(stderr,
+                            "rpmsgThreadFxn: transportGet failed on fd %d, "
+                            "returned %d\n", fd, tmpStatus);
+
+                    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+                    /*
+                     * Don't close(fd) at this time since it will get closed
+                     * later when MessageQ_delete() is called in response to
+                     * this failure.  Just remove fd's bit from the select mask
+                     * 'rfds' for now, but don't remove it from inFds[].
+                     */
+                    FD_CLR(fd, &TransportRpmsg_module->rfds);
+                    if (fd == TransportRpmsg_module->maxFd) {
+                        /* find new max fd */
+                        maxFd = TransportRpmsg_module->unblockEvent;
+                        for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
+                            maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
+                                         maxFd);
+                        }
+                        TransportRpmsg_module->maxFd = maxFd;
                     }
+                    queueId = TransportRpmsg_module->inFds[i].qId;
+
+                    pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+                    handle = MessageQ_getLocalHandle(queueId);
+
+                    PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
+                                  "%p (queueId 0x%x)...\n", handle, queueId)
 
-                    curFdx = (curFdx + 1) % nfds;
-                } while (curFdx != lastFdx);
+                    if (handle != NULL) {
+                        MessageQ_shutdown(handle);
+                    }
+                    else {
+                        fprintf(stderr,
+                                "rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
+                                "returned NULL, can't shutdown\n", queueId);
+                    }
+                }
+                else {
+                    queueId = MessageQ_getDstQueue(retMsg);
+                    PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+                            "delivering to queueId 0x%x\n", queueId)
+                    MessageQ_put(queueId, retMsg);
+                }
             }
         }
+
+        /* check for events */
+        if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+
+            read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+            do {
+                if (event & TransportRpmsg_Event_SHUTDOWN) {
+                    PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
+                    run = FALSE;
+                    break; /* highest priority, stop processing events */
+                }
+                if (event & TransportRpmsg_Event_CONTINUE) {
+                    PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
+                            (int)event);
+                    event &= ~TransportRpmsg_Event_CONTINUE;
+                }
+                if (event & TransportRpmsg_Event_PAUSE) {
+                    /*  Our event was signalled by TransportRpmsg_bind()
+                     *  or TransportRpmsg_unbind() to tell us that the set
+                     *  of file descriptors has changed.
+                     */
+                    PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
+                    /* send the acknowledgement */
+                    event = TransportRpmsg_Event_ACK;
+                    write(TransportRpmsg_module->waitEvent, &event,
+                            sizeof(event));
+                    /* now wait to be released */
+                    read(TransportRpmsg_module->unblockEvent, &event,
+                            sizeof(event));
+                }
+            } while (event != 0);
+        }
     }
 
     return (void *)status;
@@ -557,7 +633,7 @@ void *rpmsgThreadFxn(void *arg)
 /*
  * ======== transportGet ========
  *  Retrieve a message waiting in the socket's queue.
-*/
+ */
 static Int transportGet(int sock, MessageQ_Msg *retMsg)
 {
     Int           status    = MessageQ_S_SUCCESS;
@@ -583,14 +659,19 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
     byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
                          (struct sockaddr *)&fromAddr, &len);
     if (len != sizeof (fromAddr)) {
-        printf("recvfrom: got bad addr len (%d)\n", len);
+        fprintf(stderr, "recvfrom: got bad addr len (%d)\n", len);
         status = MessageQ_E_FAIL;
-        goto exit;
+        goto freeMsg;
     }
     if (byteCount < 0) {
-        printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
-        status = MessageQ_E_FAIL;
-        goto exit;
+        fprintf(stderr, "recvfrom failed: %s (%d)\n", strerror(errno), errno);
+        if (errno == ENOLINK) {
+            status = MessageQ_E_SHUTDOWN;
+        }
+        else {
+            status = MessageQ_E_FAIL;
+        }
+        goto freeMsg;
     }
     else {
          /*
@@ -601,13 +682,8 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
           */
          msg->msgSize = byteCount;
 
-         /*
-          * If the message received was statically allocated, reset the
-          * heapId, so the app can free it.
-          */
-         if (msg->heapId == MessageQ_STATICMSG)  {
-             msg->heapId = 0;  /* for a copy transport, heap id is 0. */
-         }
+         /* set the heapId in the message header to match allocation above */
+         msg->heapId = 0;
     }
 
     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
@@ -618,35 +694,53 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
 
     *retMsg = msg;
 
+    goto exit;
+
+freeMsg:
+    MessageQ_free(msg);
+
 exit:
     return status;
 }
 
+/*
+ *  ======== bindFdToQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
 {
     Int *queues;
     Int *oldQueues;
     UInt oldSize;
+    UInt newCount;
     UInt queueIndex;
+    int i;
 
     /* subtract port offset from queue index */
     queueIndex = queuePort - MessageQ_PORTOFFSET;
 
     if (queueIndex >= obj->numQueues) {
+        newCount = queueIndex + TransportRpmsg_GROWSIZE;
         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
-                queueIndex + TransportRpmsg_GROWSIZE)
+                newCount);
 
-        /* allocate larget table */
-        oldSize = obj->numQueues * sizeof (Int);
-        queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
+        /* allocate larger table */
+        oldSize = obj->numQueues * sizeof(int);
+        queues = calloc(newCount, sizeof(int));
 
         /* copy contents from old table int new table */
         memcpy(queues, obj->qIndexToFd, oldSize);
 
+        /* initialize remaining entries of new (larger) table */
+        for (i = obj->numQueues; i < newCount; i++) {
+            queues[i] = -1;
+        }
+
         /* swap in new table, delete old table */
         oldQueues = obj->qIndexToFd;
         obj->qIndexToFd = queues;
-        obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
+        obj->numQueues = newCount;
         free(oldQueues);
     }
 
@@ -654,6 +748,11 @@ Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
     obj->qIndexToFd[queueIndex] = fd;
 }
 
+/*
+ *  ======== unbindQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
     UInt queueIndex;
@@ -662,9 +761,14 @@ Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
     queueIndex = queuePort - MessageQ_PORTOFFSET;
 
     /* clear table entry */
-    obj->qIndexToFd[queueIndex] = 0;
+    obj->qIndexToFd[queueIndex] = -1;
 }
 
+/*
+ *  ======== queueIndexToFd ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
 Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
     UInt queueIndex;
@@ -688,71 +792,91 @@ Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
  */
 Int TransportRpmsg_Factory_create(Void)
 {
-    Int     status;
-    Int     attachStatus;
-    Int     i;
-    UInt16  procId;
-    Int32   attachedAny;
-    UInt16  clusterSize;
-    UInt16  clusterBase;
-
-    TransportRpmsg_Handle      *inst;
-    TransportRpmsg_Handle       transport;
-    TransportRpmsg_Params       params;
-    IMessageQTransport_Handle   iMsgQTrans;
+    Int status = Ipc_S_SUCCESS;
+    Int i;
+    UInt16 clusterSize;
+    TransportRpmsg_Handle *inst;
+    int flags;
 
 
-    status = Ipc_S_SUCCESS;
-    attachedAny = FALSE;
-
     /* needed to enumerate processors in cluster */
     clusterSize = MultiProc_getNumProcsInCluster();
-    clusterBase = MultiProc_getBaseIdOfCluster();
 
     /* allocate the instance array */
     inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
 
     if (inst == NULL) {
-        printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
+        fprintf(stderr,
+                "Error: TransportRpmsg_Factory_create failed, no memory\n");
         status = Ipc_E_MEMORY;
-        goto exit;
+        goto done;
+    }
+
+    for (i = 0; i < clusterSize; i++) {
+        inst[i] = NULL;
     }
 
     TransportRpmsg_module->inst = inst;
 
-    /* create transport instance for all processors in cluster */
-    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+    /* counter event object for passing commands to dispatch thread */
+    TransportRpmsg_module->unblockEvent = eventfd(0, 0);
 
-        if (MultiProc_self() == procId) {
-            continue;
-        }
+    if (TransportRpmsg_module->unblockEvent == -1) {
+        fprintf(stderr, "create: unblock event failed: %d (%s)\n",
+                errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
+    }
 
-        params.rprocId = procId;
-        transport = TransportRpmsg_create(&params, &attachStatus);
+    PRINTVERBOSE1("create: created unblock event %d\n",
+            TransportRpmsg_module->unblockEvent)
 
-        if (transport != NULL) {
-            iMsgQTrans = TransportRpmsg_upCast(transport);
-            MessageQ_registerTransport(iMsgQTrans, procId, 0);
-            attachedAny = TRUE;
-        }
-        else {
-            if (attachStatus == MessageQ_E_RESOURCE) {
-                continue;
-            }
-            printf("TransportRpmsg_Factory_create: failed to attach to "
-                    "procId=%d status=%d\n", procId, attachStatus);
-            status = Ipc_E_FAIL;
-            break;
-        }
+    /* semaphore event object for acknowledging client thread */
+    TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
+
+    if (TransportRpmsg_module->waitEvent == -1) {
+        fprintf(stderr,
+                "create: wait event failed: %d (%s)\n", errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
+    }
 
-        TransportRpmsg_module->inst[i] = transport;
+    PRINTVERBOSE1("create: created wait event %d\n",
+            TransportRpmsg_module->waitEvent)
+
+    /* make sure eventfds don't exist for 'fork() -> exec*()'ed child */
+    flags = fcntl(TransportRpmsg_module->waitEvent, F_GETFD);
+    if (flags != -1) {
+        fcntl(TransportRpmsg_module->waitEvent, F_SETFD, flags | FD_CLOEXEC);
+    }
+    flags = fcntl(TransportRpmsg_module->unblockEvent, F_GETFD);
+    if (flags != -1) {
+        fcntl(TransportRpmsg_module->unblockEvent, F_SETFD, flags | FD_CLOEXEC);
     }
 
-    if (!attachedAny) {
+    FD_ZERO(&TransportRpmsg_module->rfds);
+    FD_SET(TransportRpmsg_module->unblockEvent,
+            &TransportRpmsg_module->rfds);
+    TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+    TransportRpmsg_module->nInFds = 0;
+
+    pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+    status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+            &rpmsgThreadFxn, NULL);
+
+    if (status < 0) {
         status = Ipc_E_FAIL;
+        fprintf(stderr, "create: failed to spawn thread\n");
+        goto done;
+    }
+    TransportRpmsg_module->threadStarted = TRUE;
+
+done:
+    if (status < 0) {
+        TransportRpmsg_Factory_delete();
     }
 
-exit:
     return (status);
 }
 
@@ -762,27 +886,117 @@ exit:
  */
 Void TransportRpmsg_Factory_delete(Void)
 {
-    Int     i;
-    UInt16  procId;
-    UInt16  clusterSize;
-    UInt16  clusterBase;
+    uint64_t event;
 
-    /* needed to enumerate processors in cluster */
-    clusterSize = MultiProc_getNumProcsInCluster();
-    clusterBase = MultiProc_getBaseIdOfCluster();
 
-    /* detach from all remote processors, assuming they are up */
-    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+    /* shutdown the message dispatch thread */
+    if (TransportRpmsg_module->threadStarted) {
+        event = TransportRpmsg_Event_SHUTDOWN;
+        write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-        if (MultiProc_self() == procId) {
-            continue;
-        }
+        /* wait for dispatch thread to exit */
+        pthread_join(TransportRpmsg_module->threadId, NULL);
+    }
 
-        if (TransportRpmsg_module->inst[i] != NULL) {
-            MessageQ_unregisterTransport(procId, 0);
-            TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
-        }
+    /* destroy the mutex object */
+    pthread_mutex_destroy(&TransportRpmsg_module->gate);
+
+    /* close the client wait event */
+    if (TransportRpmsg_module->waitEvent != -1) {
+        close(TransportRpmsg_module->waitEvent);
+        TransportRpmsg_module->waitEvent = -1;
+    }
+
+    /* close the dispatch thread unblock event */
+    if (TransportRpmsg_module->unblockEvent != -1) {
+        close(TransportRpmsg_module->unblockEvent);
+        TransportRpmsg_module->unblockEvent = -1;
+    }
+
+    /* free the instance handle array */
+    if (TransportRpmsg_module->inst != NULL) {
+        free(TransportRpmsg_module->inst);
+        TransportRpmsg_module->inst = NULL;
     }
 
     return;
 }
+
+/*
+ *  ======== TransportRpmsg_Factory_attach ========
+ */
+Int TransportRpmsg_Factory_attach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+    TransportRpmsg_Params params;
+    TransportRpmsg_Handle transport;
+    IMessageQTransport_Handle iMsgQTrans;
+
+    /* cannot attach to yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* create transport instance for given processor */
+    params.rprocId = procId;
+    transport = TransportRpmsg_create(&params);
+
+    if (transport == NULL) {
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    /* register transport instance with MessageQ */
+    iMsgQTrans = TransportRpmsg_upCast(transport);
+    TransportRpmsg_module->inst[clusterId] = transport;
+    MessageQ_registerTransport(iMsgQTrans, procId, 0);
+
+done:
+    return (status);
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_detach ========
+ */
+Int TransportRpmsg_Factory_detach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+
+    /* cannot detach from yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* must be attached in order to detach */
+    if (TransportRpmsg_module->inst[clusterId] == NULL) {
+        status = Ipc_E_INVALIDSTATE;
+        goto done;
+    }
+
+    /* unregister from MessageQ, delete the transport instance */
+    MessageQ_unregisterTransport(procId, 0);
+    TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
+
+done:
+    return (status);
+}