397ec49b8f0f1ab29bdae9f41b597fbc33eb1f8b
[ipc/ipcdev.git] / qnx / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "client" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  *
42  *  The MessageQ module supports the structured sending and receiving of
43  *  variable length messages. This module can be used for homogeneous or
44  *  heterogeneous multi-processor messaging.
45  *
46  *  MessageQ provides more sophisticated messaging than other modules. It is
47  *  typically used for complex situations such as multi-processor messaging.
48  *
49  *  The following are key features of the MessageQ module:
50  *  -Writers and readers can be relocated to another processor with no
51  *   runtime code changes.
52  *  -Timeouts are allowed when receiving messages.
53  *  -Readers can determine the writer and reply back.
54  *  -Receiving a message is deterministic when the timeout is zero.
55  *  -Messages can reside on any message queue.
56  *  -Supports zero-copy transfers.
57  *  -Can send and receive from any type of thread.
58  *  -Notification mechanism is specified by application.
59  *  -Allows QoS (quality of service) on message buffer pools. For example,
60  *   using specific buffer pools for specific message queues.
61  *
62  *  Messages are sent and received via a message queue. A reader is a thread
63  *  that gets (reads) messages from a message queue. A writer is a thread that
64  *  puts (writes) a message to a message queue. Each message queue has one
65  *  reader and can have many writers. A thread may read from or write to multiple
66  *  message queues.
67  *
68  *  Conceptually, the reader thread owns a message queue. The reader thread
69  *  creates a message queue. Writer threads  a created message queues to
70  *  get access to them.
71  *
72  *  Message queues are identified by a system-wide unique name. Internally,
73  *  MessageQ uses the NameServermodule for managing
74  *  these names. The names are used for opening a message queue. Using
75  *  names is not required.
76  *
77  *  Messages must be allocated from the MessageQ module. Once a message is
78  *  allocated, it can be sent on any message queue. Once a message is sent, the
79  *  writer loses ownership of the message and should not attempt to modify the
80  *  message. Once the reader receives the message, it owns the message. It
81  *  may either free the message or re-use the message.
82  *
83  *  Messages in a message queue can be of variable length. The only
84  *  requirement is that the first field in the definition of a message must be a
85  *  MsgHeader structure. For example:
86  *  typedef struct MyMsg {
87  *      MessageQ_MsgHeader header;
88  *      ...
89  *  } MyMsg;
90  *
91  *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92  *  should not modify or directly access the fields in the MessageQ_MsgHeader.
93  *
94  *  All messages sent via the MessageQ module must be allocated from a
95  *  Heap implementation. The heap can be used for
96  *  other memory allocation not related to MessageQ.
97  *
98  *  An application can use multiple heaps. The purpose of having multiple
99  *  heaps is to allow an application to regulate its message usage. For
100  *  example, an application can allocate critical messages from one heap of fast
101  *  on-chip memory and non-critical messages from another heap of slower
102  *  external memory
103  *
104  *  MessageQ does support the usage of messages that allocated via the
105  *  alloc function. Please refer to the staticMsgInit
106  *  function description for more details.
107  *
108  *  In a multiple processor system, MessageQ communications to other
109  *  processors via MessageQTransport instances. There must be one and
110  *  only one MessageQTransport instance for each processor where communication
111  *  is desired.
112  *  So on a four processor system, each processor must have three
113  *  MessageQTransport instance.
114  *
115  *  The user only needs to create the MessageQTransport instances. The instances
116  *  are responsible for registering themselves with MessageQ.
117  *  This is accomplished via the registerTransport function.
118  *
119  *  ============================================================================
120  */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Linux specific header files, replacing OSAL: */
127 #include <pthread.h>
129 /* Module level headers */
130 #include <ti/ipc/NameServer.h>
131 #include <ti/ipc/MultiProc.h>
132 #include <ti/syslink/inc/_MultiProc.h>
133 #include <ti/ipc/MessageQ.h>
134 #include <_MessageQ.h>
135 #include <_IpcLog.h>
136 #include <ti/syslink/inc/MessageQDrvDefs.h>
138 #include <sys/select.h>
139 #include <sys/time.h>
140 #include <sys/types.h>
141 #include <sys/param.h>
143 #include <errno.h>
144 #include <stdio.h>
145 #include <string.h>
146 #include <stdlib.h>
147 #include <unistd.h>
148 #include <assert.h>
149 #include <fcntl.h>
151 #include <ti/syslink/inc/usr/Qnx/MessageQDrv.h>
153 /* TI IPC utils: */
154 #include <TiIpcFxns.h>
156 #include <ti/syslink/inc/ti/ipc/ti_ipc.h>
158 /* =============================================================================
159  * Macros/Constants
160  * =============================================================================
161  */
163 /*!
164  *  @brief  Name of the reserved NameServer used for MessageQ.
165  */
166 #define MessageQ_NAMESERVER  "MessageQ"
168 /* More magic rpmsg port numbers: */
169 #define MESSAGEQ_RPMSG_PORT       61
170 #define MESSAGEQ_RPMSG_MAXSIZE    512
171 #define RPMSG_RESERVED_ADDRESSES  (1024)
173 /* MessageQ needs local address bound to be a 16-bit value */
174 #define MAX_LOCAL_ADDR            0x10000
176 /* Trace flag settings: */
177 #define TRACESHIFT    12
178 #define TRACEMASK     0x1000
180 /* =============================================================================
181  * Structures & Enums
182  * =============================================================================
183  */
185 /* structure for MessageQ module state */
186 typedef struct MessageQ_ModuleObject {
187     Int                 refCount;
188     /*!< Reference count */
189     NameServer_Handle   nameServer;
190     /*!< Handle to the local NameServer used for storing GP objects */
191     pthread_mutex_t     gate;
192     /*!< Handle of gate to be used for local thread safety */
193     MessageQ_Params     defaultInstParams;
194     /*!< Default instance creation parameters */
195     int                 ipcFd[MultiProc_MAXPROCESSORS];
196     /*!< File Descriptors for sending to each remote processor */
197     int                 seqNum;
198     /*!< Process-specific sequence number */
199 } MessageQ_ModuleObject;
201 /*!
202  *  @brief  Structure for the Handle for the MessageQ.
203  */
204 typedef struct MessageQ_Object_tag {
205     MessageQ_Params         params;
206     /*! Instance specific creation parameters */
207     MessageQ_QueueId        queue;
208     /* Unique id */
209     int                     ipcFd;
210     /* File Descriptors to receive from a message queue. */
211     int                     unblockFdW;
212     /* Write this fd to unblock the select() call in MessageQ _get() */
213     int                     unblockFdR;
214      /* File Descriptor to block on to listen to unblockFdW. */
215     void                    *serverHandle;
216 } MessageQ_Object;
218 static Bool verbose = FALSE;
220 /* =============================================================================
221  *  Globals
222  * =============================================================================
223  */
224 static MessageQ_ModuleObject MessageQ_state =
226     .refCount               = 0,
227     .nameServer             = NULL,
228 };
230 /*!
231  *  @var    MessageQ_module
232  *
233  *  @brief  Pointer to the MessageQ module state.
234  */
235 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
238 /* =============================================================================
239  * Forward declarations of internal functions
240  * =============================================================================
241  */
243 /* This is a helper function to initialize a message. */
244 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex);
245 static Int transportCloseEndpoint(int fd);
246 static Int transportGet(int fd, MessageQ_Msg * retMsg);
247 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
249 /* =============================================================================
250  * APIS
251  * =============================================================================
252  */
253 /* Function to get default configuration for the MessageQ module.
254  *
255  */
256 Void MessageQ_getConfig (MessageQ_Config * cfg)
258     Int status;
259     MessageQDrv_CmdArgs cmdArgs;
261     assert (cfg != NULL);
263     cmdArgs.args.getConfig.config = cfg;
264     status = MessageQDrv_ioctl (CMD_MESSAGEQ_GETCONFIG, &cmdArgs);
266     if (status < 0) {
267         PRINTVERBOSE1("MessageQ_getConfig: API (through IOCTL) failed, \
268             status=%d\n", status)
269     }
271     return;
274 /* Function to setup the MessageQ module. */
275 Int MessageQ_setup (const MessageQ_Config * cfg)
277     Int status;
278     MessageQDrv_CmdArgs cmdArgs;
280     Int i;
282     cmdArgs.args.setup.config = (MessageQ_Config *) cfg;
283     status = MessageQDrv_ioctl(CMD_MESSAGEQ_SETUP, &cmdArgs);
284     if (status < 0) {
285         PRINTVERBOSE1("MessageQ_setup: API (through IOCTL) failed, \
286             status=%d\n", status)
287         return status;
288     }
290     MessageQ_module->nameServer = cmdArgs.args.setup.nameServerHandle;
291     MessageQ_module->seqNum = 0;
293     /* Create a default local gate. */
294     pthread_mutex_init (&(MessageQ_module->gate), NULL);
296     /* Clear ipcFd array. */
297     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
298        MessageQ_module->ipcFd[i]      = -1;
299     }
301     return status;
304 /*
305  * Function to destroy the MessageQ module.
306  */
307 Int MessageQ_destroy (void)
309     Int status;
310     MessageQDrv_CmdArgs    cmdArgs;
312     status = MessageQDrv_ioctl (CMD_MESSAGEQ_DESTROY, &cmdArgs);
313     if (status < 0) {
314         PRINTVERBOSE1("MessageQ_destroy: API (through IOCTL) failed, \
315             status=%d\n", status)
316     }
318     return status;
321 /* Function to initialize the parameters for the MessageQ instance. */
322 Void MessageQ_Params_init (MessageQ_Params * params)
324     memcpy (params, &(MessageQ_module->defaultInstParams),
325             sizeof (MessageQ_Params));
327     return;
330 /*
331  *   Function to create a MessageQ object for receiving.
332  *
333  *   Create a file descriptor and bind the source address
334  *   (local ProcId/MessageQ ID) in
335  *   order to get messages dispatched to this messageQ.
336  */
337 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
339     Int                   status    = MessageQ_S_SUCCESS;
340     MessageQ_Object *     obj    = NULL;
341     UInt16                queueIndex = 0u;
342     UInt16                procId;
343     MessageQDrv_CmdArgs   cmdArgs;
344     int                   fildes[2];
346     cmdArgs.args.create.params = (MessageQ_Params *) params;
347     cmdArgs.args.create.name = name;
348     if (name != NULL) {
349         cmdArgs.args.create.nameLen = (strlen (name) + 1);
350     }
351     else {
352         cmdArgs.args.create.nameLen = 0;
353     }
355     /* Create the generic obj */
356     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
357     if (obj == NULL) {
358         PRINTVERBOSE0("MessageQ_create: memory allocation failed\n")
359         return NULL;
360     }
362     PRINTVERBOSE2("MessageQ_create: creating endpoint for: %s, \
363        queueIndex: %d\n", name, queueIndex)
364     status = transportCreateEndpoint(&obj->ipcFd, &queueIndex);
365     if (status < 0) {
366         goto cleanup;
367     }
369     /*
370      * We expect the endpoint creation to return a port number from
371      * the MessageQCopy layer. This port number will be greater than
372      * 1024 and less than 0x10000. Use this number as the queueIndex.
373      */
374     cmdArgs.args.create.queueId = queueIndex;
376     status = MessageQDrv_ioctl (CMD_MESSAGEQ_CREATE, &cmdArgs);
377     if (status < 0) {
378         PRINTVERBOSE1("MessageQ_create: API (through IOCTL) failed, \
379             status=%d\n", status)
380         goto cleanup;
381     }
383     if (params != NULL) {
384        /* Populate the params member */
385         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
386     }
388     procId = MultiProc_self();
389     obj->queue = cmdArgs.args.create.queueId;
390     obj->serverHandle = cmdArgs.args.create.handle;
392     /*
393      * Now, to support MessageQ_unblock() functionality, create an event object.
394      * Writing to this event will unblock the select() call in MessageQ_get().
395      */
396     if (pipe(fildes) == -1) {
397         printf ("MessageQ_create: pipe creation failed: %d, %s\n",
398                    errno, strerror(errno));
399         status = MessageQ_E_FAIL;
400         obj->unblockFdW = obj->unblockFdR = -1;
401     }
402     else {
403         obj->unblockFdW = fildes[1];
404         obj->unblockFdR = fildes[0];
405     }
407 cleanup:
408     /* Cleanup if fail: */
409     if (status < 0) {
410         MessageQ_delete((MessageQ_Handle *)&obj);
411     }
413     return ((MessageQ_Handle) obj);
416 /*
417  * Function to delete a MessageQ object for a specific slave processor.
418  *
419  * Deletes the file descriptors associated with this MessageQ object.
420  */
421 Int MessageQ_delete (MessageQ_Handle * handlePtr)
423     Int               status    = MessageQ_S_SUCCESS;
424     MessageQ_Object * obj       = NULL;
425     MessageQDrv_CmdArgs cmdArgs;
427     assert(handlePtr != NULL);
428     obj = (MessageQ_Object *) (*handlePtr);
429     assert(obj != NULL);
431     if (obj->serverHandle != NULL) {
432         cmdArgs.args.deleteMessageQ.handle = obj->serverHandle;
433         status = MessageQDrv_ioctl (CMD_MESSAGEQ_DELETE, &cmdArgs);
434         if (status < 0) {
435             PRINTVERBOSE1("MessageQ_delete: API (through IOCTL) failed, \
436                 status=%d\n", status)
437         }
438     }
440     /* Close the fds used for MessageQ_unblock(): */
441     if (obj->unblockFdW >= 0) {
442         close(obj->unblockFdW);
443     }
444     if (obj->unblockFdR >= 0) {
445         close(obj->unblockFdR);
446     }
448     /* Close the communication endpoint: */
449     if (obj->ipcFd >= 0) {
450         transportCloseEndpoint(obj->ipcFd);
451     }
453     /* Now free the obj */
454     free (obj);
455     *handlePtr = NULL;
457     return (status);
460 /*
461  *  Opens an instance of MessageQ for sending.
462  *
463  *  We need not create a tiipc file descriptor here; the file descriptors for
464  *  all remote processors were created during MessageQ_attach(), and will be
465  *  retrieved during MessageQ_put().
466  */
467 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
469     Int status = MessageQ_S_SUCCESS;
471     status = NameServer_getUInt32 (MessageQ_module->nameServer,
472                                      name, queueId, NULL);
474     if (status == NameServer_E_NOTFOUND) {
475         /* Set return queue ID to invalid. */
476         *queueId = MessageQ_INVALIDMESSAGEQ;
477         status = MessageQ_E_NOTFOUND;
478     }
479     else if (status >= 0) {
480         /* Override with a MessageQ status code. */
481         status = MessageQ_S_SUCCESS;
482     }
483     else {
484         /* Set return queue ID to invalid. */
485         *queueId = MessageQ_INVALIDMESSAGEQ;
486         /* Override with a MessageQ status code. */
487         if (status == NameServer_E_TIMEOUT) {
488             status = MessageQ_E_TIMEOUT;
489         }
490         else {
491             status = MessageQ_E_FAIL;
492         }
493     }
495     return (status);
498 /* Closes previously opened instance of MessageQ module. */
499 Int MessageQ_close (MessageQ_QueueId * queueId)
501     Int32 status = MessageQ_S_SUCCESS;
503     /* Nothing more to be done for closing the MessageQ. */
504     *queueId = MessageQ_INVALIDMESSAGEQ;
506     return (status);
509 /*
510  * Place a message onto a message queue.
511  *
512  * Calls TransportShm_put(), which handles the sending of the message using the
513  * appropriate kernel interface (socket, device ioctl) call for the remote
514  * procId encoded in the queueId argument.
515  *
516  */
517 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
519     Int      status;
520     UInt16   dstProcId  = (UInt16)(queueId >> 16);
521     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
523     msg->dstId     = queueIndex;
524     msg->dstProc   = dstProcId;
526     status = transportPut(msg, queueIndex, dstProcId);
528     return (status);
531 /*
532  * Gets a message for a message queue and blocks if the queue is empty.
533  * If a message is present, it returns it.  Otherwise it blocks
534  * waiting for a message to arrive.
535  * When a message is returned, it is owned by the caller.
536  *
537  * We block using select() on the receiving tiipc file descriptor, then
538  * get the waiting message via a read.
539  * We use the file descriptors stored in the messageQ object via a previous
540  * call to MessageQ_create().
541  *
542  * Note: We currently do not support messages to be sent between threads on the
543  * lcoal processor.
544  *
545  */
546 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
548     Int     status = MessageQ_S_SUCCESS;
549     Int     tmpStatus;
550     MessageQ_Object * obj = (MessageQ_Object *) handle;
551     int     retval;
552     int     nfds;
553     fd_set  rfds;
554     struct  timeval tv;
555     void    *timevalPtr;
556     int     maxfd = 0;
558     /* Wait (with timeout) and retreive message */
559     FD_ZERO(&rfds);
560     FD_SET(obj->ipcFd, &rfds);
561     maxfd = obj->ipcFd;
563     /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
564     FD_SET(obj->unblockFdR, &rfds);
566     if (timeout == MessageQ_FOREVER) {
567         timevalPtr = NULL;
568     }
569     else {
570         /* Timeout given in msec: convert:  */
571         tv.tv_sec = timeout / 1000;
572         tv.tv_usec = (timeout % 1000) * 1000;
573         timevalPtr = &tv;
574     }
575     /* Add one to last fd created: */
576     nfds = ((maxfd > obj->unblockFdR) ? maxfd : obj->unblockFdR) + 1;
578     retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
579     if (retval)  {
580         if (FD_ISSET(obj->unblockFdR, &rfds))  {
581             /*
582              * Our event was signalled by MessageQ_unblock().
583              *
584              * This is typically done during a shutdown sequence, where
585              * the intention of the client would be to ignore (i.e. not fetch)
586              * any pending messages in the transport's queue.
587              * Thus, we shall not check for nor return any messages.
588              */
589             *msg = NULL;
590             status = MessageQ_E_UNBLOCKED;
591         }
592         else {
593             if (FD_ISSET(obj->ipcFd, &rfds)) {
594                 /* Our transport's fd was signalled: Get the message: */
595                 tmpStatus = transportGet(obj->ipcFd, msg);
596                 if (tmpStatus < 0) {
597                     printf ("MessageQ_get: tranposrtshm_get failed.");
598                     status = MessageQ_E_FAIL;
599                 }
600             }
601         }
602     }
603     else if (retval == 0) {
604         *msg = NULL;
605         status = MessageQ_E_TIMEOUT;
606     }
608     return (status);
611 /*
612  * Return a count of the number of messages in the queue
613  *
614  * TBD: To be implemented. Return -1 for now.
615  */
616 Int MessageQ_count (MessageQ_Handle handle)
618     Int               count = -1;
619     return (count);
622 /* Initializes a message not obtained from MessageQ_alloc. */
623 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
625     /* Fill in the fields of the message */
626     MessageQ_msgInit (msg);
627     msg->heapId  = MessageQ_STATICMSG;
628     msg->msgSize = size;
631 /*
632  * Allocate a message and initialize the needed fields (note some
633  * of the fields in the header are set via other APIs or in the
634  * MessageQ_put function,
635  */
636 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
638     MessageQ_Msg msg       = NULL;
640     /*
641      * heapId not used for local alloc (as this is over a copy transport), but
642      * we need to send to other side as heapId is used in BIOS transport:
643      */
644     msg = (MessageQ_Msg)calloc (1, size);
645     MessageQ_msgInit (msg);
646     msg->msgSize = size;
647     msg->heapId  = heapId;
649     return msg;
652 /* Frees the message back to the heap that was used to allocate it. */
653 Int MessageQ_free (MessageQ_Msg msg)
655     UInt32         status = MessageQ_S_SUCCESS;
657     /* Check to ensure this was not allocated by user: */
658     if (msg->heapId == MessageQ_STATICMSG)  {
659         status =  MessageQ_E_CANNOTFREESTATICMSG;
660     }
661     else {
662         free (msg);
663     }
665     return status;
668 /* Register a heap with MessageQ. */
669 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
671     Int  status = MessageQ_S_SUCCESS;
673     /* Do nothing, as this uses a copy transport: */
675     return status;
678 /* Unregister a heap with MessageQ. */
679 Int MessageQ_unregisterHeap (UInt16 heapId)
681     Int  status = MessageQ_S_SUCCESS;
683     /* Do nothing, as this uses a copy transport: */
685     return status;
688 /* Unblocks a MessageQ */
689 Void MessageQ_unblock (MessageQ_Handle handle)
691     MessageQ_Object * obj   = (MessageQ_Object *) handle;
692     char         buf = 'n';
693     int          numBytes;
695     /* Write to pipe to awaken any threads blocked on this messageQ: */
696     numBytes = write(obj->unblockFdW, &buf, 1);
699 /* Embeds a source message queue into a message. */
700 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
702     MessageQ_Object * obj   = (MessageQ_Object *) handle;
704     msg->replyId   = (UInt16)(obj->queue);
705     msg->replyProc = (UInt16)(obj->queue >> 16);
708 /* Returns the QueueId associated with the handle. */
709 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
711     MessageQ_Object * obj = (MessageQ_Object *) handle;
712     UInt32            queueId;
714     queueId = (obj->queue);
716     return queueId;
719 /* Sets the tracing of a message */
720 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
722     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
725 /*
726  *  Returns the amount of shared memory used by one transport instance.
727  *
728  *  The MessageQ module itself does not use any shared memory but the
729  *  underlying transport may use some shared memory.
730  */
731 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
733     SizeT memReq = 0u;
735     /* Do nothing, as this is a copy transport. */
737     return (memReq);
740 /*
741  *  Opens a file descriptor for this remote proc.
742  *
743  *  Only opens it if one does not already exist for this procId.
744  *
745  *  Note: remoteProcId may be MultiProc_Self() for loopback case.
746  */
747 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
749     Int     status = MessageQ_S_SUCCESS;
750     UInt32  localAddr;
751     int     ipcFd;
752     int     err;
754     PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
756     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
757         status = MessageQ_E_INVALIDPROCID;
758         goto exit;
759     }
761     pthread_mutex_lock (&(MessageQ_module->gate));
763     /* Only open a fd if one doesn't exist: */
764     if (MessageQ_module->ipcFd[remoteProcId] == -1)  {
765         /* Create a fd for sending messages to the remote proc: */
766         ipcFd = open("/dev/tiipc", O_RDWR);
767         if (ipcFd < 0) {
768             status = MessageQ_E_FAIL;
769             printf ("MessageQ_attach: open of tiipc device failed: %d, %s\n",
770                        errno, strerror(errno));
771         }
772         else  {
773             PRINTVERBOSE1("MessageQ_attach: opened tiipc fd for sending: %d\n",
774                 ipcFd)
775             MessageQ_module->ipcFd[remoteProcId] = ipcFd;
776             /*
777              * Connect to the remote endpoint and bind any reserved address as
778              * local endpoint
779              */
780             Connect(ipcFd, remoteProcId, MESSAGEQ_RPMSG_PORT);
781             err = BindAddr(ipcFd, &localAddr);
782             if (err < 0) {
783                 status = MessageQ_E_FAIL;
784                 printf ("MessageQ_attach: bind failed: %d, %s\n",
785                     errno, strerror(errno));
786             }
787         }
788     }
789     else {
790         status = MessageQ_E_ALREADYEXISTS;
791     }
793     pthread_mutex_unlock (&(MessageQ_module->gate));
795 exit:
796     return (status);
799 /*
800  *  Close the fd for this remote proc.
801  *
802  */
803 Int MessageQ_detach (UInt16 remoteProcId)
805     Int status = MessageQ_S_SUCCESS;
806     int ipcFd;
808     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
809         status = MessageQ_E_INVALIDPROCID;
810         goto exit;
811     }
813     pthread_mutex_lock (&(MessageQ_module->gate));
815     ipcFd = MessageQ_module->ipcFd[remoteProcId];
816     if (close (ipcFd)) {
817         status = MessageQ_E_OSFAILURE;
818         printf("MessageQ_detach: close failed: %d, %s\n",
819                        errno, strerror(errno));
820     }
821     else {
822         PRINTVERBOSE1("MessageQ_detach: closed fd: %d\n", ipcFd)
823         MessageQ_module->ipcFd[remoteProcId] = -1;
824     }
826     pthread_mutex_unlock (&(MessageQ_module->gate));
828 exit:
829     return (status);
832 /*
833  * This is a helper function to initialize a message.
834  */
835 Void MessageQ_msgInit (MessageQ_Msg msg)
837     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
838     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
839     msg->msgId     = MessageQ_INVALIDMSGID;
840     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
841     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
842     msg->srcProc   = MultiProc_self();
844     pthread_mutex_lock(&(MessageQ_module->gate));
845     msg->seqNum  = MessageQ_module->seqNum++;
846     pthread_mutex_unlock(&(MessageQ_module->gate));
849 /*
850  * =============================================================================
851  * Transport: Fxns kept here until need for a transport layer is realized.
852  * =============================================================================
853  */
854 /*
855  * ======== transportCreateEndpoint ========
856  *
857  * Create a communication endpoint to receive messages.
858  */
859 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex)
861     Int          status    = MessageQ_S_SUCCESS;
862     int          err;
863     UInt32       localAddr;
865     /* Create a fd to the ti-ipc to receive messages for this messageQ */
866     *fd= open("/dev/tiipc", O_RDWR);
867     if (*fd < 0) {
868         status = MessageQ_E_FAIL;
869         printf ("transportCreateEndpoint: Couldn't open tiipc device: %d, %s\n",
870                   errno, strerror(errno));
872         goto exit;
873     }
875     PRINTVERBOSE1("transportCreateEndpoint: opened fd: %d\n", *fd)
877     err = BindAddr(*fd, &localAddr);
878     if (err < 0) {
879         status = MessageQ_E_FAIL;
880         printf("transportCreateEndpoint: bind failed: %d, %s\n",
881                   errno, strerror(errno));
883         close(*fd);
884         goto exit;
885     }
887     if (localAddr >= MAX_LOCAL_ADDR) {
888         status = MessageQ_E_FAIL;
889         printf("transportCreateEndpoint: local address returned is"
890             "by BindAddr is greater than max supported\n");
892         close(*fd);
893         goto exit;
894     }
896     *queueIndex = localAddr;
898 exit:
899     return (status);
902 /*
903  * ======== transportCloseEndpoint ========
904  *
905  *  Close the communication endpoint.
906  */
907 static Int transportCloseEndpoint(int fd)
909     Int status = MessageQ_S_SUCCESS;
911     PRINTVERBOSE1("transportCloseEndpoint: closing fd: %d\n", fd)
913     /* Stop communication to this endpoint */
914     close(fd);
916     return (status);
919 /*
920  * ======== transportGet ========
921  *  Retrieve a message waiting in the queue.
922 */
923 static Int transportGet(int fd, MessageQ_Msg * retMsg)
925     Int           status    = MessageQ_S_SUCCESS;
926     MessageQ_Msg  msg;
927     int           ret;
928     int           byteCount;
929     tiipc_remote_params remote;
931     /*
932      * We have no way of peeking to see what message size we'll get, so we
933      * allocate a message of max size to receive contents from tiipc
934      * (currently, a copy transport)
935      */
936     msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
937     if (!msg)  {
938         status = MessageQ_E_MEMORY;
939         goto exit;
940     }
942     /* Get message */
943     byteCount = read(fd, msg, MESSAGEQ_RPMSG_MAXSIZE);
944     if (byteCount < 0) {
945         printf("read failed: %s (%d)\n", strerror(errno), errno);
946         status = MessageQ_E_FAIL;
947         goto exit;
948     }
949     else {
950          /* Update the allocated message size (even though this may waste space
951           * when the actual message is smaller than the maximum rpmsg size,
952           * the message will be freed soon anyway, and it avoids an extra copy).
953           */
954          msg->msgSize = byteCount;
956          /*
957           * If the message received was statically allocated, reset the
958           * heapId, so the app can free it.
959           */
960          if (msg->heapId == MessageQ_STATICMSG)  {
961              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
962          }
963     }
965     PRINTVERBOSE1("transportGet: read from fd: %d\n", fd)
966     ret = ioctl(fd, TIIPC_IOCGETREMOTE, &remote);
967     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg \
968         proc: %d\n", byteCount, remote.remote_addr, remote.remote_proc)
969     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
971     *retMsg = msg;
973 exit:
974     return (status);
977 /*
978  * ======== transportPut ========
979  *
980  * Write to tiipc file descriptor associated with
981  * with this destination procID.
982  */
983 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
985     Int     status    = MessageQ_S_SUCCESS;
986     int     ipcFd;
987     int     err;
989     /*
990      * Retrieve the tiipc file descriptor associated with this
991      * transport for the destination processor.
992      */
993     ipcFd = MessageQ_module->ipcFd[dstProcId];
995     PRINTVERBOSE2("Sending msgId: %d via fd: %d\n", msg->msgId, ipcFd)
997     /* send response message to remote processor */
998     err = write(ipcFd, msg, msg->msgSize);
999     if (err < 0) {
1000         printf ("transportPut: write failed: %d, %s\n",
1001                   errno, strerror(errno));
1002         status = MessageQ_E_FAIL;
1003         goto exit;
1004     }
1006     /*
1007      * Free the message, as this is a copy transport, we maintain MessageQ
1008      * semantics.
1009      */
1010     MessageQ_free (msg);
1012 exit:
1013     return (status);