1 /*
2 * Copyright (c) 2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "server" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained here as needed and process-specific data
39 * is handled at the "client" level. At the moment, LAD is the only "user"
40 * of this implementation.
41 *
42 * The MessageQ module supports the structured sending and receiving of
43 * variable length messages. This module can be used for homogeneous or
44 * heterogeneous multi-processor messaging.
45 *
46 * MessageQ provides more sophisticated messaging than other modules. It is
47 * typically used for complex situations such as multi-processor messaging.
48 *
49 * The following are key features of the MessageQ module:
50 * -Writers and readers can be relocated to another processor with no
51 * runtime code changes.
52 * -Timeouts are allowed when receiving messages.
53 * -Readers can determine the writer and reply back.
54 * -Receiving a message is deterministic when the timeout is zero.
55 * -Messages can reside on any message queue.
56 * -Supports zero-copy transfers.
57 * -Can send and receive from any type of thread.
58 * -Notification mechanism is specified by application.
59 * -Allows QoS (quality of service) on message buffer pools. For example,
60 * using specific buffer pools for specific message queues.
61 *
62 * Messages are sent and received via a message queue. A reader is a thread
63 * that gets (reads) messages from a message queue. A writer is a thread that
64 * puts (writes) a message to a message queue. Each message queue has one
65 * reader and can have many writers. A thread may read from or write to multiple
66 * message queues.
67 *
68 * Conceptually, the reader thread owns a message queue. The reader thread
69 * creates a message queue. Writer threads a created message queues to
70 * get access to them.
71 *
72 * Message queues are identified by a system-wide unique name. Internally,
73 * MessageQ uses the NameServermodule for managing
74 * these names. The names are used for opening a message queue. Using
75 * names is not required.
76 *
77 * Messages must be allocated from the MessageQ module. Once a message is
78 * allocated, it can be sent on any message queue. Once a message is sent, the
79 * writer loses ownership of the message and should not attempt to modify the
80 * message. Once the reader receives the message, it owns the message. It
81 * may either free the message or re-use the message.
82 *
83 * Messages in a message queue can be of variable length. The only
84 * requirement is that the first field in the definition of a message must be a
85 * MsgHeader structure. For example:
86 * typedef struct MyMsg {
87 * MessageQ_MsgHeader header;
88 * ...
89 * } MyMsg;
90 *
91 * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92 * should not modify or directly access the fields in the MessageQ_MsgHeader.
93 *
94 * All messages sent via the MessageQ module must be allocated from a
95 * Heap implementation. The heap can be used for
96 * other memory allocation not related to MessageQ.
97 *
98 * An application can use multiple heaps. The purpose of having multiple
99 * heaps is to allow an application to regulate its message usage. For
100 * example, an application can allocate critical messages from one heap of fast
101 * on-chip memory and non-critical messages from another heap of slower
102 * external memory
103 *
104 * MessageQ does support the usage of messages that allocated via the
105 * alloc function. Please refer to the staticMsgInit
106 * function description for more details.
107 *
108 * In a multiple processor system, MessageQ communications to other
109 * processors via MessageQTransport instances. There must be one and
110 * only one MessageQTransport instance for each processor where communication
111 * is desired.
112 * So on a four processor system, each processor must have three
113 * MessageQTransport instance.
114 *
115 * The user only needs to create the MessageQTransport instances. The instances
116 * are responsible for registering themselves with MessageQ.
117 * This is accomplished via the registerTransport function.
118 *
119 * ============================================================================
120 */
123 /* Standard headers */
124 #include <Std.h>
126 /* Qnx specific header files */
127 #include <pthread.h>
129 #include <sys/select.h>
130 #include <sys/time.h>
131 #include <sys/types.h>
132 #include <sys/param.h>
133 #include <errno.h>
134 #include <stdio.h>
135 #include <string.h>
136 #include <stdlib.h>
137 #include <unistd.h>
138 #include <assert.h>
140 /* Module level headers */
141 #include <ti/ipc/NameServer.h>
142 #include <ti/ipc/MultiProc.h>
143 #include <_MultiProc.h>
144 #include <ti/ipc/MessageQ.h>
145 #include <_MessageQ.h>
146 #include <_log.h>
148 /* =============================================================================
149 * Macros/Constants
150 * =============================================================================
151 */
153 /*!
154 * @brief Name of the reserved NameServer used for MessageQ.
155 */
156 #define MessageQ_NAMESERVER "MessageQ"
158 /* Slot 0 reserved for NameServer messages: */
159 #define RESERVED_MSGQ_INDEX 1
161 /* Define BENCHMARK to quiet key MessageQ APIs: */
162 //#define BENCHMARK
164 /* =============================================================================
165 * Structures & Enums
166 * =============================================================================
167 */
169 /* structure for MessageQ module state */
170 typedef struct MessageQ_ModuleObject {
171 Int refCount;
172 /*!< Reference count */
173 NameServer_Handle nameServer;
174 /*!< Handle to the local NameServer used for storing GP objects */
175 pthread_mutex_t gate;
176 /*!< Handle of gate to be used for local thread safety */
177 MessageQ_Config cfg;
178 /*!< Current config values */
179 MessageQ_Config defaultCfg;
180 /*!< Default config values */
181 MessageQ_Params defaultInstParams;
182 /*!< Default instance creation parameters */
183 MessageQ_Handle * queues;
184 /*!< Global array of message queues */
185 UInt16 numQueues;
186 /*!< Initial number of messageQ objects allowed */
187 Bool canFreeQueues;
188 /*!< Grow option */
189 Bits16 seqNum;
190 /*!< sequence number */
191 } MessageQ_ModuleObject;
193 /*!
194 * @brief Structure for the Handle for the MessageQ.
195 */
196 typedef struct MessageQ_Object {
197 MessageQ_Params params;
198 /*! Instance specific creation parameters */
199 MessageQ_QueueId queue;
200 /* Unique id */
201 Ptr nsKey;
202 /* NameServer key */
203 Int ownerPid;
204 /* Process ID of owner */
205 } MessageQ_Object;
208 /* =============================================================================
209 * Globals
210 * =============================================================================
211 */
212 static MessageQ_ModuleObject MessageQ_state =
213 {
214 .refCount = 0,
215 .nameServer = NULL,
216 .queues = NULL,
217 .numQueues = 2u,
218 .canFreeQueues = FALSE,
219 .gate = PTHREAD_MUTEX_INITIALIZER,
220 .defaultCfg.traceFlag = FALSE,
221 .defaultCfg.maxRuntimeEntries = 32u,
222 .defaultCfg.maxNameLen = 32u,
223 };
225 /*!
226 * @var MessageQ_module
227 *
228 * @brief Pointer to the MessageQ module state.
229 */
230 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
233 /* =============================================================================
234 * Forward declarations of internal functions
235 * =============================================================================
236 */
237 /* Grow the MessageQ table */
238 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
240 /* =============================================================================
241 * APIS
242 * =============================================================================
243 */
244 /* Function to get default configuration for the MessageQ module.
245 *
246 */
247 Void MessageQ_getConfig(MessageQ_Config * cfg)
248 {
249 assert(cfg != NULL);
251 /* If setup has not yet been called... */
252 if (MessageQ_module->refCount < 1) {
253 memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
254 }
255 else {
256 memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
257 }
258 }
260 /* Function to setup the MessageQ module. */
261 Int MessageQ_setup(const MessageQ_Config * cfg)
262 {
263 Int status = MessageQ_S_SUCCESS;
264 NameServer_Params params;
266 pthread_mutex_lock(&(MessageQ_module->gate));
268 LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
270 MessageQ_module->refCount++;
271 if (MessageQ_module->refCount > 1) {
272 status = MessageQ_S_ALREADYSETUP;
273 LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
275 goto exitSetup;
276 }
278 /* Initialize the parameters */
279 NameServer_Params_init(¶ms);
280 params.maxValueLen = sizeof(UInt32);
281 params.maxNameLen = cfg->maxNameLen;
283 /* Create the nameserver for modules */
284 MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
285 ¶ms);
287 memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
289 MessageQ_module->seqNum = 0;
290 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
291 MessageQ_module->queues = (MessageQ_Handle *)
292 calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
294 exitSetup:
295 LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
297 pthread_mutex_unlock(&(MessageQ_module->gate));
299 return (status);
300 }
302 /*
303 * Function to destroy the MessageQ module.
304 */
305 Int MessageQ_destroy(void)
306 {
307 Int status = MessageQ_S_SUCCESS;
308 UInt32 i;
310 pthread_mutex_lock(&(MessageQ_module->gate));
312 LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
314 /* Decrease the refCount */
315 MessageQ_module->refCount--;
316 if (MessageQ_module->refCount > 0) {
317 goto exitDestroy;
318 }
320 /* Delete any Message Queues that have not been deleted so far. */
321 for (i = 0; i< MessageQ_module->numQueues; i++) {
322 if (MessageQ_module->queues [i] != NULL) {
323 MessageQ_delete(&(MessageQ_module->queues [i]));
324 }
325 }
327 if (MessageQ_module->nameServer != NULL) {
328 /* Delete the nameserver for modules */
329 status = NameServer_delete(&MessageQ_module->nameServer);
330 }
332 /* Since MessageQ_module->gate was not allocated, no need to delete. */
334 if (MessageQ_module->queues != NULL) {
335 free(MessageQ_module->queues);
336 MessageQ_module->queues = NULL;
337 }
339 memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
340 MessageQ_module->numQueues = 0u;
341 MessageQ_module->canFreeQueues = TRUE;
343 exitDestroy:
344 LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
346 pthread_mutex_unlock(&(MessageQ_module->gate));
348 return (status);
349 }
351 /* Function to initialize the parameters for the MessageQ instance. */
352 Void MessageQ_Params_init(MessageQ_Params * params)
353 {
354 memcpy(params, &(MessageQ_module->defaultInstParams),
355 sizeof(MessageQ_Params));
357 return;
358 }
360 /*
361 * Function to create a MessageQ object for receiving.
362 */
363 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
364 {
365 Int status = MessageQ_S_SUCCESS;
366 MessageQ_Object * obj = NULL;
367 Bool found = FALSE;
368 UInt16 count = 0;
369 UInt16 queueIndex = 0u;
370 UInt16 procId;
371 int i;
373 LOG1("MessageQ_create: creating '%s'\n", name)
375 /* Create the generic obj */
376 obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
378 pthread_mutex_lock(&(MessageQ_module->gate));
380 count = MessageQ_module->numQueues;
382 /* Search the dynamic array for any holes */
383 /* We start from 1, as 0 is reserved for binding NameServer: */
384 for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
385 if (MessageQ_module->queues [i] == NULL) {
386 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
387 queueIndex = i;
388 found = TRUE;
389 break;
390 }
391 }
393 if (found == FALSE) {
394 /* Growth is always allowed. */
395 queueIndex = _MessageQ_grow(obj);
396 }
398 pthread_mutex_unlock(&(MessageQ_module->gate));
400 if (params != NULL) {
401 /* Populate the params member */
402 memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
403 }
405 procId = MultiProc_self();
406 /* create globally unique messageQ ID: */
407 obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueIndex);
408 obj->ownerPid = 0;
410 if (name != NULL) {
411 obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
412 obj->queue);
413 }
415 /* Cleanup if fail */
416 if (status < 0) {
417 MessageQ_delete((MessageQ_Handle *)&obj);
418 }
420 LOG1("MessageQ_create: returning %p\n", obj)
422 return ((MessageQ_Handle)obj);
423 }
425 /*
426 * Function to delete a MessageQ object for a specific slave processor.
427 */
428 Int MessageQ_delete(MessageQ_Handle * handlePtr)
429 {
430 Int status = MessageQ_S_SUCCESS;
431 MessageQ_Object *obj;
432 MessageQ_Handle queue;
434 obj = (MessageQ_Object *)(*handlePtr);
436 LOG1("MessageQ_delete: deleting %p\n", obj)
438 queue = MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)];
439 if (queue != obj) {
440 LOG1(" ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queue))
441 }
443 if (obj->nsKey != NULL) {
444 /* Remove from the name server */
445 status = NameServer_removeEntry(MessageQ_module->nameServer,
446 obj->nsKey);
447 if (status < 0) {
448 /* Override with a MessageQ status code. */
449 status = MessageQ_E_FAIL;
450 }
451 else {
452 status = MessageQ_S_SUCCESS;
453 }
454 }
456 pthread_mutex_lock(&(MessageQ_module->gate));
458 /* Clear the MessageQ obj from array. */
459 MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)] = NULL;
461 /* Release the local lock */
462 pthread_mutex_unlock(&(MessageQ_module->gate));
464 /* Now free the obj */
465 free(obj);
466 *handlePtr = NULL;
468 LOG1("MessageQ_delete: returning %d\n", status)
470 return (status);
471 }
473 /* Returns the MessageQ_QueueId associated with the handle. */
474 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
475 {
476 MessageQ_Object * obj = (MessageQ_Object *)handle;
477 UInt32 queueId;
479 queueId = (obj->queue);
481 return queueId;
482 }
484 /*!
485 * @brief Grow the MessageQ table
486 *
487 * @param obj Pointer to the MessageQ object.
488 *
489 * @sa _MessageQ_grow
490 *
491 */
492 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
493 {
494 UInt16 queueIndex = MessageQ_module->numQueues;
495 UInt16 oldSize;
496 MessageQ_Handle * queues;
497 MessageQ_Handle * oldQueues;
499 /* No parameter validation required since this is an internal function. */
500 oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
502 /* Allocate larger table */
503 queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
505 /* Copy contents into new table */
506 memcpy(queues, MessageQ_module->queues, oldSize);
508 /* Fill in the new entry */
509 queues[queueIndex] = (MessageQ_Handle)obj;
511 /* Hook-up new table */
512 oldQueues = MessageQ_module->queues;
513 MessageQ_module->queues = queues;
514 MessageQ_module->numQueues++;
516 /* Delete old table if not statically defined */
517 if (MessageQ_module->canFreeQueues == TRUE) {
518 free(oldQueues);
519 }
520 else {
521 MessageQ_module->canFreeQueues = TRUE;
522 }
524 LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
526 return (queueIndex);
527 }
529 /*
530 * This is a helper function to initialize a message.
531 */
532 Void MessageQ_msgInit(MessageQ_Msg msg)
533 {
534 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
535 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
536 msg->msgId = MessageQ_INVALIDMSGID;
537 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
538 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
539 msg->srcProc = MultiProc_self();
541 pthread_mutex_lock(&(MessageQ_module->gate));
542 msg->seqNum = MessageQ_module->seqNum++;
543 pthread_mutex_unlock(&(MessageQ_module->gate));
544 }
546 NameServer_Handle MessageQ_getNameServerHandle(void)
547 {
548 return MessageQ_module->nameServer;
549 }
551 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
552 {
553 handle->ownerPid = pid;
555 return;
556 }
558 Void MessageQ_cleanupOwner(Int pid)
559 {
560 MessageQ_Handle queue;
561 Int i;
563 for (i = 0; i < MessageQ_module->numQueues; i++) {
564 queue = MessageQ_module->queues[i];
565 if (queue != NULL && queue->ownerPid == pid) {
566 MessageQ_delete(&queue);
567 }
568 }
569 }