index 9ebf0063332aec489d697b1d8519aabcc4948947..46b13f84c2d5c6c5d8009f1cdac552f042e85465 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
NameServer_Handle nameServer;
pthread_mutex_t gate;
int seqNum;
+ pthread_mutex_t seqNumGate;
IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
MessageQ_PutHookFxn putHookFxn;
*/
typedef struct MessageQ_Object_tag {
CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+ pthread_mutex_t msgListGate;
MessageQ_Params params;
MessageQ_QueueId queue;
int unblocked;
{
.refCount = 0,
.nameServer = NULL,
- .gate = PTHREAD_MUTEX_INITIALIZER,
+#if defined(IPC_BUILDOS_ANDROID)
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+ .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
.putHookFxn = NULL
};
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
+ pthread_mutex_init(&obj->msgListGate, NULL);
CIRCLEQ_INIT(&obj->msgList);
if (sem_init(&obj->synchronizer, 0, 0) < 0) {
PRINTVERBOSE1(
PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
"queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
*/
MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
return (MessageQ_Handle)obj;
}
"MessageQ_delete: got LAD response for client %d, status=%d\n",
handle, status)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
queueIndex = MessageQ_getQueueIndex(obj->queue);
MessageQ_module->queues[queueIndex] = NULL;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(obj);
*handlePtr = NULL;
/*
* ======== MessageQ_put ========
- * Place a message onto a message queue
+ * Deliver the given message, either locally or to the transport
*
- * Calls transport's put(), which handles the sending of the message
- * using the appropriate kernel interface (socket, device ioctl) call
- * for the remote procId encoded in the queueId argument.
+ * If the destination is a local queue, deliver the message. Otherwise,
+ * pass the message to a transport for delivery. The transport handles
+ * the sending of the message using the appropriate interface (socket,
+ * device ioctl, etc.).
*/
Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
{
+ Int status = MessageQ_S_SUCCESS;
MessageQ_Object *obj;
- UInt16 dstProcId = (UInt16)(queueId >> 16);
- UInt16 queueIndex;
- UInt16 queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- Int status = MessageQ_S_SUCCESS;
+ UInt16 dstProcId;
+ UInt16 queueIndex;
+ UInt16 queuePort;
ITransport_Handle baseTrans;
IMessageQTransport_Handle msgTrans;
INetworkTransport_Handle netTrans;
Int priority;
UInt tid;
UInt16 clusterId;
+ Bool delivered;
+
+ /* extract destination address from the given queueId */
+ dstProcId = (UInt16)(queueId >> 16);
+ queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- /* use the queue port # for destination address */
+ /* write the destination address into the message header */
msg->dstId = queuePort;
msg->dstProc= dstProcId;
- /* invoke put hook function after addressing the message */
+ /* invoke the hook function after addressing the message */
if (MessageQ_module->putHookFxn != NULL) {
MessageQ_module->putHookFxn(queueId, msg);
}
- /* extract the transport ID from the message header */
+ /* For an outbound message: If message destination is on this
+ * processor, then check if the destination queue is in this
+ * process (thread-to-thread messaging).
+ *
+ * For an inbound message: Check if destination queue is in this
+ * process (process-to-process messaging).
+ */
+ if (dstProcId == MultiProc_self()) {
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ if (queueIndex < MessageQ_module->numQueues) {
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ if (obj != NULL) {
+ /* deliver message to queue */
+ pthread_mutex_lock(&obj->msgListGate);
+ CIRCLEQ_INSERT_TAIL(&obj->msgList,
+ (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+ pthread_mutex_unlock(&obj->msgListGate);
+ sem_post(&obj->synchronizer);
+ goto done;
+ }
+ }
+ }
+
+ /* Getting here implies the message is outbound. Must give it to
+ * either the primary or secondary transport for delivery. Start
+ * by extracting the transport ID from the message header.
+ */
tid = MessageQ_getTransportId(msg);
if (tid >= MessageQ_MAXTRANSPORTS) {
printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
tid, MessageQ_MAXTRANSPORTS);
- return (MessageQ_E_FAIL);
+ status = MessageQ_E_FAIL;
+ goto done;
}
- /* if tid is set, use secondary transport regardless of destination */
+ /* if transportId is set, use secondary transport for message delivery */
if (tid != 0) {
baseTrans = MessageQ_module->transInst[tid];
if (baseTrans == NULL) {
printf("MessageQ_put: Error: transport is null\n");
- return (MessageQ_E_FAIL);
+ status = MessageQ_E_FAIL;
+ goto done;
}
/* downcast instance pointer to transport interface */
switch (ITransport_itype(baseTrans)) {
case INetworkTransport_TypeId:
netTrans = INetworkTransport_downCast(baseTrans);
- INetworkTransport_put(netTrans, (Ptr)msg);
+ delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+ MessageQ_E_FAIL));
break;
default:
}
}
else {
- /* if destination on another processor, use primary transport */
- if (dstProcId != MultiProc_self()) {
- priority = MessageQ_getMsgPri(msg);
- clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
-
- /* primary transport can only be used for intra-cluster delivery */
- if (clusterId > MultiProc_getNumProcsInCluster()) {
- printf("MessageQ_put: Error: destination procId=%d is not "
- "in cluster. Must specify a transportId.\n", dstProcId);
- return (MessageQ_E_FAIL);
- }
-
- msgTrans = MessageQ_module->transports[clusterId][priority];
-
- IMessageQTransport_put(msgTrans, (Ptr)msg);
+ /* use primary transport for delivery */
+ priority = MessageQ_getMsgPri(msg);
+ clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+ /* primary transport can only be used for intra-cluster delivery */
+ if (clusterId > MultiProc_getNumProcsInCluster()) {
+ printf("MessageQ_put: Error: destination procId=%d is not "
+ "in cluster. Must specify a transportId.\n", dstProcId);
+ status = MessageQ_E_FAIL;
+ goto done;
}
- else {
- /* check if destination queue is in this process */
- queueIndex = queuePort - MessageQ_PORTOFFSET;
- if (queueIndex >= MessageQ_module->numQueues) {
- printf("MessageQ_put: Error: unable to deliver message, "
- "queueIndex too large or transportId missing.\n");
- return (MessageQ_E_FAIL);
- }
-
- obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
- if (obj == NULL) {
- printf("MessageQ_put: Error: unable to deliver message, "
- "destination queue not in this process.\n");
- return (MessageQ_E_FAIL);
- }
-
- /* deliver message to process local queue */
- pthread_mutex_lock(&MessageQ_module->gate);
- CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg,
- elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
- sem_post(&obj->synchronizer);
- }
+ msgTrans = MessageQ_module->transports[clusterId][priority];
+ delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
}
+done:
return (status);
}
* Optimization here to get a message without going in to the sem
* operation, but the sem count will not be maintained properly.
*/
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
if (obj->msgList.cqh_first != &obj->msgList) {
*msg = (MessageQ_Msg)obj->msglist.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
else {
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
#endif
sem_wait(&obj->synchronizer);
}
else {
+ /* add timeout (microseconds) to current time of day */
gettimeofday(&tv, NULL);
+ tv.tv_sec += timeout / 1000000;
+ tv.tv_usec += timeout % 1000000;
+
+ if (tv.tv_usec >= 1000000) {
+ tv.tv_sec++;
+ tv.tv_usec -= 1000000;
+ }
+
+ /* set absolute timeout value */
ts.tv_sec = tv.tv_sec;
- ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+ ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
if (errno == ETIMEDOUT) {
PRINTVERBOSE0("MessageQ_get: operation timed out\n")
-
- return MessageQ_E_TIMEOUT;
+ return (MessageQ_E_TIMEOUT);
+ }
+ else {
+ PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+ return (MessageQ_E_FAIL);
}
}
}
if (obj->unblocked) {
- return MessageQ_E_UNBLOCKED;
+ return obj->unblocked;
}
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
*msg = (MessageQ_Msg)obj->msgList.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
return status;
}
{
MessageQ_Object *obj = (MessageQ_Object *)handle;
- obj->unblocked = TRUE;
+ obj->unblocked = MessageQ_E_UNBLOCKED;
+ sem_post(&obj->synchronizer);
+}
+
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
+{
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+ obj->unblocked = MessageQ_E_SHUTDOWN;
sem_post(&obj->synchronizer);
}
return queueId;
}
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+ MessageQ_Object *obj;
+ MessageQ_QueueIndex queueIndex;
+ UInt16 procId;
+
+ procId = MessageQ_getProcId(queueId);
+ if (procId != MultiProc_self()) {
+ return NULL;
+ }
+
+ queueIndex = MessageQ_getQueueIndex(queueId);
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ return (MessageQ_Handle)obj;
+}
+
/* Sets the tracing of a message */
Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
{
msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
msg->srcProc = MultiProc_self();
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&MessageQ_module->seqNumGate);
msg->seqNum = MessageQ_module->seqNum++;
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&MessageQ_module->seqNumGate);
#endif
}
MessageQ_Handle *oldQueues;
UInt oldSize;
+ pthread_mutex_lock(&MessageQ_module->gate);
+
oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
MessageQ_module->queues = queues;
MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(oldQueues);
return;
}
+
+/*
+ * ======== MessageQ_bind ========
+ * Bind all existing message queues to the given processor
+ *
+ * Note: This function is a hack to work around the driver.
+ *
+ * The Linux rpmsgproto driver requires a socket for each
+ * message queue and remote processor tuple.
+ *
+ * socket --> (queue, processor)
+ *
+ * Therefore, each time a new remote processor is started, all
+ * existing message queues need to create a socket for the new
+ * processor.
+ *
+ * The driver should not have this requirement. One socket per
+ * message queue should be sufficient to uniquely identify the
+ * endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_bind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ * ======== MessageQ_unbind ========
+ * Unbind all existing message queues from the given processor
+ *
+ * Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_unbind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}