]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/commitdiff
Bind message queue to new remote processor 3.36.00.06_eng
authorRamsey Harris <ramsey@ti.com>
Tue, 28 Apr 2015 18:50:24 +0000 (11:50 -0700)
committerRobert Tivy <rtivy@ti.com>
Thu, 30 Apr 2015 23:49:06 +0000 (16:49 -0700)
The rpmsgproto driver requires a message queue to explicitly
bind to every remote processor. To work around this issue,
both MessageQ_create and Ipc_attach must invoke the bind method
at the appropriate time. This hack should be removed when the
rpmsgproto driver is fixed such that only one bind is required.

linux/src/api/Ipc.c
linux/src/api/MessageQ.c
linux/src/transport/TransportRpmsg.c

index 96804660ab401ba5489188d970224552f6f49794..42ac0cbe415ad0a8d75f71e7fd5dc3a81d60cbbd 100644 (file)
@@ -70,6 +70,10 @@ typedef struct {
     Int                         attached[MultiProc_MAXPROCESSORS];
 } Ipc_Module;
 
+/* hack: rpmsgproto driver work around */
+Void MessageQ_bind(UInt16 procId);
+Void MessageQ_unbind(UInt16 procId);
+
 
 /* =============================================================================
  *  Globals
@@ -436,6 +440,9 @@ Int Ipc_attach(UInt16 procId)
         goto done;
     }
 
+    /* hack: bind all existing message queues to remote processor */
+    MessageQ_bind(procId);
+
     /* getting here means we have successfully attached */
     Ipc_module.attached[clusterId]++;
 
@@ -479,6 +486,9 @@ Int Ipc_detach(UInt16 procId)
         goto done;
     }
 
+    /* hack: unbind all existing message queues from remote processor */
+    MessageQ_unbind(procId);
+
     /* detach transport from remote processor */
     status = Ipc_module.transportFactory->detachFxn(procId);
 
index 13ab27d5be1f4e548e87a20b237d1b43aac323ce..802e500365889a755bfadffd73471185bce9ee4d 100644 (file)
@@ -576,6 +576,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -615,6 +617,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
      */
     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     return (MessageQ_Handle)obj;
 }
 
@@ -667,6 +671,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -699,6 +705,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
     queueIndex = MessageQ_getQueueIndex(obj->queue);
     MessageQ_module->queues[queueIndex] = NULL;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(obj);
     *handlePtr = NULL;
 
@@ -1215,6 +1223,8 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_Handle *oldQueues;
     UInt oldSize;
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
@@ -1224,7 +1234,96 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_module->queues = queues;
     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(oldQueues);
 
     return;
 }
+
+/*
+ *  ======== MessageQ_bind ========
+ *  Bind all existing message queues to the given processor
+ *
+ *  Note: This function is a hack to work around the driver.
+ *
+ *  The Linux rpmsgproto driver requires a socket for each
+ *  message queue and remote processor tuple.
+ *
+ *      socket --> (queue, processor)
+ *
+ *  Therefore, each time a new remote processor is started, all
+ *  existing message queues need to create a socket for the new
+ *  processor.
+ *
+ *  The driver should not have this requirement. One socket per
+ *  message queue should be sufficient to uniquely identify the
+ *  endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_bind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ *  ======== MessageQ_unbind ========
+ *  Unbind all existing message queues from the given processor
+ *
+ *  Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+        clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_unbind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}
index 97936f6c8fc62e2fdd5f59d3853ade33884a3f6a..6e3d704a9e548cef40cf7b9925c38c41f77c24a5 100644 (file)
@@ -173,6 +173,7 @@ TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
     TransportRpmsg_Object *obj = NULL;
     int sock;
     UInt16 clusterId;
+    int i;
 
 
     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
@@ -212,13 +213,18 @@ TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
     obj->rprocId = params->rprocId;
     obj->numQueues = TransportRpmsg_GROWSIZE;
 
-    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
 
     if (obj->qIndexToFd == NULL) {
         status = Ipc_E_MEMORY;
         goto done;
     }
 
+    /* must initialize array */
+    for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
+        obj->qIndexToFd[i] = -1;
+    }
+
 done:
     if (status < 0) {
         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
@@ -272,6 +278,7 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     uint64_t event;
     UInt16 rprocId;
     pthread_t tid;
+    Int status = MessageQ_S_SUCCESS;
 
     tid = pthread_self();
     rprocId = obj->rprocId;
@@ -279,12 +286,25 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
 
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  Check if binding already exists.
+     *
+     *  There is a race condition between a thread calling MessageQ_create
+     *  and another thread calling Ipc_attach. Must make sure we don't bind
+     *  the same queue twice.
+     */
+    if (queueIndexToFd(obj, queueId) != -1) {
+        goto done;
+    }
+
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
     if (fd < 0) {
         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
                 errno, strerror(errno));
-        return (MessageQ_E_OSFAILURE);
+        status = MessageQ_E_OSFAILURE;
+        goto done;
     }
     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
             (unsigned int)tid);
@@ -295,11 +315,10 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
                       errno, strerror(errno));
         close(fd);
-        return (MessageQ_E_OSFAILURE);
+        status = MessageQ_E_OSFAILURE;
+        goto done;
     }
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
-
     /*  pause the dispatch thread */
     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
             (unsigned int)tid);
@@ -324,9 +343,10 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     event = TransportRpmsg_Event_CONTINUE;
     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
+done:
     pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-    return (MessageQ_S_SUCCESS);
+    return (status);
 }
 
 /*
@@ -352,12 +372,13 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
     /* wait for ACK event */
     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
 
-    /* retrieve file descriptor for the given queue port */
-    fd = queueIndexToFd(obj, queuePort);
-    if (!fd) {
-        PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
-                queueId);
-        status = MessageQ_E_INVALIDARG;
+    /*  Check if binding already deleted.
+     *
+     *  There is a race condition between a thread calling MessageQ_delete
+     *  and another thread calling Ipc_detach. Must make sure we don't unbind
+     *  the same queue twice.
+     */
+    if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
         goto done;
     }
     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
@@ -674,26 +695,34 @@ Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
     Int *queues;
     Int *oldQueues;
     UInt oldSize;
+    UInt newCount;
     UInt queueIndex;
+    int i;
 
     /* subtract port offset from queue index */
     queueIndex = queuePort - MessageQ_PORTOFFSET;
 
     if (queueIndex >= obj->numQueues) {
+        newCount = queueIndex + TransportRpmsg_GROWSIZE;
         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
-                queueIndex + TransportRpmsg_GROWSIZE)
+                newCount);
 
         /* allocate larger table */
-        oldSize = obj->numQueues * sizeof (Int);
-        queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
+        oldSize = obj->numQueues * sizeof(int);
+        queues = calloc(newCount, sizeof(int));
 
         /* copy contents from old table int new table */
         memcpy(queues, obj->qIndexToFd, oldSize);
 
+        /* initialize remaining entries of new (larger) table */
+        for (i = obj->numQueues; i < newCount; i++) {
+            queues[i] = -1;
+        }
+
         /* swap in new table, delete old table */
         oldQueues = obj->qIndexToFd;
         obj->qIndexToFd = queues;
-        obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
+        obj->numQueues = newCount;
         free(oldQueues);
     }
 
@@ -898,8 +927,8 @@ Int TransportRpmsg_Factory_attach(UInt16 procId)
 
     /* register transport instance with MessageQ */
     iMsgQTrans = TransportRpmsg_upCast(transport);
-    MessageQ_registerTransport(iMsgQTrans, procId, 0);
     TransportRpmsg_module->inst[clusterId] = transport;
+    MessageQ_registerTransport(iMsgQTrans, procId, 0);
 
 done:
     return (status);