Bind message queue to new remote processor
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 13ab27d5be1f4e548e87a20b237d1b43aac323ce..802e500365889a755bfadffd73471185bce9ee4d 100644 (file)
@@ -576,6 +576,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -615,6 +617,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
      */
     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     return (MessageQ_Handle)obj;
 }
 
@@ -667,6 +671,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -699,6 +705,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
     queueIndex = MessageQ_getQueueIndex(obj->queue);
     MessageQ_module->queues[queueIndex] = NULL;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(obj);
     *handlePtr = NULL;
 
@@ -1215,6 +1223,8 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_Handle *oldQueues;
     UInt oldSize;
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
@@ -1224,7 +1234,96 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_module->queues = queues;
     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(oldQueues);
 
     return;
 }
+
+/*
+ *  ======== MessageQ_bind ========
+ *  Bind all existing message queues to the given processor
+ *
+ *  Note: This function is a hack to work around the driver.
+ *
+ *  The Linux rpmsgproto driver requires a socket for each
+ *  message queue and remote processor tuple.
+ *
+ *      socket --> (queue, processor)
+ *
+ *  Therefore, each time a new remote processor is started, all
+ *  existing message queues need to create a socket for the new
+ *  processor.
+ *
+ *  The driver should not have this requirement. One socket per
+ *  message queue should be sufficient to uniquely identify the
+ *  endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_bind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ *  ======== MessageQ_unbind ========
+ *  Unbind all existing message queues from the given processor
+ *
+ *  Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+        clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_unbind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}