280aa8a803f38d6f203448bfee42c8ac0627c7a5
1 /*
2 * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "server" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained here as needed and process-specific data
39 * is handled at the "client" level. At the moment, LAD is the only "user"
40 * of this implementation.
41 */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76 * Macros/Constants
77 * =============================================================================
78 */
80 /*!
81 * @brief Name of the reserved NameServer used for MessageQ.
82 */
83 #define MessageQ_NAMESERVER "MessageQ"
85 /* Number of entries to grow when we run out of queueIndexs */
86 #define MessageQ_GROWSIZE 32
88 /* Define BENCHMARK to quiet key MessageQ APIs: */
89 //#define BENCHMARK
91 /* =============================================================================
92 * Structures & Enums
93 * =============================================================================
94 */
96 /* structure for MessageQ module state */
97 typedef struct MessageQ_ModuleObject {
98 Int refCount;
99 /*!< Reference count */
100 NameServer_Handle nameServer;
101 /*!< Handle to the local NameServer used for storing GP objects */
102 pthread_mutex_t gate;
103 /*!< Handle of gate to be used for local thread safety */
104 MessageQ_Config *cfg;
105 /*!< Current config values */
106 MessageQ_Params defaultInstParams;
107 /*!< Default instance creation parameters */
108 MessageQ_Handle * queues;
109 /*!< Global array of message queues */
110 UInt16 numQueues;
111 /*!< Initial number of messageQ objects allowed */
112 Bool canFreeQueues;
113 /*!< Grow option */
114 Bits16 seqNum;
115 /*!< sequence number */
116 } MessageQ_ModuleObject;
118 /*!
119 * @brief Structure for the Handle for the MessageQ.
120 */
121 typedef struct MessageQ_Object {
122 MessageQ_Params params;
123 /*! Instance specific creation parameters */
124 MessageQ_QueueId queue;
125 /* Unique id */
126 Ptr nsKey;
127 /* NameServer key */
128 Int ownerPid;
129 /* Process ID of owner */
130 } MessageQ_Object;
133 /* =============================================================================
134 * Globals
135 * =============================================================================
136 */
137 extern MessageQ_Config ti_ipc_MessageQ_cfg;
139 static MessageQ_ModuleObject MessageQ_state =
140 {
141 .refCount = 0,
142 .nameServer = NULL,
143 .queues = NULL,
144 .numQueues = 2u,
145 .canFreeQueues = FALSE,
146 #if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
147 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
148 #else
149 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
150 #endif
151 .cfg = &ti_ipc_MessageQ_cfg,
152 };
154 /*!
155 * @var MessageQ_module
156 *
157 * @brief Pointer to the MessageQ module state.
158 */
159 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
162 /* =============================================================================
163 * Forward declarations of internal functions
164 * =============================================================================
165 */
166 /* Grow the MessageQ table */
167 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
169 /* =============================================================================
170 * APIS
171 * =============================================================================
172 */
173 /* Function to get default configuration for the MessageQ module.
174 *
175 */
176 Void MessageQ_getConfig(MessageQ_Config * cfg)
177 {
178 assert(cfg != NULL);
180 memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
181 }
183 /* Function to setup the MessageQ module. */
184 Int MessageQ_setup(const MessageQ_Config * cfg)
185 {
186 Int status = MessageQ_S_SUCCESS;
187 NameServer_Params params;
189 pthread_mutex_lock(&(MessageQ_module->gate));
191 LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
193 MessageQ_module->refCount++;
194 if (MessageQ_module->refCount > 1) {
195 status = MessageQ_S_ALREADYSETUP;
196 LOG1("MessageQ module has been already setup, refCount=%d\n",
197 MessageQ_module->refCount)
198 goto exitSetup;
199 }
201 /* Initialize the parameters */
202 NameServer_Params_init(¶ms);
203 params.maxValueLen = sizeof(UInt32);
204 params.maxNameLen = MessageQ_module->cfg->maxNameLen;
206 /* Create the nameserver for modules */
207 MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
208 ¶ms);
210 MessageQ_module->seqNum = 0;
211 MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
212 MessageQ_module->queues = (MessageQ_Handle *)
213 calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
214 MessageQ_module->canFreeQueues = TRUE;
216 exitSetup:
217 LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
219 pthread_mutex_unlock(&(MessageQ_module->gate));
221 return (status);
222 }
224 /*
225 * Function to destroy the MessageQ module.
226 */
227 Int MessageQ_destroy(void)
228 {
229 Int status = MessageQ_S_SUCCESS;
230 UInt32 i;
232 pthread_mutex_lock(&(MessageQ_module->gate));
234 LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
236 /* Decrease the refCount */
237 MessageQ_module->refCount--;
238 if (MessageQ_module->refCount > 0) {
239 goto exitDestroy;
240 }
242 /* Delete any Message Queues that have not been deleted so far. */
243 for (i = 0; i< MessageQ_module->numQueues; i++) {
244 if (MessageQ_module->queues [i] != NULL) {
245 MessageQ_delete(&(MessageQ_module->queues [i]));
246 }
247 }
249 if (MessageQ_module->nameServer != NULL) {
250 /* Delete the nameserver for modules */
251 status = NameServer_delete(&MessageQ_module->nameServer);
252 }
254 /* Since MessageQ_module->gate was not allocated, no need to delete. */
256 if (MessageQ_module->queues != NULL) {
257 free(MessageQ_module->queues);
258 MessageQ_module->queues = NULL;
259 }
261 MessageQ_module->numQueues = 0u;
263 exitDestroy:
264 LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
266 pthread_mutex_unlock(&(MessageQ_module->gate));
268 return (status);
269 }
271 /*
272 * Function to create a MessageQ object for receiving.
273 */
274 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
275 {
276 Int status = MessageQ_S_SUCCESS;
277 MessageQ_Object * obj = NULL;
278 Bool found = FALSE;
279 UInt16 count = 0;
280 UInt16 queueIndex;
281 UInt16 queuePort;
282 UInt16 procId;
283 int i;
284 UInt numReserved;
286 LOG1("MessageQ_create: creating '%s'\n", (name == NULL) ? "NULL" : name)
288 /* Create the generic obj */
289 obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
291 if (obj == NULL) {
292 LOG0("MessageQ_create: Error: no memory\n")
293 return (NULL);
294 }
296 numReserved = MessageQ_module->cfg->numReservedEntries;
298 pthread_mutex_lock(&(MessageQ_module->gate));
300 /* check if creating a reserved queue */
301 if (params->queueIndex != MessageQ_ANY) {
302 queueIndex = params->queueIndex;
304 if (queueIndex > numReserved) {
305 LOG2("MessageQ_create: Error: requested queue index %d is greater "
306 "than reserved maximum %d\n", queueIndex, numReserved - 1)
307 free(obj);
308 obj = NULL;
309 }
310 else if (MessageQ_module->queues[queueIndex] != NULL) {
311 LOG1("MessageQ_create: Error: requested queue index %d is already "
312 "in use.\n", queueIndex);
313 free(obj);
314 obj = NULL;
315 }
317 if (obj == NULL) {
318 pthread_mutex_unlock(&(MessageQ_module->gate));
319 return (NULL);
320 }
322 MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
323 found = TRUE;
324 }
325 else {
326 count = MessageQ_module->numQueues;
328 /* search the dynamic array for any holes */
329 for (i = numReserved; i < count ; i++) {
330 if (MessageQ_module->queues [i] == NULL) {
331 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
332 queueIndex = i;
333 found = TRUE;
334 break;
335 }
336 }
337 }
339 if (found == FALSE) {
340 /* Growth is always allowed. */
341 queueIndex = _MessageQ_grow(obj);
342 }
344 pthread_mutex_unlock(&(MessageQ_module->gate));
346 if (params != NULL) {
347 /* Populate the params member */
348 memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
349 }
351 /* create globally unique message queue ID */
352 procId = MultiProc_self();
353 queuePort = queueIndex + MessageQ_PORTOFFSET;
354 obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
355 obj->ownerPid = 0;
357 if (name != NULL) {
358 obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
359 obj->queue);
360 }
362 /* Cleanup if fail */
363 if (status < 0) {
364 MessageQ_delete((MessageQ_Handle *)&obj);
365 }
367 LOG2("MessageQ_create: returning obj=%p, qid=0x%x\n", obj, obj->queue)
369 return ((MessageQ_Handle)obj);
370 }
372 /*
373 * Function to delete a MessageQ object for a specific slave processor.
374 */
375 Int MessageQ_delete(MessageQ_Handle * handlePtr)
376 {
377 Int status = MessageQ_S_SUCCESS;
378 MessageQ_Object *obj;
379 MessageQ_Handle queue;
380 UInt16 queueIndex;
382 obj = (MessageQ_Object *)(*handlePtr);
384 LOG1("MessageQ_delete: deleting %p\n", obj)
386 queueIndex = MessageQ_getQueueIndex(obj->queue);
387 queue = MessageQ_module->queues[queueIndex];
388 if (queue != obj) {
389 LOG1("ERROR: obj != MessageQ_module->queues[%d]\n", queueIndex)
390 }
392 if (obj->nsKey != NULL) {
393 /* Remove from the name server */
394 status = NameServer_removeEntry(MessageQ_module->nameServer,
395 obj->nsKey);
396 if (status < 0) {
397 /* Override with a MessageQ status code. */
398 status = MessageQ_E_FAIL;
399 }
400 else {
401 status = MessageQ_S_SUCCESS;
402 }
403 }
405 pthread_mutex_lock(&(MessageQ_module->gate));
407 /* Clear the MessageQ obj from array. */
408 MessageQ_module->queues[queueIndex] = NULL;
410 /* Release the local lock */
411 pthread_mutex_unlock(&(MessageQ_module->gate));
413 /* Now free the obj */
414 free(obj);
415 *handlePtr = NULL;
417 LOG1("MessageQ_delete: returning %d\n", status)
419 return (status);
420 }
422 /* Returns the MessageQ_QueueId associated with the handle. */
423 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
424 {
425 MessageQ_Object * obj = (MessageQ_Object *)handle;
426 UInt32 queueId;
428 queueId = (obj->queue);
430 return queueId;
431 }
433 /*!
434 * @brief Grow the MessageQ table
435 *
436 * @param obj Pointer to the MessageQ object.
437 *
438 * @sa _MessageQ_grow
439 *
440 */
441 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
442 {
443 UInt16 queueIndex = MessageQ_module->numQueues;
444 UInt16 oldSize;
445 MessageQ_Handle * queues;
446 MessageQ_Handle * oldQueues;
448 /* No parameter validation required since this is an internal function. */
449 oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
451 /* Allocate larger table */
452 queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
453 sizeof(MessageQ_Handle));
455 /* Copy contents into new table */
456 memcpy(queues, MessageQ_module->queues, oldSize);
458 /* Fill in the new entry */
459 queues[queueIndex] = (MessageQ_Handle)obj;
461 /* Hook-up new table */
462 oldQueues = MessageQ_module->queues;
463 MessageQ_module->queues = queues;
464 MessageQ_module->numQueues += MessageQ_GROWSIZE;
466 /* Delete old table if not statically defined */
467 if (MessageQ_module->canFreeQueues == TRUE) {
468 free(oldQueues);
469 }
470 else {
471 MessageQ_module->canFreeQueues = TRUE;
472 }
474 LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
476 return (queueIndex);
477 }
479 /*
480 * This is a helper function to initialize a message.
481 */
482 Void MessageQ_msgInit(MessageQ_Msg msg)
483 {
484 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
485 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
486 msg->msgId = MessageQ_INVALIDMSGID;
487 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
488 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
489 msg->srcProc = MultiProc_self();
491 pthread_mutex_lock(&(MessageQ_module->gate));
492 msg->seqNum = MessageQ_module->seqNum++;
493 pthread_mutex_unlock(&(MessageQ_module->gate));
494 }
496 NameServer_Handle MessageQ_getNameServerHandle(void)
497 {
498 return MessageQ_module->nameServer;
499 }
501 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
502 {
503 handle->ownerPid = pid;
505 return;
506 }
508 Void MessageQ_cleanupOwner(Int pid)
509 {
510 MessageQ_Handle queue;
511 Int i;
513 for (i = 0; i < MessageQ_module->numQueues; i++) {
514 queue = MessageQ_module->queues[i];
515 if (queue != NULL && queue->ownerPid == pid) {
516 MessageQ_delete(&queue);
517 }
518 }
519 }
521 Void _MessageQ_setNumReservedEntries(UInt n)
522 {
523 MessageQ_module->cfg->numReservedEntries = n;
524 }