]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blob - linux/src/daemon/MessageQ_daemon.c
Linux: Moved IPC specific types file (Std.h) into the ti/ipc namespace
[ipc/ipcdev.git] / linux / src / daemon / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2012, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76  * Macros/Constants
77  * =============================================================================
78  */
80 /*!
81  *  @brief  Name of the reserved NameServer used for MessageQ.
82  */
83 #define MessageQ_NAMESERVER  "MessageQ"
85 /* Slot 0 reserved for NameServer messages: */
86 #define RESERVED_MSGQ_INDEX  1
88 /* Define BENCHMARK to quiet key MessageQ APIs: */
89 //#define BENCHMARK
91 /* =============================================================================
92  * Structures & Enums
93  * =============================================================================
94  */
96 /* structure for MessageQ module state */
97 typedef struct MessageQ_ModuleObject {
98     Int                 refCount;
99     /*!< Reference count */
100     NameServer_Handle   nameServer;
101     /*!< Handle to the local NameServer used for storing GP objects */
102     pthread_mutex_t     gate;
103     /*!< Handle of gate to be used for local thread safety */
104     MessageQ_Config     cfg;
105     /*!< Current config values */
106     MessageQ_Config     defaultCfg;
107     /*!< Default config values */
108     MessageQ_Params     defaultInstParams;
109     /*!< Default instance creation parameters */
110     MessageQ_Handle *   queues;
111     /*!< Global array of message queues */
112     UInt16              numQueues;
113     /*!< Initial number of messageQ objects allowed */
114     Bool                canFreeQueues;
115     /*!< Grow option */
116     Bits16              seqNum;
117     /*!< sequence number */
118 } MessageQ_ModuleObject;
120 /*!
121  *  @brief  Structure for the Handle for the MessageQ.
122  */
123 typedef struct MessageQ_Object {
124     MessageQ_Params         params;
125     /*! Instance specific creation parameters */
126     MessageQ_QueueId        queue;
127     /* Unique id */
128     Ptr                     nsKey;
129     /* NameServer key */
130     Int                     ownerPid;
131     /* Process ID of owner */
132 } MessageQ_Object;
135 /* =============================================================================
136  *  Globals
137  * =============================================================================
138  */
139 static MessageQ_ModuleObject MessageQ_state =
141     .refCount               = 0,
142     .nameServer             = NULL,
143     .queues                 = NULL,
144     .numQueues              = 2u,
145     .canFreeQueues          = FALSE,
146     .gate                   = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
147     .defaultCfg.traceFlag   = FALSE,
148     .defaultCfg.maxRuntimeEntries = 32u,
149     .defaultCfg.maxNameLen    = 32u,
150 };
152 /*!
153  *  @var    MessageQ_module
154  *
155  *  @brief  Pointer to the MessageQ module state.
156  */
157 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
160 /* =============================================================================
161  * Forward declarations of internal functions
162  * =============================================================================
163  */
164 /* Grow the MessageQ table */
165 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
167 /* =============================================================================
168  * APIS
169  * =============================================================================
170  */
171 /* Function to get default configuration for the MessageQ module.
172  *
173  */
174 Void MessageQ_getConfig(MessageQ_Config * cfg)
176     assert(cfg != NULL);
178     /* If setup has not yet been called... */
179     if (MessageQ_module->refCount < 1) {
180         memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
181     }
182     else {
183         memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
184     }
187 /* Function to setup the MessageQ module. */
188 Int MessageQ_setup(const MessageQ_Config * cfg)
190     Int                    status = MessageQ_S_SUCCESS;
191     NameServer_Params      params;
193     pthread_mutex_lock(&(MessageQ_module->gate));
195     LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
197     MessageQ_module->refCount++;
198     if (MessageQ_module->refCount > 1) {
199         status = MessageQ_S_ALREADYSETUP;
200         LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
202         goto exitSetup;
203     }
205     /* Initialize the parameters */
206     NameServer_Params_init(&params);
207     params.maxValueLen = sizeof(UInt32);
208     params.maxNameLen  = cfg->maxNameLen;
210     /* Create the nameserver for modules */
211     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
212                                                     &params);
214     memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
216     MessageQ_module->seqNum = 0;
217     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
218     MessageQ_module->queues = (MessageQ_Handle *)
219         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
221 exitSetup:
222     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
224     pthread_mutex_unlock(&(MessageQ_module->gate));
226     return (status);
229 /*
230  * Function to destroy the MessageQ module.
231  */
232 Int MessageQ_destroy(void)
234     Int    status    = MessageQ_S_SUCCESS;
235     UInt32 i;
237     pthread_mutex_lock(&(MessageQ_module->gate));
239     LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
241     /* Decrease the refCount */
242     MessageQ_module->refCount--;
243     if (MessageQ_module->refCount > 0) {
244         goto exitDestroy;
245     }
247     /* Delete any Message Queues that have not been deleted so far. */
248     for (i = 0; i< MessageQ_module->numQueues; i++) {
249         if (MessageQ_module->queues [i] != NULL) {
250             MessageQ_delete(&(MessageQ_module->queues [i]));
251         }
252     }
254     if (MessageQ_module->nameServer != NULL) {
255         /* Delete the nameserver for modules */
256         status = NameServer_delete(&MessageQ_module->nameServer);
257     }
259     /* Since MessageQ_module->gate was not allocated, no need to delete. */
261     if (MessageQ_module->queues != NULL) {
262         free(MessageQ_module->queues);
263         MessageQ_module->queues = NULL;
264     }
266     memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
267     MessageQ_module->numQueues  = 0u;
268     MessageQ_module->canFreeQueues = TRUE;
270 exitDestroy:
271     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
273     pthread_mutex_unlock(&(MessageQ_module->gate));
275     return (status);
278 /* Function to initialize the parameters for the MessageQ instance. */
279 Void MessageQ_Params_init(MessageQ_Params * params)
281     memcpy(params, &(MessageQ_module->defaultInstParams),
282            sizeof(MessageQ_Params));
284     return;
287 /*
288  *   Function to create a MessageQ object for receiving.
289  */
290 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
292     Int                 status    = MessageQ_S_SUCCESS;
293     MessageQ_Object   * obj    = NULL;
294     Bool                found  = FALSE;
295     UInt16              count  = 0;
296     UInt16              queueIndex = 0u;
297     UInt16              procId;
298     int                 i;
300     LOG1("MessageQ_create: creating '%s'\n", name)
302     /* Create the generic obj */
303     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
305     pthread_mutex_lock(&(MessageQ_module->gate));
307     count = MessageQ_module->numQueues;
309     /* Search the dynamic array for any holes */
310     /* We start from 1, as 0 is reserved for binding NameServer: */
311     for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
312         if (MessageQ_module->queues [i] == NULL) {
313             MessageQ_module->queues [i] = (MessageQ_Handle)obj;
314             queueIndex = i;
315             found = TRUE;
316             break;
317         }
318     }
320     if (found == FALSE) {
321         /* Growth is always allowed. */
322         queueIndex = _MessageQ_grow(obj);
323     }
325     pthread_mutex_unlock(&(MessageQ_module->gate));
327     if (params != NULL) {
328        /* Populate the params member */
329         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
330     }
332     procId = MultiProc_self();
333     /* create globally unique messageQ ID: */
334     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueIndex);
335     obj->ownerPid = 0;
337     if (name != NULL) {
338         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
339                                           obj->queue);
340     }
342     /* Cleanup if fail */
343     if (status < 0) {
344         MessageQ_delete((MessageQ_Handle *)&obj);
345     }
347     LOG1("MessageQ_create: returning %p\n", obj)
349     return ((MessageQ_Handle)obj);
352 /*
353  * Function to delete a MessageQ object for a specific slave processor.
354  */
355 Int MessageQ_delete(MessageQ_Handle * handlePtr)
357     Int              status = MessageQ_S_SUCCESS;
358     MessageQ_Object *obj;
359     MessageQ_Handle queue;
361     obj = (MessageQ_Object *)(*handlePtr);
363     LOG1("MessageQ_delete: deleting %p\n", obj)
365     queue = MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)];
366     if (queue != obj) {
367         LOG1("    ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queue))
368     }
370     if (obj->nsKey != NULL) {
371         /* Remove from the name server */
372         status = NameServer_removeEntry(MessageQ_module->nameServer,
373                                          obj->nsKey);
374         if (status < 0) {
375             /* Override with a MessageQ status code. */
376             status = MessageQ_E_FAIL;
377         }
378         else {
379             status = MessageQ_S_SUCCESS;
380         }
381     }
383     pthread_mutex_lock(&(MessageQ_module->gate));
385     /* Clear the MessageQ obj from array. */
386     MessageQ_module->queues[(MessageQ_QueueIndex)(obj->queue)] = NULL;
388     /* Release the local lock */
389     pthread_mutex_unlock(&(MessageQ_module->gate));
391     /* Now free the obj */
392     free(obj);
393     *handlePtr = NULL;
395     LOG1("MessageQ_delete: returning %d\n", status)
397     return (status);
400 /* Returns the MessageQ_QueueId associated with the handle. */
401 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
403     MessageQ_Object * obj = (MessageQ_Object *)handle;
404     UInt32            queueId;
406     queueId = (obj->queue);
408     return queueId;
411 /*!
412  *  @brief   Grow the MessageQ table
413  *
414  *  @param   obj     Pointer to the MessageQ object.
415  *
416  *  @sa      _MessageQ_grow
417  *
418  */
419 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
421     UInt16            queueIndex = MessageQ_module->numQueues;
422     UInt16            oldSize;
423     MessageQ_Handle * queues;
424     MessageQ_Handle * oldQueues;
426     /* No parameter validation required since this is an internal function. */
427     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
429     /* Allocate larger table */
430     queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
432     /* Copy contents into new table */
433     memcpy(queues, MessageQ_module->queues, oldSize);
435     /* Fill in the new entry */
436     queues[queueIndex] = (MessageQ_Handle)obj;
438     /* Hook-up new table */
439     oldQueues = MessageQ_module->queues;
440     MessageQ_module->queues = queues;
441     MessageQ_module->numQueues++;
443     /* Delete old table if not statically defined */
444     if (MessageQ_module->canFreeQueues == TRUE) {
445         free(oldQueues);
446     }
447     else {
448         MessageQ_module->canFreeQueues = TRUE;
449     }
451     LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
453     return (queueIndex);
456 /*
457  * This is a helper function to initialize a message.
458  */
459 Void MessageQ_msgInit(MessageQ_Msg msg)
461     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
462     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
463     msg->msgId     = MessageQ_INVALIDMSGID;
464     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
465     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
466     msg->srcProc   = MultiProc_self();
468     pthread_mutex_lock(&(MessageQ_module->gate));
469     msg->seqNum  = MessageQ_module->seqNum++;
470     pthread_mutex_unlock(&(MessageQ_module->gate));
473 NameServer_Handle MessageQ_getNameServerHandle(void)
475     return MessageQ_module->nameServer;
478 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
480     handle->ownerPid = pid;
482     return;
485 Void MessageQ_cleanupOwner(Int pid)
487     MessageQ_Handle queue;
488     Int i;
490     for (i = 0; i < MessageQ_module->numQueues; i++) {
491         queue = MessageQ_module->queues[i];
492         if (queue != NULL && queue->ownerPid == pid) {
493             MessageQ_delete(&queue);
494         }
495     }