index d718f2d8249508aaaa6eb7513d40f777b325dd92..15d6e7640ca8b215fb50d55383d31ed175717e5f 100644 (file)
/*
- * Copyright (c) 2012, Texas Instruments Incorporated
+ * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
*/
#define MessageQ_NAMESERVER "MessageQ"
-/* Slot 0 reserved for NameServer messages: */
-#define RESERVED_MSGQ_INDEX 1
+/* Number of entries to grow when we run out of queueIndexs */
+#define MessageQ_GROWSIZE 32
/* Define BENCHMARK to quiet key MessageQ APIs: */
//#define BENCHMARK
/*!< Handle to the local NameServer used for storing GP objects */
pthread_mutex_t gate;
/*!< Handle of gate to be used for local thread safety */
- MessageQ_Config cfg;
+ MessageQ_Config *cfg;
/*!< Current config values */
- MessageQ_Config defaultCfg;
- /*!< Default config values */
MessageQ_Params defaultInstParams;
/*!< Default instance creation parameters */
MessageQ_Handle * queues;
* Globals
* =============================================================================
*/
+extern MessageQ_Config ti_ipc_MessageQ_cfg;
+
static MessageQ_ModuleObject MessageQ_state =
{
.refCount = 0,
#else
.gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
#endif
- .defaultCfg.traceFlag = FALSE,
- .defaultCfg.maxRuntimeEntries = 32u,
- .defaultCfg.maxNameLen = 32u,
+ .cfg = &ti_ipc_MessageQ_cfg,
};
/*!
{
assert(cfg != NULL);
- /* If setup has not yet been called... */
- if (MessageQ_module->refCount < 1) {
- memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
- }
- else {
- memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
- }
+ memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
}
/* Function to setup the MessageQ module. */
MessageQ_module->refCount++;
if (MessageQ_module->refCount > 1) {
status = MessageQ_S_ALREADYSETUP;
- LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
-
+ LOG1("MessageQ module has been already setup, refCount=%d\n",
+ MessageQ_module->refCount)
goto exitSetup;
}
/* Initialize the parameters */
NameServer_Params_init(¶ms);
params.maxValueLen = sizeof(UInt32);
- params.maxNameLen = cfg->maxNameLen;
+ params.maxNameLen = MessageQ_module->cfg->maxNameLen;
/* Create the nameserver for modules */
MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
¶ms);
- memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
-
MessageQ_module->seqNum = 0;
- MessageQ_module->numQueues = cfg->maxRuntimeEntries;
+ MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
MessageQ_module->queues = (MessageQ_Handle *)
calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
+ MessageQ_module->canFreeQueues = TRUE;
exitSetup:
LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
MessageQ_module->queues = NULL;
}
- memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
MessageQ_module->numQueues = 0u;
- MessageQ_module->canFreeQueues = TRUE;
exitDestroy:
LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
return (status);
}
-/* Function to initialize the parameters for the MessageQ instance. */
-Void MessageQ_Params_init(MessageQ_Params * params)
-{
- memcpy(params, &(MessageQ_module->defaultInstParams),
- sizeof(MessageQ_Params));
-
- return;
-}
-
/*
* Function to create a MessageQ object for receiving.
*/
MessageQ_Object * obj = NULL;
Bool found = FALSE;
UInt16 count = 0;
- UInt16 queueIndex = 0u;
+ UInt16 queueIndex;
+ UInt16 queuePort;
UInt16 procId;
int i;
+ UInt numReserved;
- LOG1("MessageQ_create: creating '%s'\n", name)
+ LOG1("MessageQ_create: creating '%s'\n", (name == NULL) ? "NULL" : name)
/* Create the generic obj */
obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
+ if (obj == NULL) {
+ LOG0("MessageQ_create: Error: no memory\n")
+ return (NULL);
+ }
+
+ numReserved = MessageQ_module->cfg->numReservedEntries;
+
pthread_mutex_lock(&(MessageQ_module->gate));
- count = MessageQ_module->numQueues;
+ /* check if creating a reserved queue */
+ if (params->queueIndex != MessageQ_ANY) {
+ queueIndex = params->queueIndex;
- /* Search the dynamic array for any holes */
- /* We start from 1, as 0 is reserved for binding NameServer: */
- for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
- if (MessageQ_module->queues [i] == NULL) {
- MessageQ_module->queues [i] = (MessageQ_Handle)obj;
- queueIndex = i;
- found = TRUE;
- break;
+ if (queueIndex > numReserved) {
+ LOG2("MessageQ_create: Error: requested queue index %d is greater "
+ "than reserved maximum %d\n", queueIndex, numReserved - 1)
+ free(obj);
+ obj = NULL;
+ }
+ else if (MessageQ_module->queues[queueIndex] != NULL) {
+ LOG1("MessageQ_create: Error: requested queue index %d is already "
+ "in use.\n", queueIndex);
+ free(obj);
+ obj = NULL;
+ }
+
+ if (obj == NULL) {
+ pthread_mutex_unlock(&(MessageQ_module->gate));
+ return (NULL);
+ }
+
+ MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+ found = TRUE;
+ }
+ else {
+ count = MessageQ_module->numQueues;
+
+ /* search the dynamic array for any holes */
+ for (i = numReserved; i < count ; i++) {
+ if (MessageQ_module->queues [i] == NULL) {
+ MessageQ_module->queues [i] = (MessageQ_Handle)obj;
+ queueIndex = i;
+ found = TRUE;
+ break;
+ }
}
}
memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
}
+ /* create globally unique message queue ID */
procId = MultiProc_self();
- /* create globally unique messageQ ID: */
- obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueIndex);
+ queuePort = queueIndex + MessageQ_PORTOFFSET;
+ obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
obj->ownerPid = 0;
if (name != NULL) {
obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
- obj->queue);
+ obj->queue);
}
/* Cleanup if fail */
MessageQ_delete((MessageQ_Handle *)&obj);
}
- LOG1("MessageQ_create: returning %p\n", obj)
+ LOG2("MessageQ_create: returning obj=%p, qid=0x%x\n", obj, obj->queue)
return ((MessageQ_Handle)obj);
}
Int status = MessageQ_S_SUCCESS;
MessageQ_Object *obj;
MessageQ_Handle queue;
+ UInt16 queueIndex;
obj = (MessageQ_Object *)(*handlePtr);
LOG1("MessageQ_delete: deleting %p\n", obj)
- queue = MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)];
+ queueIndex = MessageQ_getQueueIndex(obj->queue);
+ queue = MessageQ_module->queues[queueIndex];
if (queue != obj) {
- LOG1(" ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queue))
+ LOG1("ERROR: obj != MessageQ_module->queues[%d]\n", queueIndex)
}
if (obj->nsKey != NULL) {
pthread_mutex_lock(&(MessageQ_module->gate));
/* Clear the MessageQ obj from array. */
- MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)] = NULL;
+ MessageQ_module->queues[queueIndex] = NULL;
/* Release the local lock */
pthread_mutex_unlock(&(MessageQ_module->gate));
oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
/* Allocate larger table */
- queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
+ queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
+ sizeof(MessageQ_Handle));
/* Copy contents into new table */
memcpy(queues, MessageQ_module->queues, oldSize);
/* Hook-up new table */
oldQueues = MessageQ_module->queues;
MessageQ_module->queues = queues;
- MessageQ_module->numQueues++;
+ MessageQ_module->numQueues += MessageQ_GROWSIZE;
/* Delete old table if not statically defined */
if (MessageQ_module->canFreeQueues == TRUE) {
}
}
}
+
+Void _MessageQ_setNumReservedEntries(UInt n)
+{
+ MessageQ_module->cfg->numReservedEntries = n;
+}