1 /*
2 * Copyright (c) 2014 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "server" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained here as needed and process-specific data
39 * is handled at the "client" level. At the moment, LAD is the only "user"
40 * of this implementation.
41 */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76 * Macros/Constants
77 * =============================================================================
78 */
80 /*!
81 * @brief Name of the reserved NameServer used for MessageQ.
82 */
83 #define MessageQ_NAMESERVER "MessageQ"
85 /* Slot 0 reserved for NameServer messages: */
86 #define RESERVED_MSGQ_INDEX 1
88 /* Number of entries to grow when we run out of queueIndexs */
89 #define MessageQ_GROWSIZE 32
91 /* Define BENCHMARK to quiet key MessageQ APIs: */
92 //#define BENCHMARK
94 /* =============================================================================
95 * Structures & Enums
96 * =============================================================================
97 */
99 /* structure for MessageQ module state */
100 typedef struct MessageQ_ModuleObject {
101 Int refCount;
102 /*!< Reference count */
103 NameServer_Handle nameServer;
104 /*!< Handle to the local NameServer used for storing GP objects */
105 pthread_mutex_t gate;
106 /*!< Handle of gate to be used for local thread safety */
107 MessageQ_Config cfg;
108 /*!< Current config values */
109 MessageQ_Config defaultCfg;
110 /*!< Default config values */
111 MessageQ_Params defaultInstParams;
112 /*!< Default instance creation parameters */
113 MessageQ_Handle * queues;
114 /*!< Global array of message queues */
115 UInt16 numQueues;
116 /*!< Initial number of messageQ objects allowed */
117 Bool canFreeQueues;
118 /*!< Grow option */
119 Bits16 seqNum;
120 /*!< sequence number */
121 } MessageQ_ModuleObject;
123 /*!
124 * @brief Structure for the Handle for the MessageQ.
125 */
126 typedef struct MessageQ_Object {
127 MessageQ_Params params;
128 /*! Instance specific creation parameters */
129 MessageQ_QueueId queue;
130 /* Unique id */
131 Ptr nsKey;
132 /* NameServer key */
133 Int ownerPid;
134 /* Process ID of owner */
135 } MessageQ_Object;
138 /* =============================================================================
139 * Globals
140 * =============================================================================
141 */
142 static MessageQ_ModuleObject MessageQ_state =
143 {
144 .refCount = 0,
145 .nameServer = NULL,
146 .queues = NULL,
147 .numQueues = 2u,
148 .canFreeQueues = FALSE,
149 #if defined(IPC_BUILDOS_ANDROID)
150 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
151 #else
152 .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
153 #endif
154 .defaultCfg.traceFlag = FALSE,
155 .defaultCfg.maxRuntimeEntries = 32u,
156 .defaultCfg.maxNameLen = 32u,
157 };
159 /*!
160 * @var MessageQ_module
161 *
162 * @brief Pointer to the MessageQ module state.
163 */
164 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
167 /* =============================================================================
168 * Forward declarations of internal functions
169 * =============================================================================
170 */
171 /* Grow the MessageQ table */
172 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
178 /* Function to get default configuration for the MessageQ module.
179 *
180 */
181 Void MessageQ_getConfig(MessageQ_Config * cfg)
182 {
183 assert(cfg != NULL);
185 /* If setup has not yet been called... */
186 if (MessageQ_module->refCount < 1) {
187 memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
188 }
189 else {
190 memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
191 }
192 }
194 /* Function to setup the MessageQ module. */
195 Int MessageQ_setup(const MessageQ_Config * cfg)
196 {
197 Int status = MessageQ_S_SUCCESS;
198 NameServer_Params params;
200 pthread_mutex_lock(&(MessageQ_module->gate));
202 LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
204 MessageQ_module->refCount++;
205 if (MessageQ_module->refCount > 1) {
206 status = MessageQ_S_ALREADYSETUP;
207 LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
209 goto exitSetup;
210 }
212 /* Initialize the parameters */
213 NameServer_Params_init(¶ms);
214 params.maxValueLen = sizeof(UInt32);
215 params.maxNameLen = cfg->maxNameLen;
217 /* Create the nameserver for modules */
218 MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
219 ¶ms);
221 memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
223 MessageQ_module->seqNum = 0;
224 MessageQ_module->numQueues = cfg->maxRuntimeEntries;
225 MessageQ_module->queues = (MessageQ_Handle *)
226 calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
227 MessageQ_module->canFreeQueues = TRUE;
229 exitSetup:
230 LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
232 pthread_mutex_unlock(&(MessageQ_module->gate));
234 return (status);
235 }
237 /*
238 * Function to destroy the MessageQ module.
239 */
240 Int MessageQ_destroy(void)
241 {
242 Int status = MessageQ_S_SUCCESS;
243 UInt32 i;
245 pthread_mutex_lock(&(MessageQ_module->gate));
247 LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
249 /* Decrease the refCount */
250 MessageQ_module->refCount--;
251 if (MessageQ_module->refCount > 0) {
252 goto exitDestroy;
253 }
255 /* Delete any Message Queues that have not been deleted so far. */
256 for (i = 0; i< MessageQ_module->numQueues; i++) {
257 if (MessageQ_module->queues [i] != NULL) {
258 MessageQ_delete(&(MessageQ_module->queues [i]));
259 }
260 }
262 if (MessageQ_module->nameServer != NULL) {
263 /* Delete the nameserver for modules */
264 status = NameServer_delete(&MessageQ_module->nameServer);
265 }
267 /* Since MessageQ_module->gate was not allocated, no need to delete. */
269 if (MessageQ_module->queues != NULL) {
270 free(MessageQ_module->queues);
271 MessageQ_module->queues = NULL;
272 }
274 memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
275 MessageQ_module->numQueues = 0u;
277 exitDestroy:
278 LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
280 pthread_mutex_unlock(&(MessageQ_module->gate));
282 return (status);
283 }
285 /*
286 * Function to create a MessageQ object for receiving.
287 */
288 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
289 {
290 Int status = MessageQ_S_SUCCESS;
291 MessageQ_Object * obj = NULL;
292 Bool found = FALSE;
293 UInt16 count = 0;
294 UInt16 queueIndex = 0u;
295 UInt16 procId;
296 int i;
298 LOG1("MessageQ_create: creating '%s'\n", name)
300 /* Create the generic obj */
301 obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
303 pthread_mutex_lock(&(MessageQ_module->gate));
305 count = MessageQ_module->numQueues;
307 /* Search the dynamic array for any holes */
308 /* We start from 1, as 0 is reserved for binding NameServer: */
309 for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
310 if (MessageQ_module->queues [i] == NULL) {
311 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
312 queueIndex = i;
313 found = TRUE;
314 break;
315 }
316 }
318 if (found == FALSE) {
319 /* Growth is always allowed. */
320 queueIndex = _MessageQ_grow(obj);
321 }
323 pthread_mutex_unlock(&(MessageQ_module->gate));
325 if (params != NULL) {
326 /* Populate the params member */
327 memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
328 }
330 procId = MultiProc_self();
331 /* create globally unique messageQ ID: */
332 obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueIndex);
333 obj->ownerPid = 0;
335 if (name != NULL) {
336 obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
337 obj->queue);
338 }
340 /* Cleanup if fail */
341 if (status < 0) {
342 MessageQ_delete((MessageQ_Handle *)&obj);
343 }
345 LOG1("MessageQ_create: returning %p\n", obj)
347 return ((MessageQ_Handle)obj);
348 }
350 /*
351 * Function to delete a MessageQ object for a specific slave processor.
352 */
353 Int MessageQ_delete(MessageQ_Handle * handlePtr)
354 {
355 Int status = MessageQ_S_SUCCESS;
356 MessageQ_Object *obj;
357 MessageQ_Handle queue;
359 obj = (MessageQ_Object *)(*handlePtr);
361 LOG1("MessageQ_delete: deleting %p\n", obj)
363 queue = MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)];
364 if (queue != obj) {
365 LOG1(" ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queue))
366 }
368 if (obj->nsKey != NULL) {
369 /* Remove from the name server */
370 status = NameServer_removeEntry(MessageQ_module->nameServer,
371 obj->nsKey);
372 if (status < 0) {
373 /* Override with a MessageQ status code. */
374 status = MessageQ_E_FAIL;
375 }
376 else {
377 status = MessageQ_S_SUCCESS;
378 }
379 }
381 pthread_mutex_lock(&(MessageQ_module->gate));
383 /* Clear the MessageQ obj from array. */
384 MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)] = NULL;
386 /* Release the local lock */
387 pthread_mutex_unlock(&(MessageQ_module->gate));
389 /* Now free the obj */
390 free(obj);
391 *handlePtr = NULL;
393 LOG1("MessageQ_delete: returning %d\n", status)
395 return (status);
396 }
398 /* Returns the MessageQ_QueueId associated with the handle. */
399 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
400 {
401 MessageQ_Object * obj = (MessageQ_Object *)handle;
402 UInt32 queueId;
404 queueId = (obj->queue);
406 return queueId;
407 }
409 /*!
410 * @brief Grow the MessageQ table
411 *
412 * @param obj Pointer to the MessageQ object.
413 *
414 * @sa _MessageQ_grow
415 *
416 */
417 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
418 {
419 UInt16 queueIndex = MessageQ_module->numQueues;
420 UInt16 oldSize;
421 MessageQ_Handle * queues;
422 MessageQ_Handle * oldQueues;
424 /* No parameter validation required since this is an internal function. */
425 oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
427 /* Allocate larger table */
428 queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
429 sizeof(MessageQ_Handle));
431 /* Copy contents into new table */
432 memcpy(queues, MessageQ_module->queues, oldSize);
434 /* Fill in the new entry */
435 queues[queueIndex] = (MessageQ_Handle)obj;
437 /* Hook-up new table */
438 oldQueues = MessageQ_module->queues;
439 MessageQ_module->queues = queues;
440 MessageQ_module->numQueues += MessageQ_GROWSIZE;
442 /* Delete old table if not statically defined */
443 if (MessageQ_module->canFreeQueues == TRUE) {
444 free(oldQueues);
445 }
446 else {
447 MessageQ_module->canFreeQueues = TRUE;
448 }
450 LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
452 return (queueIndex);
453 }
455 /*
456 * This is a helper function to initialize a message.
457 */
458 Void MessageQ_msgInit(MessageQ_Msg msg)
459 {
460 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
461 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
462 msg->msgId = MessageQ_INVALIDMSGID;
463 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
464 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
465 msg->srcProc = MultiProc_self();
467 pthread_mutex_lock(&(MessageQ_module->gate));
468 msg->seqNum = MessageQ_module->seqNum++;
469 pthread_mutex_unlock(&(MessageQ_module->gate));
470 }
472 NameServer_Handle MessageQ_getNameServerHandle(void)
473 {
474 return MessageQ_module->nameServer;
475 }
477 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
478 {
479 handle->ownerPid = pid;
481 return;
482 }
484 Void MessageQ_cleanupOwner(Int pid)
485 {
486 MessageQ_Handle queue;
487 Int i;
489 for (i = 0; i < MessageQ_module->numQueues; i++) {
490 queue = MessageQ_module->queues[i];
491 if (queue != NULL && queue->ownerPid == pid) {
492 MessageQ_delete(&queue);
493 }
494 }
495 }