]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/transport/TransportRpmsg.c
SDOCM00115373 NameServer local get methods are missing on Linux
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
index c07dc90614e7133841ba8b8c15954276c06facae..cb9eea4cd05871bb366f9756b2b037b0fb6447d7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Texas Instruments Incorporated
+ * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  *  Implementation of functions specified in the IMessageQTransport interface.
  */
 
-#include <ti/ipc/Std.h>
-
-#include <ti/ipc/MessageQ.h>
-#include <ti/ipc/MultiProc.h>
-#include <_MessageQ.h>
-
 /* Socket Headers */
 #include <sys/socket.h>
 #include <sys/select.h>
 /* Socket Protocol Family */
 #include <net/rpmsg.h>
 
-/* Socket utils: */
-#include <SocketFxns.h>
 
+/* IPC headers */
+#include <ti/ipc/Std.h>
+#include <SocketFxns.h>         /* Socket utils: */
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+#include <_MessageQ.h>
 #include <_lad.h>
 
-#include <TransportRpmsg.h>
-
-
 /* More magic rpmsg port numbers: */
 #define MESSAGEQ_RPMSG_PORT       61
 #define MESSAGEQ_RPMSG_MAXSIZE   512
 
 #define TransportRpmsg_GROWSIZE 32
+#define INVALIDSOCKET (-1)
+
+#define TransportRpmsg_Event_ACK        (1 << 0)
+#define TransportRpmsg_Event_PAUSE      (1 << 1)
+#define TransportRpmsg_Event_CONTINUE   (1 << 2)
+#define TransportRpmsg_Event_SHUTDOWN   (1 << 3)
+
+
+#define _MAX(a,b) (((a)>(b))?(a):(b))
 
 /* traces in this file are controlled via _TransportMessageQ_verbose */
 Bool _TransportMessageQ_verbose = FALSE;
@@ -84,15 +90,18 @@ typedef struct TransportRpmsg_Module {
     int             inFds[1024];
     int                    nInFds;
     pthread_mutex_t gate;
-    int             unblockEvent;    /* eventFd for unblocking socket */
+    int             unblockEvent;    /* unblock the dispatch thread */
+    int             waitEvent;       /* block the client thread */
     pthread_t       threadId;        /* ID returned by pthread_create() */
     Bool            threadStarted;
+
+    TransportRpmsg_Handle *inst;    /* array of instances */
 } TransportRpmsg_Module;
 
 IMessageQTransport_Fxns TransportRpmsg_fxns = {
     .bind    = TransportRpmsg_bind,
     .unbind  = TransportRpmsg_unbind,
-    .put     = TransportRpmsg_put,
+    .put     = TransportRpmsg_put
 };
 
 typedef struct TransportRpmsg_Object {
@@ -104,13 +113,14 @@ typedef struct TransportRpmsg_Object {
 } TransportRpmsg_Object;
 
 TransportRpmsg_Module TransportRpmsg_state = {
-    .sock = {0},
+    .sock = {INVALIDSOCKET},
+    .unblockEvent = -1,
+    .waitEvent = -1,
     .threadStarted = FALSE,
+    .inst = NULL
 };
 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
 
-static Int attach(UInt16 rprocId);
-static Int detach(UInt16 rprocId);
 static void *rpmsgThreadFxn(void *arg);
 static Int transportGet(int sock, MessageQ_Msg *retMsg);
 static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
@@ -119,6 +129,19 @@ static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
 
+/* factory functions */
+Int TransportRpmsg_Factory_create(Void);
+Void TransportRpmsg_Factory_delete(Void);
+Int TransportRpmsg_Factory_attach(UInt16 procId);
+Int TransportRpmsg_Factory_detach(UInt16 procId);
+
+Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
+    .createFxn = TransportRpmsg_Factory_create,
+    .deleteFxn = TransportRpmsg_Factory_delete,
+    .attachFxn = TransportRpmsg_Factory_attach,
+    .detachFxn = TransportRpmsg_Factory_detach
+};
+
 /* -------------------------------------------------------------------------- */
 
 /* instance convertors */
@@ -133,231 +156,205 @@ TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
     return ((TransportRpmsg_Handle)base);
 }
 
-TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
-                                            Int *attachStatus)
-{
-    TransportRpmsg_Object *obj;
-    Int *queues;
-    Int rv;
-
-    rv = attach(params->rprocId);
-    if (attachStatus) {
-        *attachStatus = rv;
-    }
-
-    if (rv != MessageQ_S_SUCCESS) {
-        return NULL;
-    }
-
-    obj = calloc(1, sizeof (TransportRpmsg_Object));
-
-    /* structure copy */
-    obj->base.base.interfaceType = IMessageQTransport_TypeId;
-    obj->base.fxns = &TransportRpmsg_fxns;
-    obj->rprocId = params->rprocId;
-
-    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
-    obj->numQueues = TransportRpmsg_GROWSIZE;
-
-    return (TransportRpmsg_Handle)obj;
-}
-
-Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
+/*
+ *  ======== TransportRpmsg_create ========
+ */
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
 {
-    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
-
-    detach(obj->rprocId);
+    Int status = MessageQ_S_SUCCESS;
+    TransportRpmsg_Object *obj = NULL;
+    int sock;
+    UInt16 clusterId;
 
-    free(obj->qIndexToFd);
-    free(obj);
 
-    *handlep = NULL;
-}
+    clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
 
-static Int attach(UInt16 rprocId)
-{
-    Int    status = MessageQ_S_SUCCESS;
-    int    sock;
-
-    /* Create the socket for sending messages to the remote proc: */
+    /* create socket for sending messages to remote processor */
     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-    if (sock < 0) {
-        status = MessageQ_E_FAIL;
-        printf("attach: socket failed: %d (%s)\n",
-               errno, strerror(errno));
 
-        goto exit;
+    if (sock < 0) {
+        status = Ipc_E_FAIL;
+        printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
+                strerror(errno));
+        goto done;
     }
-
+    TransportRpmsg_module->sock[clusterId] = sock;
     PRINTVERBOSE1("attach: created send socket: %d\n", sock)
 
-    /* Attempt to connect: */
-    status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
-    if (status < 0) {
-        /* is it ok to "borrow" this error code from MessageQ? */
-        status = MessageQ_E_RESOURCE;
-
-        /* don't hard-printf or exit since this is no longer fatal */
-        PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+    status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
 
-        goto exitSock;
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
+                errno, strerror(errno), params->rprocId);
+        goto done;
     }
 
-    TransportRpmsg_module->sock[rprocId] = sock;
-
-    if (TransportRpmsg_module->threadStarted == FALSE) {
-        /* create a module wide event to unblock the socket select thread */
-        TransportRpmsg_module->unblockEvent = eventfd(0, 0);
-        if (TransportRpmsg_module->unblockEvent == -1) {
-            printf("attach: unblock socket failed: %d (%s)\n",
-                   errno, strerror(errno));
-            status = MessageQ_E_FAIL;
-
-            goto exitSock;
-        }
-
-        PRINTVERBOSE1("attach: created unblock event %d\n",
-                      TransportRpmsg_module->unblockEvent)
-
-        FD_ZERO(&TransportRpmsg_module->rfds);
-        FD_SET(TransportRpmsg_module->unblockEvent,
-               &TransportRpmsg_module->rfds);
-        TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
-        TransportRpmsg_module->nInFds = 0;
-
-        pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+    /* create the instance object */
+    obj = calloc(1, sizeof(TransportRpmsg_Object));
 
-        status = pthread_create(&TransportRpmsg_module->threadId, NULL,
-                                &rpmsgThreadFxn, NULL);
-        if (status < 0) {
-            status = MessageQ_E_FAIL;
-            printf("attach: failed to spawn thread\n");
-
-            goto exitEvent;
-        }
-        else {
-            TransportRpmsg_module->threadStarted = TRUE;
-        }
+    if (obj == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
     }
 
-    goto exit;
+    /* initialize the instance */
+    obj->base.base.interfaceType = IMessageQTransport_TypeId;
+    obj->base.fxns = &TransportRpmsg_fxns;
+    obj->rprocId = params->rprocId;
+    obj->numQueues = TransportRpmsg_GROWSIZE;
 
-exitEvent:
-    close(TransportRpmsg_module->unblockEvent);
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
 
-    FD_ZERO(&TransportRpmsg_module->rfds);
-    TransportRpmsg_module->maxFd = 0;
+    if (obj->qIndexToFd == NULL) {
+        status = Ipc_E_MEMORY;
+        goto done;
+    }
 
-exitSock:
-    close(sock);
-    TransportRpmsg_module->sock[rprocId] = 0;
+done:
+    if (status < 0) {
+        TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
+    }
 
-exit:
-    return status;
+    return (TransportRpmsg_Handle)obj;
 }
 
-static Int detach(UInt16 rprocId)
+/*
+ *  ======== TransportRpmsg_delete ========
+ */
+Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
 {
-
-    Int status = -1;
+    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
+    UInt16 clusterId;
     int sock;
 
-    sock = TransportRpmsg_module->sock[rprocId];
 
-    if (sock) {
+    clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
+
+    /* close the socket for the given transport instance */
+    sock = TransportRpmsg_module->sock[clusterId];
+    if (sock != INVALIDSOCKET) {
         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+        close(sock);
+    }
+    TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
 
-        status = close(sock);
+    if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
+        free(obj->qIndexToFd);
+        obj->qIndexToFd = NULL;
     }
 
-    return status;
+    if (obj != NULL) {
+        free(obj);
+        obj = NULL;
+    }
+
+    *pHandle = NULL;
 }
 
+/*
+ *  ======== TransportRpmsg_bind ========
+ */
 Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
-    UInt16   queueIndex = queueId & 0x0000ffff;
-    int      fd;
-    int      err;
-    uint64_t buf;
-    UInt16   rprocId;
+    UInt16 queuePort = queueId & 0x0000ffff;
+    int fd;
+    int err;
+    uint64_t event;
+    UInt16 rprocId;
+    pthread_t tid;
 
+    tid = pthread_self();
     rprocId = obj->rprocId;
 
-    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d queueIndex %d\n", rprocId, queueIndex)
+    PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
+            "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
 
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
     if (fd < 0) {
         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
                 errno, strerror(errno));
-        goto exitClose;
+        return (MessageQ_E_OSFAILURE);
     }
+    PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
+            (unsigned int)tid);
 
-    PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
-
-    err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
+    err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
     if (err < 0) {
         /* don't hard-printf since this is no longer fatal */
         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
-                      errno, strerror(errno))
-
+                      errno, strerror(errno));
         close(fd);
-
-        return -1;
+        return (MessageQ_E_OSFAILURE);
     }
 
     pthread_mutex_lock(&TransportRpmsg_module->gate);
 
+    /*  pause the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+    PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
+            (int)event, (unsigned int)tid);
+
     /* add to our fat fd array and update select() parameters */
     TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
-    TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
+    TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
     FD_SET(fd, &TransportRpmsg_module->rfds);
+    bindFdToQueueIndex(obj, fd, queuePort);
 
-    pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
-    bindFdToQueueIndex(obj, fd, queueIndex);
-
-    /*
-     * Even though we use the unblock event as just a signalling event with
-     * no related payload, we need to write some non-zero value.  Might as
-     * well make it the fd (which the reader could decide to use if needed).
-     */
-    buf = fd;
-    write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+    /* release the dispatch thread */
+    PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
+            (unsigned int)tid);
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
-    goto exit;
-
-exitClose:
-    TransportRpmsg_unbind(handle, fd);
-    fd = 0;
+    pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-exit:
-    return fd;
+    return (MessageQ_S_SUCCESS);
 }
 
+/*
+ *  ======== TransportRpmsg_unbind ========
+ */
 Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
 {
     TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
-    UInt16 queueIndex = queueId & 0x0000ffff;
-    uint64_t buf;
+    UInt16 queuePort = queueId & 0x0000ffff;
+    uint64_t event;
     Int    status = MessageQ_S_SUCCESS;
     int    maxFd;
     int    fd;
     int    i;
     int    j;
 
-    fd = queueIndexToFd(obj, queueIndex);
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  pause the dispatch thread */
+    event = TransportRpmsg_Event_PAUSE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+    /* wait for ACK event */
+    read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+
+    /* retrieve file descriptor for the given queue port */
+    fd = queueIndexToFd(obj, queuePort);
     if (!fd) {
         PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
-                      queueId)
-
-        return -1;
+                queueId);
+        status = MessageQ_E_INVALIDARG;
+        goto done;
     }
-
     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
+    /* guarenteed to work because queueIndexToFd above succeeded */
+    unbindQueueIndex(obj, queuePort);
 
     /* remove from input fd array */
     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
@@ -367,55 +364,53 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
             /* shift subsequent elements down */
             for (j = i; j < TransportRpmsg_module->nInFds; j++) {
                 TransportRpmsg_module->inFds[j] =
-                    TransportRpmsg_module->inFds[j + 1];
+                        TransportRpmsg_module->inFds[j + 1];
             }
-            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
-
-            FD_CLR(fd, &TransportRpmsg_module->rfds);
-            if (fd == TransportRpmsg_module->maxFd) {
-                /* find new max fd */
-                maxFd = TransportRpmsg_module->unblockEvent;
-                for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
-                    maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
-                }
-                TransportRpmsg_module->maxFd = maxFd;
-            }
-
-            /*
-             * Even though we use the unblock event as just a signalling
-             * event with no related payload, we need to write some non-zero
-             * value.  Might as well make it the fd (which the reader could
-             * decide to use if needed).
-             */
-            buf = fd;
-            write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
             break;
         }
+    }
 
-        close(fd);
+    /* remove fd from the descriptor set, compute new max value */
+    FD_CLR(fd, &TransportRpmsg_module->rfds);
+    if (fd == TransportRpmsg_module->maxFd) {
+        /* find new max fd */
+        maxFd = TransportRpmsg_module->unblockEvent;
+        for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+            maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
+        }
+        TransportRpmsg_module->maxFd = maxFd;
     }
 
-    unbindQueueIndex(obj, queueIndex);
+    close(fd);
+
+    /* release the dispatch thread */
+    event = TransportRpmsg_Event_CONTINUE;
+    write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
+done:
     pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-    return status;
+    return (status);
 }
 
+/*
+ *  ======== TransportRpmsg_put ========
+ */
 Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
 {
     MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
     Int     status    = TRUE;
     int     sock;
     int     err;
-    UInt16  dstProcId = msg->dstProc;
+    UInt16  clusterId;
 
     /*
      * Retrieve the socket for the AF_SYSLINK protocol associated with this
      * transport.
      */
-    sock = TransportRpmsg_module->sock[dstProcId];
+    clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
+    sock = TransportRpmsg_module->sock[clusterId];
     if (!sock) {
         return FALSE;
     }
@@ -441,96 +436,103 @@ exit:
     return status;
 }
 
+/*
+ *  ======== TransportRpmsg_control ========
+ */
 Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
 {
     return FALSE;
 }
 
+/*
+ *  ======== rpmsgThreadFxn ========
+ */
 void *rpmsgThreadFxn(void *arg)
 {
-    static int lastFdx = 0;
-    int      curFdx = 0;
     Int      status = MessageQ_S_SUCCESS;
     Int      tmpStatus;
     int      retval;
-    uint64_t buf;
+    uint64_t event;
     fd_set   rfds;
     int      maxFd;
     int      nfds;
     MessageQ_Msg     retMsg;
     MessageQ_QueueId queueId;
+    Bool run = TRUE;
+    int i;
+    int fd;
 
-    while (TRUE) {
-        pthread_mutex_lock(&TransportRpmsg_module->gate);
 
+    while (run) {
         maxFd = TransportRpmsg_module->maxFd;
         rfds = TransportRpmsg_module->rfds;
         nfds = TransportRpmsg_module->nInFds;
 
-        pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
         PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
-                      (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+                (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
 
         retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
-        if (retval) {
-            if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
-                /*
-                 * Our event was signalled by TransportRpmsg_bind()
-                 * or TransportRpmsg_unbind() to tell us that the set of
-                 * fds has changed.
-                 */
-                PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
-
-                /* we don't need the written value */
-                read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-            }
-            else {
-                /* start where we last left off */
-                curFdx = lastFdx;
-
-                /*
-                 * The set of fds that's used by select has been recorded
-                 * locally, but the array of fds that are scanned below is
-                 * a changing set (MessageQ_create/delete() can change it).
-                 * While this might present an issue in itself, one key
-                 * takeaway is that 'nfds' must not be zero else the % below
-                 * will cause a divide-by-zero exception.  We won't even get
-                 * here if nfds == 0 since it's a local copy of the module's
-                 * 'nInFds' which has to be > 0 for us to get here.  So, even
-                 * though the module's 'nInFds' might go to 0 during this loop,
-                 * the loop itself will still remain intact.
-                 */
-                do {
-                    if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
-
-                        PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
-                                      TransportRpmsg_module->inFds[curFdx])
-
-                        /* transport input fd was signalled: get the message */
-                        tmpStatus = transportGet(
-                            TransportRpmsg_module->inFds[curFdx], &retMsg);
-                        if (tmpStatus < 0) {
-                            printf("rpmsgThreadFxn: transportGet failed.");
-                            status = MessageQ_E_FAIL;
-                        }
-                        else {
-                            queueId = MessageQ_getDstQueue(retMsg);
-
-                            PRINTVERBOSE1("rpmsgThreadFxn: got message, delivering to queueId 0x%x\n", queueId)
-
-                            MessageQ_put(queueId, retMsg);
-                        }
-
-                        lastFdx = (curFdx + 1) % nfds;
-
-                        break;
-                    }
-
-                    curFdx = (curFdx + 1) % nfds;
-                } while (curFdx != lastFdx);
+
+        /* if error, try again */
+        if (retval < 0) {
+            printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
+            continue;
+        }
+
+        /* dispatch all pending messages, do this first */
+        for (i = 0; i < nfds; i++) {
+            fd = TransportRpmsg_module->inFds[i];
+
+            if (FD_ISSET(fd, &rfds)) {
+                PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+                        TransportRpmsg_module->inFds[i]);
+
+                /* transport input fd was signalled: get the message */
+                tmpStatus = transportGet(fd, &retMsg);
+                if (tmpStatus < 0) {
+                    printf("rpmsgThreadFxn: transportGet failed\n");
+                }
+                else {
+                    queueId = MessageQ_getDstQueue(retMsg);
+                    PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+                            "delivering to queueId 0x%x\n", queueId)
+                    MessageQ_put(queueId, retMsg);
+                }
             }
         }
+
+        /* check for events */
+        if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+
+            read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+            do {
+                if (event & TransportRpmsg_Event_SHUTDOWN) {
+                    PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
+                    run = FALSE;
+                    break; /* highest priority, stop processing events */
+                }
+                if (event & TransportRpmsg_Event_CONTINUE) {
+                    PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
+                            (int)event);
+                    event &= ~TransportRpmsg_Event_CONTINUE;
+                }
+                if (event & TransportRpmsg_Event_PAUSE) {
+                    /*  Our event was signalled by TransportRpmsg_bind()
+                     *  or TransportRpmsg_unbind() to tell us that the set
+                     *  of file descriptors has changed.
+                     */
+                    PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
+                    /* send the acknowledgement */
+                    event = TransportRpmsg_Event_ACK;
+                    write(TransportRpmsg_module->waitEvent, &event,
+                            sizeof(event));
+                    /* now wait to be released */
+                    read(TransportRpmsg_module->unblockEvent, &event,
+                            sizeof(event));
+                }
+            } while (event != 0);
+        }
     }
 
     return (void *)status;
@@ -539,7 +541,7 @@ void *rpmsgThreadFxn(void *arg)
 /*
  * ======== transportGet ========
  *  Retrieve a message waiting in the socket's queue.
-*/
+ */
 static Int transportGet(int sock, MessageQ_Msg *retMsg)
 {
     Int           status    = MessageQ_S_SUCCESS;
@@ -593,8 +595,10 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
     }
 
     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
-    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
-    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
+            "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
+    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
+            msg->msgSize)
 
     *retMsg = msg;
 
@@ -602,38 +606,279 @@ exit:
     return status;
 }
 
-Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
+/*
+ *  ======== bindFdToQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
+Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
 {
     Int *queues;
     Int *oldQueues;
     UInt oldSize;
+    UInt queueIndex;
 
-    if (qIndex >= obj->numQueues) {
+    /* subtract port offset from queue index */
+    queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+    if (queueIndex >= obj->numQueues) {
         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
-                      qIndex + TransportRpmsg_GROWSIZE)
+                queueIndex + TransportRpmsg_GROWSIZE)
 
+        /* allocate larger table */
         oldSize = obj->numQueues * sizeof (Int);
+        queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
 
-        queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
+        /* copy contents from old table int new table */
         memcpy(queues, obj->qIndexToFd, oldSize);
 
+        /* swap in new table, delete old table */
         oldQueues = obj->qIndexToFd;
         obj->qIndexToFd = queues;
-        obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
-
+        obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
         free(oldQueues);
     }
 
-    obj->qIndexToFd[qIndex] = fd;
+    /* add new entry */
+    obj->qIndexToFd[queueIndex] = fd;
 }
 
-Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
+/*
+ *  ======== unbindQueueIndex ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
+Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
-    obj->qIndexToFd[qIndex] = 0;
+    UInt queueIndex;
+
+    /* subtract port offset from queue index */
+    queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+    /* clear table entry */
+    obj->qIndexToFd[queueIndex] = -1;
 }
 
-Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
+/*
+ *  ======== queueIndexToFd ========
+ *
+ *  Precondition: caller must be inside the module gate
+ */
+Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
 {
-    return obj->qIndexToFd[qIndex];
+    UInt queueIndex;
+
+    /* subtract port offset from queue index */
+    queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+    /* return file descriptor */
+    return (obj->qIndexToFd[queueIndex]);
 }
 
+/*
+ *  ======== TransportRpmsg_Factory_create ========
+ *  Create the transport instances
+ *
+ *  Attach to all remote processors. For now, must attach to
+ *  at least one to tolerate MessageQ_E_RESOURCE failures.
+ *
+ *  This function implements the IPC Factory interface, so it
+ *  returns Ipc status codes.
+ */
+Int TransportRpmsg_Factory_create(Void)
+{
+    Int status = Ipc_S_SUCCESS;
+    Int i;
+    UInt16 clusterSize;
+    TransportRpmsg_Handle *inst;
+
+
+    /* needed to enumerate processors in cluster */
+    clusterSize = MultiProc_getNumProcsInCluster();
+
+    /* allocate the instance array */
+    inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
+
+    if (inst == NULL) {
+        printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
+        status = Ipc_E_MEMORY;
+        goto done;
+    }
+
+    for (i = 0; i < clusterSize; i++) {
+        inst[i] = NULL;
+    }
+
+    TransportRpmsg_module->inst = inst;
+
+    /* counter event object for passing commands to dispatch thread */
+    TransportRpmsg_module->unblockEvent = eventfd(0, 0);
+
+    if (TransportRpmsg_module->unblockEvent == -1) {
+        printf("create: unblock event failed: %d (%s)\n",
+               errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    PRINTVERBOSE1("create: created unblock event %d\n",
+            TransportRpmsg_module->unblockEvent)
+
+    /* semaphore event object for acknowledging client thread */
+    TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
+
+    if (TransportRpmsg_module->waitEvent == -1) {
+        printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    PRINTVERBOSE1("create: created wait event %d\n",
+            TransportRpmsg_module->waitEvent)
+
+    FD_ZERO(&TransportRpmsg_module->rfds);
+    FD_SET(TransportRpmsg_module->unblockEvent,
+            &TransportRpmsg_module->rfds);
+    TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+    TransportRpmsg_module->nInFds = 0;
+
+    pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+    status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+            &rpmsgThreadFxn, NULL);
+
+    if (status < 0) {
+        status = Ipc_E_FAIL;
+        printf("attach: failed to spawn thread\n");
+        goto done;
+    }
+    TransportRpmsg_module->threadStarted = TRUE;
+
+done:
+    if (status < 0) {
+        TransportRpmsg_Factory_delete();
+    }
+
+    return (status);
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_delete ========
+ *  Finalize the transport instances
+ */
+Void TransportRpmsg_Factory_delete(Void)
+{
+    uint64_t event;
+
+
+    /* shutdown the message dispatch thread */
+    if (TransportRpmsg_module->threadStarted) {
+        event = TransportRpmsg_Event_SHUTDOWN;
+        write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+        /* wait for dispatch thread to exit */
+        pthread_join(TransportRpmsg_module->threadId, NULL);
+    }
+
+    /* destroy the mutex object */
+    pthread_mutex_destroy(&TransportRpmsg_module->gate);
+
+    /* close the client wait event */
+    if (TransportRpmsg_module->waitEvent != -1) {
+        close(TransportRpmsg_module->waitEvent);
+        TransportRpmsg_module->waitEvent = -1;
+    }
+
+    /* close the dispatch thread unblock event */
+    if (TransportRpmsg_module->unblockEvent != -1) {
+        close(TransportRpmsg_module->unblockEvent);
+        TransportRpmsg_module->unblockEvent = -1;
+    }
+
+    /* free the instance handle array */
+    if (TransportRpmsg_module->inst != NULL) {
+        free(TransportRpmsg_module->inst);
+        TransportRpmsg_module->inst = NULL;
+    }
+
+    return;
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_attach ========
+ */
+Int TransportRpmsg_Factory_attach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+    TransportRpmsg_Params params;
+    TransportRpmsg_Handle transport;
+    IMessageQTransport_Handle iMsgQTrans;
+
+    /* cannot attach to yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* create transport instance for given processor */
+    params.rprocId = procId;
+    transport = TransportRpmsg_create(&params);
+
+    if (transport == NULL) {
+        status = Ipc_E_FAIL;
+        goto done;
+    }
+
+    /* register transport instance with MessageQ */
+    iMsgQTrans = TransportRpmsg_upCast(transport);
+    MessageQ_registerTransport(iMsgQTrans, procId, 0);
+    TransportRpmsg_module->inst[clusterId] = transport;
+
+done:
+    return (status);
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_detach ========
+ */
+Int TransportRpmsg_Factory_detach(UInt16 procId)
+{
+    Int status = Ipc_S_SUCCESS;
+    UInt16 clusterId;
+
+    /* cannot detach from yourself */
+    if (MultiProc_self() == procId) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* processor must be a member of the cluster */
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_getNumProcsInCluster()) {
+        status = Ipc_E_INVALIDARG;
+        goto done;
+    }
+
+    /* must be attached in order to detach */
+    if (TransportRpmsg_module->inst[clusterId] == NULL) {
+        status = Ipc_E_INVALIDSTATE;
+        goto done;
+    }
+
+    /* unregister from MessageQ, delete the transport instance */
+    MessageQ_unregisterTransport(procId, 0);
+    TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
+
+done:
+    return (status);
+}