1 /*
2 * Copyright (c) 2012-2015, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 *
42 * The MessageQ module supports the structured sending and receiving of
43 * variable length messages. This module can be used for homogeneous or
44 * heterogeneous multi-processor messaging.
45 *
46 * MessageQ provides more sophisticated messaging than other modules. It is
47 * typically used for complex situations such as multi-processor messaging.
48 *
49 * The following are key features of the MessageQ module:
50 * -Writers and readers can be relocated to another processor with no
51 * runtime code changes.
52 * -Timeouts are allowed when receiving messages.
53 * -Readers can determine the writer and reply back.
54 * -Receiving a message is deterministic when the timeout is zero.
55 * -Messages can reside on any message queue.
56 * -Supports zero-copy transfers.
57 * -Can send and receive from any type of thread.
58 * -Notification mechanism is specified by application.
59 * -Allows QoS (quality of service) on message buffer pools. For example,
60 * using specific buffer pools for specific message queues.
61 *
62 * Messages are sent and received via a message queue. A reader is a thread
63 * that gets (reads) messages from a message queue. A writer is a thread that
64 * puts (writes) a message to a message queue. Each message queue has one
65 * reader and can have many writers. A thread may read from or write to multiple
66 * message queues.
67 *
68 * Conceptually, the reader thread owns a message queue. The reader thread
69 * creates a message queue. Writer threads a created message queues to
70 * get access to them.
71 *
72 * Message queues are identified by a system-wide unique name. Internally,
73 * MessageQ uses the NameServermodule for managing
74 * these names. The names are used for opening a message queue. Using
75 * names is not required.
76 *
77 * Messages must be allocated from the MessageQ module. Once a message is
78 * allocated, it can be sent on any message queue. Once a message is sent, the
79 * writer loses ownership of the message and should not attempt to modify the
80 * message. Once the reader receives the message, it owns the message. It
81 * may either free the message or re-use the message.
82 *
83 * Messages in a message queue can be of variable length. The only
84 * requirement is that the first field in the definition of a message must be a
85 * MsgHeader structure. For example:
86 * typedef struct MyMsg {
87 * MessageQ_MsgHeader header;
88 * ...
89 * } MyMsg;
90 *
91 * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92 * should not modify or directly access the fields in the MessageQ_MsgHeader.
93 *
94 * All messages sent via the MessageQ module must be allocated from a
95 * Heap implementation. The heap can be used for
96 * other memory allocation not related to MessageQ.
97 *
98 * An application can use multiple heaps. The purpose of having multiple
99 * heaps is to allow an application to regulate its message usage. For
100 * example, an application can allocate critical messages from one heap of fast
101 * on-chip memory and non-critical messages from another heap of slower
102 * external memory
103 *
104 * MessageQ does support the usage of messages that allocated via the
105 * alloc function. Please refer to the staticMsgInit
106 * function description for more details.
107 *
108 * In a multiple processor system, MessageQ communications to other
109 * processors via MessageQTransport instances. There must be one and
110 * only one MessageQTransport instance for each processor where communication
111 * is desired.
112 * So on a four processor system, each processor must have three
113 * MessageQTransport instance.
114 *
115 * The user only needs to create the MessageQTransport instances. The instances
116 * are responsible for registering themselves with MessageQ.
117 * This is accomplished via the registerTransport function.
118 *
119 * ============================================================================
120 */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Linux specific header files, replacing OSAL: */
127 #include <pthread.h>
129 /* Module level headers */
130 #include <ti/ipc/NameServer.h>
131 #include <ti/ipc/MultiProc.h>
132 #include <ti/syslink/inc/_MultiProc.h>
133 #define MessageQ_internal 1 /* must be defined before include file */
134 #include <ti/ipc/MessageQ.h>
135 #include <_MessageQ.h>
136 #include <_IpcLog.h>
137 #include <ti/syslink/inc/MessageQDrvDefs.h>
139 #include <sys/select.h>
140 #include <sys/time.h>
141 #include <sys/types.h>
142 #include <sys/param.h>
144 #include <errno.h>
145 #include <stdio.h>
146 #include <string.h>
147 #include <stdlib.h>
148 #include <unistd.h>
149 #include <assert.h>
150 #include <fcntl.h>
152 #include <ti/syslink/inc/usr/Qnx/MessageQDrv.h>
154 /* TI IPC utils: */
155 #include <ti/ipc/TiIpcFxns.h>
157 #include <ti/syslink/inc/ti/ipc/ti_ipc.h>
159 /* =============================================================================
160 * Macros/Constants
161 * =============================================================================
162 */
164 /*!
165 * @brief Name of the reserved NameServer used for MessageQ.
166 */
167 #define MessageQ_NAMESERVER "MessageQ"
169 /* More magic rpmsg port numbers: */
170 #define MESSAGEQ_RPMSG_PORT 61
171 #define MESSAGEQ_RPMSG_MAXSIZE 512
172 #define RPMSG_RESERVED_ADDRESSES (1024)
174 /* Trace flag settings: */
175 #define TRACESHIFT 12
176 #define TRACEMASK 0x1000
178 /* =============================================================================
179 * Structures & Enums
180 * =============================================================================
181 */
183 /* params structure evolution */
184 typedef struct {
185 Void *synchronizer;
186 } MessageQ_Params_Legacy;
188 typedef struct {
189 Int __version;
190 Void *synchronizer;
191 MessageQ_QueueIndex queueIndex;
192 } MessageQ_Params_Version2;
194 /* structure for MessageQ module state */
195 typedef struct MessageQ_ModuleObject {
196 Int refCount;
197 /*!< Reference count */
198 NameServer_Handle nameServer;
199 /*!< Handle to the local NameServer used for storing GP objects */
200 pthread_mutex_t gate;
201 /*!< Handle of gate to be used for local thread safety */
202 int ipcFd[MultiProc_MAXPROCESSORS];
203 /*!< File Descriptors for sending to each remote processor */
204 int seqNum;
205 /*!< Process-specific sequence number */
206 MessageQ_PutHookFxn putHookFxn;
207 /*!< hook function for MessageQ_put method */
208 } MessageQ_ModuleObject;
210 /*!
211 * @brief Structure for the Handle for the MessageQ.
212 */
213 typedef struct MessageQ_Object_tag {
214 MessageQ_Params params;
215 /*! Instance specific creation parameters */
216 MessageQ_QueueId queue;
217 /* Unique id */
218 int ipcFd;
219 /* File Descriptors to receive from a message queue. */
220 int unblocked;
221 /* Is the queue unblocked and how */
222 int unblockFdW;
223 /* Write this fd to unblock the select() call in MessageQ _get() */
224 int unblockFdR;
225 /* File Descriptor to block on to listen to unblockFdW. */
226 void *serverHandle;
227 } MessageQ_Object;
229 /* traces in this file are controlled via _MessageQ_verbose */
230 Bool _MessageQ_verbose = FALSE;
231 #define verbose _MessageQ_verbose
233 /* =============================================================================
234 * Globals
235 * =============================================================================
236 */
237 static MessageQ_ModuleObject MessageQ_state =
238 {
239 .refCount = 0,
240 .nameServer = NULL,
241 .putHookFxn = NULL
242 };
244 /*!
245 * @var MessageQ_module
246 *
247 * @brief Pointer to the MessageQ module state.
248 */
249 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
252 /* =============================================================================
253 * Forward declarations of internal functions
254 * =============================================================================
255 */
257 /* This is a helper function to initialize a message. */
258 static Int transportCreateEndpoint(int * fd, UInt16 queueIndex);
259 static Int transportCloseEndpoint(int fd);
260 static Int transportGet(int fd, MessageQ_Msg * retMsg);
261 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
263 /* =============================================================================
264 * APIS
265 * =============================================================================
266 */
267 /* Function to get default configuration for the MessageQ module.
268 *
269 */
270 Void MessageQ_getConfig (MessageQ_Config * cfg)
271 {
272 Int status;
273 MessageQDrv_CmdArgs cmdArgs;
275 assert (cfg != NULL);
277 cmdArgs.args.getConfig.config = cfg;
278 status = MessageQDrv_ioctl (CMD_MESSAGEQ_GETCONFIG, &cmdArgs);
280 if (status < 0) {
281 PRINTVERBOSE1("MessageQ_getConfig: API (through IOCTL) failed, \
282 status=%d\n", status)
283 }
285 return;
286 }
288 /* Function to setup the MessageQ module. */
289 Int MessageQ_setup (const MessageQ_Config * cfg)
290 {
291 Int status;
292 MessageQDrv_CmdArgs cmdArgs;
294 Int i;
296 cmdArgs.args.setup.config = (MessageQ_Config *) cfg;
297 status = MessageQDrv_ioctl(CMD_MESSAGEQ_SETUP, &cmdArgs);
298 if (status < 0) {
299 PRINTVERBOSE1("MessageQ_setup: API (through IOCTL) failed, \
300 status=%d\n", status)
301 return status;
302 }
304 MessageQ_module->nameServer = cmdArgs.args.setup.nameServerHandle;
305 MessageQ_module->seqNum = 0;
307 /* Create a default local gate. */
308 pthread_mutex_init (&(MessageQ_module->gate), NULL);
310 /* Clear ipcFd array. */
311 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
312 MessageQ_module->ipcFd[i] = -1;
313 }
315 return status;
316 }
318 /*
319 * Function to destroy the MessageQ module.
320 */
321 Int MessageQ_destroy (void)
322 {
323 Int status;
324 MessageQDrv_CmdArgs cmdArgs;
326 pthread_mutex_destroy(&(MessageQ_module->gate));
328 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DESTROY, &cmdArgs);
329 if (status < 0) {
330 PRINTVERBOSE1("MessageQ_destroy: API (through IOCTL) failed, \
331 status=%d\n", status)
332 }
334 return status;
335 }
337 /*
338 * ======== MessageQ_Params_init ========
339 * Legacy implementation.
340 */
341 Void MessageQ_Params_init(MessageQ_Params *params)
342 {
343 ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
344 }
346 /*
347 * ======== MessageQ_Params_init__S ========
348 * New implementation which is version aware.
349 */
350 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
351 {
352 MessageQ_Params_Version2 *params2;
354 switch (version) {
356 case MessageQ_Params_VERSION_2:
357 params2 = (MessageQ_Params_Version2 *)params;
358 params2->__version = MessageQ_Params_VERSION_2;
359 params2->synchronizer = NULL;
360 params2->queueIndex = MessageQ_ANY;
361 break;
363 default:
364 assert(FALSE);
365 break;
366 }
367 }
369 /*
370 * Function to create a MessageQ object for receiving.
371 *
372 * Create a file descriptor and bind the source address
373 * (local ProcId/MessageQ ID) in
374 * order to get messages dispatched to this messageQ.
375 */
376 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * pp)
377 {
378 Int status = MessageQ_S_SUCCESS;
379 MessageQ_Object * obj = NULL;
380 UInt16 queuePort = 0u;
381 MessageQDrv_CmdArgs cmdArgs;
382 int fildes[2];
383 MessageQ_Params ps;
385 MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
387 /* copy the given params into the current params structure */
388 if (pp != NULL) {
390 /* snoop the params pointer to see if it's a legacy structure */
391 if ((pp->__version == 0) || (pp->__version > 100)) {
392 ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
393 }
395 /* not legacy structure, use params version field */
396 else if (pp->__version == MessageQ_Params_VERSION_2) {
397 ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
398 ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
399 ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
400 }
401 else {
402 assert(FALSE);
403 }
404 }
406 cmdArgs.args.create.params = &ps;
407 cmdArgs.args.create.name = name;
409 if (name != NULL) {
410 cmdArgs.args.create.nameLen = (strlen (name) + 1);
411 }
412 else {
413 cmdArgs.args.create.nameLen = 0;
414 }
416 /* Create the generic obj */
417 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
418 if (obj == NULL) {
419 PRINTVERBOSE0("MessageQ_create: memory allocation failed\n")
420 return NULL;
421 }
423 status = MessageQDrv_ioctl (CMD_MESSAGEQ_CREATE, &cmdArgs);
424 if (status < 0) {
425 PRINTVERBOSE1("MessageQ_create: API (through IOCTL) failed, \
426 status=%d\n", status)
427 goto cleanup;
428 }
430 /* Populate the params member */
431 memcpy(&obj->params, &ps, sizeof(ps));
433 obj->queue = cmdArgs.args.create.queueId;
434 obj->serverHandle = cmdArgs.args.create.handle;
436 /* Get the queue port # (queueIndex + PORT_OFFSET) */
437 queuePort = obj->queue & 0x0000FFFF;
439 PRINTVERBOSE2("MessageQ_create: creating endpoint for: %s"
440 "queuePort %d\n", (name == NULL) ? "NULL" : name , queuePort)
441 status = transportCreateEndpoint(&obj->ipcFd, queuePort);
442 if (status < 0) {
443 goto cleanup;
444 }
446 /*
447 * Now, to support MessageQ_unblock() functionality, create an event object.
448 * Writing to this event will unblock the select() call in MessageQ_get().
449 */
450 if (pipe(fildes) == -1) {
451 printf ("MessageQ_create: pipe creation failed: %d, %s\n",
452 errno, strerror(errno));
453 status = MessageQ_E_FAIL;
454 obj->unblockFdW = obj->unblockFdR = -1;
455 }
456 else {
457 obj->unblockFdW = fildes[1];
458 obj->unblockFdR = fildes[0];
459 }
461 cleanup:
462 /* Cleanup if fail: */
463 if (status < 0) {
464 MessageQ_delete((MessageQ_Handle *)&obj);
465 }
467 return ((MessageQ_Handle) obj);
468 }
470 /*
471 * Function to delete a MessageQ object for a specific slave processor.
472 *
473 * Deletes the file descriptors associated with this MessageQ object.
474 */
475 Int MessageQ_delete (MessageQ_Handle * handlePtr)
476 {
477 Int status = MessageQ_S_SUCCESS;
478 MessageQ_Object * obj = NULL;
479 MessageQDrv_CmdArgs cmdArgs;
481 assert(handlePtr != NULL);
482 obj = (MessageQ_Object *) (*handlePtr);
483 assert(obj != NULL);
485 if (obj->serverHandle != NULL) {
486 cmdArgs.args.deleteMessageQ.handle = obj->serverHandle;
487 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DELETE, &cmdArgs);
488 if (status < 0) {
489 PRINTVERBOSE1("MessageQ_delete: API (through IOCTL) failed, \
490 status=%d\n", status)
491 }
492 }
494 /* Close the fds used for MessageQ_unblock(): */
495 if (obj->unblockFdW >= 0) {
496 close(obj->unblockFdW);
497 }
498 if (obj->unblockFdR >= 0) {
499 close(obj->unblockFdR);
500 }
502 /* Close the communication endpoint: */
503 if (obj->ipcFd >= 0) {
504 transportCloseEndpoint(obj->ipcFd);
505 }
507 /* Now free the obj */
508 free (obj);
509 *handlePtr = NULL;
511 return (status);
512 }
514 /*
515 * Opens an instance of MessageQ for sending.
516 *
517 * We need not create a tiipc file descriptor here; the file descriptors for
518 * all remote processors were created during MessageQ_attach(), and will be
519 * retrieved during MessageQ_put().
520 */
521 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
522 {
523 Int status = MessageQ_S_SUCCESS;
525 status = NameServer_getUInt32 (MessageQ_module->nameServer,
526 name, queueId, NULL);
528 if (status == NameServer_E_NOTFOUND) {
529 /* Set return queue ID to invalid. */
530 *queueId = MessageQ_INVALIDMESSAGEQ;
531 status = MessageQ_E_NOTFOUND;
532 }
533 else if (status >= 0) {
534 /* Override with a MessageQ status code. */
535 status = MessageQ_S_SUCCESS;
536 }
537 else {
538 /* Set return queue ID to invalid. */
539 *queueId = MessageQ_INVALIDMESSAGEQ;
540 /* Override with a MessageQ status code. */
541 if (status == NameServer_E_TIMEOUT) {
542 status = MessageQ_E_TIMEOUT;
543 }
544 else {
545 status = MessageQ_E_FAIL;
546 }
547 }
549 return (status);
550 }
552 /*
553 * ======== MessageQ_openQueueId ========
554 */
555 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
556 {
557 MessageQ_QueueIndex queuePort;
558 MessageQ_QueueId queueId;
560 /* queue port is embedded in the queueId */
561 queuePort = queueIndex + MessageQ_PORTOFFSET;
562 queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
564 return (queueId);
565 }
567 /* Closes previously opened instance of MessageQ module. */
568 Int MessageQ_close (MessageQ_QueueId * queueId)
569 {
570 Int32 status = MessageQ_S_SUCCESS;
572 /* Nothing more to be done for closing the MessageQ. */
573 *queueId = MessageQ_INVALIDMESSAGEQ;
575 return (status);
576 }
578 /*
579 * Place a message onto a message queue.
580 *
581 * Calls TransportShm_put(), which handles the sending of the message using the
582 * appropriate kernel interface (socket, device ioctl) call for the remote
583 * procId encoded in the queueId argument.
584 *
585 */
586 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
587 {
588 Int status;
589 UInt16 dstProcId = (UInt16)(queueId >> 16);
590 UInt16 queuePort = queueId & 0x0000ffff;
592 /* use the queue port # for destination address */
593 msg->dstId = queuePort;
594 msg->dstProc = dstProcId;
596 /* invoke put hook function after addressing the message */
597 if (MessageQ_module->putHookFxn != NULL) {
598 MessageQ_module->putHookFxn(queueId, msg);
599 }
601 status = transportPut(msg, queuePort, dstProcId);
603 return (status);
604 }
606 /*
607 * Gets a message for a message queue and blocks if the queue is empty.
608 * If a message is present, it returns it. Otherwise it blocks
609 * waiting for a message to arrive.
610 * When a message is returned, it is owned by the caller.
611 *
612 * We block using select() on the receiving tiipc file descriptor, then
613 * get the waiting message via a read.
614 * We use the file descriptors stored in the messageQ object via a previous
615 * call to MessageQ_create().
616 *
617 * Note: We currently do not support messages to be sent between threads on the
618 * lcoal processor.
619 *
620 */
621 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
622 {
623 Int status = MessageQ_S_SUCCESS;
624 Int tmpStatus;
625 MessageQ_Object * obj = (MessageQ_Object *) handle;
626 int retval;
627 int nfds;
628 fd_set rfds;
629 struct timeval tv;
630 void *timevalPtr;
631 int maxfd = 0;
633 /* Wait (with timeout) and retreive message */
634 FD_ZERO(&rfds);
635 FD_SET(obj->ipcFd, &rfds);
636 maxfd = obj->ipcFd;
638 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
639 FD_SET(obj->unblockFdR, &rfds);
641 if (timeout == MessageQ_FOREVER) {
642 timevalPtr = NULL;
643 }
644 else {
645 /* Timeout given in msec: convert: */
646 tv.tv_sec = timeout / 1000;
647 tv.tv_usec = (timeout % 1000) * 1000;
648 timevalPtr = &tv;
649 }
650 /* Add one to last fd created: */
651 nfds = ((maxfd > obj->unblockFdR) ? maxfd : obj->unblockFdR) + 1;
653 *msg = NULL;
655 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
656 if (retval > 0) {
657 if (FD_ISSET(obj->unblockFdR, &rfds)) {
658 /*
659 * Our event was signalled by MessageQ_unblock().
660 *
661 * This is typically done during a shutdown sequence, where
662 * the intention of the client would be to ignore (i.e. not fetch)
663 * any pending messages in the transport's queue.
664 * Thus, we shall not check for nor return any messages.
665 */
666 return (obj->unblocked);
667 }
668 else {
669 if (FD_ISSET(obj->ipcFd, &rfds)) {
670 /* Our transport's fd was signalled: Get the message: */
671 tmpStatus = transportGet(obj->ipcFd, msg);
672 if (tmpStatus < 0) {
673 printf ("MessageQ_get: transportGet failed.\n");
674 if (tmpStatus == MessageQ_E_SHUTDOWN) {
675 status = tmpStatus;
676 MessageQ_shutdown(handle);
677 }
678 else {
679 status = MessageQ_E_FAIL;
680 }
681 }
682 }
683 }
684 }
685 else if (retval == 0) {
686 status = MessageQ_E_TIMEOUT;
687 }
688 else {
689 status = MessageQ_E_FAIL;
690 }
692 return (status);
693 }
695 /*
696 * Return a count of the number of messages in the queue
697 *
698 * TBD: To be implemented. Return -1 for now.
699 */
700 Int MessageQ_count (MessageQ_Handle handle)
701 {
702 Int count = -1;
703 return (count);
704 }
706 /* Initializes a message not obtained from MessageQ_alloc. */
707 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
708 {
709 /* Fill in the fields of the message */
710 MessageQ_msgInit (msg);
711 msg->heapId = MessageQ_STATICMSG;
712 msg->msgSize = size;
713 }
715 /*
716 * Allocate a message and initialize the needed fields (note some
717 * of the fields in the header are set via other APIs or in the
718 * MessageQ_put function,
719 */
720 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
721 {
722 MessageQ_Msg msg = NULL;
724 /*
725 * heapId not used for local alloc (as this is over a copy transport), but
726 * we need to send to other side as heapId is used in BIOS transport:
727 */
728 msg = (MessageQ_Msg)calloc (1, size);
729 MessageQ_msgInit (msg);
730 msg->msgSize = size;
731 msg->heapId = heapId;
733 return msg;
734 }
736 /* Frees the message back to the heap that was used to allocate it. */
737 Int MessageQ_free (MessageQ_Msg msg)
738 {
739 UInt32 status = MessageQ_S_SUCCESS;
741 /* Check to ensure this was not allocated by user: */
742 if (msg->heapId == MessageQ_STATICMSG) {
743 status = MessageQ_E_CANNOTFREESTATICMSG;
744 }
745 else {
746 free (msg);
747 }
749 return status;
750 }
752 /* Register a heap with MessageQ. */
753 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
754 {
755 Int status = MessageQ_S_SUCCESS;
757 /* Do nothing, as this uses a copy transport: */
759 return status;
760 }
762 /* Unregister a heap with MessageQ. */
763 Int MessageQ_unregisterHeap (UInt16 heapId)
764 {
765 Int status = MessageQ_S_SUCCESS;
767 /* Do nothing, as this uses a copy transport: */
769 return status;
770 }
772 /* Unblocks a MessageQ */
773 Void MessageQ_unblock (MessageQ_Handle handle)
774 {
775 MessageQ_Object * obj = (MessageQ_Object *) handle;
776 char buf = 'n';
778 obj->unblocked = MessageQ_E_UNBLOCKED;
780 /* Write to pipe to awaken any threads blocked on this messageQ: */
781 write(obj->unblockFdW, &buf, 1);
782 }
784 /* Unblocks a MessageQ that's been shutdown due to transport failure */
785 Void MessageQ_shutdown(MessageQ_Handle handle)
786 {
787 MessageQ_Object *obj = (MessageQ_Object *)handle;
788 char buf = 'n';
790 obj->unblocked = MessageQ_E_SHUTDOWN;
792 /* Write to pipe to awaken any threads blocked on this messageQ: */
793 write(obj->unblockFdW, &buf, 1);
794 }
796 /* Embeds a source message queue into a message. */
797 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
798 {
799 MessageQ_Object * obj = (MessageQ_Object *) handle;
801 msg->replyId = (UInt16)(obj->queue);
802 msg->replyProc = (UInt16)(obj->queue >> 16);
803 }
805 /* Returns the QueueId associated with the handle. */
806 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
807 {
808 MessageQ_Object * obj = (MessageQ_Object *) handle;
809 UInt32 queueId;
811 queueId = (obj->queue);
813 return queueId;
814 }
816 /* Sets the tracing of a message */
817 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
818 {
819 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
820 }
822 /*
823 * Returns the amount of shared memory used by one transport instance.
824 *
825 * The MessageQ module itself does not use any shared memory but the
826 * underlying transport may use some shared memory.
827 */
828 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
829 {
830 SizeT memReq = 0u;
832 /* Do nothing, as this is a copy transport. */
834 return (memReq);
835 }
837 /*
838 * Opens a file descriptor for this remote proc.
839 *
840 * Only opens it if one does not already exist for this procId.
841 *
842 * Note: remoteProcId may be MultiProc_Self() for loopback case.
843 */
844 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
845 {
846 Int status = MessageQ_S_SUCCESS;
847 int ipcFd;
848 int err;
850 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
852 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
853 status = MessageQ_E_INVALIDPROCID;
854 goto exit;
855 }
857 pthread_mutex_lock (&(MessageQ_module->gate));
859 /* Only open a fd if one doesn't exist: */
860 if (MessageQ_module->ipcFd[remoteProcId] == -1) {
861 /* Create a fd for sending messages to the remote proc: */
862 ipcFd = open("/dev/tiipc", O_RDWR);
863 if (ipcFd < 0) {
864 status = MessageQ_E_FAIL;
865 printf ("MessageQ_attach: open of tiipc device failed: %d, %s\n",
866 errno, strerror(errno));
867 }
868 else {
869 PRINTVERBOSE1("MessageQ_attach: opened tiipc fd for sending: %d\n",
870 ipcFd)
871 MessageQ_module->ipcFd[remoteProcId] = ipcFd;
872 /*
873 * Connect to the remote endpoint and bind any reserved address as
874 * local endpoint
875 */
876 TiIpcFxns_connect(ipcFd, remoteProcId, MESSAGEQ_RPMSG_PORT);
877 /* Bind to any port # above 1024 (MessageQCopy_MAXRESERVEDEPT) */
878 err = TiIpcFxns_bindAddr(ipcFd, TIIPC_ADDRANY);
879 if (err < 0) {
880 status = MessageQ_E_FAIL;
881 printf ("MessageQ_attach: bind failed: %d, %s\n",
882 errno, strerror(errno));
883 }
884 }
885 }
886 else {
887 status = MessageQ_E_ALREADYEXISTS;
888 }
890 pthread_mutex_unlock (&(MessageQ_module->gate));
892 exit:
893 return (status);
894 }
896 /*
897 * Close the fd for this remote proc.
898 *
899 */
900 Int MessageQ_detach (UInt16 remoteProcId)
901 {
902 Int status = MessageQ_S_SUCCESS;
903 int ipcFd;
905 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
906 status = MessageQ_E_INVALIDPROCID;
907 goto exit;
908 }
910 pthread_mutex_lock (&(MessageQ_module->gate));
912 ipcFd = MessageQ_module->ipcFd[remoteProcId];
913 if (close (ipcFd)) {
914 status = MessageQ_E_OSFAILURE;
915 printf("MessageQ_detach: close failed: %d, %s\n",
916 errno, strerror(errno));
917 }
918 else {
919 PRINTVERBOSE1("MessageQ_detach: closed fd: %d\n", ipcFd)
920 MessageQ_module->ipcFd[remoteProcId] = -1;
921 }
923 pthread_mutex_unlock (&(MessageQ_module->gate));
925 exit:
926 return (status);
927 }
929 /*
930 * This is a helper function to initialize a message.
931 */
932 Void MessageQ_msgInit (MessageQ_Msg msg)
933 {
934 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
935 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
936 msg->msgId = MessageQ_INVALIDMSGID;
937 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
938 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
939 msg->srcProc = MultiProc_self();
941 pthread_mutex_lock(&(MessageQ_module->gate));
942 msg->seqNum = MessageQ_module->seqNum++;
943 pthread_mutex_unlock(&(MessageQ_module->gate));
944 }
946 /*
947 * =============================================================================
948 * Transport: Fxns kept here until need for a transport layer is realized.
949 * =============================================================================
950 */
951 /*
952 * ======== transportCreateEndpoint ========
953 *
954 * Create a communication endpoint to receive messages.
955 */
956 static Int transportCreateEndpoint(int * fd, UInt16 queuePort)
957 {
958 Int status = MessageQ_S_SUCCESS;
959 int err;
961 /* Create a fd to the ti-ipc to receive messages for this messageQ */
962 *fd= open("/dev/tiipc", O_RDWR);
963 if (*fd < 0) {
964 status = MessageQ_E_FAIL;
965 printf ("transportCreateEndpoint: Couldn't open tiipc device: %d, %s\n",
966 errno, strerror(errno));
968 goto exit;
969 }
971 PRINTVERBOSE1("transportCreateEndpoint: opened fd: %d\n", *fd)
973 /* Bind to this port # in the transport */
974 err = TiIpcFxns_bindAddr(*fd, (UInt32)queuePort);
975 if (err < 0) {
976 status = MessageQ_E_FAIL;
977 printf("transportCreateEndpoint: bind failed: %d, %s\n",
978 errno, strerror(errno));
980 close(*fd);
981 goto exit;
982 }
984 exit:
985 return (status);
986 }
988 /*
989 * ======== transportCloseEndpoint ========
990 *
991 * Close the communication endpoint.
992 */
993 static Int transportCloseEndpoint(int fd)
994 {
995 Int status = MessageQ_S_SUCCESS;
997 PRINTVERBOSE1("transportCloseEndpoint: closing fd: %d\n", fd)
999 /* Stop communication to this endpoint */
1000 close(fd);
1002 return (status);
1003 }
1005 /*
1006 * ======== transportGet ========
1007 * Retrieve a message waiting in the queue.
1008 */
1009 static Int transportGet(int fd, MessageQ_Msg * retMsg)
1010 {
1011 Int status = MessageQ_S_SUCCESS;
1012 MessageQ_Msg msg;
1013 int ret;
1014 int byteCount;
1015 tiipc_remote_params remote;
1017 /*
1018 * We have no way of peeking to see what message size we'll get, so we
1019 * allocate a message of max size to receive contents from tiipc
1020 * (currently, a copy transport)
1021 */
1022 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1023 if (!msg) {
1024 status = MessageQ_E_MEMORY;
1025 goto exit;
1026 }
1028 /* Get message */
1029 byteCount = read(fd, msg, MESSAGEQ_RPMSG_MAXSIZE);
1030 if (byteCount < 0) {
1031 status = (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL);
1032 printf("read failed: %s (%d)\n", strerror(errno), errno);
1033 goto exit;
1034 }
1035 else {
1036 /* Update the allocated message size (even though this may waste space
1037 * when the actual message is smaller than the maximum rpmsg size,
1038 * the message will be freed soon anyway, and it avoids an extra copy).
1039 */
1040 msg->msgSize = byteCount;
1042 /*
1043 * If the message received was statically allocated, reset the
1044 * heapId, so the app can free it.
1045 */
1046 if (msg->heapId == MessageQ_STATICMSG) {
1047 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1048 }
1049 }
1051 PRINTVERBOSE1("transportGet: read from fd: %d\n", fd)
1052 ret = ioctl(fd, TIIPC_IOCGETREMOTE, &remote);
1053 if (ret == -1) {
1054 printf("ioctl failed: %s (%d)\n", strerror(errno), errno);
1055 status = MessageQ_E_FAIL;
1056 goto exit;
1057 }
1058 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg \
1059 proc: %d\n", byteCount, remote.remote_addr, remote.remote_proc)
1060 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
1061 msg->msgSize)
1063 *retMsg = msg;
1065 exit:
1066 return (status);
1067 }
1069 /*
1070 * ======== transportPut ========
1071 *
1072 * Write to tiipc file descriptor associated with
1073 * with this destination procID.
1074 */
1075 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1076 {
1077 Int status = MessageQ_S_SUCCESS;
1078 int ipcFd;
1079 int err;
1081 /*
1082 * Retrieve the tiipc file descriptor associated with this
1083 * transport for the destination processor.
1084 */
1085 ipcFd = MessageQ_module->ipcFd[dstProcId];
1087 PRINTVERBOSE2("Sending msgId: %d via fd: %d\n", msg->msgId, ipcFd)
1089 /* send response message to remote processor */
1090 err = write(ipcFd, msg, msg->msgSize);
1091 if (err < 0) {
1092 status = (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL);
1093 printf ("transportPut: write failed: %d, %s\n",
1094 errno, strerror(errno));
1095 goto exit;
1096 }
1098 /*
1099 * Free the message, as this is a copy transport, we maintain MessageQ
1100 * semantics.
1101 */
1102 MessageQ_free (msg);
1104 exit:
1105 return (status);
1106 }