index 802e500365889a755bfadffd73471185bce9ee4d..fcab2b1ca6265bc2b67927ade57edb47116b4e6b 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
NameServer_Handle nameServer;
pthread_mutex_t gate;
int seqNum;
+ pthread_mutex_t seqNumGate;
IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
MessageQ_PutHookFxn putHookFxn;
*/
typedef struct MessageQ_Object_tag {
CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+ pthread_mutex_t msgListGate;
MessageQ_Params params;
MessageQ_QueueId queue;
int unblocked;
{
.refCount = 0,
.nameServer = NULL,
- .gate = PTHREAD_MUTEX_INITIALIZER,
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+ .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
.putHookFxn = NULL
};
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
+ pthread_mutex_init(&obj->msgListGate, NULL);
CIRCLEQ_INIT(&obj->msgList);
if (sem_init(&obj->synchronizer, 0, 0) < 0) {
PRINTVERBOSE1(
if (obj != NULL) {
/* deliver message to queue */
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
CIRCLEQ_INSERT_TAIL(&obj->msgList,
(MessageQ_CIRCLEQ_ENTRY *)msg, elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
sem_post(&obj->synchronizer);
goto done;
}
* Optimization here to get a message without going in to the sem
* operation, but the sem count will not be maintained properly.
*/
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
if (obj->msgList.cqh_first != &obj->msgList) {
*msg = (MessageQ_Msg)obj->msglist.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
else {
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
#endif
return obj->unblocked;
}
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
*msg = (MessageQ_Msg)obj->msgList.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
return status;
}
msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
msg->srcProc = MultiProc_self();
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&MessageQ_module->seqNumGate);
msg->seqNum = MessageQ_module->seqNum++;
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&MessageQ_module->seqNumGate);
#endif
}