Improve MessageQ recovery on DRA7xx QNX
[ipc/ipcdev.git] / qnx / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*============================================================================
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ module "client" implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  *
42  *  The MessageQ module supports the structured sending and receiving of
43  *  variable length messages. This module can be used for homogeneous or
44  *  heterogeneous multi-processor messaging.
45  *
46  *  MessageQ provides more sophisticated messaging than other modules. It is
47  *  typically used for complex situations such as multi-processor messaging.
48  *
49  *  The following are key features of the MessageQ module:
50  *  -Writers and readers can be relocated to another processor with no
51  *   runtime code changes.
52  *  -Timeouts are allowed when receiving messages.
53  *  -Readers can determine the writer and reply back.
54  *  -Receiving a message is deterministic when the timeout is zero.
55  *  -Messages can reside on any message queue.
56  *  -Supports zero-copy transfers.
57  *  -Can send and receive from any type of thread.
58  *  -Notification mechanism is specified by application.
59  *  -Allows QoS (quality of service) on message buffer pools. For example,
60  *   using specific buffer pools for specific message queues.
61  *
62  *  Messages are sent and received via a message queue. A reader is a thread
63  *  that gets (reads) messages from a message queue. A writer is a thread that
64  *  puts (writes) a message to a message queue. Each message queue has one
65  *  reader and can have many writers. A thread may read from or write to multiple
66  *  message queues.
67  *
68  *  Conceptually, the reader thread owns a message queue. The reader thread
69  *  creates a message queue. Writer threads  a created message queues to
70  *  get access to them.
71  *
72  *  Message queues are identified by a system-wide unique name. Internally,
73  *  MessageQ uses the NameServermodule for managing
74  *  these names. The names are used for opening a message queue. Using
75  *  names is not required.
76  *
77  *  Messages must be allocated from the MessageQ module. Once a message is
78  *  allocated, it can be sent on any message queue. Once a message is sent, the
79  *  writer loses ownership of the message and should not attempt to modify the
80  *  message. Once the reader receives the message, it owns the message. It
81  *  may either free the message or re-use the message.
82  *
83  *  Messages in a message queue can be of variable length. The only
84  *  requirement is that the first field in the definition of a message must be a
85  *  MsgHeader structure. For example:
86  *  typedef struct MyMsg {
87  *      MessageQ_MsgHeader header;
88  *      ...
89  *  } MyMsg;
90  *
91  *  The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92  *  should not modify or directly access the fields in the MessageQ_MsgHeader.
93  *
94  *  All messages sent via the MessageQ module must be allocated from a
95  *  Heap implementation. The heap can be used for
96  *  other memory allocation not related to MessageQ.
97  *
98  *  An application can use multiple heaps. The purpose of having multiple
99  *  heaps is to allow an application to regulate its message usage. For
100  *  example, an application can allocate critical messages from one heap of fast
101  *  on-chip memory and non-critical messages from another heap of slower
102  *  external memory
103  *
104  *  MessageQ does support the usage of messages that allocated via the
105  *  alloc function. Please refer to the staticMsgInit
106  *  function description for more details.
107  *
108  *  In a multiple processor system, MessageQ communications to other
109  *  processors via MessageQTransport instances. There must be one and
110  *  only one MessageQTransport instance for each processor where communication
111  *  is desired.
112  *  So on a four processor system, each processor must have three
113  *  MessageQTransport instance.
114  *
115  *  The user only needs to create the MessageQTransport instances. The instances
116  *  are responsible for registering themselves with MessageQ.
117  *  This is accomplished via the registerTransport function.
118  *
119  *  ============================================================================
120  */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Linux specific header files, replacing OSAL: */
127 #include <pthread.h>
129 /* Module level headers */
130 #include <ti/ipc/NameServer.h>
131 #include <ti/ipc/MultiProc.h>
132 #include <ti/syslink/inc/_MultiProc.h>
133 #define MessageQ_internal 1     /* must be defined before include file */
134 #include <ti/ipc/MessageQ.h>
135 #include <_MessageQ.h>
136 #include <_IpcLog.h>
137 #include <ti/syslink/inc/MessageQDrvDefs.h>
139 #include <sys/select.h>
140 #include <sys/time.h>
141 #include <sys/types.h>
142 #include <sys/param.h>
144 #include <errno.h>
145 #include <stdio.h>
146 #include <string.h>
147 #include <stdlib.h>
148 #include <unistd.h>
149 #include <assert.h>
150 #include <fcntl.h>
152 #include <ti/syslink/inc/usr/Qnx/MessageQDrv.h>
154 /* TI IPC utils: */
155 #include <ti/ipc/TiIpcFxns.h>
157 #include <ti/syslink/inc/ti/ipc/ti_ipc.h>
159 /* =============================================================================
160  * Macros/Constants
161  * =============================================================================
162  */
164 /*!
165  *  @brief  Name of the reserved NameServer used for MessageQ.
166  */
167 #define MessageQ_NAMESERVER  "MessageQ"
169 /* More magic rpmsg port numbers: */
170 #define MESSAGEQ_RPMSG_PORT       61
171 #define MESSAGEQ_RPMSG_MAXSIZE    512
172 #define RPMSG_RESERVED_ADDRESSES  (1024)
174 /* Trace flag settings: */
175 #define TRACESHIFT    12
176 #define TRACEMASK     0x1000
178 /* =============================================================================
179  * Structures & Enums
180  * =============================================================================
181  */
183 /* params structure evolution */
184 typedef struct {
185     Void *synchronizer;
186 } MessageQ_Params_Legacy;
188 typedef struct {
189     Int __version;
190     Void *synchronizer;
191     MessageQ_QueueIndex queueIndex;
192 } MessageQ_Params_Version2;
194 /* structure for MessageQ module state */
195 typedef struct MessageQ_ModuleObject {
196     Int                 refCount;
197     /*!< Reference count */
198     NameServer_Handle   nameServer;
199     /*!< Handle to the local NameServer used for storing GP objects */
200     pthread_mutex_t     gate;
201     /*!< Handle of gate to be used for local thread safety */
202     int                 ipcFd[MultiProc_MAXPROCESSORS];
203     /*!< File Descriptors for sending to each remote processor */
204     int                 seqNum;
205     /*!< Process-specific sequence number */
206     MessageQ_PutHookFxn putHookFxn;
207     /*!< hook function for MessageQ_put method */
208 } MessageQ_ModuleObject;
210 /*!
211  *  @brief  Structure for the Handle for the MessageQ.
212  */
213 typedef struct MessageQ_Object_tag {
214     MessageQ_Params         params;
215     /*! Instance specific creation parameters */
216     MessageQ_QueueId        queue;
217     /* Unique id */
218     int                     ipcFd;
219     /* File Descriptors to receive from a message queue. */
220     int                     unblocked;
221     /* Is the queue unblocked and how */
222     int                     unblockFdW;
223     /* Write this fd to unblock the select() call in MessageQ _get() */
224     int                     unblockFdR;
225      /* File Descriptor to block on to listen to unblockFdW. */
226     void                    *serverHandle;
227 } MessageQ_Object;
229 /* traces in this file are controlled via _MessageQ_verbose */
230 Bool _MessageQ_verbose = FALSE;
231 #define verbose _MessageQ_verbose
233 /* =============================================================================
234  *  Globals
235  * =============================================================================
236  */
237 static MessageQ_ModuleObject MessageQ_state =
239     .refCount   = 0,
240     .nameServer = NULL,
241     .putHookFxn = NULL
242 };
244 /*!
245  *  @var    MessageQ_module
246  *
247  *  @brief  Pointer to the MessageQ module state.
248  */
249 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
252 /* =============================================================================
253  * Forward declarations of internal functions
254  * =============================================================================
255  */
257 /* This is a helper function to initialize a message. */
258 static Int transportCreateEndpoint(int * fd, UInt16 queueIndex);
259 static Int transportCloseEndpoint(int fd);
260 static Int transportGet(int fd, MessageQ_Msg * retMsg);
261 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
263 /* =============================================================================
264  * APIS
265  * =============================================================================
266  */
267 /* Function to get default configuration for the MessageQ module.
268  *
269  */
270 Void MessageQ_getConfig (MessageQ_Config * cfg)
272     Int status;
273     MessageQDrv_CmdArgs cmdArgs;
275     assert (cfg != NULL);
277     cmdArgs.args.getConfig.config = cfg;
278     status = MessageQDrv_ioctl (CMD_MESSAGEQ_GETCONFIG, &cmdArgs);
280     if (status < 0) {
281         PRINTVERBOSE1("MessageQ_getConfig: API (through IOCTL) failed, \
282             status=%d\n", status)
283     }
285     return;
288 /* Function to setup the MessageQ module. */
289 Int MessageQ_setup (const MessageQ_Config * cfg)
291     Int status;
292     MessageQDrv_CmdArgs cmdArgs;
294     Int i;
296     cmdArgs.args.setup.config = (MessageQ_Config *) cfg;
297     status = MessageQDrv_ioctl(CMD_MESSAGEQ_SETUP, &cmdArgs);
298     if (status < 0) {
299         PRINTVERBOSE1("MessageQ_setup: API (through IOCTL) failed, \
300             status=%d\n", status)
301         return status;
302     }
304     MessageQ_module->nameServer = cmdArgs.args.setup.nameServerHandle;
305     MessageQ_module->seqNum = 0;
307     /* Create a default local gate. */
308     pthread_mutex_init (&(MessageQ_module->gate), NULL);
310     /* Clear ipcFd array. */
311     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
312        MessageQ_module->ipcFd[i]      = -1;
313     }
315     return status;
318 /*
319  * Function to destroy the MessageQ module.
320  */
321 Int MessageQ_destroy (void)
323     Int status;
324     MessageQDrv_CmdArgs    cmdArgs;
326     pthread_mutex_destroy(&(MessageQ_module->gate));
328     status = MessageQDrv_ioctl (CMD_MESSAGEQ_DESTROY, &cmdArgs);
329     if (status < 0) {
330         PRINTVERBOSE1("MessageQ_destroy: API (through IOCTL) failed, \
331             status=%d\n", status)
332     }
334     return status;
337 /*
338  *  ======== MessageQ_Params_init ========
339  *  Legacy implementation.
340  */
341 Void MessageQ_Params_init(MessageQ_Params *params)
343     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
346 /*
347  *  ======== MessageQ_Params_init__S ========
348  *  New implementation which is version aware.
349  */
350 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
352     MessageQ_Params_Version2 *params2;
354     switch (version) {
356         case MessageQ_Params_VERSION_2:
357             params2 = (MessageQ_Params_Version2 *)params;
358             params2->__version = MessageQ_Params_VERSION_2;
359             params2->synchronizer = NULL;
360             params2->queueIndex = MessageQ_ANY;
361             break;
363         default:
364             assert(FALSE);
365             break;
366     }
369 /*
370  *   Function to create a MessageQ object for receiving.
371  *
372  *   Create a file descriptor and bind the source address
373  *   (local ProcId/MessageQ ID) in
374  *   order to get messages dispatched to this messageQ.
375  */
376 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * pp)
378     Int                   status    = MessageQ_S_SUCCESS;
379     MessageQ_Object *     obj    = NULL;
380     UInt16                queuePort = 0u;
381     MessageQDrv_CmdArgs   cmdArgs;
382     int                   fildes[2];
383     MessageQ_Params       ps;
385     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
387     /* copy the given params into the current params structure */
388     if (pp != NULL) {
390         /* snoop the params pointer to see if it's a legacy structure */
391         if ((pp->__version == 0) || (pp->__version > 100)) {
392             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
393         }
395         /* not legacy structure, use params version field */
396         else if (pp->__version == MessageQ_Params_VERSION_2) {
397             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
398             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
399             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
400         }
401         else {
402             assert(FALSE);
403         }
404     }
406     cmdArgs.args.create.params = &ps;
407     cmdArgs.args.create.name = name;
409     if (name != NULL) {
410         cmdArgs.args.create.nameLen = (strlen (name) + 1);
411     }
412     else {
413         cmdArgs.args.create.nameLen = 0;
414     }
416     /* Create the generic obj */
417     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
418     if (obj == NULL) {
419         PRINTVERBOSE0("MessageQ_create: memory allocation failed\n")
420         return NULL;
421     }
423     status = MessageQDrv_ioctl (CMD_MESSAGEQ_CREATE, &cmdArgs);
424     if (status < 0) {
425         PRINTVERBOSE1("MessageQ_create: API (through IOCTL) failed, \
426             status=%d\n", status)
427         goto cleanup;
428     }
430     /* Populate the params member */
431     memcpy(&obj->params, &ps, sizeof(ps));
433     obj->queue = cmdArgs.args.create.queueId;
434     obj->serverHandle = cmdArgs.args.create.handle;
436     /* Get the queue port # (queueIndex + PORT_OFFSET) */
437     queuePort = obj->queue & 0x0000FFFF;
439     PRINTVERBOSE2("MessageQ_create: creating endpoint for: %s"
440         "queuePort %d\n", (name == NULL) ? "NULL" : name , queuePort)
441     status = transportCreateEndpoint(&obj->ipcFd, queuePort);
442     if (status < 0) {
443        goto cleanup;
444     }
446     /*
447      * Now, to support MessageQ_unblock() functionality, create an event object.
448      * Writing to this event will unblock the select() call in MessageQ_get().
449      */
450     if (pipe(fildes) == -1) {
451         printf ("MessageQ_create: pipe creation failed: %d, %s\n",
452                    errno, strerror(errno));
453         status = MessageQ_E_FAIL;
454         obj->unblockFdW = obj->unblockFdR = -1;
455     }
456     else {
457         obj->unblockFdW = fildes[1];
458         obj->unblockFdR = fildes[0];
459     }
461 cleanup:
462     /* Cleanup if fail: */
463     if (status < 0) {
464         MessageQ_delete((MessageQ_Handle *)&obj);
465     }
467     return ((MessageQ_Handle) obj);
470 /*
471  * Function to delete a MessageQ object for a specific slave processor.
472  *
473  * Deletes the file descriptors associated with this MessageQ object.
474  */
475 Int MessageQ_delete (MessageQ_Handle * handlePtr)
477     Int               status    = MessageQ_S_SUCCESS;
478     MessageQ_Object * obj       = NULL;
479     MessageQDrv_CmdArgs cmdArgs;
481     assert(handlePtr != NULL);
482     obj = (MessageQ_Object *) (*handlePtr);
483     assert(obj != NULL);
485     if (obj->serverHandle != NULL) {
486         cmdArgs.args.deleteMessageQ.handle = obj->serverHandle;
487         status = MessageQDrv_ioctl (CMD_MESSAGEQ_DELETE, &cmdArgs);
488         if (status < 0) {
489             PRINTVERBOSE1("MessageQ_delete: API (through IOCTL) failed, \
490                 status=%d\n", status)
491         }
492     }
494     /* Close the fds used for MessageQ_unblock(): */
495     if (obj->unblockFdW >= 0) {
496         close(obj->unblockFdW);
497     }
498     if (obj->unblockFdR >= 0) {
499         close(obj->unblockFdR);
500     }
502     /* Close the communication endpoint: */
503     if (obj->ipcFd >= 0) {
504         transportCloseEndpoint(obj->ipcFd);
505     }
507     /* Now free the obj */
508     free (obj);
509     *handlePtr = NULL;
511     return (status);
514 /*
515  *  Opens an instance of MessageQ for sending.
516  *
517  *  We need not create a tiipc file descriptor here; the file descriptors for
518  *  all remote processors were created during MessageQ_attach(), and will be
519  *  retrieved during MessageQ_put().
520  */
521 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
523     Int status = MessageQ_S_SUCCESS;
525     status = NameServer_getUInt32 (MessageQ_module->nameServer,
526                                      name, queueId, NULL);
528     if (status == NameServer_E_NOTFOUND) {
529         /* Set return queue ID to invalid. */
530         *queueId = MessageQ_INVALIDMESSAGEQ;
531         status = MessageQ_E_NOTFOUND;
532     }
533     else if (status >= 0) {
534         /* Override with a MessageQ status code. */
535         status = MessageQ_S_SUCCESS;
536     }
537     else {
538         /* Set return queue ID to invalid. */
539         *queueId = MessageQ_INVALIDMESSAGEQ;
540         /* Override with a MessageQ status code. */
541         if (status == NameServer_E_TIMEOUT) {
542             status = MessageQ_E_TIMEOUT;
543         }
544         else {
545             status = MessageQ_E_FAIL;
546         }
547     }
549     return (status);
552 /*
553  *  ======== MessageQ_openQueueId ========
554  */
555 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
557     MessageQ_QueueIndex queuePort;
558     MessageQ_QueueId queueId;
560     /* queue port is embedded in the queueId */
561     queuePort = queueIndex + MessageQ_PORTOFFSET;
562     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
564     return (queueId);
567 /* Closes previously opened instance of MessageQ module. */
568 Int MessageQ_close (MessageQ_QueueId * queueId)
570     Int32 status = MessageQ_S_SUCCESS;
572     /* Nothing more to be done for closing the MessageQ. */
573     *queueId = MessageQ_INVALIDMESSAGEQ;
575     return (status);
578 /*
579  * Place a message onto a message queue.
580  *
581  * Calls TransportShm_put(), which handles the sending of the message using the
582  * appropriate kernel interface (socket, device ioctl) call for the remote
583  * procId encoded in the queueId argument.
584  *
585  */
586 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
588     Int      status;
589     UInt16   dstProcId  = (UInt16)(queueId >> 16);
590     UInt16   queuePort = queueId & 0x0000ffff;
592     /* use the queue port # for destination address */
593     msg->dstId     = queuePort;
594     msg->dstProc   = dstProcId;
596     /* invoke put hook function after addressing the message */
597     if (MessageQ_module->putHookFxn != NULL) {
598         MessageQ_module->putHookFxn(queueId, msg);
599     }
601     status = transportPut(msg, queuePort, dstProcId);
603     return (status);
606 /*
607  * Gets a message for a message queue and blocks if the queue is empty.
608  * If a message is present, it returns it.  Otherwise it blocks
609  * waiting for a message to arrive.
610  * When a message is returned, it is owned by the caller.
611  *
612  * We block using select() on the receiving tiipc file descriptor, then
613  * get the waiting message via a read.
614  * We use the file descriptors stored in the messageQ object via a previous
615  * call to MessageQ_create().
616  *
617  * Note: We currently do not support messages to be sent between threads on the
618  * lcoal processor.
619  *
620  */
621 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
623     Int     status = MessageQ_S_SUCCESS;
624     Int     tmpStatus;
625     MessageQ_Object * obj = (MessageQ_Object *) handle;
626     int     retval;
627     int     nfds;
628     fd_set  rfds;
629     struct  timeval tv;
630     void    *timevalPtr;
631     int     maxfd = 0;
633     /* Wait (with timeout) and retreive message */
634     FD_ZERO(&rfds);
635     FD_SET(obj->ipcFd, &rfds);
636     maxfd = obj->ipcFd;
638     /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
639     FD_SET(obj->unblockFdR, &rfds);
641     if (timeout == MessageQ_FOREVER) {
642         timevalPtr = NULL;
643     }
644     else {
645         /* Timeout given in msec: convert:  */
646         tv.tv_sec = timeout / 1000;
647         tv.tv_usec = (timeout % 1000) * 1000;
648         timevalPtr = &tv;
649     }
650     /* Add one to last fd created: */
651     nfds = ((maxfd > obj->unblockFdR) ? maxfd : obj->unblockFdR) + 1;
653     *msg = NULL;
655     retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
656     if (retval > 0)  {
657         if (FD_ISSET(obj->unblockFdR, &rfds))  {
658             /*
659              * Our event was signalled by MessageQ_unblock().
660              *
661              * This is typically done during a shutdown sequence, where
662              * the intention of the client would be to ignore (i.e. not fetch)
663              * any pending messages in the transport's queue.
664              * Thus, we shall not check for nor return any messages.
665              */
666             return (obj->unblocked);
667         }
668         else {
669             if (FD_ISSET(obj->ipcFd, &rfds)) {
670                 /* Our transport's fd was signalled: Get the message: */
671                 tmpStatus = transportGet(obj->ipcFd, msg);
672                 if (tmpStatus < 0) {
673                     printf ("MessageQ_get: transportGet failed.\n");
674                     if (tmpStatus == MessageQ_E_SHUTDOWN) {
675                         status = tmpStatus;
676                         MessageQ_shutdown(handle);
677                     }
678                     else {
679                         status = MessageQ_E_FAIL;
680                     }
681                 }
682             }
683         }
684     }
685     else if (retval == 0) {
686         status = MessageQ_E_TIMEOUT;
687     }
688     else {
689         status = MessageQ_E_FAIL;
690     }
692     return (status);
695 /*
696  * Return a count of the number of messages in the queue
697  *
698  * TBD: To be implemented. Return -1 for now.
699  */
700 Int MessageQ_count (MessageQ_Handle handle)
702     Int               count = -1;
703     return (count);
706 /* Initializes a message not obtained from MessageQ_alloc. */
707 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
709     /* Fill in the fields of the message */
710     MessageQ_msgInit (msg);
711     msg->heapId  = MessageQ_STATICMSG;
712     msg->msgSize = size;
715 /*
716  * Allocate a message and initialize the needed fields (note some
717  * of the fields in the header are set via other APIs or in the
718  * MessageQ_put function,
719  */
720 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
722     MessageQ_Msg msg       = NULL;
724     /*
725      * heapId not used for local alloc (as this is over a copy transport), but
726      * we need to send to other side as heapId is used in BIOS transport:
727      */
728     msg = (MessageQ_Msg)calloc (1, size);
729     MessageQ_msgInit (msg);
730     msg->msgSize = size;
731     msg->heapId  = heapId;
733     return msg;
736 /* Frees the message back to the heap that was used to allocate it. */
737 Int MessageQ_free (MessageQ_Msg msg)
739     UInt32         status = MessageQ_S_SUCCESS;
741     /* Check to ensure this was not allocated by user: */
742     if (msg->heapId == MessageQ_STATICMSG)  {
743         status =  MessageQ_E_CANNOTFREESTATICMSG;
744     }
745     else {
746         free (msg);
747     }
749     return status;
752 /* Register a heap with MessageQ. */
753 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
755     Int  status = MessageQ_S_SUCCESS;
757     /* Do nothing, as this uses a copy transport: */
759     return status;
762 /* Unregister a heap with MessageQ. */
763 Int MessageQ_unregisterHeap (UInt16 heapId)
765     Int  status = MessageQ_S_SUCCESS;
767     /* Do nothing, as this uses a copy transport: */
769     return status;
772 /* Unblocks a MessageQ */
773 Void MessageQ_unblock (MessageQ_Handle handle)
775     MessageQ_Object * obj   = (MessageQ_Object *) handle;
776     char         buf = 'n';
778     obj->unblocked = MessageQ_E_UNBLOCKED;
780     /* Write to pipe to awaken any threads blocked on this messageQ: */
781     write(obj->unblockFdW, &buf, 1);
784 /* Unblocks a MessageQ that's been shutdown due to transport failure */
785 Void MessageQ_shutdown(MessageQ_Handle handle)
787     MessageQ_Object *obj = (MessageQ_Object *)handle;
788     char         buf = 'n';
790     obj->unblocked = MessageQ_E_SHUTDOWN;
792     /* Write to pipe to awaken any threads blocked on this messageQ: */
793     write(obj->unblockFdW, &buf, 1);
796 /* Embeds a source message queue into a message. */
797 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
799     MessageQ_Object * obj   = (MessageQ_Object *) handle;
801     msg->replyId   = (UInt16)(obj->queue);
802     msg->replyProc = (UInt16)(obj->queue >> 16);
805 /* Returns the QueueId associated with the handle. */
806 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
808     MessageQ_Object * obj = (MessageQ_Object *) handle;
809     UInt32            queueId;
811     queueId = (obj->queue);
813     return queueId;
816 /* Sets the tracing of a message */
817 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
819     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
822 /*
823  *  Returns the amount of shared memory used by one transport instance.
824  *
825  *  The MessageQ module itself does not use any shared memory but the
826  *  underlying transport may use some shared memory.
827  */
828 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
830     SizeT memReq = 0u;
832     /* Do nothing, as this is a copy transport. */
834     return (memReq);
837 /*
838  *  Opens a file descriptor for this remote proc.
839  *
840  *  Only opens it if one does not already exist for this procId.
841  *
842  *  Note: remoteProcId may be MultiProc_Self() for loopback case.
843  */
844 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
846     Int     status = MessageQ_S_SUCCESS;
847     int     ipcFd;
848     int     err;
850     PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
852     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
853         status = MessageQ_E_INVALIDPROCID;
854         goto exit;
855     }
857     pthread_mutex_lock (&(MessageQ_module->gate));
859     /* Only open a fd if one doesn't exist: */
860     if (MessageQ_module->ipcFd[remoteProcId] == -1)  {
861         /* Create a fd for sending messages to the remote proc: */
862         ipcFd = open("/dev/tiipc", O_RDWR);
863         if (ipcFd < 0) {
864             status = MessageQ_E_FAIL;
865             printf ("MessageQ_attach: open of tiipc device failed: %d, %s\n",
866                        errno, strerror(errno));
867         }
868         else  {
869             PRINTVERBOSE1("MessageQ_attach: opened tiipc fd for sending: %d\n",
870                 ipcFd)
871             MessageQ_module->ipcFd[remoteProcId] = ipcFd;
872             /*
873              * Connect to the remote endpoint and bind any reserved address as
874              * local endpoint
875              */
876             TiIpcFxns_connect(ipcFd, remoteProcId, MESSAGEQ_RPMSG_PORT);
877             /* Bind to any port # above 1024 (MessageQCopy_MAXRESERVEDEPT) */
878             err = TiIpcFxns_bindAddr(ipcFd, TIIPC_ADDRANY);
879             if (err < 0) {
880                 status = MessageQ_E_FAIL;
881                 printf ("MessageQ_attach: bind failed: %d, %s\n",
882                     errno, strerror(errno));
883             }
884         }
885     }
886     else {
887         status = MessageQ_E_ALREADYEXISTS;
888     }
890     pthread_mutex_unlock (&(MessageQ_module->gate));
892 exit:
893     return (status);
896 /*
897  *  Close the fd for this remote proc.
898  *
899  */
900 Int MessageQ_detach (UInt16 remoteProcId)
902     Int status = MessageQ_S_SUCCESS;
903     int ipcFd;
905     if (remoteProcId >= MultiProc_MAXPROCESSORS) {
906         status = MessageQ_E_INVALIDPROCID;
907         goto exit;
908     }
910     pthread_mutex_lock (&(MessageQ_module->gate));
912     ipcFd = MessageQ_module->ipcFd[remoteProcId];
913     if (close (ipcFd)) {
914         status = MessageQ_E_OSFAILURE;
915         printf("MessageQ_detach: close failed: %d, %s\n",
916                        errno, strerror(errno));
917     }
918     else {
919         PRINTVERBOSE1("MessageQ_detach: closed fd: %d\n", ipcFd)
920         MessageQ_module->ipcFd[remoteProcId] = -1;
921     }
923     pthread_mutex_unlock (&(MessageQ_module->gate));
925 exit:
926     return (status);
929 /*
930  * This is a helper function to initialize a message.
931  */
932 Void MessageQ_msgInit (MessageQ_Msg msg)
934     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
935     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
936     msg->msgId     = MessageQ_INVALIDMSGID;
937     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
938     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
939     msg->srcProc   = MultiProc_self();
941     pthread_mutex_lock(&(MessageQ_module->gate));
942     msg->seqNum  = MessageQ_module->seqNum++;
943     pthread_mutex_unlock(&(MessageQ_module->gate));
946 /*
947  * =============================================================================
948  * Transport: Fxns kept here until need for a transport layer is realized.
949  * =============================================================================
950  */
951 /*
952  * ======== transportCreateEndpoint ========
953  *
954  * Create a communication endpoint to receive messages.
955  */
956 static Int transportCreateEndpoint(int * fd, UInt16 queuePort)
958     Int          status    = MessageQ_S_SUCCESS;
959     int          err;
961     /* Create a fd to the ti-ipc to receive messages for this messageQ */
962     *fd= open("/dev/tiipc", O_RDWR);
963     if (*fd < 0) {
964         status = MessageQ_E_FAIL;
965         printf ("transportCreateEndpoint: Couldn't open tiipc device: %d, %s\n",
966                   errno, strerror(errno));
968         goto exit;
969     }
971     PRINTVERBOSE1("transportCreateEndpoint: opened fd: %d\n", *fd)
973     /* Bind to this port # in the transport */
974     err = TiIpcFxns_bindAddr(*fd, (UInt32)queuePort);
975     if (err < 0) {
976         status = MessageQ_E_FAIL;
977         printf("transportCreateEndpoint: bind failed: %d, %s\n",
978                   errno, strerror(errno));
980         close(*fd);
981         goto exit;
982     }
984 exit:
985     return (status);
988 /*
989  * ======== transportCloseEndpoint ========
990  *
991  *  Close the communication endpoint.
992  */
993 static Int transportCloseEndpoint(int fd)
995     Int status = MessageQ_S_SUCCESS;
997     PRINTVERBOSE1("transportCloseEndpoint: closing fd: %d\n", fd)
999     /* Stop communication to this endpoint */
1000     close(fd);
1002     return (status);
1005 /*
1006  * ======== transportGet ========
1007  *  Retrieve a message waiting in the queue.
1008 */
1009 static Int transportGet(int fd, MessageQ_Msg * retMsg)
1011     Int           status    = MessageQ_S_SUCCESS;
1012     MessageQ_Msg  msg;
1013     int           ret;
1014     int           byteCount;
1015     tiipc_remote_params remote;
1017     /*
1018      * We have no way of peeking to see what message size we'll get, so we
1019      * allocate a message of max size to receive contents from tiipc
1020      * (currently, a copy transport)
1021      */
1022     msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1023     if (!msg)  {
1024         status = MessageQ_E_MEMORY;
1025         goto exit;
1026     }
1028     /* Get message */
1029     byteCount = read(fd, msg, MESSAGEQ_RPMSG_MAXSIZE);
1030     if (byteCount < 0) {
1031         status = (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL);
1032         printf("read failed: %s (%d)\n", strerror(errno), errno);
1033         goto exit;
1034     }
1035     else {
1036          /* Update the allocated message size (even though this may waste space
1037           * when the actual message is smaller than the maximum rpmsg size,
1038           * the message will be freed soon anyway, and it avoids an extra copy).
1039           */
1040          msg->msgSize = byteCount;
1042          /*
1043           * If the message received was statically allocated, reset the
1044           * heapId, so the app can free it.
1045           */
1046          if (msg->heapId == MessageQ_STATICMSG)  {
1047              msg->heapId = 0;  /* for a copy transport, heap id is 0. */
1048          }
1049     }
1051     PRINTVERBOSE1("transportGet: read from fd: %d\n", fd)
1052     ret = ioctl(fd, TIIPC_IOCGETREMOTE, &remote);
1053     if (ret == -1) {
1054         printf("ioctl failed: %s (%d)\n", strerror(errno), errno);
1055         status = MessageQ_E_FAIL;
1056         goto exit;
1057     }
1058     PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg \
1059         proc: %d\n", byteCount, remote.remote_addr, remote.remote_proc)
1060     PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
1061         msg->msgSize)
1063     *retMsg = msg;
1065 exit:
1066     return (status);
1069 /*
1070  * ======== transportPut ========
1071  *
1072  * Write to tiipc file descriptor associated with
1073  * with this destination procID.
1074  */
1075 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1077     Int     status    = MessageQ_S_SUCCESS;
1078     int     ipcFd;
1079     int     err;
1081     /*
1082      * Retrieve the tiipc file descriptor associated with this
1083      * transport for the destination processor.
1084      */
1085     ipcFd = MessageQ_module->ipcFd[dstProcId];
1087     PRINTVERBOSE2("Sending msgId: %d via fd: %d\n", msg->msgId, ipcFd)
1089     /* send response message to remote processor */
1090     err = write(ipcFd, msg, msg->msgSize);
1091     if (err < 0) {
1092         status = (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL);
1093         printf ("transportPut: write failed: %d, %s\n",
1094                   errno, strerror(errno));
1095         goto exit;
1096     }
1098     /*
1099      * Free the message, as this is a copy transport, we maintain MessageQ
1100      * semantics.
1101      */
1102     MessageQ_free (msg);
1104 exit:
1105     return (status);