SDOCM00115428 Incorrect return status from NameServer_delete
[ipc/ipcdev.git] / linux / src / daemon / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  */
44 /* Standard IPC headers */
45 #include <ti/ipc/Std.h>
47 /* POSIX thread support */
48 #include <pthread.h>
50 /* Socket Headers */
51 #include <sys/select.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <sys/param.h>
55 #include <sys/eventfd.h>
56 #include <errno.h>
57 #include <stdio.h>
58 #include <string.h>
59 #include <stdlib.h>
60 #include <unistd.h>
61 #include <assert.h>
63 /* Socket Protocol Family */
64 #include <net/rpmsg.h>
66 /* Module level headers */
67 #include <ti/ipc/NameServer.h>
68 #include <ti/ipc/MultiProc.h>
69 #include <_MultiProc.h>
70 #include <ti/ipc/MessageQ.h>
71 #include <_MessageQ.h>
73 #include <_lad.h>
75 /* =============================================================================
76  * Macros/Constants
77  * =============================================================================
78  */
80 /*!
81  *  @brief  Name of the reserved NameServer used for MessageQ.
82  */
83 #define MessageQ_NAMESERVER  "MessageQ"
85 /* Number of entries to grow when we run out of queueIndexs */
86 #define MessageQ_GROWSIZE 32
88 /* Define BENCHMARK to quiet key MessageQ APIs: */
89 //#define BENCHMARK
91 /* =============================================================================
92  * Structures & Enums
93  * =============================================================================
94  */
96 /* structure for MessageQ module state */
97 typedef struct MessageQ_ModuleObject {
98     Int                 refCount;
99     /*!< Reference count */
100     NameServer_Handle   nameServer;
101     /*!< Handle to the local NameServer used for storing GP objects */
102     pthread_mutex_t     gate;
103     /*!< Handle of gate to be used for local thread safety */
104     MessageQ_Config    *cfg;
105     /*!< Current config values */
106     MessageQ_Config     defaultCfg;
107     /*!< Default config values */
108     MessageQ_Params     defaultInstParams;
109     /*!< Default instance creation parameters */
110     MessageQ_Handle *   queues;
111     /*!< Global array of message queues */
112     UInt16              numQueues;
113     /*!< Initial number of messageQ objects allowed */
114     Bool                canFreeQueues;
115     /*!< Grow option */
116     Bits16              seqNum;
117     /*!< sequence number */
118 } MessageQ_ModuleObject;
120 /*!
121  *  @brief  Structure for the Handle for the MessageQ.
122  */
123 typedef struct MessageQ_Object {
124     MessageQ_Params         params;
125     /*! Instance specific creation parameters */
126     MessageQ_QueueId        queue;
127     /* Unique id */
128     Ptr                     nsKey;
129     /* NameServer key */
130     Int                     ownerPid;
131     /* Process ID of owner */
132 } MessageQ_Object;
135 /* =============================================================================
136  *  Globals
137  * =============================================================================
138  */
139 extern MessageQ_Config ti_ipc_MessageQ_cfg;
141 static MessageQ_ModuleObject MessageQ_state =
143     .refCount               = 0,
144     .nameServer             = NULL,
145     .queues                 = NULL,
146     .numQueues              = 2u,
147     .canFreeQueues          = FALSE,
148 #if defined(IPC_BUILDOS_ANDROID)
149     .gate                   = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
150 #else
151     .gate                   = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
152 #endif
153     .cfg = &ti_ipc_MessageQ_cfg,
154     .defaultCfg.traceFlag   = FALSE,
155     .defaultCfg.maxRuntimeEntries = 32u,
156     .defaultCfg.maxNameLen    = 32u,
157     .defaultCfg.numReservedEntries = 0
158 };
160 /*!
161  *  @var    MessageQ_module
162  *
163  *  @brief  Pointer to the MessageQ module state.
164  */
165 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
168 /* =============================================================================
169  * Forward declarations of internal functions
170  * =============================================================================
171  */
172 /* Grow the MessageQ table */
173 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
175 /* =============================================================================
176  * APIS
177  * =============================================================================
178  */
179 /* Function to get default configuration for the MessageQ module.
180  *
181  */
182 Void MessageQ_getConfig(MessageQ_Config * cfg)
184     assert(cfg != NULL);
186     /* If setup has not yet been called... */
187     if (MessageQ_module->refCount < 1) {
188         memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
189     }
190     else {
191         memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
192     }
195 /* Function to setup the MessageQ module. */
196 Int MessageQ_setup(const MessageQ_Config * cfg)
198     Int                    status = MessageQ_S_SUCCESS;
199     NameServer_Params      params;
201     pthread_mutex_lock(&(MessageQ_module->gate));
203     LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
205     MessageQ_module->refCount++;
206     if (MessageQ_module->refCount > 1) {
207         status = MessageQ_S_ALREADYSETUP;
208         LOG1("MessageQ module has been already setup, refCount=%d\n",
209                 MessageQ_module->refCount)
210         goto exitSetup;
211     }
213     /* Initialize the parameters */
214     NameServer_Params_init(&params);
215     params.maxValueLen = sizeof(UInt32);
216     params.maxNameLen  = MessageQ_module->cfg->maxNameLen;
218     /* Create the nameserver for modules */
219     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
220                                                     &params);
222     MessageQ_module->seqNum = 0;
223     MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
224     MessageQ_module->queues = (MessageQ_Handle *)
225         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
226     MessageQ_module->canFreeQueues = TRUE;
228 exitSetup:
229     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
231     pthread_mutex_unlock(&(MessageQ_module->gate));
233     return (status);
236 /*
237  * Function to destroy the MessageQ module.
238  */
239 Int MessageQ_destroy(void)
241     Int    status    = MessageQ_S_SUCCESS;
242     UInt32 i;
244     pthread_mutex_lock(&(MessageQ_module->gate));
246     LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
248     /* Decrease the refCount */
249     MessageQ_module->refCount--;
250     if (MessageQ_module->refCount > 0) {
251         goto exitDestroy;
252     }
254     /* Delete any Message Queues that have not been deleted so far. */
255     for (i = 0; i< MessageQ_module->numQueues; i++) {
256         if (MessageQ_module->queues [i] != NULL) {
257             MessageQ_delete(&(MessageQ_module->queues [i]));
258         }
259     }
261     if (MessageQ_module->nameServer != NULL) {
262         /* Delete the nameserver for modules */
263         status = NameServer_delete(&MessageQ_module->nameServer);
264     }
266     /* Since MessageQ_module->gate was not allocated, no need to delete. */
268     if (MessageQ_module->queues != NULL) {
269         free(MessageQ_module->queues);
270         MessageQ_module->queues = NULL;
271     }
273     MessageQ_module->numQueues  = 0u;
275 exitDestroy:
276     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
278     pthread_mutex_unlock(&(MessageQ_module->gate));
280     return (status);
283 /*
284  *   Function to create a MessageQ object for receiving.
285  */
286 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
288     Int                 status    = MessageQ_S_SUCCESS;
289     MessageQ_Object   * obj    = NULL;
290     Bool                found  = FALSE;
291     UInt16              count  = 0;
292     UInt16              queueIndex;
293     UInt16              queuePort;
294     UInt16              procId;
295     int                 i;
296     UInt                numReserved;
298     LOG1("MessageQ_create: creating '%s'\n", (name == NULL) ? "NULL" : name)
300     /* Create the generic obj */
301     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
303     if (obj == NULL) {
304         LOG0("MessageQ_create: Error: no memory\n")
305         return (NULL);
306     }
308     numReserved = MessageQ_module->cfg->numReservedEntries;
310     pthread_mutex_lock(&(MessageQ_module->gate));
312     /* check if creating a reserved queue */
313     if (params->queueIndex != MessageQ_ANY) {
314         queueIndex = params->queueIndex;
316         if (queueIndex > numReserved) {
317             LOG2("MessageQ_create: Error: requested queue index %d is greater "
318                     "than reserved maximum %d\n", queueIndex, numReserved - 1)
319             free(obj);
320             obj = NULL;
321         }
322         else if (MessageQ_module->queues[queueIndex] != NULL) {
323             LOG1("MessageQ_create: Error: requested queue index %d is already "
324                     "in use.\n", queueIndex);
325             free(obj);
326             obj = NULL;
327         }
329         if (obj == NULL) {
330             pthread_mutex_unlock(&(MessageQ_module->gate));
331             return (NULL);
332         }
334         MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
335         found = TRUE;
336     }
337     else {
338         count = MessageQ_module->numQueues;
340         /* search the dynamic array for any holes */
341         for (i = numReserved; i < count ; i++) {
342             if (MessageQ_module->queues [i] == NULL) {
343                 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
344                 queueIndex = i;
345                 found = TRUE;
346                 break;
347             }
348         }
349     }
351     if (found == FALSE) {
352         /* Growth is always allowed. */
353         queueIndex = _MessageQ_grow(obj);
354     }
356     pthread_mutex_unlock(&(MessageQ_module->gate));
358     if (params != NULL) {
359        /* Populate the params member */
360         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
361     }
363     /* create globally unique message queue ID */
364     procId = MultiProc_self();
365     queuePort = queueIndex + MessageQ_PORTOFFSET;
366     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
367     obj->ownerPid = 0;
369     if (name != NULL) {
370         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
371                 obj->queue);
372     }
374     /* Cleanup if fail */
375     if (status < 0) {
376         MessageQ_delete((MessageQ_Handle *)&obj);
377     }
379     LOG2("MessageQ_create: returning obj=%p, qid=0x%x\n", obj, obj->queue)
381     return ((MessageQ_Handle)obj);
384 /*
385  * Function to delete a MessageQ object for a specific slave processor.
386  */
387 Int MessageQ_delete(MessageQ_Handle * handlePtr)
389     Int              status = MessageQ_S_SUCCESS;
390     MessageQ_Object *obj;
391     MessageQ_Handle queue;
392     UInt16          queueIndex;
394     obj = (MessageQ_Object *)(*handlePtr);
396     LOG1("MessageQ_delete: deleting %p\n", obj)
398     queueIndex = MessageQ_getQueueIndex(obj->queue);
399     queue = MessageQ_module->queues[queueIndex];
400     if (queue != obj) {
401         LOG1("ERROR: obj != MessageQ_module->queues[%d]\n", queueIndex)
402     }
404     if (obj->nsKey != NULL) {
405         /* Remove from the name server */
406         status = NameServer_removeEntry(MessageQ_module->nameServer,
407                                          obj->nsKey);
408         if (status < 0) {
409             /* Override with a MessageQ status code. */
410             status = MessageQ_E_FAIL;
411         }
412         else {
413             status = MessageQ_S_SUCCESS;
414         }
415     }
417     pthread_mutex_lock(&(MessageQ_module->gate));
419     /* Clear the MessageQ obj from array. */
420     MessageQ_module->queues[queueIndex] = NULL;
422     /* Release the local lock */
423     pthread_mutex_unlock(&(MessageQ_module->gate));
425     /* Now free the obj */
426     free(obj);
427     *handlePtr = NULL;
429     LOG1("MessageQ_delete: returning %d\n", status)
431     return (status);
434 /* Returns the MessageQ_QueueId associated with the handle. */
435 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
437     MessageQ_Object * obj = (MessageQ_Object *)handle;
438     UInt32            queueId;
440     queueId = (obj->queue);
442     return queueId;
445 /*!
446  *  @brief   Grow the MessageQ table
447  *
448  *  @param   obj     Pointer to the MessageQ object.
449  *
450  *  @sa      _MessageQ_grow
451  *
452  */
453 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
455     UInt16            queueIndex = MessageQ_module->numQueues;
456     UInt16            oldSize;
457     MessageQ_Handle * queues;
458     MessageQ_Handle * oldQueues;
460     /* No parameter validation required since this is an internal function. */
461     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
463     /* Allocate larger table */
464     queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
465                     sizeof(MessageQ_Handle));
467     /* Copy contents into new table */
468     memcpy(queues, MessageQ_module->queues, oldSize);
470     /* Fill in the new entry */
471     queues[queueIndex] = (MessageQ_Handle)obj;
473     /* Hook-up new table */
474     oldQueues = MessageQ_module->queues;
475     MessageQ_module->queues = queues;
476     MessageQ_module->numQueues += MessageQ_GROWSIZE;
478     /* Delete old table if not statically defined */
479     if (MessageQ_module->canFreeQueues == TRUE) {
480         free(oldQueues);
481     }
482     else {
483         MessageQ_module->canFreeQueues = TRUE;
484     }
486     LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
488     return (queueIndex);
491 /*
492  * This is a helper function to initialize a message.
493  */
494 Void MessageQ_msgInit(MessageQ_Msg msg)
496     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
497     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
498     msg->msgId     = MessageQ_INVALIDMSGID;
499     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
500     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
501     msg->srcProc   = MultiProc_self();
503     pthread_mutex_lock(&(MessageQ_module->gate));
504     msg->seqNum  = MessageQ_module->seqNum++;
505     pthread_mutex_unlock(&(MessageQ_module->gate));
508 NameServer_Handle MessageQ_getNameServerHandle(void)
510     return MessageQ_module->nameServer;
513 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
515     handle->ownerPid = pid;
517     return;
520 Void MessageQ_cleanupOwner(Int pid)
522     MessageQ_Handle queue;
523     Int i;
525     for (i = 0; i < MessageQ_module->numQueues; i++) {
526         queue = MessageQ_module->queues[i];
527         if (queue != NULL && queue->ownerPid == pid) {
528             MessageQ_delete(&queue);
529         }
530     }