bea345111e264ba1cd629759d09da150e8612b07
1 /*
2 * Copyright (c) 2012-2015, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 *
42 * The MessageQ module supports the structured sending and receiving of
43 * variable length messages. This module can be used for homogeneous or
44 * heterogeneous multi-processor messaging.
45 *
46 * MessageQ provides more sophisticated messaging than other modules. It is
47 * typically used for complex situations such as multi-processor messaging.
48 *
49 * The following are key features of the MessageQ module:
50 * -Writers and readers can be relocated to another processor with no
51 * runtime code changes.
52 * -Timeouts are allowed when receiving messages.
53 * -Readers can determine the writer and reply back.
54 * -Receiving a message is deterministic when the timeout is zero.
55 * -Messages can reside on any message queue.
56 * -Supports zero-copy transfers.
57 * -Can send and receive from any type of thread.
58 * -Notification mechanism is specified by application.
59 * -Allows QoS (quality of service) on message buffer pools. For example,
60 * using specific buffer pools for specific message queues.
61 *
62 * Messages are sent and received via a message queue. A reader is a thread
63 * that gets (reads) messages from a message queue. A writer is a thread that
64 * puts (writes) a message to a message queue. Each message queue has one
65 * reader and can have many writers. A thread may read from or write to multiple
66 * message queues.
67 *
68 * Conceptually, the reader thread owns a message queue. The reader thread
69 * creates a message queue. Writer threads a created message queues to
70 * get access to them.
71 *
72 * Message queues are identified by a system-wide unique name. Internally,
73 * MessageQ uses the NameServermodule for managing
74 * these names. The names are used for opening a message queue. Using
75 * names is not required.
76 *
77 * Messages must be allocated from the MessageQ module. Once a message is
78 * allocated, it can be sent on any message queue. Once a message is sent, the
79 * writer loses ownership of the message and should not attempt to modify the
80 * message. Once the reader receives the message, it owns the message. It
81 * may either free the message or re-use the message.
82 *
83 * Messages in a message queue can be of variable length. The only
84 * requirement is that the first field in the definition of a message must be a
85 * MsgHeader structure. For example:
86 * typedef struct MyMsg {
87 * MessageQ_MsgHeader header;
88 * ...
89 * } MyMsg;
90 *
91 * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92 * should not modify or directly access the fields in the MessageQ_MsgHeader.
93 *
94 * All messages sent via the MessageQ module must be allocated from a
95 * Heap implementation. The heap can be used for
96 * other memory allocation not related to MessageQ.
97 *
98 * An application can use multiple heaps. The purpose of having multiple
99 * heaps is to allow an application to regulate its message usage. For
100 * example, an application can allocate critical messages from one heap of fast
101 * on-chip memory and non-critical messages from another heap of slower
102 * external memory
103 *
104 * MessageQ does support the usage of messages that allocated via the
105 * alloc function. Please refer to the staticMsgInit
106 * function description for more details.
107 *
108 * In a multiple processor system, MessageQ communications to other
109 * processors via MessageQTransport instances. There must be one and
110 * only one MessageQTransport instance for each processor where communication
111 * is desired.
112 * So on a four processor system, each processor must have three
113 * MessageQTransport instance.
114 *
115 * The user only needs to create the MessageQTransport instances. The instances
116 * are responsible for registering themselves with MessageQ.
117 * This is accomplished via the registerTransport function.
118 *
119 * ============================================================================
120 */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Linux specific header files, replacing OSAL: */
127 #include <pthread.h>
129 /* Module level headers */
130 #include <ti/ipc/NameServer.h>
131 #include <ti/ipc/MultiProc.h>
132 #include <ti/syslink/inc/_MultiProc.h>
133 #define MessageQ_internal 1 /* must be defined before include file */
134 #include <ti/ipc/MessageQ.h>
135 #include <_MessageQ.h>
136 #include <_IpcLog.h>
137 #include <ti/syslink/inc/MessageQDrvDefs.h>
139 #include <sys/select.h>
140 #include <sys/time.h>
141 #include <sys/types.h>
142 #include <sys/param.h>
144 #include <errno.h>
145 #include <stdio.h>
146 #include <string.h>
147 #include <stdlib.h>
148 #include <unistd.h>
149 #include <assert.h>
150 #include <fcntl.h>
152 #include <ti/syslink/inc/usr/Qnx/MessageQDrv.h>
154 /* TI IPC utils: */
155 #include <TiIpcFxns.h>
157 #include <ti/syslink/inc/ti/ipc/ti_ipc.h>
159 /* =============================================================================
160 * Macros/Constants
161 * =============================================================================
162 */
164 /*!
165 * @brief Name of the reserved NameServer used for MessageQ.
166 */
167 #define MessageQ_NAMESERVER "MessageQ"
169 /* More magic rpmsg port numbers: */
170 #define MESSAGEQ_RPMSG_PORT 61
171 #define MESSAGEQ_RPMSG_MAXSIZE 512
172 #define RPMSG_RESERVED_ADDRESSES (1024)
174 /* MessageQ needs local address bound to be a 16-bit value */
175 #define MAX_LOCAL_ADDR 0x10000
177 /* Trace flag settings: */
178 #define TRACESHIFT 12
179 #define TRACEMASK 0x1000
181 /* =============================================================================
182 * Structures & Enums
183 * =============================================================================
184 */
186 /* params structure evolution */
187 typedef struct {
188 Void *synchronizer;
189 } MessageQ_Params_Legacy;
191 typedef struct {
192 Int __version;
193 Void *synchronizer;
194 MessageQ_QueueIndex queueIndex;
195 } MessageQ_Params_Version2;
197 /* structure for MessageQ module state */
198 typedef struct MessageQ_ModuleObject {
199 Int refCount;
200 /*!< Reference count */
201 NameServer_Handle nameServer;
202 /*!< Handle to the local NameServer used for storing GP objects */
203 pthread_mutex_t gate;
204 /*!< Handle of gate to be used for local thread safety */
205 int ipcFd[MultiProc_MAXPROCESSORS];
206 /*!< File Descriptors for sending to each remote processor */
207 int seqNum;
208 /*!< Process-specific sequence number */
209 MessageQ_PutHookFxn putHookFxn;
210 /*!< hook function for MessageQ_put method */
211 } MessageQ_ModuleObject;
213 /*!
214 * @brief Structure for the Handle for the MessageQ.
215 */
216 typedef struct MessageQ_Object_tag {
217 MessageQ_Params params;
218 /*! Instance specific creation parameters */
219 MessageQ_QueueId queue;
220 /* Unique id */
221 int ipcFd;
222 /* File Descriptors to receive from a message queue. */
223 int unblockFdW;
224 /* Write this fd to unblock the select() call in MessageQ _get() */
225 int unblockFdR;
226 /* File Descriptor to block on to listen to unblockFdW. */
227 void *serverHandle;
228 } MessageQ_Object;
230 static Bool verbose = FALSE;
232 /* =============================================================================
233 * Globals
234 * =============================================================================
235 */
236 static MessageQ_ModuleObject MessageQ_state =
237 {
238 .refCount = 0,
239 .nameServer = NULL,
240 .putHookFxn = NULL
241 };
243 /*!
244 * @var MessageQ_module
245 *
246 * @brief Pointer to the MessageQ module state.
247 */
248 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
251 /* =============================================================================
252 * Forward declarations of internal functions
253 * =============================================================================
254 */
256 /* This is a helper function to initialize a message. */
257 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex);
258 static Int transportCloseEndpoint(int fd);
259 static Int transportGet(int fd, MessageQ_Msg * retMsg);
260 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
262 /* =============================================================================
263 * APIS
264 * =============================================================================
265 */
266 /* Function to get default configuration for the MessageQ module.
267 *
268 */
269 Void MessageQ_getConfig (MessageQ_Config * cfg)
270 {
271 Int status;
272 MessageQDrv_CmdArgs cmdArgs;
274 assert (cfg != NULL);
276 cmdArgs.args.getConfig.config = cfg;
277 status = MessageQDrv_ioctl (CMD_MESSAGEQ_GETCONFIG, &cmdArgs);
279 if (status < 0) {
280 PRINTVERBOSE1("MessageQ_getConfig: API (through IOCTL) failed, \
281 status=%d\n", status)
282 }
284 return;
285 }
287 /* Function to setup the MessageQ module. */
288 Int MessageQ_setup (const MessageQ_Config * cfg)
289 {
290 Int status;
291 MessageQDrv_CmdArgs cmdArgs;
293 Int i;
295 cmdArgs.args.setup.config = (MessageQ_Config *) cfg;
296 status = MessageQDrv_ioctl(CMD_MESSAGEQ_SETUP, &cmdArgs);
297 if (status < 0) {
298 PRINTVERBOSE1("MessageQ_setup: API (through IOCTL) failed, \
299 status=%d\n", status)
300 return status;
301 }
303 MessageQ_module->nameServer = cmdArgs.args.setup.nameServerHandle;
304 MessageQ_module->seqNum = 0;
306 /* Create a default local gate. */
307 pthread_mutex_init (&(MessageQ_module->gate), NULL);
309 /* Clear ipcFd array. */
310 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
311 MessageQ_module->ipcFd[i] = -1;
312 }
314 return status;
315 }
317 /*
318 * Function to destroy the MessageQ module.
319 */
320 Int MessageQ_destroy (void)
321 {
322 Int status;
323 MessageQDrv_CmdArgs cmdArgs;
325 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DESTROY, &cmdArgs);
326 if (status < 0) {
327 PRINTVERBOSE1("MessageQ_destroy: API (through IOCTL) failed, \
328 status=%d\n", status)
329 }
331 return status;
332 }
334 /*
335 * ======== MessageQ_Params_init ========
336 * Legacy implementation.
337 */
338 Void MessageQ_Params_init(MessageQ_Params *params)
339 {
340 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
341 }
343 /*
344 * ======== MessageQ_Params_init__S ========
345 * New implementation which is version aware.
346 */
347 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
348 {
349 MessageQ_Params_Version2 *params2;
351 switch (version) {
353 case MessageQ_Params_VERSION_2:
354 params2 = (MessageQ_Params_Version2 *)params;
355 params2->__version = MessageQ_Params_VERSION_2;
356 params2->synchronizer = NULL;
357 params2->queueIndex = MessageQ_ANY;
358 break;
360 default:
361 assert(FALSE);
362 break;
363 }
364 }
366 /*
367 * Function to create a MessageQ object for receiving.
368 *
369 * Create a file descriptor and bind the source address
370 * (local ProcId/MessageQ ID) in
371 * order to get messages dispatched to this messageQ.
372 */
373 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * pp)
374 {
375 Int status = MessageQ_S_SUCCESS;
376 MessageQ_Object * obj = NULL;
377 UInt16 queueIndex = 0u;
378 MessageQDrv_CmdArgs cmdArgs;
379 int fildes[2];
380 MessageQ_Params ps;
382 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
384 /* copy the given params into the current params structure */
385 if (pp != NULL) {
387 /* snoop the params pointer to see if it's a legacy structure */
388 if ((pp->__version == 0) || (pp->__version > 100)) {
389 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
390 }
392 /* not legacy structure, use params version field */
393 else if (pp->__version == MessageQ_Params_VERSION_2) {
394 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
395 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
396 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
397 }
398 else {
399 assert(FALSE);
400 }
401 }
403 cmdArgs.args.create.params = &ps;
404 cmdArgs.args.create.name = name;
406 if (name != NULL) {
407 cmdArgs.args.create.nameLen = (strlen (name) + 1);
408 }
409 else {
410 cmdArgs.args.create.nameLen = 0;
411 }
413 /* Create the generic obj */
414 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
415 if (obj == NULL) {
416 PRINTVERBOSE0("MessageQ_create: memory allocation failed\n")
417 return NULL;
418 }
420 PRINTVERBOSE2("MessageQ_create: creating endpoint for: %s, \
421 queueIndex: %d\n", name, queueIndex)
422 status = transportCreateEndpoint(&obj->ipcFd, &queueIndex);
423 if (status < 0) {
424 goto cleanup;
425 }
427 /*
428 * We expect the endpoint creation to return a port number from
429 * the MessageQCopy layer. This port number will be greater than
430 * 1024 and less than 0x10000. Use this number as the queueIndex.
431 */
432 cmdArgs.args.create.queueId = queueIndex;
434 status = MessageQDrv_ioctl (CMD_MESSAGEQ_CREATE, &cmdArgs);
435 if (status < 0) {
436 PRINTVERBOSE1("MessageQ_create: API (through IOCTL) failed, \
437 status=%d\n", status)
438 goto cleanup;
439 }
441 /* Populate the params member */
442 memcpy(&obj->params, &ps, sizeof(ps));
444 obj->queue = cmdArgs.args.create.queueId;
445 obj->serverHandle = cmdArgs.args.create.handle;
447 /*
448 * Now, to support MessageQ_unblock() functionality, create an event object.
449 * Writing to this event will unblock the select() call in MessageQ_get().
450 */
451 if (pipe(fildes) == -1) {
452 printf ("MessageQ_create: pipe creation failed: %d, %s\n",
453 errno, strerror(errno));
454 status = MessageQ_E_FAIL;
455 obj->unblockFdW = obj->unblockFdR = -1;
456 }
457 else {
458 obj->unblockFdW = fildes[1];
459 obj->unblockFdR = fildes[0];
460 }
462 cleanup:
463 /* Cleanup if fail: */
464 if (status < 0) {
465 MessageQ_delete((MessageQ_Handle *)&obj);
466 }
468 return ((MessageQ_Handle) obj);
469 }
471 /*
472 * Function to delete a MessageQ object for a specific slave processor.
473 *
474 * Deletes the file descriptors associated with this MessageQ object.
475 */
476 Int MessageQ_delete (MessageQ_Handle * handlePtr)
477 {
478 Int status = MessageQ_S_SUCCESS;
479 MessageQ_Object * obj = NULL;
480 MessageQDrv_CmdArgs cmdArgs;
482 assert(handlePtr != NULL);
483 obj = (MessageQ_Object *) (*handlePtr);
484 assert(obj != NULL);
486 if (obj->serverHandle != NULL) {
487 cmdArgs.args.deleteMessageQ.handle = obj->serverHandle;
488 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DELETE, &cmdArgs);
489 if (status < 0) {
490 PRINTVERBOSE1("MessageQ_delete: API (through IOCTL) failed, \
491 status=%d\n", status)
492 }
493 }
495 /* Close the fds used for MessageQ_unblock(): */
496 if (obj->unblockFdW >= 0) {
497 close(obj->unblockFdW);
498 }
499 if (obj->unblockFdR >= 0) {
500 close(obj->unblockFdR);
501 }
503 /* Close the communication endpoint: */
504 if (obj->ipcFd >= 0) {
505 transportCloseEndpoint(obj->ipcFd);
506 }
508 /* Now free the obj */
509 free (obj);
510 *handlePtr = NULL;
512 return (status);
513 }
515 /*
516 * Opens an instance of MessageQ for sending.
517 *
518 * We need not create a tiipc file descriptor here; the file descriptors for
519 * all remote processors were created during MessageQ_attach(), and will be
520 * retrieved during MessageQ_put().
521 */
522 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
523 {
524 Int status = MessageQ_S_SUCCESS;
526 status = NameServer_getUInt32 (MessageQ_module->nameServer,
527 name, queueId, NULL);
529 if (status == NameServer_E_NOTFOUND) {
530 /* Set return queue ID to invalid. */
531 *queueId = MessageQ_INVALIDMESSAGEQ;
532 status = MessageQ_E_NOTFOUND;
533 }
534 else if (status >= 0) {
535 /* Override with a MessageQ status code. */
536 status = MessageQ_S_SUCCESS;
537 }
538 else {
539 /* Set return queue ID to invalid. */
540 *queueId = MessageQ_INVALIDMESSAGEQ;
541 /* Override with a MessageQ status code. */
542 if (status == NameServer_E_TIMEOUT) {
543 status = MessageQ_E_TIMEOUT;
544 }
545 else {
546 status = MessageQ_E_FAIL;
547 }
548 }
550 return (status);
551 }
553 /* Closes previously opened instance of MessageQ module. */
554 Int MessageQ_close (MessageQ_QueueId * queueId)
555 {
556 Int32 status = MessageQ_S_SUCCESS;
558 /* Nothing more to be done for closing the MessageQ. */
559 *queueId = MessageQ_INVALIDMESSAGEQ;
561 return (status);
562 }
564 /*
565 * Place a message onto a message queue.
566 *
567 * Calls TransportShm_put(), which handles the sending of the message using the
568 * appropriate kernel interface (socket, device ioctl) call for the remote
569 * procId encoded in the queueId argument.
570 *
571 */
572 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
573 {
574 Int status;
575 UInt16 dstProcId = (UInt16)(queueId >> 16);
576 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
578 msg->dstId = queueIndex;
579 msg->dstProc = dstProcId;
581 /* invoke put hook function after addressing the message */
582 if (MessageQ_module->putHookFxn != NULL) {
583 MessageQ_module->putHookFxn(queueId, msg);
584 }
586 status = transportPut(msg, queueIndex, dstProcId);
588 return (status);
589 }
591 /*
592 * Gets a message for a message queue and blocks if the queue is empty.
593 * If a message is present, it returns it. Otherwise it blocks
594 * waiting for a message to arrive.
595 * When a message is returned, it is owned by the caller.
596 *
597 * We block using select() on the receiving tiipc file descriptor, then
598 * get the waiting message via a read.
599 * We use the file descriptors stored in the messageQ object via a previous
600 * call to MessageQ_create().
601 *
602 * Note: We currently do not support messages to be sent between threads on the
603 * lcoal processor.
604 *
605 */
606 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
607 {
608 Int status = MessageQ_S_SUCCESS;
609 Int tmpStatus;
610 MessageQ_Object * obj = (MessageQ_Object *) handle;
611 int retval;
612 int nfds;
613 fd_set rfds;
614 struct timeval tv;
615 void *timevalPtr;
616 int maxfd = 0;
618 /* Wait (with timeout) and retreive message */
619 FD_ZERO(&rfds);
620 FD_SET(obj->ipcFd, &rfds);
621 maxfd = obj->ipcFd;
623 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
624 FD_SET(obj->unblockFdR, &rfds);
626 if (timeout == MessageQ_FOREVER) {
627 timevalPtr = NULL;
628 }
629 else {
630 /* Timeout given in msec: convert: */
631 tv.tv_sec = timeout / 1000;
632 tv.tv_usec = (timeout % 1000) * 1000;
633 timevalPtr = &tv;
634 }
635 /* Add one to last fd created: */
636 nfds = ((maxfd > obj->unblockFdR) ? maxfd : obj->unblockFdR) + 1;
638 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
639 if (retval) {
640 if (FD_ISSET(obj->unblockFdR, &rfds)) {
641 /*
642 * Our event was signalled by MessageQ_unblock().
643 *
644 * This is typically done during a shutdown sequence, where
645 * the intention of the client would be to ignore (i.e. not fetch)
646 * any pending messages in the transport's queue.
647 * Thus, we shall not check for nor return any messages.
648 */
649 *msg = NULL;
650 status = MessageQ_E_UNBLOCKED;
651 }
652 else {
653 if (FD_ISSET(obj->ipcFd, &rfds)) {
654 /* Our transport's fd was signalled: Get the message: */
655 tmpStatus = transportGet(obj->ipcFd, msg);
656 if (tmpStatus < 0) {
657 printf ("MessageQ_get: tranposrtshm_get failed.");
658 status = MessageQ_E_FAIL;
659 }
660 }
661 }
662 }
663 else if (retval == 0) {
664 *msg = NULL;
665 status = MessageQ_E_TIMEOUT;
666 }
668 return (status);
669 }
671 /*
672 * Return a count of the number of messages in the queue
673 *
674 * TBD: To be implemented. Return -1 for now.
675 */
676 Int MessageQ_count (MessageQ_Handle handle)
677 {
678 Int count = -1;
679 return (count);
680 }
682 /* Initializes a message not obtained from MessageQ_alloc. */
683 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
684 {
685 /* Fill in the fields of the message */
686 MessageQ_msgInit (msg);
687 msg->heapId = MessageQ_STATICMSG;
688 msg->msgSize = size;
689 }
691 /*
692 * Allocate a message and initialize the needed fields (note some
693 * of the fields in the header are set via other APIs or in the
694 * MessageQ_put function,
695 */
696 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
697 {
698 MessageQ_Msg msg = NULL;
700 /*
701 * heapId not used for local alloc (as this is over a copy transport), but
702 * we need to send to other side as heapId is used in BIOS transport:
703 */
704 msg = (MessageQ_Msg)calloc (1, size);
705 MessageQ_msgInit (msg);
706 msg->msgSize = size;
707 msg->heapId = heapId;
709 return msg;
710 }
712 /* Frees the message back to the heap that was used to allocate it. */
713 Int MessageQ_free (MessageQ_Msg msg)
714 {
715 UInt32 status = MessageQ_S_SUCCESS;
717 /* Check to ensure this was not allocated by user: */
718 if (msg->heapId == MessageQ_STATICMSG) {
719 status = MessageQ_E_CANNOTFREESTATICMSG;
720 }
721 else {
722 free (msg);
723 }
725 return status;
726 }
728 /* Register a heap with MessageQ. */
729 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
730 {
731 Int status = MessageQ_S_SUCCESS;
733 /* Do nothing, as this uses a copy transport: */
735 return status;
736 }
738 /* Unregister a heap with MessageQ. */
739 Int MessageQ_unregisterHeap (UInt16 heapId)
740 {
741 Int status = MessageQ_S_SUCCESS;
743 /* Do nothing, as this uses a copy transport: */
745 return status;
746 }
748 /* Unblocks a MessageQ */
749 Void MessageQ_unblock (MessageQ_Handle handle)
750 {
751 MessageQ_Object * obj = (MessageQ_Object *) handle;
752 char buf = 'n';
754 /* Write to pipe to awaken any threads blocked on this messageQ: */
755 write(obj->unblockFdW, &buf, 1);
756 }
758 /* Embeds a source message queue into a message. */
759 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
760 {
761 MessageQ_Object * obj = (MessageQ_Object *) handle;
763 msg->replyId = (UInt16)(obj->queue);
764 msg->replyProc = (UInt16)(obj->queue >> 16);
765 }
767 /* Returns the QueueId associated with the handle. */
768 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
769 {
770 MessageQ_Object * obj = (MessageQ_Object *) handle;
771 UInt32 queueId;
773 queueId = (obj->queue);
775 return queueId;
776 }
778 /* Sets the tracing of a message */
779 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
780 {
781 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
782 }
784 /*
785 * Returns the amount of shared memory used by one transport instance.
786 *
787 * The MessageQ module itself does not use any shared memory but the
788 * underlying transport may use some shared memory.
789 */
790 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
791 {
792 SizeT memReq = 0u;
794 /* Do nothing, as this is a copy transport. */
796 return (memReq);
797 }
799 /*
800 * Opens a file descriptor for this remote proc.
801 *
802 * Only opens it if one does not already exist for this procId.
803 *
804 * Note: remoteProcId may be MultiProc_Self() for loopback case.
805 */
806 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
807 {
808 Int status = MessageQ_S_SUCCESS;
809 UInt32 localAddr;
810 int ipcFd;
811 int err;
813 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
815 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
816 status = MessageQ_E_INVALIDPROCID;
817 goto exit;
818 }
820 pthread_mutex_lock (&(MessageQ_module->gate));
822 /* Only open a fd if one doesn't exist: */
823 if (MessageQ_module->ipcFd[remoteProcId] == -1) {
824 /* Create a fd for sending messages to the remote proc: */
825 ipcFd = open("/dev/tiipc", O_RDWR);
826 if (ipcFd < 0) {
827 status = MessageQ_E_FAIL;
828 printf ("MessageQ_attach: open of tiipc device failed: %d, %s\n",
829 errno, strerror(errno));
830 }
831 else {
832 PRINTVERBOSE1("MessageQ_attach: opened tiipc fd for sending: %d\n",
833 ipcFd)
834 MessageQ_module->ipcFd[remoteProcId] = ipcFd;
835 /*
836 * Connect to the remote endpoint and bind any reserved address as
837 * local endpoint
838 */
839 Connect(ipcFd, remoteProcId, MESSAGEQ_RPMSG_PORT);
840 err = BindAddr(ipcFd, &localAddr);
841 if (err < 0) {
842 status = MessageQ_E_FAIL;
843 printf ("MessageQ_attach: bind failed: %d, %s\n",
844 errno, strerror(errno));
845 }
846 }
847 }
848 else {
849 status = MessageQ_E_ALREADYEXISTS;
850 }
852 pthread_mutex_unlock (&(MessageQ_module->gate));
854 exit:
855 return (status);
856 }
858 /*
859 * Close the fd for this remote proc.
860 *
861 */
862 Int MessageQ_detach (UInt16 remoteProcId)
863 {
864 Int status = MessageQ_S_SUCCESS;
865 int ipcFd;
867 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
868 status = MessageQ_E_INVALIDPROCID;
869 goto exit;
870 }
872 pthread_mutex_lock (&(MessageQ_module->gate));
874 ipcFd = MessageQ_module->ipcFd[remoteProcId];
875 if (close (ipcFd)) {
876 status = MessageQ_E_OSFAILURE;
877 printf("MessageQ_detach: close failed: %d, %s\n",
878 errno, strerror(errno));
879 }
880 else {
881 PRINTVERBOSE1("MessageQ_detach: closed fd: %d\n", ipcFd)
882 MessageQ_module->ipcFd[remoteProcId] = -1;
883 }
885 pthread_mutex_unlock (&(MessageQ_module->gate));
887 exit:
888 return (status);
889 }
891 /*
892 * This is a helper function to initialize a message.
893 */
894 Void MessageQ_msgInit (MessageQ_Msg msg)
895 {
896 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
897 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
898 msg->msgId = MessageQ_INVALIDMSGID;
899 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
900 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
901 msg->srcProc = MultiProc_self();
903 pthread_mutex_lock(&(MessageQ_module->gate));
904 msg->seqNum = MessageQ_module->seqNum++;
905 pthread_mutex_unlock(&(MessageQ_module->gate));
906 }
908 /*
909 * =============================================================================
910 * Transport: Fxns kept here until need for a transport layer is realized.
911 * =============================================================================
912 */
913 /*
914 * ======== transportCreateEndpoint ========
915 *
916 * Create a communication endpoint to receive messages.
917 */
918 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex)
919 {
920 Int status = MessageQ_S_SUCCESS;
921 int err;
922 UInt32 localAddr;
924 /* Create a fd to the ti-ipc to receive messages for this messageQ */
925 *fd= open("/dev/tiipc", O_RDWR);
926 if (*fd < 0) {
927 status = MessageQ_E_FAIL;
928 printf ("transportCreateEndpoint: Couldn't open tiipc device: %d, %s\n",
929 errno, strerror(errno));
931 goto exit;
932 }
934 PRINTVERBOSE1("transportCreateEndpoint: opened fd: %d\n", *fd)
936 err = BindAddr(*fd, &localAddr);
937 if (err < 0) {
938 status = MessageQ_E_FAIL;
939 printf("transportCreateEndpoint: bind failed: %d, %s\n",
940 errno, strerror(errno));
942 close(*fd);
943 goto exit;
944 }
946 if (localAddr >= MAX_LOCAL_ADDR) {
947 status = MessageQ_E_FAIL;
948 printf("transportCreateEndpoint: local address returned is"
949 "by BindAddr is greater than max supported\n");
951 close(*fd);
952 goto exit;
953 }
955 *queueIndex = localAddr;
957 exit:
958 return (status);
959 }
961 /*
962 * ======== transportCloseEndpoint ========
963 *
964 * Close the communication endpoint.
965 */
966 static Int transportCloseEndpoint(int fd)
967 {
968 Int status = MessageQ_S_SUCCESS;
970 PRINTVERBOSE1("transportCloseEndpoint: closing fd: %d\n", fd)
972 /* Stop communication to this endpoint */
973 close(fd);
975 return (status);
976 }
978 /*
979 * ======== transportGet ========
980 * Retrieve a message waiting in the queue.
981 */
982 static Int transportGet(int fd, MessageQ_Msg * retMsg)
983 {
984 Int status = MessageQ_S_SUCCESS;
985 MessageQ_Msg msg;
986 int ret;
987 int byteCount;
988 tiipc_remote_params remote;
990 /*
991 * We have no way of peeking to see what message size we'll get, so we
992 * allocate a message of max size to receive contents from tiipc
993 * (currently, a copy transport)
994 */
995 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
996 if (!msg) {
997 status = MessageQ_E_MEMORY;
998 goto exit;
999 }
1001 /* Get message */
1002 byteCount = read(fd, msg, MESSAGEQ_RPMSG_MAXSIZE);
1003 if (byteCount < 0) {
1004 printf("read failed: %s (%d)\n", strerror(errno), errno);
1005 status = MessageQ_E_FAIL;
1006 goto exit;
1007 }
1008 else {
1009 /* Update the allocated message size (even though this may waste space
1010 * when the actual message is smaller than the maximum rpmsg size,
1011 * the message will be freed soon anyway, and it avoids an extra copy).
1012 */
1013 msg->msgSize = byteCount;
1015 /*
1016 * If the message received was statically allocated, reset the
1017 * heapId, so the app can free it.
1018 */
1019 if (msg->heapId == MessageQ_STATICMSG) {
1020 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1021 }
1022 }
1024 PRINTVERBOSE1("transportGet: read from fd: %d\n", fd)
1025 ret = ioctl(fd, TIIPC_IOCGETREMOTE, &remote);
1026 if (ret == -1) {
1027 printf("ioctl failed: %s (%d)\n", strerror(errno), errno);
1028 status = MessageQ_E_FAIL;
1029 goto exit;
1030 }
1031 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg \
1032 proc: %d\n", byteCount, remote.remote_addr, remote.remote_proc)
1033 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
1034 msg->msgSize)
1036 *retMsg = msg;
1038 exit:
1039 return (status);
1040 }
1042 /*
1043 * ======== transportPut ========
1044 *
1045 * Write to tiipc file descriptor associated with
1046 * with this destination procID.
1047 */
1048 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1049 {
1050 Int status = MessageQ_S_SUCCESS;
1051 int ipcFd;
1052 int err;
1054 /*
1055 * Retrieve the tiipc file descriptor associated with this
1056 * transport for the destination processor.
1057 */
1058 ipcFd = MessageQ_module->ipcFd[dstProcId];
1060 PRINTVERBOSE2("Sending msgId: %d via fd: %d\n", msg->msgId, ipcFd)
1062 /* send response message to remote processor */
1063 err = write(ipcFd, msg, msg->msgSize);
1064 if (err < 0) {
1065 printf ("transportPut: write failed: %d, %s\n",
1066 errno, strerror(errno));
1067 status = MessageQ_E_FAIL;
1068 goto exit;
1069 }
1071 /*
1072 * Free the message, as this is a copy transport, we maintain MessageQ
1073 * semantics.
1074 */
1075 MessageQ_free (msg);
1077 exit:
1078 return (status);
1079 }