]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/api/MessageQ.c
Linux/Android: Refactor MessageQ_create to Register Later with NameServer
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 2fc61e32a10808d1d848d85e94eb52eb008f6928..db96b241156963cc831833d95edd0de733991b04 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 #include <ti/ipc/NameServer.h>
 #include <ti/ipc/MultiProc.h>
 #include <_MultiProc.h>
+#define MessageQ_internal 1     /* must be defined before include file */
 #include <ti/ipc/MessageQ.h>
 #include <_MessageQ.h>
-#include <ITransport.h>
-#include <IMessageQTransport.h>
-#include <INetworkTransport.h>
+#include <ti/ipc/interfaces/IHeap.h>
+#include <ti/ipc/interfaces/ITransport.h>
+#include <ti/ipc/interfaces/IMessageQTransport.h>
+#include <ti/ipc/interfaces/INetworkTransport.h>
 
 /* Socket Headers */
 #include <sys/select.h>
@@ -69,9 +71,6 @@
 #include <pthread.h>
 #include <semaphore.h>
 
-/* Socket Protocol Family */
-#include <net/rpmsg.h>
-
 #include <ladclient.h>
 #include <_lad.h>
 
  * =============================================================================
  */
 
+/* params structure evolution */
+typedef struct {
+    Void *synchronizer;
+} MessageQ_Params_Legacy;
+
+typedef struct {
+    Int __version;
+    Void *synchronizer;
+    MessageQ_QueueIndex queueIndex;
+} MessageQ_Params_Version2;
+
 /* structure for MessageQ module state */
 typedef struct MessageQ_ModuleObject {
     MessageQ_Handle           *queues;
@@ -108,11 +118,13 @@ typedef struct MessageQ_ModuleObject {
     Int                       refCount;
     NameServer_Handle         nameServer;
     pthread_mutex_t           gate;
-    MessageQ_Params           defaultInstParams;
     int                       seqNum;
+    pthread_mutex_t           seqNumGate;
     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
-    INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
+    ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
     MessageQ_PutHookFxn       putHookFxn;
+    Ptr                      *heaps;
+    Int                       numHeaps;
 } MessageQ_ModuleObject;
 
 typedef struct MessageQ_CIRCLEQ_ENTRY {
@@ -124,6 +136,7 @@ typedef struct MessageQ_CIRCLEQ_ENTRY {
  */
 typedef struct MessageQ_Object_tag {
     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+    pthread_mutex_t              msgListGate;
     MessageQ_Params              params;
     MessageQ_QueueId             queue;
     int                          unblocked;
@@ -143,7 +156,15 @@ static MessageQ_ModuleObject MessageQ_state =
 {
     .refCount   = 0,
     .nameServer = NULL,
-    .putHookFxn = NULL
+#if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+    .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
+    .putHookFxn = NULL,
+    .heaps      = NULL,
+    .numHeaps   = 0
 };
 
 /*!
@@ -164,22 +185,28 @@ Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
                                 UInt16 rprocId, UInt priority)
 {
     Int status = FALSE;
+    UInt16 clusterId;
 
     if (handle == NULL) {
-        printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
-              );
+        fprintf(stderr,
+                "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+               );
 
         return status;
     }
 
-    if (rprocId >= MultiProc_MAXPROCESSORS) {
-        printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
+    /* map procId to clusterId */
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_MAXPROCESSORS) {
+        fprintf(stderr,
+                "MessageQ_registerTransport: invalid procId %d\n", rprocId);
 
         return status;
     }
 
-    if (MessageQ_module->transports[rprocId][priority] == NULL) {
-        MessageQ_module->transports[rprocId][priority] = handle;
+    if (MessageQ_module->transports[clusterId][priority] == NULL) {
+        MessageQ_module->transports[clusterId][priority] = handle;
 
         status = TRUE;
     }
@@ -190,43 +217,55 @@ Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
 {
     if (inst == NULL) {
-        printf("MessageQ_registerTransportId: invalid NULL handle\n");
+        fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
 
         return MessageQ_E_INVALIDARG;
     }
 
     if (tid >= MessageQ_MAXTRANSPORTS) {
-        printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+        fprintf(stderr,
+                "MessageQ_unregisterNetTransport: invalid transport id %d, "
+                "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
 
         return MessageQ_E_INVALIDARG;
     }
 
     if (MessageQ_module->transInst[tid] != NULL) {
-        printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
+        fprintf(stderr,
+                "MessageQ_registerTransportId: transport id %d already "
+                "registered\n", tid);
 
         return MessageQ_E_ALREADYEXISTS;
     }
 
-    MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+    MessageQ_module->transInst[tid] = inst;
 
     return MessageQ_S_SUCCESS;
 }
 
 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
 {
-    if (rprocId >= MultiProc_MAXPROCESSORS) {
-        printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
+    UInt16 clusterId;
+
+    /* map procId to clusterId */
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+    if (clusterId >= MultiProc_MAXPROCESSORS) {
+        fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
+                rprocId);
 
         return;
     }
 
-    MessageQ_module->transports[rprocId][priority] = NULL;
+    MessageQ_module->transports[clusterId][priority] = NULL;
 }
 
 Void MessageQ_unregisterTransportId(UInt tid)
 {
     if (tid >= MessageQ_MAXTRANSPORTS) {
-        printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+        fprintf(stderr,
+                "MessageQ_unregisterTransportId: invalid transport id %d, "
+                "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
 
         return;
     }
@@ -285,76 +324,81 @@ Void MessageQ_getConfig(MessageQ_Config *cfg)
  */
 Int MessageQ_setup(const MessageQ_Config *cfg)
 {
-    Int status;
+    Int status = MessageQ_S_SUCCESS;
     LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
     Int pri;
-    Int rprocId;
+    Int i;
     Int tid;
 
+    /* this entire function must be serialized */
     pthread_mutex_lock(&MessageQ_module->gate);
 
-    MessageQ_module->refCount++;
-    if (MessageQ_module->refCount > 1) {
-
-        pthread_mutex_unlock(&MessageQ_module->gate);
-
+    /* ensure only first thread performs startup procedure */
+    if (++MessageQ_module->refCount > 1) {
         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
-                      MessageQ_module->refCount)
-
-        return MessageQ_S_ALREADYSETUP;
+                MessageQ_module->refCount)
+        status = MessageQ_S_ALREADYSETUP;
+        goto exit;
     }
 
-    pthread_mutex_unlock(&MessageQ_module->gate);
-
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
-        PRINTVERBOSE1(
-          "MessageQ_setup: can't find connection to daemon for pid %d\n",
-           getpid())
-
-        return MessageQ_E_RESOURCE;
+        PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
+                "pid %d\n", getpid())
+        status = MessageQ_E_RESOURCE;
+        goto exit;
     }
 
     cmd.cmd = LAD_MESSAGEQ_SETUP;
     cmd.clientId = handle;
-    memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
+    memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
-        PRINTVERBOSE1(
-          "MessageQ_setup: sending LAD command failed, status=%d\n", status)
-        return MessageQ_E_FAIL;
+        PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
+                "status=%d\n", status)
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
-        return status;
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
     status = rsp.setup.status;
 
-    PRINTVERBOSE2(
-      "MessageQ_setup: got LAD response for client %d, status=%d\n",
-      handle, status)
+    PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
+            handle, status)
 
     MessageQ_module->seqNum = 0;
     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
-                                     sizeof (MessageQ_Handle));
+            sizeof(MessageQ_Handle));
+    MessageQ_module->numHeaps = cfg->numHeaps;
+    MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
 
-    pthread_mutex_init(&MessageQ_module->gate, NULL);
-
-    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+    for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
         for (pri = 0; pri < 2; pri++) {
-            MessageQ_module->transports[rprocId][pri] = NULL;
+            MessageQ_module->transports[i][pri] = NULL;
         }
     }
+
     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
         MessageQ_module->transInst[tid] = NULL;
     }
 
-    return status;
+exit:
+    /* if error, must decrement reference count */
+    if (status < 0) {
+        MessageQ_module->refCount--;
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (status);
 }
 
 /*
@@ -362,68 +406,134 @@ Int MessageQ_setup(const MessageQ_Config *cfg)
  */
 Int MessageQ_destroy(void)
 {
-    Int status;
+    Int status = MessageQ_S_SUCCESS;
     LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
+    int i;
+
+    /* this entire function must be serialized */
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    /* ensure only last thread does the work */
+    if (--MessageQ_module->refCount > 0) {
+        goto exit;
+    }
+
+    /* ensure all registered heaps have been unregistered */
+    for (i = 0; i < MessageQ_module->numHeaps; i++) {
+        if (MessageQ_module->heaps[i] != NULL) {
+            PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
+        }
+    }
+    free(MessageQ_module->heaps);
+    MessageQ_module->heaps = NULL;
 
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
-        PRINTVERBOSE1(
-          "MessageQ_destroy: can't find connection to daemon for pid %d\n",
-           getpid())
-
-        return MessageQ_E_RESOURCE;
+        PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
+                "for pid %d\n", getpid())
+        status =  MessageQ_E_RESOURCE;
+        goto exit;
     }
 
     cmd.cmd = LAD_MESSAGEQ_DESTROY;
     cmd.clientId = handle;
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
-        PRINTVERBOSE1(
-          "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
-        return MessageQ_E_FAIL;
+        PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
+                "status=%d\n", status)
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
-        return status;
+        status = MessageQ_E_FAIL;
+        goto exit;
     }
     status = rsp.status;
 
-    PRINTVERBOSE2(
-      "MessageQ_destroy: got LAD response for client %d, status=%d\n",
-      handle, status)
+    PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
+            "status=%d\n", handle, status)
 
-    return status;
-}
+exit:
+    pthread_mutex_unlock(&MessageQ_module->gate);
 
+    return (status);
+}
 
-/* Function to initialize the parameters for the MessageQ instance. */
+/*
+ *  ======== MessageQ_Params_init ========
+ *  Legacy implementation.
+ */
 Void MessageQ_Params_init(MessageQ_Params *params)
 {
-    memcpy (params, &(MessageQ_module->defaultInstParams),
-            sizeof (MessageQ_Params));
+    ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
+}
 
-    return;
+/*
+ *  ======== MessageQ_Params_init__S ========
+ *  New implementation which is version aware.
+ */
+Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
+{
+    MessageQ_Params_Version2 *params2;
+
+    switch (version) {
+
+        case MessageQ_Params_VERSION_2:
+            params2 = (MessageQ_Params_Version2 *)params;
+            params2->__version = MessageQ_Params_VERSION_2;
+            params2->synchronizer = NULL;
+            params2->queueIndex = MessageQ_ANY;
+            break;
+
+        default:
+            assert(FALSE);
+            break;
+    }
 }
 
 /*
- *  MessageQ_create - create a MessageQ object for receiving.
+ *  ======== MessageQ_create ========
  */
-MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
 {
     Int                   status;
     MessageQ_Object      *obj = NULL;
     IMessageQTransport_Handle transport;
-    INetworkTransport_Handle transInst;
+    INetworkTransport_Handle netTrans;
+    ITransport_Handle     baseTrans;
     UInt16                queueIndex;
-    UInt16                rprocId;
+    UInt16                clusterId;
     Int                   tid;
     Int                   priority;
     LAD_ClientHandle      handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
+    MessageQ_Params ps;
+
+    MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
+
+    /* copy the given params into the current params structure */
+    if (pp != NULL) {
+
+        /* snoop the params pointer to see if it's a legacy structure */
+        if ((pp->__version == 0) || (pp->__version > 100)) {
+            ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
+        }
+
+        /* not legacy structure, use params version field */
+        else if (pp->__version == MessageQ_Params_VERSION_2) {
+            ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
+            ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
+            ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
+        }
+        else {
+            assert(FALSE);
+        }
+    }
 
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
@@ -436,6 +546,7 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
 
     cmd.cmd = LAD_MESSAGEQ_CREATE;
     cmd.clientId = handle;
+
     if (name == NULL) {
         cmd.args.messageQCreate.name[0] = '\0';
     }
@@ -445,9 +556,7 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
     }
 
-    if (params) {
-        memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
-    }
+    memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
         PRINTVERBOSE1(
@@ -475,15 +584,13 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
     /* Create the generic obj */
     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
 
-    if (params != NULL) {
-       /* Populate the params member */
-        memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
-    }
+   /* Populate the params member */
+    memcpy(&obj->params, &ps, sizeof(ps));
 
-    queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
 
     obj->queue = rsp.messageQCreate.queueId;
     obj->serverHandle = rsp.messageQCreate.serverHandle;
+    pthread_mutex_init(&obj->msgListGate, NULL);
     CIRCLEQ_INIT(&obj->msgList);
     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
         PRINTVERBOSE1(
@@ -494,52 +601,109 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
         return NULL;
     }
 
-    PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
+    /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
+    queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
 
-    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+    PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
+            "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
-            transport = MessageQ_module->transports[rprocId][priority];
+            transport = MessageQ_module->transports[clusterId][priority];
             if (transport) {
                 /* need to check return and do something if error */
                 IMessageQTransport_bind((Void *)transport, obj->queue);
             }
         }
     }
+
     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
-        transInst = MessageQ_module->transInst[tid];
-        if (transInst) {
-            /* need to check return and do something if error */
-            INetworkTransport_bind((Void *)transInst, obj->queue);
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_bind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    fprintf(stderr,
+                            "MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
         }
     }
 
-    /*
-     * Since LAD's MessageQ_module can grow, we need to be able to grow as well
-     */
+    /* LAD's MessageQ module can grow, we need to grow as well */
     if (queueIndex >= MessageQ_module->numQueues) {
         _MessageQ_grow(queueIndex);
     }
 
-    /*
-     * No need to "allocate" slot since the queueIndex returned by
-     * LAD is guaranteed to be unique.
+    /*  No need to "allocate" slot since the queueIndex returned by
+     *  LAD is guaranteed to be unique.
      */
     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    /* send announce message to LAD, indicating we are ready to receive msgs */
+    cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
+    cmd.clientId = handle;
+
+    if (name == NULL) {
+        cmd.args.messageQAnnounce.name[0] = '\0';
+    }
+    else {
+        strncpy(cmd.args.messageQAnnounce.name, name,
+                LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+        cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
+    }
+
+    cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
+
+    if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+        PRINTVERBOSE1(
+          "MessageQ_create: sending LAD command failed, status=%d\n", status)
+        goto exit;
+    }
+
+    if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
+        goto exit;
+    }
+    status = rsp.messageQAnnounce.status;
+
+    PRINTVERBOSE2(
+      "MessageQ_create: got LAD response for client %d, status=%d\n",
+      handle, status)
+
+    if (status == -1) {
+       PRINTVERBOSE1(
+          "MessageQ_create: MessageQ server operation failed, status=%d\n",
+          status)
+    }
+
+exit:
     return (MessageQ_Handle)obj;
 }
 
 /*
- * MessageQ_delete - delete a MessageQ object.
+ *  ======== MessageQ_delete ========
  */
 Int MessageQ_delete(MessageQ_Handle *handlePtr)
 {
     MessageQ_Object *obj;
     IMessageQTransport_Handle transport;
-    INetworkTransport_Handle transInst;
+    INetworkTransport_Handle  netTrans;
+    ITransport_Handle         baseTrans;
     Int              status = MessageQ_S_SUCCESS;
     UInt16           queueIndex;
-    UInt16                rprocId;
+    UInt16                clusterId;
     Int                   tid;
     Int                   priority;
     LAD_ClientHandle handle;
@@ -577,24 +741,43 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
-            transport = MessageQ_module->transports[rprocId][priority];
+            transport = MessageQ_module->transports[clusterId][priority];
             if (transport) {
                 IMessageQTransport_unbind((Void *)transport, obj->queue);
             }
         }
     }
+
     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
-        transInst = MessageQ_module->transInst[tid];
-        if (transInst) {
-            INetworkTransport_unbind((Void *)transInst, obj->queue);
+        baseTrans = MessageQ_module->transInst[tid];
+
+        if (baseTrans != NULL) {
+            switch (ITransport_itype(baseTrans)) {
+                case INetworkTransport_TypeId:
+                    netTrans = INetworkTransport_downCast(baseTrans);
+                    INetworkTransport_unbind((void *)netTrans, obj->queue);
+                    break;
+
+                default:
+                    /* error */
+                    fprintf(stderr,
+                            "MessageQ_create: Error: transport id %d is an "
+                            "unsupported transport type.\n", tid);
+                    break;
+            }
         }
     }
 
-    queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
+    /* extract the queue index from the queueId */
+    queueIndex = MessageQ_getQueueIndex(obj->queue);
     MessageQ_module->queues[queueIndex] = NULL;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(obj);
     *handlePtr = NULL;
 
@@ -602,7 +785,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
 }
 
 /*
- *  MessageQ_open - Opens an instance of MessageQ for sending.
+ *  ======== MessageQ_open ========
+ *  Acquire a queueId for use in sending messages to the queue
  */
 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
 {
@@ -637,7 +821,23 @@ Int MessageQ_open(String name, MessageQ_QueueId *queueId)
 }
 
 /*
- *  MessageQ_close - Closes previously opened instance of MessageQ.
+ *  ======== MessageQ_openQueueId ========
+ */
+MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
+{
+    MessageQ_QueueIndex queuePort;
+    MessageQ_QueueId queueId;
+
+    /* queue port is embedded in the queueId */
+    queuePort = queueIndex + MessageQ_PORTOFFSET;
+    queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
+
+    return (queueId);
+}
+
+/*
+ *  ======== MessageQ_close ========
+ *  Closes previously opened instance of MessageQ
  */
 Int MessageQ_close(MessageQ_QueueId *queueId)
 {
@@ -650,84 +850,139 @@ Int MessageQ_close(MessageQ_QueueId *queueId)
 }
 
 /*
- * MessageQ_put - place a message onto a message queue.
- *
- * Calls transport's put(), which handles the sending of the message using the
- * appropriate kernel interface (socket, device ioctl) call for the remote
- * procId encoded in the queueId argument.
+ *  ======== MessageQ_put ========
+ *  Deliver the given message, either locally or to the transport
  *
+ *  If the destination is a local queue, deliver the message. Otherwise,
+ *  pass the message to a transport for delivery. The transport handles
+ *  the sending of the message using the appropriate interface (socket,
+ *  device ioctl, etc.).
  */
 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 {
+    Int status = MessageQ_S_SUCCESS;
     MessageQ_Object *obj;
-    UInt16   dstProcId  = (UInt16)(queueId >> 16);
-    UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
-    Int      status = MessageQ_S_SUCCESS;
-    ITransport_Handle transport;
+    UInt16 dstProcId;
+    UInt16 queueIndex;
+    UInt16 queuePort;
+    ITransport_Handle baseTrans;
     IMessageQTransport_Handle msgTrans;
     INetworkTransport_Handle netTrans;
     Int priority;
     UInt tid;
+    UInt16 clusterId;
+    Bool delivered;
+
+    /* extract destination address from the given queueId */
+    dstProcId  = (UInt16)(queueId >> 16);
+    queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
 
-    msg->dstId     = queueIndex;
-    msg->dstProc   = dstProcId;
+    /* write the destination address into the message header */
+    msg->dstId = queuePort;
+    msg->dstProc= dstProcId;
 
-    /* invoke put hook function after addressing the message */
+    /* invoke the hook function after addressing the message */
     if (MessageQ_module->putHookFxn != NULL) {
         MessageQ_module->putHookFxn(queueId, msg);
     }
 
-    if (dstProcId != MultiProc_self()) {
-        tid = MessageQ_getTransportId(msg);
-        if (tid == 0) {
-            priority = MessageQ_getMsgPri(msg);
-            msgTrans = MessageQ_module->transports[dstProcId][priority];
-
-            IMessageQTransport_put(msgTrans, (Ptr)msg);
-        }
-        else {
-            if (tid >= MessageQ_MAXTRANSPORTS) {
-                printf("MessageQ_put: transport id %d too big, must be < %d\n",
-                       tid, MessageQ_MAXTRANSPORTS);
-
-                return MessageQ_E_FAIL;
+    /*  For an outbound message: If message destination is on this
+     *  processor, then check if the destination queue is in this
+     *  process (thread-to-thread messaging).
+     *
+     *  For an inbound message: Check if destination queue is in this
+     *  process (process-to-process messaging).
+     */
+    if (dstProcId == MultiProc_self()) {
+        queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+        if (queueIndex < MessageQ_module->numQueues) {
+            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+            if (obj != NULL) {
+                /* deliver message to queue */
+                pthread_mutex_lock(&obj->msgListGate);
+                CIRCLEQ_INSERT_TAIL(&obj->msgList,
+                        (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+                pthread_mutex_unlock(&obj->msgListGate);
+                sem_post(&obj->synchronizer);
+                goto done;
             }
+        }
+        /* If we get here, then we have failed to deliver a local message. */
+        status = MessageQ_E_FAIL;
+        goto done;
+    }
 
-            /* use secondary transport */
-            netTrans = MessageQ_module->transInst[tid];
-            transport = INetworkTransport_upCast(netTrans);
-
-            /* downcast instance pointer to transport interface */
-            switch (ITransport_itype(transport)) {
-                case INetworkTransport_TypeId:
-                    INetworkTransport_put(netTrans, (Ptr)msg);
+    /*  Getting here implies the message is outbound. Must give it to
+     *  either the primary or secondary transport for delivery. Start
+     *  by extracting the transport ID from the message header.
+     */
+    tid = MessageQ_getTransportId(msg);
 
-                    break;
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        fprintf(stderr,
+                "MessageQ_put: Error: transport id %d too big, must be < %d\n",
+                tid, MessageQ_MAXTRANSPORTS);
+        status = MessageQ_E_FAIL;
+        goto done;
+    }
 
-                default:
-                    /* error */
-                    printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
+    /* if transportId is set, use secondary transport for message delivery */
+    if (tid != 0) {
+        baseTrans = MessageQ_module->transInst[tid];
 
-                    status = MessageQ_E_FAIL;
+        if (baseTrans == NULL) {
+            fprintf(stderr, "MessageQ_put: Error: transport is null\n");
+            status = MessageQ_E_FAIL;
+            goto done;
+        }
 
-                    break;
-            }
+        /* downcast instance pointer to transport interface */
+        switch (ITransport_itype(baseTrans)) {
+            case INetworkTransport_TypeId:
+                netTrans = INetworkTransport_downCast(baseTrans);
+                delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+                status = (delivered ? MessageQ_S_SUCCESS :
+                          (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+                           MessageQ_E_FAIL));
+                break;
+
+            default:
+                /* error */
+                fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
+                        "unsupported transport type\n", tid);
+                status = MessageQ_E_FAIL;
+                break;
         }
     }
     else {
-        obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
-        pthread_mutex_lock(&MessageQ_module->gate);
-
-        /* It is a local MessageQ */
-        CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
-
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        /* use primary transport for delivery */
+        priority = MessageQ_getMsgPri(msg);
+        clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+        /* primary transport can only be used for intra-cluster delivery */
+        if (clusterId > MultiProc_getNumProcsInCluster()) {
+            fprintf(stderr,
+                    "MessageQ_put: Error: destination procId=%d is not "
+                    "in cluster. Must specify a transportId.\n", dstProcId);
+            status =  MessageQ_E_FAIL;
+            goto done;
+        }
 
-       sem_post(&obj->synchronizer);
+        msgTrans = MessageQ_module->transports[clusterId][priority];
+        if (msgTrans) {
+            delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        }
+        else {
+            delivered = MessageQ_E_FAIL;
+        }
+        status = (delivered ? MessageQ_S_SUCCESS :
+                  (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
     }
 
-    return status;
+done:
+    return (status);
 }
 
 /*
@@ -750,16 +1005,16 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
  * Optimization here to get a message without going in to the sem
  * operation, but the sem count will not be maintained properly.
  */
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     if (obj->msgList.cqh_first != &obj->msgList) {
         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
 
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
     else {
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
 #endif
 
@@ -767,29 +1022,42 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
         sem_wait(&obj->synchronizer);
     }
     else {
+        /* add timeout (microseconds) to current time of day */
         gettimeofday(&tv, NULL);
+        tv.tv_sec += timeout / 1000000;
+        tv.tv_usec += timeout % 1000000;
+
+        if (tv.tv_usec >= 1000000) {
+              tv.tv_sec++;
+              tv.tv_usec -= 1000000;
+        }
+
+        /* set absolute timeout value */
         ts.tv_sec = tv.tv_sec;
-        ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+        ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
 
         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
             if (errno == ETIMEDOUT) {
                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
-
-                return MessageQ_E_TIMEOUT;
+                return (MessageQ_E_TIMEOUT);
+            }
+            else {
+                PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+                return (MessageQ_E_FAIL);
             }
         }
     }
 
     if (obj->unblocked) {
-        return MessageQ_E_UNBLOCKED;
+        return obj->unblocked;
     }
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
 
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&obj->msgListGate);
 
     return status;
 }
@@ -840,18 +1108,33 @@ Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
  */
 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
 {
+    IHeap_Handle heap;
     MessageQ_Msg msg;
 
-    /*
-     * heapId not used for local alloc (as this is over a copy transport), but
-     * we need to send to other side as heapId is used in BIOS transport.
-     */
-    msg = (MessageQ_Msg)calloc(1, size);
+    if (heapId > (MessageQ_module->numHeaps - 1)) {
+        PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
+        return (NULL);
+    }
+    else if (MessageQ_module->heaps[heapId] == NULL) {
+        PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
+                heapId);
+        return (NULL);
+    }
+    else {
+        heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
+    }
+
+    msg = IHeap_alloc(heap, size);
+
+    if (msg == NULL) {
+        return (NULL);
+    }
+
     MessageQ_msgInit(msg);
     msg->msgSize = size;
     msg->heapId = heapId;
 
-    return msg;
+    return (msg);
 }
 
 /*
@@ -860,36 +1143,73 @@ MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
 Int MessageQ_free(MessageQ_Msg msg)
 {
     UInt32 status = MessageQ_S_SUCCESS;
+    IHeap_Handle heap;
 
-    /* Check to ensure this was not allocated by user: */
+    /* ensure this was not allocated by user */
     if (msg->heapId == MessageQ_STATICMSG) {
         status = MessageQ_E_CANNOTFREESTATICMSG;
     }
+    else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
+        status = MessageQ_E_INVALIDARG;
+    }
+    else if (MessageQ_module->heaps[msg->heapId] == NULL) {
+        status = MessageQ_E_NOTFOUND;
+    }
     else {
-        free(msg);
+        heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
     }
 
-    return status;
+    IHeap_free(heap, (void *)msg);
+
+    return (status);
 }
 
-/* Register a heap with MessageQ. */
+/*
+ *  ======== MessageQ_registerHeap ========
+ */
 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
 {
     Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport */
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    return status;
+    if (heapId > (MessageQ_module->numHeaps - 1)) {
+        status = MessageQ_E_INVALIDARG;
+    }
+    else if (MessageQ_module->heaps[heapId] != NULL) {
+        status = MessageQ_E_ALREADYEXISTS;
+    }
+    else {
+        MessageQ_module->heaps[heapId] = heap;
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (status);
 }
 
-/* Unregister a heap with MessageQ. */
+/*
+ *  ======== MessageQ_unregisterHeap ========
+ */
 Int MessageQ_unregisterHeap(UInt16 heapId)
 {
     Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport */
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    return status;
+    if (heapId > (MessageQ_module->numHeaps - 1)) {
+        status = MessageQ_E_INVALIDARG;
+    }
+    else if (MessageQ_module->heaps[heapId] == NULL) {
+        status = MessageQ_E_NOTFOUND;
+    }
+    else {
+        MessageQ_module->heaps[heapId] = NULL;
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return (status);
 }
 
 /* Unblocks a MessageQ */
@@ -897,7 +1217,16 @@ Void MessageQ_unblock(MessageQ_Handle handle)
 {
     MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    obj->unblocked = TRUE;
+    obj->unblocked = MessageQ_E_UNBLOCKED;
+    sem_post(&obj->synchronizer);
+}
+
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
+{
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+    obj->unblocked = MessageQ_E_SHUTDOWN;
     sem_post(&obj->synchronizer);
 }
 
@@ -921,6 +1250,24 @@ MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
     return queueId;
 }
 
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+    MessageQ_Object *obj;
+    MessageQ_QueueIndex queueIndex;
+    UInt16 procId;
+
+    procId = MessageQ_getProcId(queueId);
+    if (procId != MultiProc_self()) {
+        return NULL;
+    }
+
+    queueIndex = MessageQ_getQueueIndex(queueId);
+    obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+    return (MessageQ_Handle)obj;
+}
+
 /* Sets the tracing of a message */
 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
 {
@@ -990,14 +1337,18 @@ Void MessageQ_msgInit(MessageQ_Msg msg)
     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
     msg->srcProc   = MultiProc_self();
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&MessageQ_module->seqNumGate);
     msg->seqNum  = MessageQ_module->seqNum++;
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&MessageQ_module->seqNumGate);
 #endif
 }
 
 /*
- * Grow module's queues[] array to accommodate queueIndex from LAD
+ *  ======== _MessageQ_grow ========
+ *  Increase module's queues array to accommodate queueIndex from LAD
+ *
+ *  Note: this function takes the queue index value (i.e. without the
+ *  port offset).
  */
 Void _MessageQ_grow(UInt16 queueIndex)
 {
@@ -1005,17 +1356,107 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_Handle *oldQueues;
     UInt oldSize;
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
-    queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
+    queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
     memcpy(queues, MessageQ_module->queues, oldSize);
 
     oldQueues = MessageQ_module->queues;
     MessageQ_module->queues = queues;
     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(oldQueues);
 
     return;
 }
 
+/*
+ *  ======== MessageQ_bind ========
+ *  Bind all existing message queues to the given processor
+ *
+ *  Note: This function is a hack to work around the driver.
+ *
+ *  The Linux rpmsgproto driver requires a socket for each
+ *  message queue and remote processor tuple.
+ *
+ *      socket --> (queue, processor)
+ *
+ *  Therefore, each time a new remote processor is started, all
+ *  existing message queues need to create a socket for the new
+ *  processor.
+ *
+ *  The driver should not have this requirement. One socket per
+ *  message queue should be sufficient to uniquely identify the
+ *  endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_bind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ *  ======== MessageQ_unbind ========
+ *  Unbind all existing message queues from the given processor
+ *
+ *  Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+        clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_unbind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}