Bind message queue to new remote processor
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
index 97936f6c8fc62e2fdd5f59d3853ade33884a3f6a..6e3d704a9e548cef40cf7b9925c38c41f77c24a5 100644 (file)
@@ -173,6 +173,7 @@ TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
     TransportRpmsg_Object *obj = NULL;
     int sock;
     UInt16 clusterId;
+    int i;
 
 
     clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
@@ -212,13 +213,18 @@ TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
     obj->rprocId = params->rprocId;
     obj->numQueues = TransportRpmsg_GROWSIZE;
 
-    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
 
     if (obj->qIndexToFd == NULL) {
         status = Ipc_E_MEMORY;
         goto done;
     }
 
+    /* must initialize array */
+    for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
+        obj->qIndexToFd[i] = -1;
+    }
+
 done:
     if (status < 0) {
         TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
@@ -272,6 +278,7 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     uint64_t event;
     UInt16 rprocId;
     pthread_t tid;
+    Int status = MessageQ_S_SUCCESS;
 
     tid = pthread_self();
     rprocId = obj->rprocId;
@@ -279,12 +286,25 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
             "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
 
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /*  Check if binding already exists.
+     *
+     *  There is a race condition between a thread calling MessageQ_create
+     *  and another thread calling Ipc_attach. Must make sure we don't bind
+     *  the same queue twice.
+     */
+    if (queueIndexToFd(obj, queueId) != -1) {
+        goto done;
+    }
+
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
     if (fd < 0) {
         printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
                 errno, strerror(errno));
-        return (MessageQ_E_OSFAILURE);
+        status = MessageQ_E_OSFAILURE;
+        goto done;
     }
     PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
             (unsigned int)tid);
@@ -295,11 +315,10 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
         PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
                       errno, strerror(errno));
         close(fd);
-        return (MessageQ_E_OSFAILURE);
+        status = MessageQ_E_OSFAILURE;
+        goto done;
     }
 
-    pthread_mutex_lock(&TransportRpmsg_module->gate);
-
     /*  pause the dispatch thread */
     PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
             (unsigned int)tid);
@@ -324,9 +343,10 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
     event = TransportRpmsg_Event_CONTINUE;
     write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
 
+done:
     pthread_mutex_unlock(&TransportRpmsg_module->gate);
 
-    return (MessageQ_S_SUCCESS);
+    return (status);
 }
 
 /*
@@ -352,12 +372,13 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
     /* wait for ACK event */
     read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
 
-    /* retrieve file descriptor for the given queue port */
-    fd = queueIndexToFd(obj, queuePort);
-    if (!fd) {
-        PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
-                queueId);
-        status = MessageQ_E_INVALIDARG;
+    /*  Check if binding already deleted.
+     *
+     *  There is a race condition between a thread calling MessageQ_delete
+     *  and another thread calling Ipc_detach. Must make sure we don't unbind
+     *  the same queue twice.
+     */
+    if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
         goto done;
     }
     PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
@@ -674,26 +695,34 @@ Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
     Int *queues;
     Int *oldQueues;
     UInt oldSize;
+    UInt newCount;
     UInt queueIndex;
+    int i;
 
     /* subtract port offset from queue index */
     queueIndex = queuePort - MessageQ_PORTOFFSET;
 
     if (queueIndex >= obj->numQueues) {
+        newCount = queueIndex + TransportRpmsg_GROWSIZE;
         PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
-                queueIndex + TransportRpmsg_GROWSIZE)
+                newCount);
 
         /* allocate larger table */
-        oldSize = obj->numQueues * sizeof (Int);
-        queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
+        oldSize = obj->numQueues * sizeof(int);
+        queues = calloc(newCount, sizeof(int));
 
         /* copy contents from old table int new table */
         memcpy(queues, obj->qIndexToFd, oldSize);
 
+        /* initialize remaining entries of new (larger) table */
+        for (i = obj->numQueues; i < newCount; i++) {
+            queues[i] = -1;
+        }
+
         /* swap in new table, delete old table */
         oldQueues = obj->qIndexToFd;
         obj->qIndexToFd = queues;
-        obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
+        obj->numQueues = newCount;
         free(oldQueues);
     }
 
@@ -898,8 +927,8 @@ Int TransportRpmsg_Factory_attach(UInt16 procId)
 
     /* register transport instance with MessageQ */
     iMsgQTrans = TransportRpmsg_upCast(transport);
-    MessageQ_registerTransport(iMsgQTrans, procId, 0);
     TransportRpmsg_module->inst[clusterId] = transport;
+    MessageQ_registerTransport(iMsgQTrans, procId, 0);
 
 done:
     return (status);