]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/api/MessageQ.c
Add FD_CLOEXEC flag for sockets, /dev/mem and LAD pipes
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 9ebf0063332aec489d697b1d8519aabcc4948947..46b13f84c2d5c6c5d8009f1cdac552f042e85465 100644 (file)
@@ -118,6 +118,7 @@ typedef struct MessageQ_ModuleObject {
     NameServer_Handle         nameServer;
     pthread_mutex_t           gate;
     int                       seqNum;
+    pthread_mutex_t           seqNumGate;
     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
     MessageQ_PutHookFxn       putHookFxn;
@@ -132,6 +133,7 @@ typedef struct MessageQ_CIRCLEQ_ENTRY {
  */
 typedef struct MessageQ_Object_tag {
     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+    pthread_mutex_t              msgListGate;
     MessageQ_Params              params;
     MessageQ_QueueId             queue;
     int                          unblocked;
@@ -151,7 +153,12 @@ static MessageQ_ModuleObject MessageQ_state =
 {
     .refCount   = 0,
     .nameServer = NULL,
-    .gate       = PTHREAD_MUTEX_INITIALIZER,
+#if defined(IPC_BUILDOS_ANDROID)
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+    .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
     .putHookFxn = NULL
 };
 
@@ -560,6 +567,7 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
 
     obj->queue = rsp.messageQCreate.queueId;
     obj->serverHandle = rsp.messageQCreate.serverHandle;
+    pthread_mutex_init(&obj->msgListGate, NULL);
     CIRCLEQ_INIT(&obj->msgList);
     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
         PRINTVERBOSE1(
@@ -576,6 +584,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -615,6 +625,8 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
      */
     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     return (MessageQ_Handle)obj;
 }
 
@@ -667,6 +679,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
        for (priority = 0; priority < 2; priority++) {
             transport = MessageQ_module->transports[clusterId][priority];
@@ -699,6 +713,8 @@ Int MessageQ_delete(MessageQ_Handle *handlePtr)
     queueIndex = MessageQ_getQueueIndex(obj->queue);
     MessageQ_module->queues[queueIndex] = NULL;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(obj);
     *handlePtr = NULL;
 
@@ -772,58 +788,97 @@ Int MessageQ_close(MessageQ_QueueId *queueId)
 
 /*
  *  ======== MessageQ_put ========
- *  Place a message onto a message queue
+ *  Deliver the given message, either locally or to the transport
  *
- *  Calls transport's put(), which handles the sending of the message
- *  using the appropriate kernel interface (socket, device ioctl) call
- *  for the remote procId encoded in the queueId argument.
+ *  If the destination is a local queue, deliver the message. Otherwise,
+ *  pass the message to a transport for delivery. The transport handles
+ *  the sending of the message using the appropriate interface (socket,
+ *  device ioctl, etc.).
  */
 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 {
+    Int status = MessageQ_S_SUCCESS;
     MessageQ_Object *obj;
-    UInt16   dstProcId  = (UInt16)(queueId >> 16);
-    UInt16   queueIndex;
-    UInt16   queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
-    Int      status = MessageQ_S_SUCCESS;
+    UInt16 dstProcId;
+    UInt16 queueIndex;
+    UInt16 queuePort;
     ITransport_Handle baseTrans;
     IMessageQTransport_Handle msgTrans;
     INetworkTransport_Handle netTrans;
     Int priority;
     UInt tid;
     UInt16 clusterId;
+    Bool delivered;
+
+    /* extract destination address from the given queueId */
+    dstProcId  = (UInt16)(queueId >> 16);
+    queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
 
-    /* use the queue port # for destination address */
+    /* write the destination address into the message header */
     msg->dstId = queuePort;
     msg->dstProc= dstProcId;
 
-    /* invoke put hook function after addressing the message */
+    /* invoke the hook function after addressing the message */
     if (MessageQ_module->putHookFxn != NULL) {
         MessageQ_module->putHookFxn(queueId, msg);
     }
 
-    /* extract the transport ID from the message header */
+    /*  For an outbound message: If message destination is on this
+     *  processor, then check if the destination queue is in this
+     *  process (thread-to-thread messaging).
+     *
+     *  For an inbound message: Check if destination queue is in this
+     *  process (process-to-process messaging).
+     */
+    if (dstProcId == MultiProc_self()) {
+        queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+        if (queueIndex < MessageQ_module->numQueues) {
+            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+            if (obj != NULL) {
+                /* deliver message to queue */
+                pthread_mutex_lock(&obj->msgListGate);
+                CIRCLEQ_INSERT_TAIL(&obj->msgList,
+                        (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+                pthread_mutex_unlock(&obj->msgListGate);
+                sem_post(&obj->synchronizer);
+                goto done;
+            }
+        }
+    }
+
+    /*  Getting here implies the message is outbound. Must give it to
+     *  either the primary or secondary transport for delivery. Start
+     *  by extracting the transport ID from the message header.
+     */
     tid = MessageQ_getTransportId(msg);
 
     if (tid >= MessageQ_MAXTRANSPORTS) {
         printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
                 tid, MessageQ_MAXTRANSPORTS);
-        return (MessageQ_E_FAIL);
+        status = MessageQ_E_FAIL;
+        goto done;
     }
 
-    /* if tid is set, use secondary transport regardless of destination */
+    /* if transportId is set, use secondary transport for message delivery */
     if (tid != 0) {
         baseTrans = MessageQ_module->transInst[tid];
 
         if (baseTrans == NULL) {
             printf("MessageQ_put: Error: transport is null\n");
-            return (MessageQ_E_FAIL);
+            status = MessageQ_E_FAIL;
+            goto done;
         }
 
         /* downcast instance pointer to transport interface */
         switch (ITransport_itype(baseTrans)) {
             case INetworkTransport_TypeId:
                 netTrans = INetworkTransport_downCast(baseTrans);
-                INetworkTransport_put(netTrans, (Ptr)msg);
+                delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+                status = (delivered ? MessageQ_S_SUCCESS :
+                          (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+                           MessageQ_E_FAIL));
                 break;
 
             default:
@@ -835,49 +890,25 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
         }
     }
     else {
-        /* if destination on another processor, use primary transport */
-        if (dstProcId != MultiProc_self()) {
-            priority = MessageQ_getMsgPri(msg);
-            clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
-
-            /* primary transport can only be used for intra-cluster delivery */
-            if (clusterId > MultiProc_getNumProcsInCluster()) {
-                printf("MessageQ_put: Error: destination procId=%d is not "
-                        "in cluster. Must specify a transportId.\n", dstProcId);
-                return (MessageQ_E_FAIL);
-            }
-
-            msgTrans = MessageQ_module->transports[clusterId][priority];
-
-            IMessageQTransport_put(msgTrans, (Ptr)msg);
+        /* use primary transport for delivery */
+        priority = MessageQ_getMsgPri(msg);
+        clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+        /* primary transport can only be used for intra-cluster delivery */
+        if (clusterId > MultiProc_getNumProcsInCluster()) {
+            printf("MessageQ_put: Error: destination procId=%d is not "
+                    "in cluster. Must specify a transportId.\n", dstProcId);
+            status =  MessageQ_E_FAIL;
+            goto done;
         }
-        else {
-            /* check if destination queue is in this process */
-            queueIndex = queuePort - MessageQ_PORTOFFSET;
 
-            if (queueIndex >= MessageQ_module->numQueues) {
-                printf("MessageQ_put: Error: unable to deliver message, "
-                        "queueIndex too large or transportId missing.\n");
-                return (MessageQ_E_FAIL);
-            }
-
-            obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
-            if (obj == NULL) {
-                printf("MessageQ_put: Error: unable to deliver message, "
-                        "destination queue not in this process.\n");
-                return (MessageQ_E_FAIL);
-            }
-
-            /* deliver message to process local queue */
-            pthread_mutex_lock(&MessageQ_module->gate);
-            CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
-                    elem);
-            pthread_mutex_unlock(&MessageQ_module->gate);
-            sem_post(&obj->synchronizer);
-        }
+        msgTrans = MessageQ_module->transports[clusterId][priority];
+        delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        status = (delivered ? MessageQ_S_SUCCESS :
+                  (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
     }
 
+done:
     return (status);
 }
 
@@ -901,16 +932,16 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
  * Optimization here to get a message without going in to the sem
  * operation, but the sem count will not be maintained properly.
  */
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     if (obj->msgList.cqh_first != &obj->msgList) {
         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
 
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
     else {
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
 #endif
 
@@ -918,29 +949,42 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
         sem_wait(&obj->synchronizer);
     }
     else {
+        /* add timeout (microseconds) to current time of day */
         gettimeofday(&tv, NULL);
+        tv.tv_sec += timeout / 1000000;
+        tv.tv_usec += timeout % 1000000;
+
+        if (tv.tv_usec >= 1000000) {
+              tv.tv_sec++;
+              tv.tv_usec -= 1000000;
+        }
+
+        /* set absolute timeout value */
         ts.tv_sec = tv.tv_sec;
-        ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+        ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
 
         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
             if (errno == ETIMEDOUT) {
                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
-
-                return MessageQ_E_TIMEOUT;
+                return (MessageQ_E_TIMEOUT);
+            }
+            else {
+                PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+                return (MessageQ_E_FAIL);
             }
         }
     }
 
     if (obj->unblocked) {
-        return MessageQ_E_UNBLOCKED;
+        return obj->unblocked;
     }
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
 
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&obj->msgListGate);
 
     return status;
 }
@@ -1048,7 +1092,16 @@ Void MessageQ_unblock(MessageQ_Handle handle)
 {
     MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    obj->unblocked = TRUE;
+    obj->unblocked = MessageQ_E_UNBLOCKED;
+    sem_post(&obj->synchronizer);
+}
+
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
+{
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+    obj->unblocked = MessageQ_E_SHUTDOWN;
     sem_post(&obj->synchronizer);
 }
 
@@ -1072,6 +1125,24 @@ MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
     return queueId;
 }
 
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+    MessageQ_Object *obj;
+    MessageQ_QueueIndex queueIndex;
+    UInt16 procId;
+
+    procId = MessageQ_getProcId(queueId);
+    if (procId != MultiProc_self()) {
+        return NULL;
+    }
+
+    queueIndex = MessageQ_getQueueIndex(queueId);
+    obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+    return (MessageQ_Handle)obj;
+}
+
 /* Sets the tracing of a message */
 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
 {
@@ -1141,9 +1212,9 @@ Void MessageQ_msgInit(MessageQ_Msg msg)
     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
     msg->srcProc   = MultiProc_self();
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&MessageQ_module->seqNumGate);
     msg->seqNum  = MessageQ_module->seqNum++;
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&MessageQ_module->seqNumGate);
 #endif
 }
 
@@ -1160,6 +1231,8 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_Handle *oldQueues;
     UInt oldSize;
 
+    pthread_mutex_lock(&MessageQ_module->gate);
+
     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
@@ -1169,7 +1242,96 @@ Void _MessageQ_grow(UInt16 queueIndex)
     MessageQ_module->queues = queues;
     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
     free(oldQueues);
 
     return;
 }
+
+/*
+ *  ======== MessageQ_bind ========
+ *  Bind all existing message queues to the given processor
+ *
+ *  Note: This function is a hack to work around the driver.
+ *
+ *  The Linux rpmsgproto driver requires a socket for each
+ *  message queue and remote processor tuple.
+ *
+ *      socket --> (queue, processor)
+ *
+ *  Therefore, each time a new remote processor is started, all
+ *  existing message queues need to create a socket for the new
+ *  processor.
+ *
+ *  The driver should not have this requirement. One socket per
+ *  message queue should be sufficient to uniquely identify the
+ *  endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    clusterId = procId - MultiProc_getBaseIdOfCluster();
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_bind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ *  ======== MessageQ_unbind ========
+ *  Unbind all existing message queues from the given processor
+ *
+ *  Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+    int q;
+    int clusterId;
+    int priority;
+    MessageQ_Handle handle;
+    MessageQ_QueueId queue;
+    IMessageQTransport_Handle transport;
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+        if ((handle = MessageQ_module->queues[q]) == NULL) {
+            continue;
+        }
+
+        queue = ((MessageQ_Object *)handle)->queue;
+        clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+        for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[clusterId][priority];
+            if (transport != NULL) {
+                IMessageQTransport_unbind((Void *)transport, queue);
+            }
+        }
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+}