Create specialized gates to prevent deadlock on general gate
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
index 802e500365889a755bfadffd73471185bce9ee4d..fcab2b1ca6265bc2b67927ade57edb47116b4e6b 100644 (file)
@@ -118,6 +118,7 @@ typedef struct MessageQ_ModuleObject {
     NameServer_Handle         nameServer;
     pthread_mutex_t           gate;
     int                       seqNum;
+    pthread_mutex_t           seqNumGate;
     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
     MessageQ_PutHookFxn       putHookFxn;
@@ -132,6 +133,7 @@ typedef struct MessageQ_CIRCLEQ_ENTRY {
  */
 typedef struct MessageQ_Object_tag {
     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+    pthread_mutex_t              msgListGate;
     MessageQ_Params              params;
     MessageQ_QueueId             queue;
     int                          unblocked;
@@ -151,7 +153,8 @@ static MessageQ_ModuleObject MessageQ_state =
 {
     .refCount   = 0,
     .nameServer = NULL,
-    .gate       = PTHREAD_MUTEX_INITIALIZER,
+    .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+    .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
     .putHookFxn = NULL
 };
 
@@ -560,6 +563,7 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
 
     obj->queue = rsp.messageQCreate.queueId;
     obj->serverHandle = rsp.messageQCreate.serverHandle;
+    pthread_mutex_init(&obj->msgListGate, NULL);
     CIRCLEQ_INIT(&obj->msgList);
     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
         PRINTVERBOSE1(
@@ -830,10 +834,10 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 
             if (obj != NULL) {
                 /* deliver message to queue */
-                pthread_mutex_lock(&MessageQ_module->gate);
+                pthread_mutex_lock(&obj->msgListGate);
                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
-                pthread_mutex_unlock(&MessageQ_module->gate);
+                pthread_mutex_unlock(&obj->msgListGate);
                 sem_post(&obj->synchronizer);
                 goto done;
             }
@@ -924,16 +928,16 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
  * Optimization here to get a message without going in to the sem
  * operation, but the sem count will not be maintained properly.
  */
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     if (obj->msgList.cqh_first != &obj->msgList) {
         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
 
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
     else {
-        pthread_mutex_unlock(&MessageQ_module->gate);
+        pthread_mutex_unlock(&obj->msgListGate);
     }
 #endif
 
@@ -971,12 +975,12 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
         return obj->unblocked;
     }
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&obj->msgListGate);
 
     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
 
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&obj->msgListGate);
 
     return status;
 }
@@ -1204,9 +1208,9 @@ Void MessageQ_msgInit(MessageQ_Msg msg)
     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
     msg->srcProc   = MultiProc_self();
 
-    pthread_mutex_lock(&MessageQ_module->gate);
+    pthread_mutex_lock(&MessageQ_module->seqNumGate);
     msg->seqNum  = MessageQ_module->seqNum++;
-    pthread_mutex_unlock(&MessageQ_module->gate);
+    pthread_mutex_unlock(&MessageQ_module->seqNumGate);
 #endif
 }