QNX: Change LOGn to GT traces for consistency
[ipc/ipcdev.git] / qnx / src / ipc3x_dev / ti / syslink / ipc / hlos / knl / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2013-2015, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  *
42  *  The MessageQ module supports the structured sending and receiving of
43  *  variable length messages. This module can be used for homogeneous or
44  *  heterogeneous multi-processor messaging.
45  *
46  *  MessageQ provides more sophisticated messaging than other modules. It is
47  *  typically used for complex situations such as multi-processor messaging.
48  *
49  *  The following are key features of the MessageQ module:
50  *  -Writers and readers can be relocated to another processor with no
51  *   runtime code changes.
52  *  -Timeouts are allowed when receiving messages.
53  *  -Readers can determine the writer and reply back.
54  *  -Receiving a message is deterministic when the timeout is zero.
55  *  -Messages can reside on any message queue.
56  *  -Supports zero-copy transfers.
57  *  -Can send and receive from any type of thread.
58  *  -Notification mechanism is specified by application.
59  *  -Allows QoS (quality of service) on message buffer pools. For example,
60  *   using specific buffer pools for specific message queues.
61  *
62  *  Messages are sent and received via a message queue. A reader is a thread
63  *  that gets (reads) messages from a message queue. A writer is a thread that
64  *  puts (writes) a message to a message queue. Each message queue has one
65  *  reader and can have many writers. A thread may read from or write to multiple
66  *  message queues.
67  *
68  *  Conceptually, the reader thread owns a message queue. The reader thread
69  *  creates a message queue. Writer threads  a created message queues to
70  *  get access to them.
71  *
72  *  Message queues are identified by a system-wide unique name. Internally,
73  *  MessageQ uses the NameServermodule for managing
74  *  these names. The names are used for opening a message queue. Using
75  *  names is not required.
76  *
77  *  Messages must be allocated from the MessageQ module. Once a message is
78  *  allocated, it can be sent on any message queue. Once a message is sent, the
79  *  writer loses ownership of the message and should not attempt to modify the
80  *  message. Once the reader receives the message, it owns the message. It
81  *  may either free the message or re-use the message.
82  *
83  *  Messages in a message queue can be of variable length. The only
84  *  requirement is that the first field in the definition of a message must be a
85  *  MsgHeader structure. For example:
86  *  typedef struct MyMsg {
87  *      MessageQ_MsgHeader header;
88  *      ...
89  *  } MyMsg;
90  *
91  *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92  *  should not modify or directly access the fields in the MessageQ_MsgHeader.
93  *
94  *  All messages sent via the MessageQ module must be allocated from a
95  *  Heap implementation. The heap can be used for
96  *  other memory allocation not related to MessageQ.
97  *
98  *  An application can use multiple heaps. The purpose of having multiple
99  *  heaps is to allow an application to regulate its message usage. For
100  *  example, an application can allocate critical messages from one heap of fast
101  *  on-chip memory and non-critical messages from another heap of slower
102  *  external memory
103  *
104  *  MessageQ does support the usage of messages that allocated via the
105  *  alloc function. Please refer to the staticMsgInit
106  *  function description for more details.
107  *
108  *  In a multiple processor system, MessageQ communications to other
109  *  processors via MessageQTransport instances. There must be one and
110  *  only one MessageQTransport instance for each processor where communication
111  *  is desired.
112  *  So on a four processor system, each processor must have three
113  *  MessageQTransport instance.
114  *
115  *  The user only needs to create the MessageQTransport instances. The instances
116  *  are responsible for registering themselves with MessageQ.
117  *  This is accomplished via the registerTransport function.
118  *
119  *  ============================================================================
120  */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Qnx specific header files */
127 #include <pthread.h>
129 #include <sys/select.h>
130 #include <sys/time.h>
131 #include <sys/types.h>
132 #include <sys/param.h>
133 #include <errno.h>
134 #include <stdio.h>
135 #include <string.h>
136 #include <stdlib.h>
137 #include <unistd.h>
138 #include <assert.h>
140 /* Module level headers */
141 #include <ti/ipc/NameServer.h>
142 #include <ti/ipc/MultiProc.h>
143 #include <_MultiProc.h>
144 #include <ti/ipc/MessageQ.h>
145 #include <_MessageQ.h>
146 #include <ti/syslink/utils/Trace.h>
148 /* =============================================================================
149  * Macros/Constants
150  * =============================================================================
151  */
153 /*!
154  *  @brief  Name of the reserved NameServer used for MessageQ.
155  */
156 #define MessageQ_NAMESERVER  "MessageQ"
158 /* Define BENCHMARK to quiet key MessageQ APIs: */
159 //#define BENCHMARK
161 /* =============================================================================
162  * Structures & Enums
163  * =============================================================================
164  */
166 /* structure for MessageQ module state */
167 typedef struct MessageQ_ModuleObject {
168     Int                 refCount;
169     /*!< Reference count */
170     NameServer_Handle   nameServer;
171     /*!< Handle to the local NameServer used for storing GP objects */
172     pthread_mutex_t     gate;
173     /*!< Handle of gate to be used for local thread safety */
174     MessageQ_Config    *cfg;
175     /*!< Current config values */
176     MessageQ_Config     defaultCfg;
177     /*!< Default config values */
178     MessageQ_Params     defaultInstParams;
179     /*!< Default instance creation parameters */
180     MessageQ_Handle *   queues;
181     /*!< Global array of message queues */
182     UInt16              numQueues;
183     /*!< Initial number of messageQ objects allowed */
184     Bool                canFreeQueues;
185     /*!< Grow option */
186     Bits16              seqNum;
187     /*!< sequence number */
188 } MessageQ_ModuleObject;
190 /*!
191  *  @brief  Structure for the Handle for the MessageQ.
192  */
193 typedef struct MessageQ_Object {
194     MessageQ_Params         params;
195     /*! Instance specific creation parameters */
196     MessageQ_QueueId        queue;
197     /* Unique id */
198     Ptr                     nsKey;
199     /* NameServer key */
200     Int                     ownerPid;
201     /* Process ID of owner */
202 } MessageQ_Object;
205 /* =============================================================================
206  *  Globals
207  * =============================================================================
208  */
209 extern MessageQ_Config ti_ipc_MessageQ_cfg;
211 static MessageQ_ModuleObject MessageQ_state =
213     .refCount               = 0,
214     .nameServer             = NULL,
215     .queues                 = NULL,
216     .numQueues              = 2u,
217     .canFreeQueues          = FALSE,
218     .gate                   = PTHREAD_MUTEX_INITIALIZER,
219     .cfg                    = &ti_ipc_MessageQ_cfg,
220     .defaultCfg.traceFlag   = FALSE,
221     .defaultCfg.maxRuntimeEntries  = 32u,
222     .defaultCfg.maxNameLen         = 32u,
223     .defaultCfg.numReservedEntries = 0
224 };
226 /*!
227  *  @var    MessageQ_module
228  *
229  *  @brief  Pointer to the MessageQ module state.
230  */
231 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
234 /* =============================================================================
235  * Forward declarations of internal functions
236  * =============================================================================
237  */
238 /* Grow the MessageQ table */
239 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
241 /* =============================================================================
242  * APIS
243  * =============================================================================
244  */
245 /* Function to get default configuration for the MessageQ module.
246  *
247  */
248 Void MessageQ_getConfig(MessageQ_Config * cfg)
250     assert(cfg != NULL);
252     /* If setup has not yet been called... */
253     if (MessageQ_module->refCount < 1) {
254         memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
255     }
256     else {
257         memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
258     }
261 /* Function to setup the MessageQ module. */
262 Int MessageQ_setup(const MessageQ_Config * cfg)
264     Int                    status = MessageQ_S_SUCCESS;
265     NameServer_Params      params;
267     pthread_mutex_lock(&(MessageQ_module->gate));
269     GT_1trace(curTrace, GT_ENTER, "MessageQ_setup",
270               MessageQ_module->refCount);
272     MessageQ_module->refCount++;
273     if (MessageQ_module->refCount > 1) {
274         status = MessageQ_S_ALREADYSETUP;
275         GT_1trace(curTrace, GT_1CLASS, "MessageQ module has been already setup,"
276                   " refCount=%d", MessageQ_module->refCount);
278         goto exitSetup;
279     }
281     /* Initialize the parameters */
282     NameServer_Params_init(&params);
283     params.maxValueLen = sizeof(UInt32);
284     params.maxNameLen  = MessageQ_module->cfg->maxNameLen;
286     /* Create the nameserver for modules */
287     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
288                                                     &params);
290     MessageQ_module->seqNum = 0;
291     MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
292     MessageQ_module->queues = (MessageQ_Handle *)
293         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
295 exitSetup:
296     GT_1trace(curTrace, GT_LEAVE, "MessageQ_setup", MessageQ_module->refCount);
298     pthread_mutex_unlock(&(MessageQ_module->gate));
300     return (status);
303 /*
304  * Function to destroy the MessageQ module.
305  */
306 Int MessageQ_destroy(void)
308     Int    status    = MessageQ_S_SUCCESS;
309     UInt32 i;
311     pthread_mutex_lock(&(MessageQ_module->gate));
313     GT_1trace(curTrace, GT_ENTER, "MessageQ_destroy",
314               MessageQ_module->refCount);
316     /* Decrease the refCount */
317     MessageQ_module->refCount--;
318     if (MessageQ_module->refCount > 0) {
319         goto exitDestroy;
320     }
322     /* Delete any Message Queues that have not been deleted so far. */
323     for (i = 0; i< MessageQ_module->numQueues; i++) {
324         if (MessageQ_module->queues [i] != NULL) {
325             MessageQ_delete(&(MessageQ_module->queues [i]));
326         }
327     }
329     if (MessageQ_module->nameServer != NULL) {
330         /* Delete the nameserver for modules */
331         status = NameServer_delete(&MessageQ_module->nameServer);
332     }
334     /* Since MessageQ_module->gate was not allocated, no need to delete. */
336     if (MessageQ_module->queues != NULL) {
337         free(MessageQ_module->queues);
338         MessageQ_module->queues = NULL;
339     }
341     MessageQ_module->numQueues  = 0u;
342     MessageQ_module->canFreeQueues = TRUE;
344 exitDestroy:
345     GT_1trace(curTrace, GT_LEAVE, "MessageQ_destroy",
346               MessageQ_module->refCount);
348     pthread_mutex_unlock(&(MessageQ_module->gate));
350     return (status);
353 /*
354  *   Function to create a MessageQ object for receiving.
355  */
356 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
358     Int                 status    = MessageQ_S_SUCCESS;
359     MessageQ_Object   * obj    = NULL;
360     Bool                found  = FALSE;
361     UInt16              count  = 0;
362     UInt16              queueIndex = 0u;
363     UInt16              queuePort;
364     UInt16              procId;
365     int                 i;
366     UInt                numReserved;
368     GT_1trace(curTrace, GT_1CLASS, "MessageQ_create: creating '%s'", name);
370     /* Create the generic obj */
371     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
373     if (obj == NULL) {
374         GT_setFailureReason(curTrace,
375                             GT_4CLASS,
376                             "MessageQ_create",
377                             MessageQ_E_FAIL,
378                             "Error: no memory");
379         return (NULL);
380     }
382     numReserved = MessageQ_module->cfg->numReservedEntries;
384     pthread_mutex_lock(&(MessageQ_module->gate));
386     /* check if creating a reserved queue */
387     if (params->queueIndex != MessageQ_ANY) {
388         queueIndex = params->queueIndex;
390         if (queueIndex > numReserved) {
391             GT_2trace(curTrace, GT_4CLASS, "MessageQ_create: Error: requested "
392                       "queue index %d is greater than reserved maximum %d",
393                       queueIndex, numReserved - 1);
394             free(obj);
395             obj = NULL;
396         }
397         else if (MessageQ_module->queues[queueIndex] != NULL) {
398             GT_1trace(curTrace, GT_4CLASS, "MessageQ_create: Error: "
399                       "requested queue index %d is already in use.",
400                       queueIndex);
401             free(obj);
402             obj = NULL;
403         }
405         if (obj == NULL) {
406             pthread_mutex_unlock(&(MessageQ_module->gate));
407             return (NULL);
408         }
410         MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
411         found = TRUE;
412     }
413     else {
414         count = MessageQ_module->numQueues;
416         /* Search the dynamic array for any holes */
417         for (i = numReserved; i < count ; i++) {
418             if (MessageQ_module->queues [i] == NULL) {
419                 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
420                 queueIndex = i;
421                 found = TRUE;
422                 break;
423             }
424         }
425     }
427     if (found == FALSE) {
428         /* Growth is always allowed. */
429         queueIndex = _MessageQ_grow(obj);
430     }
432     pthread_mutex_unlock(&(MessageQ_module->gate));
434     if (params != NULL) {
435        /* Populate the params member */
436         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
437     }
439     procId = MultiProc_self();
440     /* create globally unique messageQ ID */
441     queuePort = queueIndex + MessageQ_PORTOFFSET;
442     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
443     obj->ownerPid = 0;
445     if (name != NULL) {
446         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
447                                           obj->queue);
448     }
450     /* Cleanup if fail */
451     if (status < 0) {
452         MessageQ_delete((MessageQ_Handle *)&obj);
453     }
455     GT_2trace(curTrace, GT_1CLASS, "MessageQ_create: returning obj=%p, qid=0x%x",
456               obj, obj->queue);
458     return ((MessageQ_Handle)obj);
462 /*
463  * Function to delete a MessageQ object for a specific slave processor.
464  */
465 Int MessageQ_delete(MessageQ_Handle * handlePtr)
467     Int              status = MessageQ_S_SUCCESS;
468     MessageQ_Object *obj;
469     MessageQ_Handle queue;
470     UInt16          queueIndex;
472     obj = (MessageQ_Object *)(*handlePtr);
474     GT_1trace(curTrace, GT_ENTER, "MessageQ_delete", obj);
476     queueIndex = MessageQ_getQueueIndex(obj->queue);
477     queue = MessageQ_module->queues[queueIndex];
478     if (queue != obj) {
479         GT_1trace(curTrace, GT_4CLASS,
480                   "    ERROR: obj != MessageQ_module->queues[%d]", queueIndex);
481     }
483     if (obj->nsKey != NULL) {
484         /* Remove from the name server */
485         status = NameServer_removeEntry(MessageQ_module->nameServer,
486                                          obj->nsKey);
487         if (status < 0) {
488             /* Override with a MessageQ status code. */
489             status = MessageQ_E_FAIL;
490         }
491         else {
492             status = MessageQ_S_SUCCESS;
493         }
494     }
496     pthread_mutex_lock(&(MessageQ_module->gate));
498     /* Clear the MessageQ obj from array. */
499     MessageQ_module->queues[queueIndex] = NULL;
501     /* Release the local lock */
502     pthread_mutex_unlock(&(MessageQ_module->gate));
504     /* Now free the obj */
505     free(obj);
506     *handlePtr = NULL;
508     GT_1trace(curTrace, GT_LEAVE, "MessageQ_delete", status);
510     return (status);
513 /* Returns the MessageQ_QueueId associated with the handle. */
514 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
516     MessageQ_Object * obj = (MessageQ_Object *)handle;
517     UInt32            queueId;
519     queueId = (obj->queue);
521     return queueId;
524 /*!
525  *  @brief   Grow the MessageQ table
526  *
527  *  @param   obj     Pointer to the MessageQ object.
528  *
529  *  @sa      _MessageQ_grow
530  *
531  */
532 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
534     UInt16            queueIndex = MessageQ_module->numQueues;
535     UInt16            oldSize;
536     MessageQ_Handle * queues;
537     MessageQ_Handle * oldQueues;
539     /* No parameter validation required since this is an internal function. */
540     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
542     /* Allocate larger table */
543     queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
545     /* Copy contents into new table */
546     memcpy(queues, MessageQ_module->queues, oldSize);
548     /* Fill in the new entry */
549     queues[queueIndex] = (MessageQ_Handle)obj;
551     /* Hook-up new table */
552     oldQueues = MessageQ_module->queues;
553     MessageQ_module->queues = queues;
554     MessageQ_module->numQueues++;
556     /* Delete old table if not statically defined */
557     if (MessageQ_module->canFreeQueues == TRUE) {
558         free(oldQueues);
559     }
560     else {
561         MessageQ_module->canFreeQueues = TRUE;
562     }
564     GT_1trace(curTrace, GT_1CLASS, "_MessageQ_grow: queueIndex: 0x%x",
565               queueIndex);
567     return (queueIndex);
570 /*
571  * This is a helper function to initialize a message.
572  */
573 Void MessageQ_msgInit(MessageQ_Msg msg)
575     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
576     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
577     msg->msgId     = MessageQ_INVALIDMSGID;
578     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
579     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
580     msg->srcProc   = MultiProc_self();
582     pthread_mutex_lock(&(MessageQ_module->gate));
583     msg->seqNum  = MessageQ_module->seqNum++;
584     pthread_mutex_unlock(&(MessageQ_module->gate));
587 NameServer_Handle MessageQ_getNameServerHandle(void)
589     return MessageQ_module->nameServer;
592 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
594     handle->ownerPid = pid;
596     return;
599 Void MessageQ_cleanupOwner(Int pid)
601     MessageQ_Handle queue;
602     Int i;
604     for (i = 0; i < MessageQ_module->numQueues; i++) {
605         queue = MessageQ_module->queues[i];
606         if (queue != NULL && queue->ownerPid == pid) {
607             MessageQ_delete(&queue);
608         }
609     }