index 9fba28790e90bb7571234d538a6fd29d16c7b4a0..f82728dcb02ead73a15287ecf2c85b17fe748921 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
#define MessageQ_internal 1 /* must be defined before include file */
#include <ti/ipc/MessageQ.h>
#include <_MessageQ.h>
+#include <ti/ipc/interfaces/IHeap.h>
#include <ti/ipc/interfaces/ITransport.h>
#include <ti/ipc/interfaces/IMessageQTransport.h>
#include <ti/ipc/interfaces/INetworkTransport.h>
NameServer_Handle nameServer;
pthread_mutex_t gate;
int seqNum;
+ pthread_mutex_t seqNumGate;
IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
MessageQ_PutHookFxn putHookFxn;
+ Ptr *heaps;
+ Int numHeaps;
} MessageQ_ModuleObject;
typedef struct MessageQ_CIRCLEQ_ENTRY {
*/
typedef struct MessageQ_Object_tag {
CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+ pthread_mutex_t msgListGate;
MessageQ_Params params;
MessageQ_QueueId queue;
int unblocked;
{
.refCount = 0,
.nameServer = NULL,
- .gate = PTHREAD_MUTEX_INITIALIZER,
- .putHookFxn = NULL
+#if defined(IPC_BUILDOS_ANDROID)
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+ .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
+ .putHookFxn = NULL,
+ .heaps = NULL,
+ .numHeaps = 0
};
/*!
UInt16 clusterId;
if (handle == NULL) {
- printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
- );
+ fprintf(stderr,
+ "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+ );
return status;
}
clusterId = rprocId - MultiProc_getBaseIdOfCluster();
if (clusterId >= MultiProc_MAXPROCESSORS) {
- printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
+ fprintf(stderr,
+ "MessageQ_registerTransport: invalid procId %d\n", rprocId);
return status;
}
Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
{
if (inst == NULL) {
- printf("MessageQ_registerTransportId: invalid NULL handle\n");
+ fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
return MessageQ_E_INVALIDARG;
}
if (tid >= MessageQ_MAXTRANSPORTS) {
- printf("MessageQ_unregisterNetTransport: invalid transport id %d,"
+ fprintf(stderr,
+ "MessageQ_unregisterNetTransport: invalid transport id %d, "
"must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
return MessageQ_E_INVALIDARG;
}
if (MessageQ_module->transInst[tid] != NULL) {
- printf("MessageQ_registerTransportId: transport id %d already "
+ fprintf(stderr,
+ "MessageQ_registerTransportId: transport id %d already "
"registered\n", tid);
return MessageQ_E_ALREADYEXISTS;
clusterId = rprocId - MultiProc_getBaseIdOfCluster();
if (clusterId >= MultiProc_MAXPROCESSORS) {
- printf("MessageQ_unregisterTransport: invalid rprocId %d\n", rprocId);
+ fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
+ rprocId);
return;
}
Void MessageQ_unregisterTransportId(UInt tid)
{
if (tid >= MessageQ_MAXTRANSPORTS) {
- printf("MessageQ_unregisterTransportId: invalid transport id %d, "
+ fprintf(stderr,
+ "MessageQ_unregisterTransportId: invalid transport id %d, "
"must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
return;
MessageQ_module->numQueues = cfg->maxRuntimeEntries;
MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
sizeof(MessageQ_Handle));
+ MessageQ_module->numHeaps = cfg->numHeaps;
+ MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
for (pri = 0; pri < 2; pri++) {
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
+ int i;
/* this entire function must be serialized */
pthread_mutex_lock(&MessageQ_module->gate);
goto exit;
}
+ /* ensure all registered heaps have been unregistered */
+ for (i = 0; i < MessageQ_module->numHeaps; i++) {
+ if (MessageQ_module->heaps[i] != NULL) {
+ PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
+ }
+ }
+ free(MessageQ_module->heaps);
+ MessageQ_module->heaps = NULL;
+
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
+ pthread_mutex_init(&obj->msgListGate, NULL);
CIRCLEQ_INIT(&obj->msgList);
if (sem_init(&obj->synchronizer, 0, 0) < 0) {
PRINTVERBOSE1(
PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
"queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
default:
/* error */
- printf("MessageQ_create: Error: transport id %d is an "
+ fprintf(stderr,
+ "MessageQ_create: Error: transport id %d is an "
"unsupported transport type.\n", tid);
break;
}
*/
MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
return (MessageQ_Handle)obj;
}
"MessageQ_delete: got LAD response for client %d, status=%d\n",
handle, status)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
default:
/* error */
- printf("MessageQ_create: Error: transport id %d is an "
+ fprintf(stderr,
+ "MessageQ_create: Error: transport id %d is an "
"unsupported transport type.\n", tid);
break;
}
queueIndex = MessageQ_getQueueIndex(obj->queue);
MessageQ_module->queues[queueIndex] = NULL;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(obj);
*handlePtr = NULL;
if (obj != NULL) {
/* deliver message to queue */
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
CIRCLEQ_INSERT_TAIL(&obj->msgList,
(MessageQ_CIRCLEQ_ENTRY *)msg, elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
sem_post(&obj->synchronizer);
goto done;
}
tid = MessageQ_getTransportId(msg);
if (tid >= MessageQ_MAXTRANSPORTS) {
- printf("MessageQ_put: Error: transport id %d too big, must be < %d\n",
+ fprintf(stderr,
+ "MessageQ_put: Error: transport id %d too big, must be < %d\n",
tid, MessageQ_MAXTRANSPORTS);
status = MessageQ_E_FAIL;
goto done;
baseTrans = MessageQ_module->transInst[tid];
if (baseTrans == NULL) {
- printf("MessageQ_put: Error: transport is null\n");
+ fprintf(stderr, "MessageQ_put: Error: transport is null\n");
status = MessageQ_E_FAIL;
goto done;
}
case INetworkTransport_TypeId:
netTrans = INetworkTransport_downCast(baseTrans);
delivered = INetworkTransport_put(netTrans, (Ptr)msg);
- status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+ MessageQ_E_FAIL));
break;
default:
/* error */
- printf("MessageQ_put: Error: transport id %d is an "
+ fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
"unsupported transport type\n", tid);
status = MessageQ_E_FAIL;
break;
/* primary transport can only be used for intra-cluster delivery */
if (clusterId > MultiProc_getNumProcsInCluster()) {
- printf("MessageQ_put: Error: destination procId=%d is not "
+ fprintf(stderr,
+ "MessageQ_put: Error: destination procId=%d is not "
"in cluster. Must specify a transportId.\n", dstProcId);
status = MessageQ_E_FAIL;
goto done;
msgTrans = MessageQ_module->transports[clusterId][priority];
delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
- status = (delivered ? MessageQ_S_SUCCESS : MessageQ_E_FAIL);
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
}
done:
* Optimization here to get a message without going in to the sem
* operation, but the sem count will not be maintained properly.
*/
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
if (obj->msgList.cqh_first != &obj->msgList) {
*msg = (MessageQ_Msg)obj->msglist.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
else {
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
}
#endif
return obj->unblocked;
}
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&obj->msgListGate);
*msg = (MessageQ_Msg)obj->msgList.cqh_first;
CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&obj->msgListGate);
return status;
}
*/
MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
{
+ IHeap_Handle heap;
MessageQ_Msg msg;
- /*
- * heapId not used for local alloc (as this is over a copy transport), but
- * we need to send to other side as heapId is used in BIOS transport.
- */
- msg = (MessageQ_Msg)calloc(1, size);
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
+ return (NULL);
+ }
+ else if (MessageQ_module->heaps[heapId] == NULL) {
+ PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
+ heapId);
+ return (NULL);
+ }
+ else {
+ heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
+ }
+
+ msg = IHeap_alloc(heap, size);
+
+ if (msg == NULL) {
+ return (NULL);
+ }
+
MessageQ_msgInit(msg);
msg->msgSize = size;
msg->heapId = heapId;
- return msg;
+ return (msg);
}
/*
Int MessageQ_free(MessageQ_Msg msg)
{
UInt32 status = MessageQ_S_SUCCESS;
+ IHeap_Handle heap;
- /* Check to ensure this was not allocated by user: */
+ /* ensure this was not allocated by user */
if (msg->heapId == MessageQ_STATICMSG) {
status = MessageQ_E_CANNOTFREESTATICMSG;
}
+ else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[msg->heapId] == NULL) {
+ status = MessageQ_E_NOTFOUND;
+ }
else {
- free(msg);
+ heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
}
- return status;
+ IHeap_free(heap, (void *)msg);
+
+ return (status);
}
-/* Register a heap with MessageQ. */
+/*
+ * ======== MessageQ_registerHeap ========
+ */
Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
{
Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport */
+ pthread_mutex_lock(&MessageQ_module->gate);
- return status;
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[heapId] != NULL) {
+ status = MessageQ_E_ALREADYEXISTS;
+ }
+ else {
+ MessageQ_module->heaps[heapId] = heap;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
-/* Unregister a heap with MessageQ. */
+/*
+ * ======== MessageQ_unregisterHeap ========
+ */
Int MessageQ_unregisterHeap(UInt16 heapId)
{
Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport */
+ pthread_mutex_lock(&MessageQ_module->gate);
- return status;
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[heapId] == NULL) {
+ status = MessageQ_E_NOTFOUND;
+ }
+ else {
+ MessageQ_module->heaps[heapId] = NULL;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
/* Unblocks a MessageQ */
msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
msg->srcProc = MultiProc_self();
- pthread_mutex_lock(&MessageQ_module->gate);
+ pthread_mutex_lock(&MessageQ_module->seqNumGate);
msg->seqNum = MessageQ_module->seqNum++;
- pthread_mutex_unlock(&MessageQ_module->gate);
+ pthread_mutex_unlock(&MessageQ_module->seqNumGate);
#endif
}
MessageQ_Handle *oldQueues;
UInt oldSize;
+ pthread_mutex_lock(&MessageQ_module->gate);
+
oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
MessageQ_module->queues = queues;
MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(oldQueues);
return;
}
+
+/*
+ * ======== MessageQ_bind ========
+ * Bind all existing message queues to the given processor
+ *
+ * Note: This function is a hack to work around the driver.
+ *
+ * The Linux rpmsgproto driver requires a socket for each
+ * message queue and remote processor tuple.
+ *
+ * socket --> (queue, processor)
+ *
+ * Therefore, each time a new remote processor is started, all
+ * existing message queues need to create a socket for the new
+ * processor.
+ *
+ * The driver should not have this requirement. One socket per
+ * message queue should be sufficient to uniquely identify the
+ * endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_bind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ * ======== MessageQ_unbind ========
+ * Unbind all existing message queues from the given processor
+ *
+ * Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_unbind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}