Add reserved queue support for QNX
[ipc/ipcdev.git] / qnx / src / ipc3x_dev / ti / syslink / ipc / hlos / knl / MessageQ_daemon.c
1 /*
2  * Copyright (c) 2013-2015, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "server" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained here as needed and process-specific data
39  *  is handled at the "client" level.  At the moment, LAD is the only "user"
40  *  of this implementation.
41  *
42  *  The MessageQ module supports the structured sending and receiving of
43  *  variable length messages. This module can be used for homogeneous or
44  *  heterogeneous multi-processor messaging.
45  *
46  *  MessageQ provides more sophisticated messaging than other modules. It is
47  *  typically used for complex situations such as multi-processor messaging.
48  *
49  *  The following are key features of the MessageQ module:
50  *  -Writers and readers can be relocated to another processor with no
51  *   runtime code changes.
52  *  -Timeouts are allowed when receiving messages.
53  *  -Readers can determine the writer and reply back.
54  *  -Receiving a message is deterministic when the timeout is zero.
55  *  -Messages can reside on any message queue.
56  *  -Supports zero-copy transfers.
57  *  -Can send and receive from any type of thread.
58  *  -Notification mechanism is specified by application.
59  *  -Allows QoS (quality of service) on message buffer pools. For example,
60  *   using specific buffer pools for specific message queues.
61  *
62  *  Messages are sent and received via a message queue. A reader is a thread
63  *  that gets (reads) messages from a message queue. A writer is a thread that
64  *  puts (writes) a message to a message queue. Each message queue has one
65  *  reader and can have many writers. A thread may read from or write to multiple
66  *  message queues.
67  *
68  *  Conceptually, the reader thread owns a message queue. The reader thread
69  *  creates a message queue. Writer threads  a created message queues to
70  *  get access to them.
71  *
72  *  Message queues are identified by a system-wide unique name. Internally,
73  *  MessageQ uses the NameServermodule for managing
74  *  these names. The names are used for opening a message queue. Using
75  *  names is not required.
76  *
77  *  Messages must be allocated from the MessageQ module. Once a message is
78  *  allocated, it can be sent on any message queue. Once a message is sent, the
79  *  writer loses ownership of the message and should not attempt to modify the
80  *  message. Once the reader receives the message, it owns the message. It
81  *  may either free the message or re-use the message.
82  *
83  *  Messages in a message queue can be of variable length. The only
84  *  requirement is that the first field in the definition of a message must be a
85  *  MsgHeader structure. For example:
86  *  typedef struct MyMsg {
87  *      MessageQ_MsgHeader header;
88  *      ...
89  *  } MyMsg;
90  *
91  *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92  *  should not modify or directly access the fields in the MessageQ_MsgHeader.
93  *
94  *  All messages sent via the MessageQ module must be allocated from a
95  *  Heap implementation. The heap can be used for
96  *  other memory allocation not related to MessageQ.
97  *
98  *  An application can use multiple heaps. The purpose of having multiple
99  *  heaps is to allow an application to regulate its message usage. For
100  *  example, an application can allocate critical messages from one heap of fast
101  *  on-chip memory and non-critical messages from another heap of slower
102  *  external memory
103  *
104  *  MessageQ does support the usage of messages that allocated via the
105  *  alloc function. Please refer to the staticMsgInit
106  *  function description for more details.
107  *
108  *  In a multiple processor system, MessageQ communications to other
109  *  processors via MessageQTransport instances. There must be one and
110  *  only one MessageQTransport instance for each processor where communication
111  *  is desired.
112  *  So on a four processor system, each processor must have three
113  *  MessageQTransport instance.
114  *
115  *  The user only needs to create the MessageQTransport instances. The instances
116  *  are responsible for registering themselves with MessageQ.
117  *  This is accomplished via the registerTransport function.
118  *
119  *  ============================================================================
120  */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Qnx specific header files */
127 #include <pthread.h>
129 #include <sys/select.h>
130 #include <sys/time.h>
131 #include <sys/types.h>
132 #include <sys/param.h>
133 #include <errno.h>
134 #include <stdio.h>
135 #include <string.h>
136 #include <stdlib.h>
137 #include <unistd.h>
138 #include <assert.h>
140 /* Module level headers */
141 #include <ti/ipc/NameServer.h>
142 #include <ti/ipc/MultiProc.h>
143 #include <_MultiProc.h>
144 #include <ti/ipc/MessageQ.h>
145 #include <_MessageQ.h>
146 #include <_IpcLog.h>
148 /* =============================================================================
149  * Macros/Constants
150  * =============================================================================
151  */
153 /*!
154  *  @brief  Name of the reserved NameServer used for MessageQ.
155  */
156 #define MessageQ_NAMESERVER  "MessageQ"
158 /* Define BENCHMARK to quiet key MessageQ APIs: */
159 //#define BENCHMARK
161 /* =============================================================================
162  * Structures & Enums
163  * =============================================================================
164  */
166 /* structure for MessageQ module state */
167 typedef struct MessageQ_ModuleObject {
168     Int                 refCount;
169     /*!< Reference count */
170     NameServer_Handle   nameServer;
171     /*!< Handle to the local NameServer used for storing GP objects */
172     pthread_mutex_t     gate;
173     /*!< Handle of gate to be used for local thread safety */
174     MessageQ_Config    *cfg;
175     /*!< Current config values */
176     MessageQ_Config     defaultCfg;
177     /*!< Default config values */
178     MessageQ_Params     defaultInstParams;
179     /*!< Default instance creation parameters */
180     MessageQ_Handle *   queues;
181     /*!< Global array of message queues */
182     UInt16              numQueues;
183     /*!< Initial number of messageQ objects allowed */
184     Bool                canFreeQueues;
185     /*!< Grow option */
186     Bits16              seqNum;
187     /*!< sequence number */
188 } MessageQ_ModuleObject;
190 /*!
191  *  @brief  Structure for the Handle for the MessageQ.
192  */
193 typedef struct MessageQ_Object {
194     MessageQ_Params         params;
195     /*! Instance specific creation parameters */
196     MessageQ_QueueId        queue;
197     /* Unique id */
198     Ptr                     nsKey;
199     /* NameServer key */
200     Int                     ownerPid;
201     /* Process ID of owner */
202 } MessageQ_Object;
205 /* =============================================================================
206  *  Globals
207  * =============================================================================
208  */
209 extern MessageQ_Config ti_ipc_MessageQ_cfg;
211 static MessageQ_ModuleObject MessageQ_state =
213     .refCount               = 0,
214     .nameServer             = NULL,
215     .queues                 = NULL,
216     .numQueues              = 2u,
217     .canFreeQueues          = FALSE,
218     .gate                   = PTHREAD_MUTEX_INITIALIZER,
219     .cfg                    = &ti_ipc_MessageQ_cfg,
220     .defaultCfg.traceFlag   = FALSE,
221     .defaultCfg.maxRuntimeEntries  = 32u,
222     .defaultCfg.maxNameLen         = 32u,
223     .defaultCfg.numReservedEntries = 0
224 };
226 /*!
227  *  @var    MessageQ_module
228  *
229  *  @brief  Pointer to the MessageQ module state.
230  */
231 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
234 /* =============================================================================
235  * Forward declarations of internal functions
236  * =============================================================================
237  */
238 /* Grow the MessageQ table */
239 static UInt16 _MessageQ_grow(MessageQ_Object * obj);
241 /* =============================================================================
242  * APIS
243  * =============================================================================
244  */
245 /* Function to get default configuration for the MessageQ module.
246  *
247  */
248 Void MessageQ_getConfig(MessageQ_Config * cfg)
250     assert(cfg != NULL);
252     /* If setup has not yet been called... */
253     if (MessageQ_module->refCount < 1) {
254         memcpy(cfg, &MessageQ_module->defaultCfg, sizeof(MessageQ_Config));
255     }
256     else {
257         memcpy(cfg, MessageQ_module->cfg, sizeof(MessageQ_Config));
258     }
261 /* Function to setup the MessageQ module. */
262 Int MessageQ_setup(const MessageQ_Config * cfg)
264     Int                    status = MessageQ_S_SUCCESS;
265     NameServer_Params      params;
267     pthread_mutex_lock(&(MessageQ_module->gate));
269     LOG1("MessageQ_setup: entered, refCount=%d\n", MessageQ_module->refCount)
271     MessageQ_module->refCount++;
272     if (MessageQ_module->refCount > 1) {
273         status = MessageQ_S_ALREADYSETUP;
274         LOG1("MessageQ module has been already setup, refCount=%d\n",
275             MessageQ_module->refCount)
277         goto exitSetup;
278     }
280     /* Initialize the parameters */
281     NameServer_Params_init(&params);
282     params.maxValueLen = sizeof(UInt32);
283     params.maxNameLen  = MessageQ_module->cfg->maxNameLen;
285     /* Create the nameserver for modules */
286     MessageQ_module->nameServer = NameServer_create(MessageQ_NAMESERVER,
287                                                     &params);
289     MessageQ_module->seqNum = 0;
290     MessageQ_module->numQueues = MessageQ_module->cfg->maxRuntimeEntries;
291     MessageQ_module->queues = (MessageQ_Handle *)
292         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
294 exitSetup:
295     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
297     pthread_mutex_unlock(&(MessageQ_module->gate));
299     return (status);
302 /*
303  * Function to destroy the MessageQ module.
304  */
305 Int MessageQ_destroy(void)
307     Int    status    = MessageQ_S_SUCCESS;
308     UInt32 i;
310     pthread_mutex_lock(&(MessageQ_module->gate));
312     LOG1("MessageQ_destroy: entered, refCount=%d\n", MessageQ_module->refCount)
314     /* Decrease the refCount */
315     MessageQ_module->refCount--;
316     if (MessageQ_module->refCount > 0) {
317         goto exitDestroy;
318     }
320     /* Delete any Message Queues that have not been deleted so far. */
321     for (i = 0; i< MessageQ_module->numQueues; i++) {
322         if (MessageQ_module->queues [i] != NULL) {
323             MessageQ_delete(&(MessageQ_module->queues [i]));
324         }
325     }
327     if (MessageQ_module->nameServer != NULL) {
328         /* Delete the nameserver for modules */
329         status = NameServer_delete(&MessageQ_module->nameServer);
330     }
332     /* Since MessageQ_module->gate was not allocated, no need to delete. */
334     if (MessageQ_module->queues != NULL) {
335         free(MessageQ_module->queues);
336         MessageQ_module->queues = NULL;
337     }
339     MessageQ_module->numQueues  = 0u;
340     MessageQ_module->canFreeQueues = TRUE;
342 exitDestroy:
343     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
345     pthread_mutex_unlock(&(MessageQ_module->gate));
347     return (status);
350 /*
351  *   Function to create a MessageQ object for receiving.
352  */
353 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
355     Int                 status    = MessageQ_S_SUCCESS;
356     MessageQ_Object   * obj    = NULL;
357     Bool                found  = FALSE;
358     UInt16              count  = 0;
359     UInt16              queueIndex = 0u;
360     UInt16              queuePort;
361     UInt16              procId;
362     int                 i;
363     UInt                numReserved;
365     LOG1("MessageQ_create: creating '%s'\n", name)
367     /* Create the generic obj */
368     obj = (MessageQ_Object *)calloc(1, sizeof(MessageQ_Object));
370     if (obj == NULL) {
371         LOG0("MessageQ_create: Error: no memory\n")
372         return (NULL);
373     }
375     numReserved = MessageQ_module->cfg->numReservedEntries;
377     pthread_mutex_lock(&(MessageQ_module->gate));
379     /* check if creating a reserved queue */
380     if (params->queueIndex != MessageQ_ANY) {
381         queueIndex = params->queueIndex;
383         if (queueIndex > numReserved) {
384             LOG2("MessageQ_create: Error: requested queue index %d is greater "
385                     "than reserved maximum %d\n", queueIndex, numReserved - 1)
386             free(obj);
387             obj = NULL;
388         }
389         else if (MessageQ_module->queues[queueIndex] != NULL) {
390             LOG1("MessageQ_create: Error: requested queue index %d is already "
391                     "in use.\n", queueIndex);
392             free(obj);
393             obj = NULL;
394         }
396         if (obj == NULL) {
397             pthread_mutex_unlock(&(MessageQ_module->gate));
398             return (NULL);
399         }
401         MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
402         found = TRUE;
403     }
404     else {
405         count = MessageQ_module->numQueues;
407         /* Search the dynamic array for any holes */
408         for (i = numReserved; i < count ; i++) {
409             if (MessageQ_module->queues [i] == NULL) {
410                 MessageQ_module->queues [i] = (MessageQ_Handle)obj;
411                 queueIndex = i;
412                 found = TRUE;
413                 break;
414             }
415         }
416     }
418     if (found == FALSE) {
419         /* Growth is always allowed. */
420         queueIndex = _MessageQ_grow(obj);
421     }
423     pthread_mutex_unlock(&(MessageQ_module->gate));
425     if (params != NULL) {
426        /* Populate the params member */
427         memcpy((Ptr)&obj->params, (Ptr)params, sizeof(MessageQ_Params));
428     }
430     procId = MultiProc_self();
431     /* create globally unique messageQ ID */
432     queuePort = queueIndex + MessageQ_PORTOFFSET;
433     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
434     obj->ownerPid = 0;
436     if (name != NULL) {
437         obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
438                                           obj->queue);
439     }
441     /* Cleanup if fail */
442     if (status < 0) {
443         MessageQ_delete((MessageQ_Handle *)&obj);
444     }
446     LOG2("MessageQ_create: returning obj=%p, qid=0x%x\n", obj, obj->queue)
448     return ((MessageQ_Handle)obj);
452 /*
453  * Function to delete a MessageQ object for a specific slave processor.
454  */
455 Int MessageQ_delete(MessageQ_Handle * handlePtr)
457     Int              status = MessageQ_S_SUCCESS;
458     MessageQ_Object *obj;
459     MessageQ_Handle queue;
460     UInt16          queueIndex;
462     obj = (MessageQ_Object *)(*handlePtr);
464     LOG1("MessageQ_delete: deleting %p\n", obj)
466     queueIndex = MessageQ_getQueueIndex(obj->queue);
467     queue = MessageQ_module->queues[queueIndex];
468     if (queue != obj) {
469         LOG1("    ERROR: obj != MessageQ_module->queues[%d]\n", queueIndex)
470     }
472     if (obj->nsKey != NULL) {
473         /* Remove from the name server */
474         status = NameServer_removeEntry(MessageQ_module->nameServer,
475                                          obj->nsKey);
476         if (status < 0) {
477             /* Override with a MessageQ status code. */
478             status = MessageQ_E_FAIL;
479         }
480         else {
481             status = MessageQ_S_SUCCESS;
482         }
483     }
485     pthread_mutex_lock(&(MessageQ_module->gate));
487     /* Clear the MessageQ obj from array. */
488     MessageQ_module->queues[queueIndex] = NULL;
490     /* Release the local lock */
491     pthread_mutex_unlock(&(MessageQ_module->gate));
493     /* Now free the obj */
494     free(obj);
495     *handlePtr = NULL;
497     LOG1("MessageQ_delete: returning %d\n", status)
499     return (status);
502 /* Returns the MessageQ_QueueId associated with the handle. */
503 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
505     MessageQ_Object * obj = (MessageQ_Object *)handle;
506     UInt32            queueId;
508     queueId = (obj->queue);
510     return queueId;
513 /*!
514  *  @brief   Grow the MessageQ table
515  *
516  *  @param   obj     Pointer to the MessageQ object.
517  *
518  *  @sa      _MessageQ_grow
519  *
520  */
521 static UInt16 _MessageQ_grow(MessageQ_Object * obj)
523     UInt16            queueIndex = MessageQ_module->numQueues;
524     UInt16            oldSize;
525     MessageQ_Handle * queues;
526     MessageQ_Handle * oldQueues;
528     /* No parameter validation required since this is an internal function. */
529     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
531     /* Allocate larger table */
532     queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
534     /* Copy contents into new table */
535     memcpy(queues, MessageQ_module->queues, oldSize);
537     /* Fill in the new entry */
538     queues[queueIndex] = (MessageQ_Handle)obj;
540     /* Hook-up new table */
541     oldQueues = MessageQ_module->queues;
542     MessageQ_module->queues = queues;
543     MessageQ_module->numQueues++;
545     /* Delete old table if not statically defined */
546     if (MessageQ_module->canFreeQueues == TRUE) {
547         free(oldQueues);
548     }
549     else {
550         MessageQ_module->canFreeQueues = TRUE;
551     }
553     LOG1("_MessageQ_grow: queueIndex: 0x%x\n", queueIndex)
555     return (queueIndex);
558 /*
559  * This is a helper function to initialize a message.
560  */
561 Void MessageQ_msgInit(MessageQ_Msg msg)
563     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
564     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
565     msg->msgId     = MessageQ_INVALIDMSGID;
566     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
567     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
568     msg->srcProc   = MultiProc_self();
570     pthread_mutex_lock(&(MessageQ_module->gate));
571     msg->seqNum  = MessageQ_module->seqNum++;
572     pthread_mutex_unlock(&(MessageQ_module->gate));
575 NameServer_Handle MessageQ_getNameServerHandle(void)
577     return MessageQ_module->nameServer;
580 Void MessageQ_setQueueOwner(MessageQ_Handle handle, Int pid)
582     handle->ownerPid = pid;
584     return;
587 Void MessageQ_cleanupOwner(Int pid)
589     MessageQ_Handle queue;
590     Int i;
592     for (i = 0; i < MessageQ_module->numQueues; i++) {
593         queue = MessageQ_module->queues[i];
594         if (queue != NULL && queue->ownerPid == pid) {
595             MessageQ_delete(&queue);
596         }
597     }