0d84713df66b8fd0b52956a79af849cd9fb5b3a1
[ipc/ipcdev.git] / qnx / src / ipc3x_dev / ti / syslink / ipc / hlos / knl / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2013-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  *
42  *  The MessageQ module supports the structured sending and receiving of
43  *  variable length messages. This module can be used for homogeneous or
44  *  heterogeneous multi-processor messaging.
45  *
46  *  MessageQ provides more sophisticated messaging than other modules. It is
47  *  typically used for complex situations such as multi-processor messaging.
48  *
49  *  The following are key features of the MessageQ module:
50  *  -Writers and readers can be relocated to another processor with no
51  *   runtime code changes.
52  *  -Timeouts are allowed when receiving messages.
53  *  -Readers can determine the writer and reply back.
54  *  -Receiving a message is deterministic when the timeout is zero.
55  *  -Messages can reside on any message queue.
56  *  -Supports zero-copy transfers.
57  *  -Can send and receive from any type of thread.
58  *  -Notification mechanism is specified by application.
59  *  -Allows QoS (quality of service) on message buffer pools. For example,
60  *   using specific buffer pools for specific message queues.
61  *
62  *  Messages are sent and received via a message queue. A reader is a thread
63  *  that gets (reads) messages from a message queue. A writer is a thread that
64  *  puts (writes) a message to a message queue. Each message queue has one
65  *  reader and can have many writers. A thread may read from or write to multiple
66  *  message queues.
67  *
68  *  Conceptually, the reader thread owns a message queue. The reader thread
69  *  creates a message queue. Writer threads  a created message queues to
70  *  get access to them.
71  *
72  *  Message queues are identified by a system-wide unique name. Internally,
73  *  MessageQ uses the NameServermodule for managing
74  *  these names. The names are used for opening a message queue. Using
75  *  names is not required.
76  *
77  *  Messages must be allocated from the MessageQ module. Once a message is
78  *  allocated, it can be sent on any message queue. Once a message is sent, the
79  *  writer loses ownership of the message and should not attempt to modify the
80  *  message. Once the reader receives the message, it owns the message. It
81  *  may either free the message or re-use the message.
82  *
83  *  Messages in a message queue can be of variable length. The only
84  *  requirement is that the first field in the definition of a message must be a
85  *  MsgHeader structure. For example:
86  *  typedef struct MyMsg {
87  *      MessageQ_MsgHeader header;
88  *      ...
89  *  } MyMsg;
90  *
91  *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92  *  should not modify or directly access the fields in the MessageQ_MsgHeader.
93  *
94  *  All messages sent via the MessageQ module must be allocated from a
95  *  Heap implementation. The heap can be used for
96  *  other memory allocation not related to MessageQ.
97  *
98  *  An application can use multiple heaps. The purpose of having multiple
99  *  heaps is to allow an application to regulate its message usage. For
100  *  example, an application can allocate critical messages from one heap of fast
101  *  on-chip memory and non-critical messages from another heap of slower
102  *  external memory
103  *
104  *  MessageQ does support the usage of messages that allocated via the
105  *  alloc function. Please refer to the staticMsgInit
106  *  function description for more details.
107  *
108  *  In a multiple processor system, MessageQ communications to other
109  *  processors via MessageQTransport instances. There must be one and
110  *  only one MessageQTransport instance for each processor where communication
111  *  is desired.
112  *  So on a four processor system, each processor must have three
113  *  MessageQTransport instance.
114  *
115  *  The user only needs to create the MessageQTransport instances. The instances
116  *  are responsible for registering themselves with MessageQ.
117  *  This is accomplished via the registerTransport function.
118  *
119  *  ============================================================================
120  */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Qnx specific header files */
127 #include <pthread.h>
129 #include <sys/select.h>
130 #include <sys/time.h>
131 #include <sys/types.h>
132 #include <sys/param.h>
133 #include <errno.h>
134 #include <stdio.h>
135 #include <string.h>
136 #include <stdlib.h>
137 #include <unistd.h>
138 #include <assert.h>
140 /* Module level headers */
141 #include <ti/ipc/NameServer.h>
142 #include <ti/ipc/MultiProc.h>
143 #include <_MultiProc.h>
144 #include <ti/ipc/MessageQ.h>
145 #include <_MessageQ.h>
146 #include <_IpcLog.h>
148 /* =============================================================================
149  * Macros/Constants
150  * =============================================================================
151  */
153 /*!
154  *  @brief  Name of the reserved NameServer used for MessageQ.
155  */
156 #define MessageQ_NAMESERVER  "MessageQ"
158 /* Slot 0 reserved for NameServer messages: */
159 #define RESERVED_MSGQ_INDEX  1
161 /* Define BENCHMARK to quiet key MessageQ APIs: */
162 //#define BENCHMARK
164 /* =============================================================================
165  * Structures & Enums
166  * =============================================================================
167  */
169 /* structure for MessageQ module state */
170 typedef struct MessageQ_ModuleObject {
171     Int                 refCount;
172     /*!< Reference count */
173     NameServer_Handle   nameServer;
174     /*!< Handle to the local NameServer used for storing GP objects */
175     pthread_mutex_t     gate;
176     /*!< Handle of gate to be used for local thread safety */
177     MessageQ_Config     cfg;
178     /*!< Current config values */
179     MessageQ_Config     defaultCfg;
180     /*!< Default config values */
181     MessageQ_Params     defaultInstParams;
182     /*!< Default instance creation parameters */
183     MessageQ_Handle *   queues;
184     /*!< Global array of message queues */
185     UInt16              numQueues;
186     /*!< Initial number of messageQ objects allowed */
187     Bool                canFreeQueues;
188     /*!< Grow option */
189     Bits16              seqNum;
190     /*!< sequence number */
191 } MessageQ_ModuleObject;
193 /*!
194  *  @brief  Structure for the Handle for the MessageQ.
195  */
196 typedef struct MessageQ_Object {
197     MessageQ_Params         params;
198     /*! Instance specific creation parameters */
199     MessageQ_QueueId        queue;
200     /* Unique id */
201     MessageQ_QueueIndex     queueIndex;
202     /* 16-bit index into the queues array */
203     Ptr                     nsKey;
204     /* NameServer key */
205     Int                     ownerPid;
206     /* Process ID of owner */
207 } MessageQ_Object;
210 /* =============================================================================
211  *  Globals
212  * =============================================================================
213  */
214 static MessageQ_ModuleObject MessageQ_state =
216     .refCount               = 0,
217     .nameServer             = NULL,
218     .queues                 = NULL,
219     .numQueues              = 2u,
220     .canFreeQueues          = FALSE,
221     .gate                   = PTHREAD_MUTEX_INITIALIZER,
222     .defaultCfg.traceFlag   = FALSE,
223     .defaultCfg.maxRuntimeEntries = 32u,
224     .defaultCfg.maxNameLen    = 32u,
225 };
227 /*!
228  *  @var    MessageQ_module
229  *
230  *  @brief  Pointer to the MessageQ module state.
231  */
232 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
235 /* =============================================================================
236  * Forward declarations of internal functions
237  * =============================================================================
238  */
239 /* Grow the MessageQ table */
240 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
242 /* =============================================================================
243  * APIS
244  * =============================================================================
245  */
246 /* Function to get default configuration for the MessageQ module.
247  *
248  */
249 Void MessageQ_getConfig(MessageQ_Config * cfg)
251     assert(cfg != NULL);
253     /* If setup has not yet been called... */
254     if (MessageQ_module->refCount < 1) {
255         memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
256     }
257     else {
258         memcpy(cfg, &MessageQ_module->cfg, sizeof(MessageQ_Config));
259     }
262 /* Function to setup the MessageQ module. */
263 Int MessageQ_setup(const MessageQ_Config * cfg)
265     Int                    status = MessageQ_S_SUCCESS;
266     NameServer_Params      params;
268     pthread_mutex_lock(&(MessageQ_module->gate));
270     LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
272     MessageQ_module->refCount++;
273     if (MessageQ_module->refCount > 1) {
274         status = MessageQ_S_ALREADYSETUP;
275         LOG1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount)
277         goto exitSetup;
278     }
280     /* Initialize the parameters */
281     NameServer_Params_init(&params);
282     params.maxValueLen = sizeof(UInt32);
283     params.maxNameLen  = cfg->maxNameLen;
285     /* Create the nameserver for modules */
286     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
287                                                     &params);
289     memcpy(&MessageQ_module->cfg, (void *)cfg, sizeof(MessageQ_Config));
291     MessageQ_module->seqNum = 0;
292     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
293     MessageQ_module->queues = (MessageQ_Handle *)
294         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
296 exitSetup:
297     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
299     pthread_mutex_unlock(&(MessageQ_module->gate));
301     return (status);
304 /*
305  * Function to destroy the MessageQ module.
306  */
307 Int MessageQ_destroy(void)
309     Int    status    = MessageQ_S_SUCCESS;
310     UInt32 i;
312     pthread_mutex_lock(&(MessageQ_module->gate));
314     LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
316     /* Decrease the refCount */
317     MessageQ_module->refCount--;
318     if (MessageQ_module->refCount > 0) {
319         goto exitDestroy;
320     }
322     /* Delete any Message Queues that have not been deleted so far. */
323     for (i = 0; i< MessageQ_module->numQueues; i++) {
324         if (MessageQ_module->queues [i] != NULL) {
325             MessageQ_delete(&(MessageQ_module->queues [i]));
326         }
327     }
329     if (MessageQ_module->nameServer != NULL) {
330         /* Delete the nameserver for modules */
331         status = NameServer_delete(&MessageQ_module->nameServer);
332     }
334     /* Since MessageQ_module->gate was not allocated, no need to delete. */
336     if (MessageQ_module->queues != NULL) {
337         free(MessageQ_module->queues);
338         MessageQ_module->queues = NULL;
339     }
341     memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
342     MessageQ_module->numQueues  = 0u;
343     MessageQ_module->canFreeQueues = TRUE;
345 exitDestroy:
346     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
348     pthread_mutex_unlock(&(MessageQ_module->gate));
350     return (status);
353 /*
354  *   Function to create a MessageQ object for receiving.
355  */
356 MessageQ_Handle MessageQ_createWithQueueId(String name, const MessageQ_Params * params,
357     UInt32 queueId)
359     Int                 status    = MessageQ_S_SUCCESS;
360     MessageQ_Object   * obj    = NULL;
361     Bool                found  = FALSE;
362     UInt16              count  = 0;
363     UInt16              queueIndex = 0u;
364     UInt16              procId;
365     int                 i;
367     LOG1("MessageQ_create: creating '%s'\n", name)
369     /* Create the generic obj */
370     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
372     pthread_mutex_lock(&(MessageQ_module->gate));
374     count = MessageQ_module->numQueues;
376     /* Search the dynamic array for any holes */
377     /* We start from 1, as 0 is reserved for binding NameServer: */
378     for (i = RESERVED_MSGQ_INDEX; i < count ; i++) {
379         if (MessageQ_module->queues [i] == NULL) {
380             MessageQ_module->queues [i] = (MessageQ_Handle)obj;
381             queueIndex = i;
382             found = TRUE;
383             break;
384         }
385     }
387     if (found == FALSE) {
388         /* Growth is always allowed. */
389         queueIndex = _MessageQ_grow(obj);
390     }
392     pthread_mutex_unlock(&(MessageQ_module->gate));
394     if (params != NULL) {
395        /* Populate the params member */
396         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
397     }
399     procId = MultiProc_self();
400     obj->queueIndex = queueIndex;
401     /* create globally unique messageQ ID */
402     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queueId);
403     obj->ownerPid = 0;
405     if (name != NULL) {
406         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
407                                           obj->queue);
408     }
410     /* Cleanup if fail */
411     if (status < 0) {
412         MessageQ_delete((MessageQ_Handle *)&obj);
413     }
415     LOG1("MessageQ_create: returning %p\n", obj)
417     return ((MessageQ_Handle)obj);
421 /*
422  * Function to delete a MessageQ object for a specific slave processor.
423  */
424 Int MessageQ_delete(MessageQ_Handle * handlePtr)
426     Int              status = MessageQ_S_SUCCESS;
427     MessageQ_Object *obj;
428     MessageQ_Handle queue;
430     obj = (MessageQ_Object *)(*handlePtr);
432     LOG1("MessageQ_delete: deleting %p\n", obj)
434     queue = MessageQ_module->queues[obj->queueIndex];
435     if (queue != obj) {
436         LOG1("    ERROR: obj != MessageQ_module->queues[%d]\n", (MessageQ_QueueIndex)(obj->queueIndex))
437     }
439     if (obj->nsKey != NULL) {
440         /* Remove from the name server */
441         status = NameServer_removeEntry(MessageQ_module->nameServer,
442                                          obj->nsKey);
443         if (status < 0) {
444             /* Override with a MessageQ status code. */
445             status = MessageQ_E_FAIL;
446         }
447         else {
448             status = MessageQ_S_SUCCESS;
449         }
450     }
452     pthread_mutex_lock(&(MessageQ_module->gate));
454     /* Clear the MessageQ obj from array. */
455     MessageQ_module->queues[(MessageQ_QueueIndex)obj->queueIndex] = NULL;
457     /* Release the local lock */
458     pthread_mutex_unlock(&(MessageQ_module->gate));
460     /* Now free the obj */
461     free(obj);
462     *handlePtr = NULL;
464     LOG1("MessageQ_delete: returning %d\n", status)
466     return (status);
469 /* Returns the MessageQ_QueueId associated with the handle. */
470 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
472     MessageQ_Object * obj = (MessageQ_Object *)handle;
473     UInt32            queueId;
475     queueId = (obj->queue);
477     return queueId;
480 /*!
481  *  @brief   Grow the MessageQ table
482  *
483  *  @param   obj     Pointer to the MessageQ object.
484  *
485  *  @sa      _MessageQ_grow
486  *
487  */
488 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
490     UInt16            queueIndex = MessageQ_module->numQueues;
491     UInt16            oldSize;
492     MessageQ_Handle * queues;
493     MessageQ_Handle * oldQueues;
495     /* No parameter validation required since this is an internal function. */
496     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
498     /* Allocate larger table */
499     queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
501     /* Copy contents into new table */
502     memcpy(queues, MessageQ_module->queues, oldSize);
504     /* Fill in the new entry */
505     queues[queueIndex] = (MessageQ_Handle)obj;
507     /* Hook-up new table */
508     oldQueues = MessageQ_module->queues;
509     MessageQ_module->queues = queues;
510     MessageQ_module->numQueues++;
512     /* Delete old table if not statically defined */
513     if (MessageQ_module->canFreeQueues == TRUE) {
514         free(oldQueues);
515     }
516     else {
517         MessageQ_module->canFreeQueues = TRUE;
518     }
520     LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
522     return (queueIndex);
525 /*
526  * This is a helper function to initialize a message.
527  */
528 Void MessageQ_msgInit(MessageQ_Msg msg)
530     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
531     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
532     msg->msgId     = MessageQ_INVALIDMSGID;
533     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
534     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
535     msg->srcProc   = MultiProc_self();
537     pthread_mutex_lock(&(MessageQ_module->gate));
538     msg->seqNum  = MessageQ_module->seqNum++;
539     pthread_mutex_unlock(&(MessageQ_module->gate));
542 NameServer_Handle MessageQ_getNameServerHandle(void)
544     return MessageQ_module->nameServer;
547 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
549     handle->ownerPid = pid;
551     return;
554 Void MessageQ_cleanupOwner(Int pid)
556     MessageQ_Handle queue;
557     Int i;
559     for (i = 0; i < MessageQ_module->numQueues; i++) {
560         queue = MessageQ_module->queues[i];
561         if (queue != NULL && queue->ownerPid == pid) {
562             MessageQ_delete(&queue);
563         }
564     }