1 /*
2 * Copyright (c) 2012, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "server" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained here as needed and process-specific data
39 * is handled at the "client" level. At the moment, LAD is the only "user"
40 * of this implementation.
41 */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76 * Macros/Constants
77 * =============================================================================
78 */
80 /*!
81 * @brief Name of the reserved NameServer used for MessageQ.
82 */
83 #define MessageQ_NAMESERVER "MessageQ"
85 /* Slot 0 reserved for NameServer messages: */
86 #define RESERVED_MSGQ_INDEX 1
88 /* Define BENCHMARK to quiet key MessageQ APIs: */
89 //#define BENCHMARK
91 /* =============================================================================
92 * Structures & Enums
93 * =============================================================================
94 */
96 /* structure for MessageQ module state */
97 typedef struct MessageQ_ModuleObject {
98 Int refCount;
99 /*!< Reference count */
100 NameServer_Handle nameServer;
101 /*!< Handle to the local NameServer used for storing GP objects */
102 pthread_mutex_t gate;
103 /*!< Handle of gate to be used for local thread safety */
104 MessageQ_Config cfg;
105 /*!< Current config values */
106 MessageQ_Config defaultCfg;
107 /*!< Default config values */
108 MessageQ_Params defaultInstParams;
109 /*!< Default instance creation parameters */
110 MessageQ_Handle * queues;
111 /*!< Global array of message queues */
112 UInt16 numQueues;
113 /*!< Initial number of messageQ objects allowed */
114 Bool canFreeQueues;
115 /*!< Grow option */
116 Bits16 seqNum;
117 /*!< sequence number */
118 } MessageQ_ModuleObject;
120 /*!
121 * @brief Structure for the Handle for the MessageQ.
122 */
123 typedef struct MessageQ_Object {
124 MessageQ_Params params;
125 /*! Instance specific creation parameters */
126 MessageQ_QueueId queue;
127 /* Unique id */
128 Ptr nsKey;
129 /* NameServer key */
130 Int ownerPid;
131 /* Process ID of owner */
132 } MessageQ_Object;
135 /* =============================================================================
136 * Globals
137 * =============================================================================
138 */
139 static MessageQ_ModuleObject MessageQ_state =
140 {
141 .refCount = 0,
142 .nameServer = NULL,
143 .queues = NULL,
144 .numQueues = 2u,
145 .canFreeQueues = FALSE,
146 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
147 .defaultCfg.traceFlag = FALSE,
148 .defaultCfg.maxRuntimeEntries = 32u,
149 .defaultCfg.maxNameLen = 32u,
150 };
152 /*!
153 * @var MessageQ_module
154 *
155 * @brief Pointer to the MessageQ module state.
156 */
157 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
160 /* =============================================================================
161 * Forward declarations of internal functions
162 * =============================================================================
163 */
164 /* Grow the MessageQ table */
165 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
167 /* =============================================================================
168 * APIS
169 * =============================================================================
170 */
171 /* Function to get default configuration for the MessageQ module.
172 *
173 */
174 Void MessageQ_getConfig(MessageQ_Config * cfg)
175 {
176 assert(cfg != NULL);
178 /* If setup has not yet been called... */
179 if (MessageQ_module->refCount < 1) {
180 memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
181 }
182 else {
183 memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
184 }
185 }
187 /* Function to setup the MessageQ module. */
188 Int MessageQ_setup(const MessageQ_Config * cfg)
189 {
190 Int status = MessageQ_S_SUCCESS;
191 NameServer_Params params;
193 pthread_mutex_lock(&(MessageQ_module->gate));
195 LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
197 MessageQ_module->refCount++;
198 if (MessageQ_module->refCount > 1) {
199 status = MessageQ_S_ALREADYSETUP;
200 LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
202 goto exitSetup;
203 }
205 /* Initialize the parameters */
206 NameServer_Params_init(¶ms);
207 params.maxValueLen = sizeof(UInt32);
208 params.maxNameLen = cfg->maxNameLen;
210 /* Create the nameserver for modules */
211 MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
212 ¶ms);
214 memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
216 MessageQ_module->seqNum = 0;
217 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
218 MessageQ_module->queues = (MessageQ_Handle *)
219 calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
221 exitSetup:
222 LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
224 pthread_mutex_unlock(&(MessageQ_module->gate));
226 return (status);
227 }
229 /*
230 * Function to destroy the MessageQ module.
231 */
232 Int MessageQ_destroy(void)
233 {
234 Int status = MessageQ_S_SUCCESS;
235 UInt32 i;
237 pthread_mutex_lock(&(MessageQ_module->gate));
239 LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
241 /* Decrease the refCount */
242 MessageQ_module->refCount--;
243 if (MessageQ_module->refCount > 0) {
244 goto exitDestroy;
245 }
247 /* Delete any Message Queues that have not been deleted so far. */
248 for (i = 0; i< MessageQ_module->numQueues; i++) {
249 if (MessageQ_module->queues [i] != NULL) {
250 MessageQ_delete(&(MessageQ_module->queues [i]));
251 }
252 }
254 if (MessageQ_module->nameServer != NULL) {
255 /* Delete the nameserver for modules */
256 status = NameServer_delete(&MessageQ_module->nameServer);
257 }
259 /* Since MessageQ_module->gate was not allocated, no need to delete. */
261 if (MessageQ_module->queues != NULL) {
262 free(MessageQ_module->queues);
263 MessageQ_module->queues = NULL;
264 }
266 memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
267 MessageQ_module->numQueues = 0u;
268 MessageQ_module->canFreeQueues = TRUE;
270 exitDestroy:
271 LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
273 pthread_mutex_unlock(&(MessageQ_module->gate));
275 return (status);
276 }
278 /* Function to initialize the parameters for the MessageQ instance. */
279 Void MessageQ_Params_init(MessageQ_Params * params)
280 {
281 memcpy(params, &(MessageQ_module->defaultInstParams),
282 sizeof(MessageQ_Params));
284 return;
285 }
287 /*
288 * Function to create a MessageQ object for receiving.
289 */
290 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
291 {
292 Int status = MessageQ_S_SUCCESS;
293 MessageQ_Object * obj = NULL;
294 Bool found = FALSE;
295 UInt16 count = 0;
296 UInt16 queueIndex = 0u;
297 UInt16 procId;
298 int i;
300 LOG1("MessageQ_create: creating '%s'\n", name)
302 /* Create the generic obj */
303 obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
305 pthread_mutex_lock(&(MessageQ_module->gate));
307 count = MessageQ_module->numQueues;
309 /* Search the dynamic array for any holes */
310 /* We start from 1, as 0 is reserved for binding NameServer: */
311 for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
312 if (MessageQ_module->queues [i] == NULL) {
313 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
314 queueIndex = i;
315 found = TRUE;
316 break;
317 }
318 }
320 if (found == FALSE) {
321 /* Growth is always allowed. */
322 queueIndex = _MessageQ_grow(obj);
323 }
325 pthread_mutex_unlock(&(MessageQ_module->gate));
327 if (params != NULL) {
328 /* Populate the params member */
329 memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
330 }
332 procId = MultiProc_self();
333 /* create globally unique messageQ ID: */
334 obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueIndex);
335 obj->ownerPid = 0;
337 if (name != NULL) {
338 obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
339 obj->queue);
340 }
342 /* Cleanup if fail */
343 if (status < 0) {
344 MessageQ_delete((MessageQ_Handle *)&obj);
345 }
347 LOG1("MessageQ_create: returning %p\n", obj)
349 return ((MessageQ_Handle)obj);
350 }
352 /*
353 * Function to delete a MessageQ object for a specific slave processor.
354 */
355 Int MessageQ_delete(MessageQ_Handle * handlePtr)
356 {
357 Int status = MessageQ_S_SUCCESS;
358 MessageQ_Object *obj;
359 MessageQ_Handle queue;
361 obj = (MessageQ_Object *)(*handlePtr);
363 LOG1("MessageQ_delete: deleting %p\n", obj)
365 queue = MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)];
366 if (queue != obj) {
367 LOG1(" ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queue))
368 }
370 if (obj->nsKey != NULL) {
371 /* Remove from the name server */
372 status = NameServer_removeEntry(MessageQ_module->nameServer,
373 obj->nsKey);
374 if (status < 0) {
375 /* Override with a MessageQ status code. */
376 status = MessageQ_E_FAIL;
377 }
378 else {
379 status = MessageQ_S_SUCCESS;
380 }
381 }
383 pthread_mutex_lock(&(MessageQ_module->gate));
385 /* Clear the MessageQ obj from array. */
386 MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)] = NULL;
388 /* Release the local lock */
389 pthread_mutex_unlock(&(MessageQ_module->gate));
391 /* Now free the obj */
392 free(obj);
393 *handlePtr = NULL;
395 LOG1("MessageQ_delete: returning %d\n", status)
397 return (status);
398 }
400 /* Returns the MessageQ_QueueId associated with the handle. */
401 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
402 {
403 MessageQ_Object * obj = (MessageQ_Object *)handle;
404 UInt32 queueId;
406 queueId = (obj->queue);
408 return queueId;
409 }
411 /*!
412 * @brief Grow the MessageQ table
413 *
414 * @param obj Pointer to the MessageQ object.
415 *
416 * @sa _MessageQ_grow
417 *
418 */
419 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
420 {
421 UInt16 queueIndex = MessageQ_module->numQueues;
422 UInt16 oldSize;
423 MessageQ_Handle * queues;
424 MessageQ_Handle * oldQueues;
426 /* No parameter validation required since this is an internal function. */
427 oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
429 /* Allocate larger table */
430 queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
432 /* Copy contents into new table */
433 memcpy(queues, MessageQ_module->queues, oldSize);
435 /* Fill in the new entry */
436 queues[queueIndex] = (MessageQ_Handle)obj;
438 /* Hook-up new table */
439 oldQueues = MessageQ_module->queues;
440 MessageQ_module->queues = queues;
441 MessageQ_module->numQueues++;
443 /* Delete old table if not statically defined */
444 if (MessageQ_module->canFreeQueues == TRUE) {
445 free(oldQueues);
446 }
447 else {
448 MessageQ_module->canFreeQueues = TRUE;
449 }
451 LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
453 return (queueIndex);
454 }
456 /*
457 * This is a helper function to initialize a message.
458 */
459 Void MessageQ_msgInit(MessageQ_Msg msg)
460 {
461 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
462 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
463 msg->msgId = MessageQ_INVALIDMSGID;
464 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
465 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
466 msg->srcProc = MultiProc_self();
468 pthread_mutex_lock(&(MessageQ_module->gate));
469 msg->seqNum = MessageQ_module->seqNum++;
470 pthread_mutex_unlock(&(MessageQ_module->gate));
471 }
473 NameServer_Handle MessageQ_getNameServerHandle(void)
474 {
475 return MessageQ_module->nameServer;
476 }
478 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
479 {
480 handle->ownerPid = pid;
482 return;
483 }
485 Void MessageQ_cleanupOwner(Int pid)
486 {
487 MessageQ_Handle queue;
488 Int i;
490 for (i = 0; i < MessageQ_module->numQueues; i++) {
491 queue = MessageQ_module->queues[i];
492 if (queue != NULL && queue->ownerPid == pid) {
493 MessageQ_delete(&queue);
494 }
495 }
496 }