index c55015c449c58940548e94e126f2cab0ce9ddff1..9fba28790e90bb7571234d538a6fd29d16c7b4a0 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
#define MessageQ_internal 1 /* must be defined before include file */
#include <ti/ipc/MessageQ.h>
#include <_MessageQ.h>
-#include <ITransport.h>
-#include <IMessageQTransport.h>
-#include <INetworkTransport.h>
+#include <ti/ipc/interfaces/ITransport.h>
+#include <ti/ipc/interfaces/IMessageQTransport.h>
+#include <ti/ipc/interfaces/INetworkTransport.h>
/* Socket Headers */
#include <sys/select.h>
#include <pthread.h>
#include <semaphore.h>
-/* Socket Protocol Family */
-#include <net/rpmsg.h>
-
#include <ladclient.h>
#include <_lad.h>
pthread_mutex_t gate;
int seqNum;
IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
- INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS];
+ ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
MessageQ_PutHookFxn putHookFxn;
} MessageQ_ModuleObject;
{
.refCount = 0,
.nameServer = NULL,
+ .gate = PTHREAD_MUTEX_INITIALIZER,
.putHookFxn = NULL
};
return MessageQ_E_ALREADYEXISTS;
}
- MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+ MessageQ_module->transInst[tid] = inst;
return MessageQ_S_SUCCESS;
}
*/
Int MessageQ_setup(const MessageQ_Config *cfg)
{
- Int status;
+ Int status = MessageQ_S_SUCCESS;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
Int i;
Int tid;
+ /* this entire function must be serialized */
pthread_mutex_lock(&MessageQ_module->gate);
- MessageQ_module->refCount++;
- if (MessageQ_module->refCount > 1) {
-
- pthread_mutex_unlock(&MessageQ_module->gate);
-
+ /* ensure only first thread performs startup procedure */
+ if (++MessageQ_module->refCount > 1) {
PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
- MessageQ_module->refCount)
-
- return MessageQ_S_ALREADYSETUP;
+ MessageQ_module->refCount)
+ status = MessageQ_S_ALREADYSETUP;
+ goto exit;
}
- pthread_mutex_unlock(&MessageQ_module->gate);
-
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
- PRINTVERBOSE1(
- "MessageQ_setup: can't find connection to daemon for pid %d\n",
- getpid())
-
- return MessageQ_E_RESOURCE;
+ PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
+ "pid %d\n", getpid())
+ status = MessageQ_E_RESOURCE;
+ goto exit;
}
cmd.cmd = LAD_MESSAGEQ_SETUP;
cmd.clientId = handle;
- memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
+ memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
- PRINTVERBOSE1(
- "MessageQ_setup: sending LAD command failed, status=%d\n", status)
- return MessageQ_E_FAIL;
+ PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
+ "status=%d\n", status)
+ status = MessageQ_E_FAIL;
+ goto exit;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
- return status;
+ status = MessageQ_E_FAIL;
+ goto exit;
}
status = rsp.setup.status;
- PRINTVERBOSE2(
- "MessageQ_setup: got LAD response for client %d, status=%d\n",
- handle, status)
+ PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
+ handle, status)
MessageQ_module->seqNum = 0;
MessageQ_module->nameServer = rsp.setup.nameServerHandle;
MessageQ_module->numQueues = cfg->maxRuntimeEntries;
MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
- sizeof (MessageQ_Handle));
-
- pthread_mutex_init(&MessageQ_module->gate, NULL);
+ sizeof(MessageQ_Handle));
for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
for (pri = 0; pri < 2; pri++) {
MessageQ_module->transInst[tid] = NULL;
}
- return status;
+exit:
+ /* if error, must decrement reference count */
+ if (status < 0) {
+ MessageQ_module->refCount--;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
/*
*/
Int MessageQ_destroy(void)
{
- Int status;
+ Int status = MessageQ_S_SUCCESS;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
+ /* this entire function must be serialized */
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ /* ensure only last thread does the work */
+ if (--MessageQ_module->refCount > 0) {
+ goto exit;
+ }
+
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
- PRINTVERBOSE1(
- "MessageQ_destroy: can't find connection to daemon for pid %d\n",
- getpid())
-
- return MessageQ_E_RESOURCE;
+ PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
+ "for pid %d\n", getpid())
+ status = MessageQ_E_RESOURCE;
+ goto exit;
}
cmd.cmd = LAD_MESSAGEQ_DESTROY;
cmd.clientId = handle;
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
- PRINTVERBOSE1(
- "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
- return MessageQ_E_FAIL;
+ PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
+ "status=%d\n", status)
+ status = MessageQ_E_FAIL;
+ goto exit;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
- return status;
+ status = MessageQ_E_FAIL;
+ goto exit;
}
status = rsp.status;
- PRINTVERBOSE2(
- "MessageQ_destroy: got LAD response for client %d, status=%d\n",
- handle, status)
+ PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
+ "status=%d\n", handle, status)
- return status;
+exit:
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
/*
}
/*
- * MessageQ_create - create a MessageQ object for receiving.
+ * ======== MessageQ_create ========
*/
MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
{
Int status;
MessageQ_Object *obj = NULL;
IMessageQTransport_Handle transport;
- INetworkTransport_Handle transInst;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
UInt16 queueIndex;
UInt16 clusterId;
Int tid;
/* Populate the params member */
memcpy(&obj->params, &ps, sizeof(ps));
- queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
return NULL;
}
+ /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
+ queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
+
PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
- "queueIndex %d\n", name, queueIndex)
+ "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
}
for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
- transInst = MessageQ_module->transInst[tid];
- if (transInst) {
- /* need to check return and do something if error */
- INetworkTransport_bind((Void *)transInst, obj->queue);
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_bind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
}
}
- /*
- * Since LAD's MessageQ_module can grow, we need to be able to grow as well
- */
+ /* LAD's MessageQ module can grow, we need to grow as well */
if (queueIndex >= MessageQ_module->numQueues) {
_MessageQ_grow(queueIndex);
}
- /*
- * No need to "allocate" slot since the queueIndex returned by
- * LAD is guaranteed to be unique.
+ /* No need to "allocate" slot since the queueIndex returned by
+ * LAD is guaranteed to be unique.
*/
MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
}
/*
- * MessageQ_delete - delete a MessageQ object.
+ * ======== MessageQ_delete ========
*/
Int MessageQ_delete(MessageQ_Handle *handlePtr)
{
MessageQ_Object *obj;
IMessageQTransport_Handle transport;
- INetworkTransport_Handle transInst;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
Int status = MessageQ_S_SUCCESS;
UInt16 queueIndex;
UInt16 clusterId;
}
for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
- transInst = MessageQ_module->transInst[tid];
- if (transInst) {
- INetworkTransport_unbind((Void *)transInst, obj->queue);
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_unbind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
}
}
- queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
+ /* extract the queue index from the queueId */
+ queueIndex = MessageQ_getQueueIndex(obj->queue);
MessageQ_module->queues[queueIndex] = NULL;
free(obj);
}
/*
- * MessageQ_open - Opens an instance of MessageQ for sending.
+ * ======== MessageQ_open ========
+ * Acquire a queueId for use in sending messages to the queue
*/
Int MessageQ_open(String name, MessageQ_QueueId *queueId)
{
}
/*
- * MessageQ_close - Closes previously opened instance of MessageQ.
+ * ======== MessageQ_openQueueId ========
+ */
+MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
+{
+ MessageQ_QueueIndex queuePort;
+ MessageQ_QueueId queueId;
+
+ /* queue port is embedded in the queueId */
+ queuePort = queueIndex + MessageQ_PORTOFFSET;
+ queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
+
+ return (queueId);
+}
+
+/*
+ * ======== MessageQ_close ========
+ * Closes previously opened instance of MessageQ
*/
Int MessageQ_close(MessageQ_QueueId *queueId)
{
}
/*
- * MessageQ_put - place a message onto a message queue.
- *
- * Calls transport's put(), which handles the sending of the message using the
- * appropriate kernel interface (socket, device ioctl) call for the remote
- * procId encoded in the queueId argument.
+ * ======== MessageQ_put ========
+ * Deliver the given message, either locally or to the transport
*
+ * If the destination is a local queue, deliver the message. Otherwise,
+ * pass the message to a transport for delivery. The transport handles
+ * the sending of the message using the appropriate interface (socket,
+ * device ioctl, etc.).
*/
Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
{
+ Int status = MessageQ_S_SUCCESS;
MessageQ_Object *obj;
- UInt16 dstProcId = (UInt16)(queueId >> 16);
- UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- Int status = MessageQ_S_SUCCESS;
- ITransport_Handle transport;
+ UInt16 dstProcId;
+ UInt16 queueIndex;
+ UInt16 queuePort;
+ ITransport_Handle baseTrans;
IMessageQTransport_Handle msgTrans;
INetworkTransport_Handle netTrans;
Int priority;
UInt tid;
UInt16 clusterId;
+ Bool delivered;
+
+ /* extract destination address from the given queueId */
+ dstProcId = (UInt16)(queueId >> 16);
+ queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
- msg->dstId = queueIndex;
- msg->dstProc = dstProcId;
+ /* write the destination address into the message header */
+ msg->dstId = queuePort;
+ msg->dstProc= dstProcId;
- /* invoke put hook function after addressing the message */
+ /* invoke the hook function after addressing the message */
if (MessageQ_module->putHookFxn != NULL) {
MessageQ_module->putHookFxn(queueId, msg);
}
- if (dstProcId != MultiProc_self()) {
- tid = MessageQ_getTransportId(msg);
- if (tid == 0) {
- priority = MessageQ_getMsgPri(msg);
- clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
-
- /* primary transport can only be used for intra-cluster delivery */
- if (clusterId > MultiProc_getNumProcsInCluster()) {
- printf("MessageQ_put: Error: destination procId=%d is not "
- "in cluster. Must specify a transportId.\n", dstProcId);
- return MessageQ_E_FAIL;
+ /* For an outbound message: If message destination is on this
+ * processor, then check if the destination queue is in this
+ * process (thread-to-thread messaging).
+ *
+ * For an inbound message: Check if destination queue is in this
+ * process (process-to-process messaging).
+ */
+ if (dstProcId == MultiProc_self()) {
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ if (queueIndex < MessageQ_module->numQueues) {
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ if (obj != NULL) {
+ /* deliver message to queue */
+ pthread_mutex_lock(&MessageQ_module->gate);
+ CIRCLEQ_INSERT_TAIL(&obj->msgList,
+ (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+ pthread_mutex_unlock(&MessageQ_module->gate);
+ sem_post(&obj->synchronizer);
+ goto done;
}
+ }
+ }
- msgTrans = MessageQ_module->transports[clusterId][priority];
+ /* Getting here implies the message is outbound. Must give it to
+ * either the primary or secondary transport for delivery. Start
+ * by extracting the transport ID from the message header.
+ */
+ tid = MessageQ_getTransportId(msg);
- IMessageQTransport_put(msgTrans, (Ptr)msg);
- }
- else {
- if (tid >= MessageQ_MAXTRANSPORTS) {
- printf("MessageQ_put: transport id %d too big, must be < %d\n",
- tid, MessageQ_MAXTRANSPORTS);
- return MessageQ_E_FAIL;
- }
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
+ tid, MessageQ_MAXTRANSPORTS);
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
- /* use secondary transport */
- netTrans = MessageQ_module->transInst[tid];
- transport = INetworkTransport_upCast(netTrans);
+ /* if transportId is set, use secondary transport for message delivery */
+ if (tid != 0) {
+ baseTrans = MessageQ_module->transInst[tid];
- /* downcast instance pointer to transport interface */
- switch (ITransport_itype(transport)) {
- case INetworkTransport_TypeId:
- INetworkTransport_put(netTrans, (Ptr)msg);
- break;
+ if (baseTrans == NULL) {
+ printf("MessageQ_put: Error: transport is null\n");
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
- default:
- /* error */
- printf("MessageQ_put: Error: transport id %d is an "
- "unsupported transport type\n", tid);
- status = MessageQ_E_FAIL;
- break;
- }
+ /* downcast instance pointer to transport interface */
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_put: Error: transport id %d is an "
+ "unsupported transport type\n", tid);
+ status = MessageQ_E_FAIL;
+ break;
}
}
else {
- obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
-
- pthread_mutex_lock(&MessageQ_module->gate);
-
- /* It is a local MessageQ */
- CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
-
- pthread_mutex_unlock(&MessageQ_module->gate);
+ /* use primary transport for delivery */
+ priority = MessageQ_getMsgPri(msg);
+ clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+ /* primary transport can only be used for intra-cluster delivery */
+ if (clusterId > MultiProc_getNumProcsInCluster()) {
+ printf("MessageQ_put: Error: destination procId=%d is not "
+ "in cluster. Must specify a transportId.\n", dstProcId);
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
- sem_post(&obj->synchronizer);
+ msgTrans = MessageQ_module->transports[clusterId][priority];
+ delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
}
- return status;
+done:
+ return (status);
}
/*
sem_wait(&obj->synchronizer);
}
else {
+ /* add timeout (microseconds) to current time of day */
gettimeofday(&tv, NULL);
+ tv.tv_sec += timeout / 1000000;
+ tv.tv_usec += timeout % 1000000;
+
+ if (tv.tv_usec >= 1000000) {
+ tv.tv_sec++;
+ tv.tv_usec -= 1000000;
+ }
+
+ /* set absolute timeout value */
ts.tv_sec = tv.tv_sec;
- ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+ ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
if (errno == ETIMEDOUT) {
PRINTVERBOSE0("MessageQ_get: operation timed out\n")
-
- return MessageQ_E_TIMEOUT;
+ return (MessageQ_E_TIMEOUT);
+ }
+ else {
+ PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+ return (MessageQ_E_FAIL);
}
}
}
if (obj->unblocked) {
- return MessageQ_E_UNBLOCKED;
+ return obj->unblocked;
}
pthread_mutex_lock(&MessageQ_module->gate);
{
MessageQ_Object *obj = (MessageQ_Object *)handle;
- obj->unblocked = TRUE;
+ obj->unblocked = MessageQ_E_UNBLOCKED;
+ sem_post(&obj->synchronizer);
+}
+
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
+{
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+ obj->unblocked = MessageQ_E_SHUTDOWN;
sem_post(&obj->synchronizer);
}
return queueId;
}
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+ MessageQ_Object *obj;
+ MessageQ_QueueIndex queueIndex;
+ UInt16 procId;
+
+ procId = MessageQ_getProcId(queueId);
+ if (procId != MultiProc_self()) {
+ return NULL;
+ }
+
+ queueIndex = MessageQ_getQueueIndex(queueId);
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ return (MessageQ_Handle)obj;
+}
+
/* Sets the tracing of a message */
Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
{
}
/*
- * Grow module's queues[] array to accommodate queueIndex from LAD
+ * ======== _MessageQ_grow ========
+ * Increase module's queues array to accommodate queueIndex from LAD
+ *
+ * Note: this function takes the queue index value (i.e. without the
+ * port offset).
*/
Void _MessageQ_grow(UInt16 queueIndex)
{
oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
- queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
+ queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
memcpy(queues, MessageQ_module->queues, oldSize);
oldQueues = MessageQ_module->queues;
return;
}
-