]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/api/MessageQ.c
Merge remote-tracking branch 'origin/3.36' into ipc-next
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 5921fe78325111b341ea28621cdc463a342bd9c5..8cad46f063ea16b05c6f6149be680d0ca2545708 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2013, Texas Instruments Incorporated
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -32,7 +32,7 @@
 /*
  *  @file   MessageQ.c
  *
- *  @brief  MessageQ module "client" implementation
+ *  @brief  MessageQ Linux implementation
  *
  *  This implementation is geared for use in a "client/server" model, whereby
  *  system-wide data is maintained in a "server" component and process-
  *  connects and communicates with LAD for the server connection.
  */
 
-
 /* Standard IPC header */
 #include <ti/ipc/Std.h>
 
-/* Linux specific header files, replacing OSAL: */
-#include <pthread.h>
-
 /* Module level headers */
 #include <ti/ipc/NameServer.h>
 #include <ti/ipc/MultiProc.h>
 #include <_MultiProc.h>
+#define MessageQ_internal 1     /* must be defined before include file */
 #include <ti/ipc/MessageQ.h>
 #include <_MessageQ.h>
+#include <ti/ipc/interfaces/ITransport.h>
+#include <ti/ipc/interfaces/IMessageQTransport.h>
+#include <ti/ipc/interfaces/INetworkTransport.h>
 
 /* Socket Headers */
 #include <sys/select.h>
 #include <sys/types.h>
 #include <sys/param.h>
 #include <sys/eventfd.h>
-#include <sys/socket.h>
+#include <sys/queue.h>
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <assert.h>
-
-/* Socket Protocol Family */
-#include <net/rpmsg.h>
-
-/* Socket utils: */
-#include <SocketFxns.h>
+#include <pthread.h>
+#include <semaphore.h>
 
 #include <ladclient.h>
 #include <_lad.h>
  */
 #define MessageQ_NAMESERVER  "MessageQ"
 
-/*!
- *  @brief  Value of an invalid socket ID:
- */
-#define Transport_INVALIDSOCKET  (0xFFFFFFFF)
+#define MessageQ_MAXTRANSPORTS 8
 
-/* More magic rpmsg port numbers: */
-#define MESSAGEQ_RPMSG_PORT       61
-#define MESSAGEQ_RPMSG_MAXSIZE   512
+#define MessageQ_GROWSIZE 32
 
 /* Trace flag settings: */
 #define TRACESHIFT    12
  * =============================================================================
  */
 
+/* params structure evolution */
+typedef struct {
+    Void *synchronizer;
+} MessageQ_Params_Legacy;
+
+typedef struct {
+    Int __version;
+    Void *synchronizer;
+    MessageQ_QueueIndex queueIndex;
+} MessageQ_Params_Version2;
+
 /* structure for MessageQ module state */
 typedef struct MessageQ_ModuleObject {
-    Int                 refCount;
-    /*!< Reference count */
-    NameServer_Handle   nameServer;
-    /*!< Handle to the local NameServer used for storing GP objects */
-    pthread_mutex_t     gate;
-    /*!< Handle of gate to be used for local thread safety */
-    MessageQ_Params     defaultInstParams;
-    /*!< Default instance creation parameters */
-    int                 sock[MultiProc_MAXPROCESSORS];
-    /*!< Sockets to for sending to each remote processor */
-    int                 seqNum;
-    /*!< Process-specific sequence number */
+    MessageQ_Handle           *queues;
+    Int                       numQueues;
+    Int                       refCount;
+    NameServer_Handle         nameServer;
+    pthread_mutex_t           gate;
+    int                       seqNum;
+    pthread_mutex_t           seqNumGate;
+    IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
+    ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
+    MessageQ_PutHookFxn       putHookFxn;
 } MessageQ_ModuleObject;
 
+typedef struct MessageQ_CIRCLEQ_ENTRY {
+     CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
+} MessageQ_CIRCLEQ_ENTRY;
+
 /*!
  *  @brief  Structure for the Handle for the MessageQ.
  */
 typedef struct MessageQ_Object_tag {
-    MessageQ_Params         params;
-    /*! Instance specific creation parameters */
-    MessageQ_QueueId        queue;
-    /* Unique id */
-    int                     fd[MultiProc_MAXPROCESSORS];
-    /* File Descriptor to block on messages from remote processors. */
-    int                     unblockFd;
-    /* Write this fd to unblock the select() call in MessageQ _get() */
-    void                    *serverHandle;
+    CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+    pthread_mutex_t              msgListGate;
+    MessageQ_Params              params;
+    MessageQ_QueueId             queue;
+    int                          unblocked;
+    void                         *serverHandle;
+    sem_t                        synchronizer;
 } MessageQ_Object;
 
-static Bool verbose = FALSE;
-
+/* traces in this file are controlled via _MessageQ_verbose */
+Bool _MessageQ_verbose = FALSE;
+#define verbose _MessageQ_verbose
 
 /* =============================================================================
  *  Globals
@@ -148,8 +151,15 @@ static Bool verbose = FALSE;
  */
 static MessageQ_ModuleObject MessageQ_state =
 {
-    .refCount               = 0,
-    .nameServer             = NULL,
+    .refCount   = 0,
+    .nameServer = NULL,
+#if defined(IPC_BUILDOS_ANDROID)
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+    .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
+    .putHookFxn = NULL
 };
 
 /*!
@@ -157,28 +167,111 @@ static MessageQ_ModuleObject MessageQ_state =
  *
  *  @brief  Pointer to the MessageQ module state.
  */
-MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
+MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
 
+Void _MessageQ_grow(UInt16 queueIndex);
 
 /* =============================================================================
- * Forward declarations of internal functions
+ * APIS
  * =============================================================================
  */
 
-/* This is a helper function to initialize a message. */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
-static Int transportCloseEndpoint(int fd);
-static Int transportGet(int sock, MessageQ_Msg * retMsg);
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+                                UInt16 rprocId, UInt priority)
+{
+    Int status = FALSE;
+    UInt16 clusterId;
 
-/* =============================================================================
- * APIS
- * =============================================================================
- */
-/* Function to get default configuration for the MessageQ module.
- *
+    if (handle == NULL) {
+        fprintf(stderr,
+                "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+               );
+
+        return status;
+    }
+
+    /* map procId to clusterId */
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_MAXPROCESSORS) {
+        fprintf(stderr,
+                "MessageQ_registerTransport: invalid procId %d\n", rprocId);
+
+        return status;
+    }
+
+    if (MessageQ_module->transports[clusterId][priority] == NULL) {
+        MessageQ_module->transports[clusterId][priority] = handle;
+
+        status = TRUE;
+    }
+
+    return status;
+}
+
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+    if (inst == NULL) {
+        fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
+
+        return MessageQ_E_INVALIDARG;
+    }
+
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        fprintf(stderr,
+                "MessageQ_unregisterNetTransport: invalid transport id %d, "
+                "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+        return MessageQ_E_INVALIDARG;
+    }
+
+    if (MessageQ_module->transInst[tid] != NULL) {
+        fprintf(stderr,
+                "MessageQ_registerTransportId: transport id %d already "
+                "registered\n", tid);
+
+        return MessageQ_E_ALREADYEXISTS;
+    }
+
+    MessageQ_module->transInst[tid] = inst;
+
+    return MessageQ_S_SUCCESS;
+}
+
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
+{
+    UInt16 clusterId;
+
+    /* map procId to clusterId */
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_MAXPROCESSORS) {
+        fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
+                rprocId);
+
+        return;
+    }
+
+    MessageQ_module->transports[clusterId][priority] = NULL;
+}
+
+Void MessageQ_unregisterTransportId(UInt tid)
+{
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        fprintf(stderr,
+                "MessageQ_unregisterTransportId: invalid transport id %d, "
+                "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+        return;
+    }
+
+    MessageQ_module->transInst[tid] = NULL;
+}
+
+/*
+ * Function to get default configuration for the MessageQ module.
  */
-Void MessageQ_getConfig (MessageQ_Config * cfg)
+Void MessageQ_getConfig(MessageQ_Config *cfg)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -206,7 +299,8 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
-        PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
+        PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
+                      status)
         return;
     }
     status = rsp.messageQGetConfig.status;
@@ -215,27 +309,41 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
+    memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
 
     return;
 }
 
-/* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+/*
+ *  Function to setup the MessageQ module.
+ */
+Int MessageQ_setup(const MessageQ_Config *cfg)
 {
-    Int status;
+    Int status = MessageQ_S_SUCCESS;
     LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
+    Int pri;
     Int i;
+    Int tid;
+
+    /* this entire function must be serialized */
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    /* ensure only first thread performs startup procedure */
+    if (++MessageQ_module->refCount > 1) {
+        PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
+                MessageQ_module->refCount)
+        status = MessageQ_S_ALREADYSETUP;
+        goto exit;
+    }
 
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
-        PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
-           getpid())
-
-        return MessageQ_E_RESOURCE;
+        PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
+                "pid %d\n", getpid())
+        status = MessageQ_E_RESOURCE;
+        goto exit;
     }
 
     cmd.cmd = LAD_MESSAGEQ_SETUP;
@@ -243,104 +351,172 @@ Int MessageQ_setup (const MessageQ_Config * cfg)
     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
-        PRINTVERBOSE1(
-          "MessageQ_setup: sending LAD command failed, status=%d\n", status)
-        return MessageQ_E_FAIL;
+        PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
+                "status=%d\n", status)
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
-        return(status);
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
     status = rsp.setup.status;
 
-    PRINTVERBOSE2(
-      "MessageQ_setup: got LAD response for client %d, status=%d\n",
-      handle, status)
+    PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
+            handle, status)
 
-    MessageQ_module->nameServer = rsp.setup.nameServerHandle;
     MessageQ_module->seqNum = 0;
+    MessageQ_module->nameServer = rsp.setup.nameServerHandle;
+    MessageQ_module->numQueues = cfg->maxRuntimeEntries;
+    MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
+            sizeof(MessageQ_Handle));
 
-    /* Create a default local gate. */
-    pthread_mutex_init (&(MessageQ_module->gate), NULL);
-
-    /* Clear sockets array. */
     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
-       MessageQ_module->sock[i]      = Transport_INVALIDSOCKET;
+        for (pri = 0; pri < 2; pri++) {
+            MessageQ_module->transports[i][pri] = NULL;
+        }
     }
 
+    for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        MessageQ_module->transInst[tid] = NULL;
+    }
 
-    return status;
+exit:
+    /* if error, must decrement reference count */
+    if (status < 0) {
+        MessageQ_module->refCount--;
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (status);
 }
 
 /*
- * Function to destroy the MessageQ module.
- * Destroys socket/protocol maps;  sockets themselves should have been
- * destroyed in MessageQ_delete() and MessageQ_detach() calls.
+ *  MessageQ_destroy - destroy the MessageQ module.
  */
-Int MessageQ_destroy (void)
+Int MessageQ_destroy(void)
 {
-    Int status;
+    Int status = MessageQ_S_SUCCESS;
     LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
 
+    /* this entire function must be serialized */
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    /* ensure only last thread does the work */
+    if (--MessageQ_module->refCount > 0) {
+        goto exit;
+    }
+
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
-        PRINTVERBOSE1(
-          "MessageQ_destroy: can't find connection to daemon for pid %d\n",
-           getpid())
-
-        return MessageQ_E_RESOURCE;
+        PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
+                "for pid %d\n", getpid())
+        status =  MessageQ_E_RESOURCE;
+        goto exit;
     }
 
     cmd.cmd = LAD_MESSAGEQ_DESTROY;
     cmd.clientId = handle;
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
-        PRINTVERBOSE1(
-          "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
-        return MessageQ_E_FAIL;
+        PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
+                "status=%d\n", status)
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
-        return(status);
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
     status = rsp.status;
 
-    PRINTVERBOSE2(
-      "MessageQ_destroy: got LAD response for client %d, status=%d\n",
-      handle, status)
+    PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
+            "status=%d\n", handle, status)
 
-    return status;
+exit:
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (status);
 }
 
-/* Function to initialize the parameters for the MessageQ instance. */
-Void MessageQ_Params_init (MessageQ_Params * params)
+/*
+ *  ======== MessageQ_Params_init ========
+ *  Legacy implementation.
+ */
+Void MessageQ_Params_init(MessageQ_Params *params)
 {
-    memcpy (params, &(MessageQ_module->defaultInstParams),
-            sizeof (MessageQ_Params));
+    ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
+}
 
-    return;
+/*
+ *  ======== MessageQ_Params_init__S ========
+ *  New implementation which is version aware.
+ */
+Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
+{
+    MessageQ_Params_Version2 *params2;
+
+    switch (version) {
+
+        case MessageQ_Params_VERSION_2:
+            params2 = (MessageQ_Params_Version2 *)params;
+            params2->__version = MessageQ_Params_VERSION_2;
+            params2->synchronizer = NULL;
+            params2->queueIndex = MessageQ_ANY;
+            break;
+
+        default:
+            assert(FALSE);
+            break;
+    }
 }
 
 /*
- *   Function to create a MessageQ object for receiving.
- *
- *   Create a socket and bind the source address (local ProcId/MessageQ ID) in
- *   order to get messages dispatched to this messageQ.
+ *  ======== MessageQ_create ========
  */
-MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
 {
-    Int                   status    = MessageQ_S_SUCCESS;
-    MessageQ_Object *     obj    = NULL;
-    UInt16                queueIndex = 0u;
-    UInt16                procId;
-    UInt16                rprocId;
+    Int                   status;
+    MessageQ_Object      *obj = NULL;
+    IMessageQTransport_Handle transport;
+    INetworkTransport_Handle netTrans;
+    ITransport_Handle     baseTrans;
+    UInt16                queueIndex;
+    UInt16                clusterId;
+    Int                   tid;
+    Int                   priority;
     LAD_ClientHandle      handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
+    MessageQ_Params ps;
+
+    MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
+
+    /* copy the given params into the current params structure */
+    if (pp != NULL) {
+
+        /* snoop the params pointer to see if it's a legacy structure */
+        if ((pp->__version == 0) || (pp->__version > 100)) {
+            ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
+        }
+
+        /* not legacy structure, use params version field */
+        else if (pp->__version == MessageQ_Params_VERSION_2) {
+            ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
+            ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
+            ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
+        }
+        else {
+            assert(FALSE);
+        }
+    }
 
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
@@ -353,13 +529,18 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
 
     cmd.cmd = LAD_MESSAGEQ_CREATE;
     cmd.clientId = handle;
-    strncpy(cmd.args.messageQCreate.name, name,
-            LAD_MESSAGEQCREATEMAXNAMELEN - 1);
-    cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
-    if (params) {
-        memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
+
+    if (name == NULL) {
+        cmd.args.messageQCreate.name[0] = '\0';
+    }
+    else {
+        strncpy(cmd.args.messageQCreate.name, name,
+                LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+        cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
     }
 
+    memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
+
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
         PRINTVERBOSE1(
           "MessageQ_create: sending LAD command failed, status=%d\n", status)
@@ -386,68 +567,91 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
     /* Create the generic obj */
     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
 
-    if (params != NULL) {
-       /* Populate the params member */
-        memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
-    }
+   /* Populate the params member */
+    memcpy(&obj->params, &ps, sizeof(ps));
+
 
-    procId = MultiProc_self();
-    queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
     obj->queue = rsp.messageQCreate.queueId;
     obj->serverHandle = rsp.messageQCreate.serverHandle;
+    pthread_mutex_init(&obj->msgListGate, NULL);
+    CIRCLEQ_INIT(&obj->msgList);
+    if (sem_init(&obj->synchronizer, 0, 0) < 0) {
+        PRINTVERBOSE1(
+          "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
 
-    /*
-     * Create a set of communication endpoints (one per each remote proc),
-     * and return the socket as target for MessageQ_put() calls, and as
-     * a file descriptor to close during MessageQ_delete().
-     */
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        obj->fd[rprocId] = Transport_INVALIDSOCKET;
-        if (procId == rprocId) {
-            /* Skip creating an endpoint for ourself. */
-            continue;
-        }
+        MessageQ_delete((MessageQ_Handle *)&obj);
+
+        return NULL;
+    }
 
-        PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
+    /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
+    queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
 
-        status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
-                                           queueIndex);
-        if (status < 0) {
-           goto cleanup;
+    PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
+            "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
+       for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport) {
+                /* need to check return and do something if error */
+                IMessageQTransport_bind((Void *)transport, obj->queue);
+            }
         }
     }
 
-    /*
-     * Now, to support MessageQ_unblock() functionality, create an event object.
-     * Writing to this event will unblock the select() call in MessageQ_get().
-     */
-    obj->unblockFd = eventfd(0, 0);
-    if (obj->unblockFd == -1)  {
-        printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
-                   errno, strerror(errno));
-        status = MessageQ_E_FAIL;
+    for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_bind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    fprintf(stderr,
+                            "MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
+        }
     }
 
-cleanup:
-    /* Cleanup if fail: */
-    if (status < 0) {
-        MessageQ_delete((MessageQ_Handle *)&obj);
+    /* LAD's MessageQ module can grow, we need to grow as well */
+    if (queueIndex >= MessageQ_module->numQueues) {
+        _MessageQ_grow(queueIndex);
     }
 
-    return ((MessageQ_Handle) obj);
+    /*  No need to "allocate" slot since the queueIndex returned by
+     *  LAD is guaranteed to be unique.
+     */
+    MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (MessageQ_Handle)obj;
 }
 
 /*
- * Function to delete a MessageQ object for a specific slave processor.
- *
- * Deletes the socket associated with this MessageQ object.
+ *  ======== MessageQ_delete ========
  */
-Int MessageQ_delete (MessageQ_Handle * handlePtr)
+Int MessageQ_delete(MessageQ_Handle *handlePtr)
 {
-    Int               status    = MessageQ_S_SUCCESS;
-    MessageQ_Object * obj       = NULL;
-    UInt16            rprocId;
-    LAD_ClientHandle      handle;
+    MessageQ_Object *obj;
+    IMessageQTransport_Handle transport;
+    INetworkTransport_Handle  netTrans;
+    ITransport_Handle         baseTrans;
+    Int              status = MessageQ_S_SUCCESS;
+    UInt16           queueIndex;
+    UInt16                clusterId;
+    Int                   tid;
+    Int                   priority;
+    LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
 
@@ -460,7 +664,7 @@ Int MessageQ_delete (MessageQ_Handle * handlePtr)
         return MessageQ_E_FAIL;
     }
 
-    obj = (MessageQ_Object *) (*handlePtr);
+    obj = (MessageQ_Object *)(*handlePtr);
 
     cmd.cmd = LAD_MESSAGEQ_DELETE;
     cmd.clientId = handle;
@@ -482,51 +686,74 @@ Int MessageQ_delete (MessageQ_Handle * handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    /* Close the event used for MessageQ_unblock(): */
-    close(obj->unblockFd);
+    for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
+       for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport) {
+                IMessageQTransport_unbind((Void *)transport, obj->queue);
+            }
+        }
+    }
 
-    /* Close the communication endpoint: */
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
-            status = transportCloseEndpoint(obj->fd[rprocId]);
+    for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_unbind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    fprintf(stderr,
+                            "MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
         }
     }
 
-    /* Now free the obj */
-    free (obj);
+    /* extract the queue index from the queueId */
+    queueIndex = MessageQ_getQueueIndex(obj->queue);
+    MessageQ_module->queues[queueIndex] = NULL;
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    free(obj);
     *handlePtr = NULL;
 
-    return (status);
+    return status;
 }
 
 /*
- *  Opens an instance of MessageQ for sending.
- *
- *  We need not create a socket here; the sockets for all remote processors
- *  were created during MessageQ_attach(), and will be
- *  retrieved during MessageQ_put().
+ *  ======== MessageQ_open ========
+ *  Acquire a queueId for use in sending messages to the queue
  */
-Int MessageQ_open (String name, MessageQ_QueueId * queueId)
+Int MessageQ_open(String name, MessageQ_QueueId *queueId)
 {
     Int status = MessageQ_S_SUCCESS;
 
-    status = NameServer_getUInt32 (MessageQ_module->nameServer,
-                                     name, queueId, NULL);
+    status = NameServer_getUInt32(MessageQ_module->nameServer,
+                                  name, queueId, NULL);
 
     if (status == NameServer_E_NOTFOUND) {
-        /* Set return queue ID to invalid. */
+        /* Set return queue ID to invalid */
         *queueId = MessageQ_INVALIDMESSAGEQ;
         status = MessageQ_E_NOTFOUND;
     }
     else if (status >= 0) {
-        /* Override with a MessageQ status code. */
+        /* Override with a MessageQ status code */
         status = MessageQ_S_SUCCESS;
     }
     else {
-        /* Set return queue ID to invalid. */
+        /* Set return queue ID to invalid */
         *queueId = MessageQ_INVALIDMESSAGEQ;
-        /* Override with a MessageQ status code. */
+
+        /* Override with a MessageQ status code */
         if (status == NameServer_E_TIMEOUT) {
             status = MessageQ_E_TIMEOUT;
         }
@@ -535,129 +762,241 @@ Int MessageQ_open (String name, MessageQ_QueueId * queueId)
         }
     }
 
-    return (status);
+    return status;
+}
+
+/*
+ *  ======== MessageQ_openQueueId ========
+ */
+MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
+{
+    MessageQ_QueueIndex queuePort;
+    MessageQ_QueueId queueId;
+
+    /* queue port is embedded in the queueId */
+    queuePort = queueIndex + MessageQ_PORTOFFSET;
+    queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
+
+    return (queueId);
 }
 
-/* Closes previously opened instance of MessageQ module. */
-Int MessageQ_close (MessageQ_QueueId * queueId)
+/*
+ *  ======== MessageQ_close ========
+ *  Closes previously opened instance of MessageQ
+ */
+Int MessageQ_close(MessageQ_QueueId *queueId)
 {
     Int32 status = MessageQ_S_SUCCESS;
 
     /* Nothing more to be done for closing the MessageQ. */
     *queueId = MessageQ_INVALIDMESSAGEQ;
 
-    return (status);
+    return status;
 }
 
 /*
- * Place a message onto a message queue.
- *
- * Calls TransportShm_put(), which handles the sending of the message using the
- * appropriate kernel interface (socket, device ioctl) call for the remote
- * procId encoded in the queueId argument.
+ *  ======== MessageQ_put ========
+ *  Deliver the given message, either locally or to the transport
  *
+ *  If the destination is a local queue, deliver the message. Otherwise,
+ *  pass the message to a transport for delivery. The transport handles
+ *  the sending of the message using the appropriate interface (socket,
+ *  device ioctl, etc.).
  */
-Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
+Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 {
-    Int      status;
-    UInt16   dstProcId  = (UInt16)(queueId >> 16);
-    UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+    Int status = MessageQ_S_SUCCESS;
+    MessageQ_Object *obj;
+    UInt16 dstProcId;
+    UInt16 queueIndex;
+    UInt16 queuePort;
+    ITransport_Handle baseTrans;
+    IMessageQTransport_Handle msgTrans;
+    INetworkTransport_Handle netTrans;
+    Int priority;
+    UInt tid;
+    UInt16 clusterId;
+    Bool delivered;
+
+    /* extract destination address from the given queueId */
+    dstProcId  = (UInt16)(queueId >> 16);
+    queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+
+    /* write the destination address into the message header */
+    msg->dstId = queuePort;
+    msg->dstProc= dstProcId;
+
+    /* invoke the hook function after addressing the message */
+    if (MessageQ_module->putHookFxn != NULL) {
+        MessageQ_module->putHookFxn(queueId, msg);
+    }
 
-    msg->dstId     = queueIndex;
-    msg->dstProc   = dstProcId;
+    /*  For an outbound message: If message destination is on this
+     *  processor, then check if the destination queue is in this
+     *  process (thread-to-thread messaging).
+     *
+     *  For an inbound message: Check if destination queue is in this
+     *  process (process-to-process messaging).
+     */
+    if (dstProcId == MultiProc_self()) {
+        queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+        if (queueIndex < MessageQ_module->numQueues) {
+            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+            if (obj != NULL) {
+                /* deliver message to queue */
+                pthread_mutex_lock(&obj->msgListGate);
+                CIRCLEQ_INSERT_TAIL(&obj->msgList,
+                        (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+                pthread_mutex_unlock(&obj->msgListGate);
+                sem_post(&obj->synchronizer);
+                goto done;
+            }
+        }
+    }
+
+    /*  Getting here implies the message is outbound. Must give it to
+     *  either the primary or secondary transport for delivery. Start
+     *  by extracting the transport ID from the message header.
+     */
+    tid = MessageQ_getTransportId(msg);
 
-    status = transportPut(msg, queueIndex, dstProcId);
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        fprintf(stderr,
+                "MessageQ_put: Error: transport id %d too big, must be < %d\n",
+                tid, MessageQ_MAXTRANSPORTS);
+        status = MessageQ_E_FAIL;
+        goto done;
+    }
 
+    /* if transportId is set, use secondary transport for message delivery */
+    if (tid != 0) {
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans == NULL) {
+            fprintf(stderr, "MessageQ_put: Error: transport is null\n");
+            status = MessageQ_E_FAIL;
+            goto done;
+        }
+
+        /* downcast instance pointer to transport interface */
+        switch (ITransport_itype(baseTrans)) {
+            case INetworkTransport_TypeId:
+                netTrans = INetworkTransport_downCast(baseTrans);
+                delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+                status = (delivered ? MessageQ_S_SUCCESS :
+                          (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+                           MessageQ_E_FAIL));
+                break;
+
+            default:
+                /* error */
+                fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
+                        "unsupported transport type\n", tid);
+                status = MessageQ_E_FAIL;
+                break;
+        }
+    }
+    else {
+        /* use primary transport for delivery */
+        priority = MessageQ_getMsgPri(msg);
+        clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+        /* primary transport can only be used for intra-cluster delivery */
+        if (clusterId > MultiProc_getNumProcsInCluster()) {
+            fprintf(stderr,
+                    "MessageQ_put: Error: destination procId=%d is not "
+                    "in cluster. Must specify a transportId.\n", dstProcId);
+            status =  MessageQ_E_FAIL;
+            goto done;
+        }
+
+        msgTrans = MessageQ_module->transports[clusterId][priority];
+        delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        status = (delivered ? MessageQ_S_SUCCESS :
+                  (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
+    }
+
+done:
     return (status);
 }
 
 /*
- * Gets a message for a message queue and blocks if the queue is empty.
- * If a message is present, it returns it.  Otherwise it blocks
- * waiting for a message to arrive.
- * When a message is returned, it is owned by the caller.
- *
- * We block using select() on the receiving socket's file descriptor, then
- * get the waiting message via the socket API recvfrom().
- * We use the socket stored in the messageQ object via a previous call to
- * MessageQ_create().
+ *  MessageQ_get - gets a message for a message queue and blocks if
+ *  the queue is empty.
  *
+ *  If a message is present, it returns it.  Otherwise it blocks
+ *  waiting for a message to arrive.
+ *  When a message is returned, it is owned by the caller.
  */
-Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
+Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
 {
+    MessageQ_Object * obj = (MessageQ_Object *)handle;
     Int     status = MessageQ_S_SUCCESS;
-    Int     tmpStatus;
-    MessageQ_Object * obj = (MessageQ_Object *) handle;
-    int     retval;
-    int     nfds;
-    fd_set  rfds;
-    struct  timeval tv;
-    void    *timevalPtr;
-    UInt16  rprocId;
-    int     maxfd = 0;
-
-    /* Wait (with timeout) and retreive message from socket: */
-    FD_ZERO(&rfds);
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (rprocId == MultiProc_self()) {
-            continue;
-        }
-        maxfd = MAX(maxfd, obj->fd[rprocId]);
-        FD_SET(obj->fd[rprocId], &rfds);
-    }
+    struct timespec ts;
+    struct timeval tv;
 
-    /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
-    FD_SET(obj->unblockFd, &rfds);
+#if 0
+/*
+ * Optimization here to get a message without going in to the sem
+ * operation, but the sem count will not be maintained properly.
+ */
+    pthread_mutex_lock(&obj->msgListGate);
 
-    if (timeout == MessageQ_FOREVER) {
-        timevalPtr = NULL;
+    if (obj->msgList.cqh_first != &obj->msgList) {
+        *msg = (MessageQ_Msg)obj->msglist.cqh_first;
+        CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
+
+        pthread_mutex_unlock(&obj->msgListGate);
     }
     else {
-        /* Timeout given in msec: convert:  */
-        tv.tv_sec = timeout / 1000;
-        tv.tv_usec = (timeout % 1000) * 1000;
-        timevalPtr = &tv;
+        pthread_mutex_unlock(&obj->msgListGate);
+    }
+#endif
+
+    if (timeout == MessageQ_FOREVER) {
+        sem_wait(&obj->synchronizer);
     }
-    /* Add one to last fd created: */
-    nfds = MAX(maxfd, obj->unblockFd) + 1;
-
-    retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
-    if (retval)  {
-        if (FD_ISSET(obj->unblockFd, &rfds))  {
-            /*
-             * Our event was signalled by MessageQ_unblock().
-             *
-             * This is typically done during a shutdown sequence, where
-             * the intention of the client would be to ignore (i.e. not fetch)
-             * any pending messages in the transport's queue.
-             * Thus, we shall not check for nor return any messages.
-             */
-            *msg = NULL;
-            status = MessageQ_E_UNBLOCKED;
+    else {
+        /* add timeout (microseconds) to current time of day */
+        gettimeofday(&tv, NULL);
+        tv.tv_sec += timeout / 1000000;
+        tv.tv_usec += timeout % 1000000;
+
+        if (tv.tv_usec >= 1000000) {
+              tv.tv_sec++;
+              tv.tv_usec -= 1000000;
         }
-        else {
-            for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
-                 rprocId++) {
-                if (rprocId == MultiProc_self()) {
-                    continue;
-                }
-                if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                    /* Our transport's fd was signalled: Get the message: */
-                    tmpStatus = transportGet(obj->fd[rprocId], msg);
-                    if (tmpStatus < 0) {
-                        printf ("MessageQ_get: tranposrtshm_get failed.");
-                        status = MessageQ_E_FAIL;
-                    }
-                }
+
+        /* set absolute timeout value */
+        ts.tv_sec = tv.tv_sec;
+        ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
+
+        if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
+            if (errno == ETIMEDOUT) {
+                PRINTVERBOSE0("MessageQ_get: operation timed out\n")
+                return (MessageQ_E_TIMEOUT);
+            }
+            else {
+                PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+                return (MessageQ_E_FAIL);
             }
         }
     }
-    else if (retval == 0) {
-        *msg = NULL;
-        status = MessageQ_E_TIMEOUT;
+
+    if (obj->unblocked) {
+        return obj->unblocked;
     }
 
-    return (status);
+    pthread_mutex_lock(&obj->msgListGate);
+
+    *msg = (MessageQ_Msg)obj->msgList.cqh_first;
+    CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
+
+    pthread_mutex_unlock(&obj->msgListGate);
+
+    return status;
 }
 
 /*
@@ -665,7 +1004,7 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
  *
  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
  */
-Int MessageQ_count (MessageQ_Handle handle)
+Int MessageQ_count(MessageQ_Handle handle)
 {
     Int               count = -1;
 #if 0
@@ -685,108 +1024,137 @@ Int MessageQ_count (MessageQ_Handle handle)
                  &count, &optlen);
 #endif
 
-    return (count);
+    return count;
 }
 
-/* Initializes a message not obtained from MessageQ_alloc. */
-Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
+/*
+ *  Initializes a message not obtained from MessageQ_alloc.
+ */
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
 {
     /* Fill in the fields of the message */
-    MessageQ_msgInit (msg);
-    msg->heapId  = MessageQ_STATICMSG;
+    MessageQ_msgInit(msg);
+    msg->heapId = MessageQ_STATICMSG;
     msg->msgSize = size;
 }
 
 /*
- * Allocate a message and initialize the needed fields (note some
- * of the fields in the header are set via other APIs or in the
- * MessageQ_put function,
+ *  Allocate a message and initialize the needed fields (note some
+ *  of the fields in the header are set via other APIs or in the
+ *  MessageQ_put function,
  */
-MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
 {
-    MessageQ_Msg msg       = NULL;
+    MessageQ_Msg msg;
 
     /*
      * heapId not used for local alloc (as this is over a copy transport), but
-     * we need to send to other side as heapId is used in BIOS transport:
+     * we need to send to other side as heapId is used in BIOS transport.
      */
-    msg = (MessageQ_Msg)calloc (1, size);
-    MessageQ_msgInit (msg);
+    msg = (MessageQ_Msg)calloc(1, size);
+    MessageQ_msgInit(msg);
     msg->msgSize = size;
-    msg->heapId  = heapId;
+    msg->heapId = heapId;
 
     return msg;
 }
 
-/* Frees the message back to the heap that was used to allocate it. */
-Int MessageQ_free (MessageQ_Msg msg)
+/*
+ *  Frees the message back to the heap that was used to allocate it.
+ */
+Int MessageQ_free(MessageQ_Msg msg)
 {
-    UInt32         status = MessageQ_S_SUCCESS;
+    UInt32 status = MessageQ_S_SUCCESS;
 
     /* Check to ensure this was not allocated by user: */
-    if (msg->heapId == MessageQ_STATICMSG)  {
-        status =  MessageQ_E_CANNOTFREESTATICMSG;
+    if (msg->heapId == MessageQ_STATICMSG) {
+        status = MessageQ_E_CANNOTFREESTATICMSG;
     }
     else {
-        free (msg);
+        free(msg);
     }
 
     return status;
 }
 
 /* Register a heap with MessageQ. */
-Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
+Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
 {
-    Int  status = MessageQ_S_SUCCESS;
+    Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport: */
+    /* Do nothing, as this uses a copy transport */
 
     return status;
 }
 
 /* Unregister a heap with MessageQ. */
-Int MessageQ_unregisterHeap (UInt16 heapId)
+Int MessageQ_unregisterHeap(UInt16 heapId)
 {
-    Int  status = MessageQ_S_SUCCESS;
+    Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport: */
+    /* Do nothing, as this uses a copy transport */
 
     return status;
 }
 
 /* Unblocks a MessageQ */
-Void MessageQ_unblock (MessageQ_Handle handle)
+Void MessageQ_unblock(MessageQ_Handle handle)
 {
-    MessageQ_Object * obj   = (MessageQ_Object *) handle;
-    uint64_t     buf = 1;
-    int          numBytes;
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    /* Write 8 bytes to awaken any threads blocked on this messageQ: */
-    numBytes = write(obj->unblockFd, &buf, sizeof(buf));
+    obj->unblocked = MessageQ_E_UNBLOCKED;
+    sem_post(&obj->synchronizer);
 }
 
-/* Embeds a source message queue into a message. */
-Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
 {
-    MessageQ_Object * obj   = (MessageQ_Object *) handle;
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+    obj->unblocked = MessageQ_E_SHUTDOWN;
+    sem_post(&obj->synchronizer);
+}
 
-    msg->replyId   = (UInt16)(obj->queue);
+/* Embeds a source message queue into a message */
+Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
+{
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+    msg->replyId = (UInt16)(obj->queue);
     msg->replyProc = (UInt16)(obj->queue >> 16);
 }
 
 /* Returns the QueueId associated with the handle. */
-MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
+MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
 {
-    MessageQ_Object * obj = (MessageQ_Object *) handle;
-    UInt32            queueId;
+    MessageQ_Object *obj = (MessageQ_Object *) handle;
+    UInt32 queueId;
 
     queueId = (obj->queue);
 
     return queueId;
 }
 
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+    MessageQ_Object *obj;
+    MessageQ_QueueIndex queueIndex;
+    UInt16 procId;
+
+    procId = MessageQ_getProcId(queueId);
+    if (procId != MultiProc_self()) {
+        return NULL;
+    }
+
+    queueIndex = MessageQ_getQueueIndex(queueId);
+    obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+    return (MessageQ_Handle)obj;
+}
+
 /* Sets the tracing of a message */
-Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
+Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
 {
     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
 }
@@ -797,99 +1165,19 @@ Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
  *  The MessageQ module itself does not use any shared memory but the
  *  underlying transport may use some shared memory.
  */
-SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
+SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
 {
     SizeT memReq = 0u;
 
     /* Do nothing, as this is a copy transport. */
 
-    return (memReq);
-}
-
-/*
- *  Create a socket for this remote proc, and attempt to connect.
- *
- *  Only creates a socket if one does not already exist for this procId.
- *
- *  Note: remoteProcId may be MultiProc_Self() for loopback case.
- */
-Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
-{
-    Int     status = MessageQ_S_SUCCESS;
-    int     sock;
-
-    PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
-
-    if (remoteProcId >= MultiProc_MAXPROCESSORS) {
-        status = MessageQ_E_INVALIDPROCID;
-        goto exit;
-    }
-
-    pthread_mutex_lock (&(MessageQ_module->gate));
-
-    /* Only create a socket if one doesn't exist: */
-    if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET)  {
-        /* Create the socket for sending messages to the remote proc: */
-        sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-        if (sock < 0) {
-            status = MessageQ_E_FAIL;
-            printf ("MessageQ_attach: socket failed: %d, %s\n",
-                       errno, strerror(errno));
-        }
-        else  {
-            PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
-            MessageQ_module->sock[remoteProcId] = sock;
-            /* Attempt to connect: */
-            ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
-        }
-    }
-    else {
-        status = MessageQ_E_ALREADYEXISTS;
-    }
-
-    pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
-    return (status);
-}
-
-/*
- *  Close the socket for this remote proc.
- *
- */
-Int MessageQ_detach (UInt16 remoteProcId)
-{
-    Int status = MessageQ_S_SUCCESS;
-    int sock;
-
-    if (remoteProcId >= MultiProc_MAXPROCESSORS) {
-        status = MessageQ_E_INVALIDPROCID;
-        goto exit;
-    }
-
-    pthread_mutex_lock (&(MessageQ_module->gate));
-
-    sock = MessageQ_module->sock[remoteProcId];
-    if (close (sock)) {
-        status = MessageQ_E_OSFAILURE;
-        printf ("MessageQ_detach: close failed: %d, %s\n",
-                       errno, strerror(errno));
-    }
-    else {
-        PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
-        MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
-    }
-
-    pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
-    return (status);
+    return memReq;
 }
 
 /*
  * This is a helper function to initialize a message.
  */
-Void MessageQ_msgInit (MessageQ_Msg msg)
+Void MessageQ_msgInit(MessageQ_Msg msg)
 {
 #if 0
     Int                 status    = MessageQ_S_SUCCESS;
@@ -900,7 +1188,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
         PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
+          "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
            getpid())
 
         return;
@@ -925,7 +1213,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
+    memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
 #else
     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
@@ -934,164 +1222,126 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
     msg->srcProc   = MultiProc_self();
 
-    pthread_mutex_lock(&(MessageQ_module->gate));
+    pthread_mutex_lock(&MessageQ_module->seqNumGate);
     msg->seqNum  = MessageQ_module->seqNum++;
-    pthread_mutex_unlock(&(MessageQ_module->gate));
+    pthread_mutex_unlock(&MessageQ_module->seqNumGate);
 #endif
 }
 
 /*
- * =============================================================================
- * Transport: Fxns kept here until need for a transport layer is realized.
- * =============================================================================
- */
-/*
- * ======== transportCreateEndpoint ========
+ *  ======== _MessageQ_grow ========
+ *  Increase module's queues array to accommodate queueIndex from LAD
  *
- * Create a communication endpoint to receive messages.
+ *  Note: this function takes the queue index value (i.e. without the
+ *  port offset).
  */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
+Void _MessageQ_grow(UInt16 queueIndex)
 {
-    Int          status    = MessageQ_S_SUCCESS;
-    int         err;
+    MessageQ_Handle *queues;
+    MessageQ_Handle *oldQueues;
+    UInt oldSize;
 
-    /*  Create the socket to receive messages for this messageQ. */
-    *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-    if (*fd < 0) {
-        status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
-                  errno, strerror(errno));
-        goto exit;
-    }
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
+    oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
-    err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
-    if (err < 0) {
-        status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: bind failed: %d, %s\n",
-                  errno, strerror(errno));
-    }
+    queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
+    memcpy(queues, MessageQ_module->queues, oldSize);
 
-exit:
-    return (status);
-}
-
-/*
- * ======== transportCloseEndpoint ========
- *
- *  Close the communication endpoint.
- */
-static Int transportCloseEndpoint(int fd)
-{
-    Int status = MessageQ_S_SUCCESS;
+    oldQueues = MessageQ_module->queues;
+    MessageQ_module->queues = queues;
+    MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
-    PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
+    pthread_mutex_unlock(&MessageQ_module->gate);
 
-    /* Stop communication to this socket:  */
-    close(fd);
+    free(oldQueues);
 
-    return (status);
+    return;
 }
 
 /*
- * ======== transportGet ========
- *  Retrieve a message waiting in the socket's queue.
-*/
-static Int transportGet(int sock, MessageQ_Msg * retMsg)
+ *  ======== MessageQ_bind ========
+ *  Bind all existing message queues to the given processor
+ *
+ *  Note: This function is a hack to work around the driver.
+ *
+ *  The Linux rpmsgproto driver requires a socket for each
+ *  message queue and remote processor tuple.
+ *
+ *      socket --> (queue, processor)
+ *
+ *  Therefore, each time a new remote processor is started, all
+ *  existing message queues need to create a socket for the new
+ *  processor.
+ *
+ *  The driver should not have this requirement. One socket per
+ *  message queue should be sufficient to uniquely identify the
+ *  endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
 {
-    Int           status    = MessageQ_S_SUCCESS;
-    MessageQ_Msg  msg;
-    struct sockaddr_rpmsg fromAddr;  // [Socket address of sender]
-    unsigned int  len;
-    int           byteCount;
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
 
-    /*
-     * We have no way of peeking to see what message size we'll get, so we
-     * allocate a message of max size to receive contents from the rpmsg socket
-     * (currently, a copy transport)
-     */
-    msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
-    if (!msg)  {
-        status = MessageQ_E_MEMORY;
-        goto exit;
-    }
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    memset(&fromAddr, 0, sizeof(fromAddr));
-    len = sizeof(fromAddr);
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
 
-    byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
-                      (struct sockaddr *)&fromAddr, &len);
-    if (len != sizeof(fromAddr)) {
-        printf("recvfrom: got bad addr len (%d)\n", len);
-        status = MessageQ_E_FAIL;
-        goto exit;
-    }
-    if (byteCount < 0) {
-        printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
-        status = MessageQ_E_FAIL;
-        goto exit;
-    }
-    else {
-         /* Update the allocated message size (even though this may waste space
-          * when the actual message is smaller than the maximum rpmsg size,
-          * the message will be freed soon anyway, and it avoids an extra copy).
-          */
-         msg->msgSize = byteCount;
-
-         /*
-          * If the message received was statically allocated, reset the
-          * heapId, so the app can free it.
-          */
-         if (msg->heapId == MessageQ_STATICMSG)  {
-             msg->heapId = 0;  /* for a copy transport, heap id is 0. */
-         }
-    }
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
 
-    PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
-    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
-    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+        queue = ((MessageQ_Object *)handle)->queue;
 
-    *retMsg = msg;
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_bind((Void *)transport, queue);
+            }
+        }
+    }
 
-exit:
-    return (status);
+    pthread_mutex_unlock(&MessageQ_module->gate);
 }
 
 /*
- * ======== transportPut ========
+ *  ======== MessageQ_unbind ========
+ *  Unbind all existing message queues from the given processor
  *
- * Calls the socket API sendto() on the socket associated with
- * with this destination procID.
- * Currently, both local and remote messages are sent via the Socket ABI, so
- * no local object lists are maintained here.
-*/
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
+ *  Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
 {
-    Int     status    = MessageQ_S_SUCCESS;
-    int     sock;
-    int     err;
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
 
-    /*
-     * Retrieve the socket for the AF_SYSLINK protocol associated with this
-     * transport.
-     */
-    sock = MessageQ_module->sock[dstProcId];
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
 
-    err = send(sock, msg, msg->msgSize, 0);
-    if (err < 0) {
-        printf ("transportPut: send failed: %d, %s\n",
-                  errno, strerror(errno));
-        status = MessageQ_E_FAIL;
-    }
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
 
-    /*
-     * Free the message, as this is a copy transport, we maintain MessageQ
-     * semantics.
-     */
-    MessageQ_free (msg);
+        queue = ((MessageQ_Object *)handle)->queue;
+        clusterId = procId - MultiProc_getBaseIdOfCluster();
 
-    return (status);
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_unbind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
 }