280aa8a803f38d6f203448bfee42c8ac0627c7a5
[ipc/ipcdev.git] / linux / src / daemon / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76  * Macros/Constants
77  * =============================================================================
78  */
80 /*!
81  *  @brief  Name of the reserved NameServer used for MessageQ.
82  */
83 #define MessageQ_NAMESERVER  "MessageQ"
85 /* Number of entries to grow when we run out of queueIndexs */
86 #define MessageQ_GROWSIZE 32
88 /* Define BENCHMARK to quiet key MessageQ APIs: */
89 //#define BENCHMARK
91 /* =============================================================================
92  * Structures & Enums
93  * =============================================================================
94  */
96 /* structure for MessageQ module state */
97 typedef struct MessageQ_ModuleObject {
98     Int                 refCount;
99     /*!< Reference count */
100     NameServer_Handle   nameServer;
101     /*!< Handle to the local NameServer used for storing GP objects */
102     pthread_mutex_t     gate;
103     /*!< Handle of gate to be used for local thread safety */
104     MessageQ_Config    *cfg;
105     /*!< Current config values */
106     MessageQ_Params     defaultInstParams;
107     /*!< Default instance creation parameters */
108     MessageQ_Handle *   queues;
109     /*!< Global array of message queues */
110     UInt16              numQueues;
111     /*!< Initial number of messageQ objects allowed */
112     Bool                canFreeQueues;
113     /*!< Grow option */
114     Bits16              seqNum;
115     /*!< sequence number */
116 } MessageQ_ModuleObject;
118 /*!
119  *  @brief  Structure for the Handle for the MessageQ.
120  */
121 typedef struct MessageQ_Object {
122     MessageQ_Params         params;
123     /*! Instance specific creation parameters */
124     MessageQ_QueueId        queue;
125     /* Unique id */
126     Ptr                     nsKey;
127     /* NameServer key */
128     Int                     ownerPid;
129     /* Process ID of owner */
130 } MessageQ_Object;
133 /* =============================================================================
134  *  Globals
135  * =============================================================================
136  */
137 extern MessageQ_Config ti_ipc_MessageQ_cfg;
139 static MessageQ_ModuleObject MessageQ_state =
141     .refCount               = 0,
142     .nameServer             = NULL,
143     .queues                 = NULL,
144     .numQueues              = 2u,
145     .canFreeQueues          = FALSE,
146 #if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
147     .gate                   = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
148 #else
149     .gate                   = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
150 #endif
151     .cfg = &ti_ipc_MessageQ_cfg,
152 };
154 /*!
155  *  @var    MessageQ_module
156  *
157  *  @brief  Pointer to the MessageQ module state.
158  */
159 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
162 /* =============================================================================
163  * Forward declarations of internal functions
164  * =============================================================================
165  */
166 /* Grow the MessageQ table */
167 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
169 /* =============================================================================
170  * APIS
171  * =============================================================================
172  */
173 /* Function to get default configuration for the MessageQ module.
174  *
175  */
176 Void MessageQ_getConfig(MessageQ_Config * cfg)
178     assert(cfg != NULL);
180     memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
183 /* Function to setup the MessageQ module. */
184 Int MessageQ_setup(const MessageQ_Config * cfg)
186     Int                    status = MessageQ_S_SUCCESS;
187     NameServer_Params      params;
189     pthread_mutex_lock(&(MessageQ_module->gate));
191     LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
193     MessageQ_module->refCount++;
194     if (MessageQ_module->refCount > 1) {
195         status = MessageQ_S_ALREADYSETUP;
196         LOG1("MessageQ module has been already setup, refCount=%d\n",
197                 MessageQ_module->refCount)
198         goto exitSetup;
199     }
201     /* Initialize the parameters */
202     NameServer_Params_init(&params);
203     params.maxValueLen = sizeof(UInt32);
204     params.maxNameLen  = MessageQ_module->cfg->maxNameLen;
206     /* Create the nameserver for modules */
207     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
208                                                     &params);
210     MessageQ_module->seqNum = 0;
211     MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
212     MessageQ_module->queues = (MessageQ_Handle *)
213         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
214     MessageQ_module->canFreeQueues = TRUE;
216 exitSetup:
217     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
219     pthread_mutex_unlock(&(MessageQ_module->gate));
221     return (status);
224 /*
225  * Function to destroy the MessageQ module.
226  */
227 Int MessageQ_destroy(void)
229     Int    status    = MessageQ_S_SUCCESS;
230     UInt32 i;
232     pthread_mutex_lock(&(MessageQ_module->gate));
234     LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
236     /* Decrease the refCount */
237     MessageQ_module->refCount--;
238     if (MessageQ_module->refCount > 0) {
239         goto exitDestroy;
240     }
242     /* Delete any Message Queues that have not been deleted so far. */
243     for (i = 0; i< MessageQ_module->numQueues; i++) {
244         if (MessageQ_module->queues [i] != NULL) {
245             MessageQ_delete(&(MessageQ_module->queues [i]));
246         }
247     }
249     if (MessageQ_module->nameServer != NULL) {
250         /* Delete the nameserver for modules */
251         status = NameServer_delete(&MessageQ_module->nameServer);
252     }
254     /* Since MessageQ_module->gate was not allocated, no need to delete. */
256     if (MessageQ_module->queues != NULL) {
257         free(MessageQ_module->queues);
258         MessageQ_module->queues = NULL;
259     }
261     MessageQ_module->numQueues  = 0u;
263 exitDestroy:
264     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
266     pthread_mutex_unlock(&(MessageQ_module->gate));
268     return (status);
271 /*
272  *   Function to create a MessageQ object for receiving.
273  */
274 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
276     Int                 status    = MessageQ_S_SUCCESS;
277     MessageQ_Object   * obj    = NULL;
278     Bool                found  = FALSE;
279     UInt16              count  = 0;
280     UInt16              queueIndex;
281     UInt16              queuePort;
282     UInt16              procId;
283     int                 i;
284     UInt                numReserved;
286     LOG1("MessageQ_create: creating '%s'\n", (name == NULL) ? "NULL" : name)
288     /* Create the generic obj */
289     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
291     if (obj == NULL) {
292         LOG0("MessageQ_create: Error: no memory\n")
293         return (NULL);
294     }
296     numReserved = MessageQ_module->cfg->numReservedEntries;
298     pthread_mutex_lock(&(MessageQ_module->gate));
300     /* check if creating a reserved queue */
301     if (params->queueIndex != MessageQ_ANY) {
302         queueIndex = params->queueIndex;
304         if (queueIndex > numReserved) {
305             LOG2("MessageQ_create: Error: requested queue index %d is greater "
306                     "than reserved maximum %d\n", queueIndex, numReserved - 1)
307             free(obj);
308             obj = NULL;
309         }
310         else if (MessageQ_module->queues[queueIndex] != NULL) {
311             LOG1("MessageQ_create: Error: requested queue index %d is already "
312                     "in use.\n", queueIndex);
313             free(obj);
314             obj = NULL;
315         }
317         if (obj == NULL) {
318             pthread_mutex_unlock(&(MessageQ_module->gate));
319             return (NULL);
320         }
322         MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
323         found = TRUE;
324     }
325     else {
326         count = MessageQ_module->numQueues;
328         /* search the dynamic array for any holes */
329         for (i = numReserved; i < count ; i++) {
330             if (MessageQ_module->queues [i] == NULL) {
331                 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
332                 queueIndex = i;
333                 found = TRUE;
334                 break;
335             }
336         }
337     }
339     if (found == FALSE) {
340         /* Growth is always allowed. */
341         queueIndex = _MessageQ_grow(obj);
342     }
344     pthread_mutex_unlock(&(MessageQ_module->gate));
346     if (params != NULL) {
347        /* Populate the params member */
348         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
349     }
351     /* create globally unique message queue ID */
352     procId = MultiProc_self();
353     queuePort = queueIndex + MessageQ_PORTOFFSET;
354     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
355     obj->ownerPid = 0;
357     if (name != NULL) {
358         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
359                 obj->queue);
360     }
362     /* Cleanup if fail */
363     if (status < 0) {
364         MessageQ_delete((MessageQ_Handle *)&obj);
365     }
367     LOG2("MessageQ_create: returning obj=%p, qid=0x%x\n", obj, obj->queue)
369     return ((MessageQ_Handle)obj);
372 /*
373  * Function to delete a MessageQ object for a specific slave processor.
374  */
375 Int MessageQ_delete(MessageQ_Handle * handlePtr)
377     Int              status = MessageQ_S_SUCCESS;
378     MessageQ_Object *obj;
379     MessageQ_Handle queue;
380     UInt16          queueIndex;
382     obj = (MessageQ_Object *)(*handlePtr);
384     LOG1("MessageQ_delete: deleting %p\n", obj)
386     queueIndex = MessageQ_getQueueIndex(obj->queue);
387     queue = MessageQ_module->queues[queueIndex];
388     if (queue != obj) {
389         LOG1("ERROR: obj != MessageQ_module->queues[%d]\n", queueIndex)
390     }
392     if (obj->nsKey != NULL) {
393         /* Remove from the name server */
394         status = NameServer_removeEntry(MessageQ_module->nameServer,
395                                          obj->nsKey);
396         if (status < 0) {
397             /* Override with a MessageQ status code. */
398             status = MessageQ_E_FAIL;
399         }
400         else {
401             status = MessageQ_S_SUCCESS;
402         }
403     }
405     pthread_mutex_lock(&(MessageQ_module->gate));
407     /* Clear the MessageQ obj from array. */
408     MessageQ_module->queues[queueIndex] = NULL;
410     /* Release the local lock */
411     pthread_mutex_unlock(&(MessageQ_module->gate));
413     /* Now free the obj */
414     free(obj);
415     *handlePtr = NULL;
417     LOG1("MessageQ_delete: returning %d\n", status)
419     return (status);
422 /* Returns the MessageQ_QueueId associated with the handle. */
423 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
425     MessageQ_Object * obj = (MessageQ_Object *)handle;
426     UInt32            queueId;
428     queueId = (obj->queue);
430     return queueId;
433 /*!
434  *  @brief   Grow the MessageQ table
435  *
436  *  @param   obj     Pointer to the MessageQ object.
437  *
438  *  @sa      _MessageQ_grow
439  *
440  */
441 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
443     UInt16            queueIndex = MessageQ_module->numQueues;
444     UInt16            oldSize;
445     MessageQ_Handle * queues;
446     MessageQ_Handle * oldQueues;
448     /* No parameter validation required since this is an internal function. */
449     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
451     /* Allocate larger table */
452     queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
453                     sizeof(MessageQ_Handle));
455     /* Copy contents into new table */
456     memcpy(queues, MessageQ_module->queues, oldSize);
458     /* Fill in the new entry */
459     queues[queueIndex] = (MessageQ_Handle)obj;
461     /* Hook-up new table */
462     oldQueues = MessageQ_module->queues;
463     MessageQ_module->queues = queues;
464     MessageQ_module->numQueues += MessageQ_GROWSIZE;
466     /* Delete old table if not statically defined */
467     if (MessageQ_module->canFreeQueues == TRUE) {
468         free(oldQueues);
469     }
470     else {
471         MessageQ_module->canFreeQueues = TRUE;
472     }
474     LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
476     return (queueIndex);
479 /*
480  * This is a helper function to initialize a message.
481  */
482 Void MessageQ_msgInit(MessageQ_Msg msg)
484     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
485     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
486     msg->msgId     = MessageQ_INVALIDMSGID;
487     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
488     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
489     msg->srcProc   = MultiProc_self();
491     pthread_mutex_lock(&(MessageQ_module->gate));
492     msg->seqNum  = MessageQ_module->seqNum++;
493     pthread_mutex_unlock(&(MessageQ_module->gate));
496 NameServer_Handle MessageQ_getNameServerHandle(void)
498     return MessageQ_module->nameServer;
501 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
503     handle->ownerPid = pid;
505     return;
508 Void MessageQ_cleanupOwner(Int pid)
510     MessageQ_Handle queue;
511     Int i;
513     for (i = 0; i < MessageQ_module->numQueues; i++) {
514         queue = MessageQ_module->queues[i];
515         if (queue != NULL && queue->ownerPid == pid) {
516             MessageQ_delete(&queue);
517         }
518     }
521 Void _MessageQ_setNumReservedEntries(UInt n)
523     MessageQ_module->cfg->numReservedEntries = n;