1 /*
2 * Copyright (c) 2012-2014, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*============================================================================
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 *
42 * The MessageQ module supports the structured sending and receiving of
43 * variable length messages. This module can be used for homogeneous or
44 * heterogeneous multi-processor messaging.
45 *
46 * MessageQ provides more sophisticated messaging than other modules. It is
47 * typically used for complex situations such as multi-processor messaging.
48 *
49 * The following are key features of the MessageQ module:
50 * -Writers and readers can be relocated to another processor with no
51 * runtime code changes.
52 * -Timeouts are allowed when receiving messages.
53 * -Readers can determine the writer and reply back.
54 * -Receiving a message is deterministic when the timeout is zero.
55 * -Messages can reside on any message queue.
56 * -Supports zero-copy transfers.
57 * -Can send and receive from any type of thread.
58 * -Notification mechanism is specified by application.
59 * -Allows QoS (quality of service) on message buffer pools. For example,
60 * using specific buffer pools for specific message queues.
61 *
62 * Messages are sent and received via a message queue. A reader is a thread
63 * that gets (reads) messages from a message queue. A writer is a thread that
64 * puts (writes) a message to a message queue. Each message queue has one
65 * reader and can have many writers. A thread may read from or write to multiple
66 * message queues.
67 *
68 * Conceptually, the reader thread owns a message queue. The reader thread
69 * creates a message queue. Writer threads a created message queues to
70 * get access to them.
71 *
72 * Message queues are identified by a system-wide unique name. Internally,
73 * MessageQ uses the NameServermodule for managing
74 * these names. The names are used for opening a message queue. Using
75 * names is not required.
76 *
77 * Messages must be allocated from the MessageQ module. Once a message is
78 * allocated, it can be sent on any message queue. Once a message is sent, the
79 * writer loses ownership of the message and should not attempt to modify the
80 * message. Once the reader receives the message, it owns the message. It
81 * may either free the message or re-use the message.
82 *
83 * Messages in a message queue can be of variable length. The only
84 * requirement is that the first field in the definition of a message must be a
85 * MsgHeader structure. For example:
86 * typedef struct MyMsg {
87 * MessageQ_MsgHeader header;
88 * ...
89 * } MyMsg;
90 *
91 * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
92 * should not modify or directly access the fields in the MessageQ_MsgHeader.
93 *
94 * All messages sent via the MessageQ module must be allocated from a
95 * Heap implementation. The heap can be used for
96 * other memory allocation not related to MessageQ.
97 *
98 * An application can use multiple heaps. The purpose of having multiple
99 * heaps is to allow an application to regulate its message usage. For
100 * example, an application can allocate critical messages from one heap of fast
101 * on-chip memory and non-critical messages from another heap of slower
102 * external memory
103 *
104 * MessageQ does support the usage of messages that allocated via the
105 * alloc function. Please refer to the staticMsgInit
106 * function description for more details.
107 *
108 * In a multiple processor system, MessageQ communications to other
109 * processors via MessageQTransport instances. There must be one and
110 * only one MessageQTransport instance for each processor where communication
111 * is desired.
112 * So on a four processor system, each processor must have three
113 * MessageQTransport instance.
114 *
115 * The user only needs to create the MessageQTransport instances. The instances
116 * are responsible for registering themselves with MessageQ.
117 * This is accomplished via the registerTransport function.
118 *
119 * ============================================================================
120 */
123 /* Standard headers */
124 #include <ti/ipc/Std.h>
126 /* Linux specific header files, replacing OSAL: */
127 #include <pthread.h>
129 /* Module level headers */
130 #include <ti/ipc/NameServer.h>
131 #include <ti/ipc/MultiProc.h>
132 #include <ti/syslink/inc/_MultiProc.h>
133 #include <ti/ipc/MessageQ.h>
134 #include <_MessageQ.h>
135 #include <_IpcLog.h>
136 #include <ti/syslink/inc/MessageQDrvDefs.h>
138 #include <sys/select.h>
139 #include <sys/time.h>
140 #include <sys/types.h>
141 #include <sys/param.h>
143 #include <errno.h>
144 #include <stdio.h>
145 #include <string.h>
146 #include <stdlib.h>
147 #include <unistd.h>
148 #include <assert.h>
149 #include <fcntl.h>
151 #include <ti/syslink/inc/usr/Qnx/MessageQDrv.h>
153 /* TI IPC utils: */
154 #include <TiIpcFxns.h>
156 #include <ti/syslink/inc/ti/ipc/ti_ipc.h>
158 /* =============================================================================
159 * Macros/Constants
160 * =============================================================================
161 */
163 /*!
164 * @brief Name of the reserved NameServer used for MessageQ.
165 */
166 #define MessageQ_NAMESERVER "MessageQ"
168 /* More magic rpmsg port numbers: */
169 #define MESSAGEQ_RPMSG_PORT 61
170 #define MESSAGEQ_RPMSG_MAXSIZE 512
171 #define RPMSG_RESERVED_ADDRESSES (1024)
173 /* MessageQ needs local address bound to be a 16-bit value */
174 #define MAX_LOCAL_ADDR 0x10000
176 /* Trace flag settings: */
177 #define TRACESHIFT 12
178 #define TRACEMASK 0x1000
180 /* =============================================================================
181 * Structures & Enums
182 * =============================================================================
183 */
185 /* structure for MessageQ module state */
186 typedef struct MessageQ_ModuleObject {
187 Int refCount;
188 /*!< Reference count */
189 NameServer_Handle nameServer;
190 /*!< Handle to the local NameServer used for storing GP objects */
191 pthread_mutex_t gate;
192 /*!< Handle of gate to be used for local thread safety */
193 MessageQ_Params defaultInstParams;
194 /*!< Default instance creation parameters */
195 int ipcFd[MultiProc_MAXPROCESSORS];
196 /*!< File Descriptors for sending to each remote processor */
197 int seqNum;
198 /*!< Process-specific sequence number */
199 } MessageQ_ModuleObject;
201 /*!
202 * @brief Structure for the Handle for the MessageQ.
203 */
204 typedef struct MessageQ_Object_tag {
205 MessageQ_Params params;
206 /*! Instance specific creation parameters */
207 MessageQ_QueueId queue;
208 /* Unique id */
209 int ipcFd;
210 /* File Descriptors to receive from a message queue. */
211 int unblockFdW;
212 /* Write this fd to unblock the select() call in MessageQ _get() */
213 int unblockFdR;
214 /* File Descriptor to block on to listen to unblockFdW. */
215 void *serverHandle;
216 } MessageQ_Object;
218 static Bool verbose = FALSE;
220 /* =============================================================================
221 * Globals
222 * =============================================================================
223 */
224 static MessageQ_ModuleObject MessageQ_state =
225 {
226 .refCount = 0,
227 .nameServer = NULL,
228 };
230 /*!
231 * @var MessageQ_module
232 *
233 * @brief Pointer to the MessageQ module state.
234 */
235 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
238 /* =============================================================================
239 * Forward declarations of internal functions
240 * =============================================================================
241 */
243 /* This is a helper function to initialize a message. */
244 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex);
245 static Int transportCloseEndpoint(int fd);
246 static Int transportGet(int fd, MessageQ_Msg * retMsg);
247 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
249 /* =============================================================================
250 * APIS
251 * =============================================================================
252 */
253 /* Function to get default configuration for the MessageQ module.
254 *
255 */
256 Void MessageQ_getConfig (MessageQ_Config * cfg)
257 {
258 Int status;
259 MessageQDrv_CmdArgs cmdArgs;
261 assert (cfg != NULL);
263 cmdArgs.args.getConfig.config = cfg;
264 status = MessageQDrv_ioctl (CMD_MESSAGEQ_GETCONFIG, &cmdArgs);
266 if (status < 0) {
267 PRINTVERBOSE1("MessageQ_getConfig: API (through IOCTL) failed, \
268 status=%d\n", status)
269 }
271 return;
272 }
274 /* Function to setup the MessageQ module. */
275 Int MessageQ_setup (const MessageQ_Config * cfg)
276 {
277 Int status;
278 MessageQDrv_CmdArgs cmdArgs;
280 Int i;
282 cmdArgs.args.setup.config = (MessageQ_Config *) cfg;
283 status = MessageQDrv_ioctl(CMD_MESSAGEQ_SETUP, &cmdArgs);
284 if (status < 0) {
285 PRINTVERBOSE1("MessageQ_setup: API (through IOCTL) failed, \
286 status=%d\n", status)
287 return status;
288 }
290 MessageQ_module->nameServer = cmdArgs.args.setup.nameServerHandle;
291 MessageQ_module->seqNum = 0;
293 /* Create a default local gate. */
294 pthread_mutex_init (&(MessageQ_module->gate), NULL);
296 /* Clear ipcFd array. */
297 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
298 MessageQ_module->ipcFd[i] = -1;
299 }
301 return status;
302 }
304 /*
305 * Function to destroy the MessageQ module.
306 */
307 Int MessageQ_destroy (void)
308 {
309 Int status;
310 MessageQDrv_CmdArgs cmdArgs;
312 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DESTROY, &cmdArgs);
313 if (status < 0) {
314 PRINTVERBOSE1("MessageQ_destroy: API (through IOCTL) failed, \
315 status=%d\n", status)
316 }
318 return status;
319 }
321 /* Function to initialize the parameters for the MessageQ instance. */
322 Void MessageQ_Params_init (MessageQ_Params * params)
323 {
324 memcpy (params, &(MessageQ_module->defaultInstParams),
325 sizeof (MessageQ_Params));
327 return;
328 }
330 /*
331 * Function to create a MessageQ object for receiving.
332 *
333 * Create a file descriptor and bind the source address
334 * (local ProcId/MessageQ ID) in
335 * order to get messages dispatched to this messageQ.
336 */
337 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
338 {
339 Int status = MessageQ_S_SUCCESS;
340 MessageQ_Object * obj = NULL;
341 UInt16 queueIndex = 0u;
342 UInt16 procId;
343 MessageQDrv_CmdArgs cmdArgs;
344 int fildes[2];
346 cmdArgs.args.create.params = (MessageQ_Params *) params;
347 cmdArgs.args.create.name = name;
348 if (name != NULL) {
349 cmdArgs.args.create.nameLen = (strlen (name) + 1);
350 }
351 else {
352 cmdArgs.args.create.nameLen = 0;
353 }
355 /* Create the generic obj */
356 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
357 if (obj == NULL) {
358 PRINTVERBOSE0("MessageQ_create: memory allocation failed\n")
359 return NULL;
360 }
362 PRINTVERBOSE2("MessageQ_create: creating endpoint for: %s, \
363 queueIndex: %d\n", name, queueIndex)
364 status = transportCreateEndpoint(&obj->ipcFd, &queueIndex);
365 if (status < 0) {
366 goto cleanup;
367 }
369 /*
370 * We expect the endpoint creation to return a port number from
371 * the MessageQCopy layer. This port number will be greater than
372 * 1024 and less than 0x10000. Use this number as the queueIndex.
373 */
374 cmdArgs.args.create.queueId = queueIndex;
376 status = MessageQDrv_ioctl (CMD_MESSAGEQ_CREATE, &cmdArgs);
377 if (status < 0) {
378 PRINTVERBOSE1("MessageQ_create: API (through IOCTL) failed, \
379 status=%d\n", status)
380 goto cleanup;
381 }
383 if (params != NULL) {
384 /* Populate the params member */
385 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
386 }
388 procId = MultiProc_self();
389 obj->queue = cmdArgs.args.create.queueId;
390 obj->serverHandle = cmdArgs.args.create.handle;
392 /*
393 * Now, to support MessageQ_unblock() functionality, create an event object.
394 * Writing to this event will unblock the select() call in MessageQ_get().
395 */
396 if (pipe(fildes) == -1) {
397 printf ("MessageQ_create: pipe creation failed: %d, %s\n",
398 errno, strerror(errno));
399 status = MessageQ_E_FAIL;
400 obj->unblockFdW = obj->unblockFdR = -1;
401 }
402 else {
403 obj->unblockFdW = fildes[1];
404 obj->unblockFdR = fildes[0];
405 }
407 cleanup:
408 /* Cleanup if fail: */
409 if (status < 0) {
410 MessageQ_delete((MessageQ_Handle *)&obj);
411 }
413 return ((MessageQ_Handle) obj);
414 }
416 /*
417 * Function to delete a MessageQ object for a specific slave processor.
418 *
419 * Deletes the file descriptors associated with this MessageQ object.
420 */
421 Int MessageQ_delete (MessageQ_Handle * handlePtr)
422 {
423 Int status = MessageQ_S_SUCCESS;
424 MessageQ_Object * obj = NULL;
425 MessageQDrv_CmdArgs cmdArgs;
427 assert(handlePtr != NULL);
428 obj = (MessageQ_Object *) (*handlePtr);
429 assert(obj != NULL);
431 if (obj->serverHandle != NULL) {
432 cmdArgs.args.deleteMessageQ.handle = obj->serverHandle;
433 status = MessageQDrv_ioctl (CMD_MESSAGEQ_DELETE, &cmdArgs);
434 if (status < 0) {
435 PRINTVERBOSE1("MessageQ_delete: API (through IOCTL) failed, \
436 status=%d\n", status)
437 }
438 }
440 /* Close the fds used for MessageQ_unblock(): */
441 if (obj->unblockFdW >= 0) {
442 close(obj->unblockFdW);
443 }
444 if (obj->unblockFdR >= 0) {
445 close(obj->unblockFdR);
446 }
448 /* Close the communication endpoint: */
449 if (obj->ipcFd >= 0) {
450 transportCloseEndpoint(obj->ipcFd);
451 }
453 /* Now free the obj */
454 free (obj);
455 *handlePtr = NULL;
457 return (status);
458 }
460 /*
461 * Opens an instance of MessageQ for sending.
462 *
463 * We need not create a tiipc file descriptor here; the file descriptors for
464 * all remote processors were created during MessageQ_attach(), and will be
465 * retrieved during MessageQ_put().
466 */
467 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
468 {
469 Int status = MessageQ_S_SUCCESS;
471 status = NameServer_getUInt32 (MessageQ_module->nameServer,
472 name, queueId, NULL);
474 if (status == NameServer_E_NOTFOUND) {
475 /* Set return queue ID to invalid. */
476 *queueId = MessageQ_INVALIDMESSAGEQ;
477 status = MessageQ_E_NOTFOUND;
478 }
479 else if (status >= 0) {
480 /* Override with a MessageQ status code. */
481 status = MessageQ_S_SUCCESS;
482 }
483 else {
484 /* Set return queue ID to invalid. */
485 *queueId = MessageQ_INVALIDMESSAGEQ;
486 /* Override with a MessageQ status code. */
487 if (status == NameServer_E_TIMEOUT) {
488 status = MessageQ_E_TIMEOUT;
489 }
490 else {
491 status = MessageQ_E_FAIL;
492 }
493 }
495 return (status);
496 }
498 /* Closes previously opened instance of MessageQ module. */
499 Int MessageQ_close (MessageQ_QueueId * queueId)
500 {
501 Int32 status = MessageQ_S_SUCCESS;
503 /* Nothing more to be done for closing the MessageQ. */
504 *queueId = MessageQ_INVALIDMESSAGEQ;
506 return (status);
507 }
509 /*
510 * Place a message onto a message queue.
511 *
512 * Calls TransportShm_put(), which handles the sending of the message using the
513 * appropriate kernel interface (socket, device ioctl) call for the remote
514 * procId encoded in the queueId argument.
515 *
516 */
517 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
518 {
519 Int status;
520 UInt16 dstProcId = (UInt16)(queueId >> 16);
521 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
523 msg->dstId = queueIndex;
524 msg->dstProc = dstProcId;
526 status = transportPut(msg, queueIndex, dstProcId);
528 return (status);
529 }
531 /*
532 * Gets a message for a message queue and blocks if the queue is empty.
533 * If a message is present, it returns it. Otherwise it blocks
534 * waiting for a message to arrive.
535 * When a message is returned, it is owned by the caller.
536 *
537 * We block using select() on the receiving tiipc file descriptor, then
538 * get the waiting message via a read.
539 * We use the file descriptors stored in the messageQ object via a previous
540 * call to MessageQ_create().
541 *
542 * Note: We currently do not support messages to be sent between threads on the
543 * lcoal processor.
544 *
545 */
546 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
547 {
548 Int status = MessageQ_S_SUCCESS;
549 Int tmpStatus;
550 MessageQ_Object * obj = (MessageQ_Object *) handle;
551 int retval;
552 int nfds;
553 fd_set rfds;
554 struct timeval tv;
555 void *timevalPtr;
556 int maxfd = 0;
558 /* Wait (with timeout) and retreive message */
559 FD_ZERO(&rfds);
560 FD_SET(obj->ipcFd, &rfds);
561 maxfd = obj->ipcFd;
563 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
564 FD_SET(obj->unblockFdR, &rfds);
566 if (timeout == MessageQ_FOREVER) {
567 timevalPtr = NULL;
568 }
569 else {
570 /* Timeout given in msec: convert: */
571 tv.tv_sec = timeout / 1000;
572 tv.tv_usec = (timeout % 1000) * 1000;
573 timevalPtr = &tv;
574 }
575 /* Add one to last fd created: */
576 nfds = ((maxfd > obj->unblockFdR) ? maxfd : obj->unblockFdR) + 1;
578 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
579 if (retval) {
580 if (FD_ISSET(obj->unblockFdR, &rfds)) {
581 /*
582 * Our event was signalled by MessageQ_unblock().
583 *
584 * This is typically done during a shutdown sequence, where
585 * the intention of the client would be to ignore (i.e. not fetch)
586 * any pending messages in the transport's queue.
587 * Thus, we shall not check for nor return any messages.
588 */
589 *msg = NULL;
590 status = MessageQ_E_UNBLOCKED;
591 }
592 else {
593 if (FD_ISSET(obj->ipcFd, &rfds)) {
594 /* Our transport's fd was signalled: Get the message: */
595 tmpStatus = transportGet(obj->ipcFd, msg);
596 if (tmpStatus < 0) {
597 printf ("MessageQ_get: tranposrtshm_get failed.");
598 status = MessageQ_E_FAIL;
599 }
600 }
601 }
602 }
603 else if (retval == 0) {
604 *msg = NULL;
605 status = MessageQ_E_TIMEOUT;
606 }
608 return (status);
609 }
611 /*
612 * Return a count of the number of messages in the queue
613 *
614 * TBD: To be implemented. Return -1 for now.
615 */
616 Int MessageQ_count (MessageQ_Handle handle)
617 {
618 Int count = -1;
619 return (count);
620 }
622 /* Initializes a message not obtained from MessageQ_alloc. */
623 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
624 {
625 /* Fill in the fields of the message */
626 MessageQ_msgInit (msg);
627 msg->heapId = MessageQ_STATICMSG;
628 msg->msgSize = size;
629 }
631 /*
632 * Allocate a message and initialize the needed fields (note some
633 * of the fields in the header are set via other APIs or in the
634 * MessageQ_put function,
635 */
636 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
637 {
638 MessageQ_Msg msg = NULL;
640 /*
641 * heapId not used for local alloc (as this is over a copy transport), but
642 * we need to send to other side as heapId is used in BIOS transport:
643 */
644 msg = (MessageQ_Msg)calloc (1, size);
645 MessageQ_msgInit (msg);
646 msg->msgSize = size;
647 msg->heapId = heapId;
649 return msg;
650 }
652 /* Frees the message back to the heap that was used to allocate it. */
653 Int MessageQ_free (MessageQ_Msg msg)
654 {
655 UInt32 status = MessageQ_S_SUCCESS;
657 /* Check to ensure this was not allocated by user: */
658 if (msg->heapId == MessageQ_STATICMSG) {
659 status = MessageQ_E_CANNOTFREESTATICMSG;
660 }
661 else {
662 free (msg);
663 }
665 return status;
666 }
668 /* Register a heap with MessageQ. */
669 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
670 {
671 Int status = MessageQ_S_SUCCESS;
673 /* Do nothing, as this uses a copy transport: */
675 return status;
676 }
678 /* Unregister a heap with MessageQ. */
679 Int MessageQ_unregisterHeap (UInt16 heapId)
680 {
681 Int status = MessageQ_S_SUCCESS;
683 /* Do nothing, as this uses a copy transport: */
685 return status;
686 }
688 /* Unblocks a MessageQ */
689 Void MessageQ_unblock (MessageQ_Handle handle)
690 {
691 MessageQ_Object * obj = (MessageQ_Object *) handle;
692 char buf = 'n';
693 int numBytes;
695 /* Write to pipe to awaken any threads blocked on this messageQ: */
696 numBytes = write(obj->unblockFdW, &buf, 1);
697 }
699 /* Embeds a source message queue into a message. */
700 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
701 {
702 MessageQ_Object * obj = (MessageQ_Object *) handle;
704 msg->replyId = (UInt16)(obj->queue);
705 msg->replyProc = (UInt16)(obj->queue >> 16);
706 }
708 /* Returns the QueueId associated with the handle. */
709 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
710 {
711 MessageQ_Object * obj = (MessageQ_Object *) handle;
712 UInt32 queueId;
714 queueId = (obj->queue);
716 return queueId;
717 }
719 /* Sets the tracing of a message */
720 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
721 {
722 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
723 }
725 /*
726 * Returns the amount of shared memory used by one transport instance.
727 *
728 * The MessageQ module itself does not use any shared memory but the
729 * underlying transport may use some shared memory.
730 */
731 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
732 {
733 SizeT memReq = 0u;
735 /* Do nothing, as this is a copy transport. */
737 return (memReq);
738 }
740 /*
741 * Opens a file descriptor for this remote proc.
742 *
743 * Only opens it if one does not already exist for this procId.
744 *
745 * Note: remoteProcId may be MultiProc_Self() for loopback case.
746 */
747 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
748 {
749 Int status = MessageQ_S_SUCCESS;
750 UInt32 localAddr;
751 int ipcFd;
752 int err;
754 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
756 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
757 status = MessageQ_E_INVALIDPROCID;
758 goto exit;
759 }
761 pthread_mutex_lock (&(MessageQ_module->gate));
763 /* Only open a fd if one doesn't exist: */
764 if (MessageQ_module->ipcFd[remoteProcId] == -1) {
765 /* Create a fd for sending messages to the remote proc: */
766 ipcFd = open("/dev/tiipc", O_RDWR);
767 if (ipcFd < 0) {
768 status = MessageQ_E_FAIL;
769 printf ("MessageQ_attach: open of tiipc device failed: %d, %s\n",
770 errno, strerror(errno));
771 }
772 else {
773 PRINTVERBOSE1("MessageQ_attach: opened tiipc fd for sending: %d\n",
774 ipcFd)
775 MessageQ_module->ipcFd[remoteProcId] = ipcFd;
776 /*
777 * Connect to the remote endpoint and bind any reserved address as
778 * local endpoint
779 */
780 Connect(ipcFd, remoteProcId, MESSAGEQ_RPMSG_PORT);
781 err = BindAddr(ipcFd, &localAddr);
782 if (err < 0) {
783 status = MessageQ_E_FAIL;
784 printf ("MessageQ_attach: bind failed: %d, %s\n",
785 errno, strerror(errno));
786 }
787 }
788 }
789 else {
790 status = MessageQ_E_ALREADYEXISTS;
791 }
793 pthread_mutex_unlock (&(MessageQ_module->gate));
795 exit:
796 return (status);
797 }
799 /*
800 * Close the fd for this remote proc.
801 *
802 */
803 Int MessageQ_detach (UInt16 remoteProcId)
804 {
805 Int status = MessageQ_S_SUCCESS;
806 int ipcFd;
808 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
809 status = MessageQ_E_INVALIDPROCID;
810 goto exit;
811 }
813 pthread_mutex_lock (&(MessageQ_module->gate));
815 ipcFd = MessageQ_module->ipcFd[remoteProcId];
816 if (close (ipcFd)) {
817 status = MessageQ_E_OSFAILURE;
818 printf("MessageQ_detach: close failed: %d, %s\n",
819 errno, strerror(errno));
820 }
821 else {
822 PRINTVERBOSE1("MessageQ_detach: closed fd: %d\n", ipcFd)
823 MessageQ_module->ipcFd[remoteProcId] = -1;
824 }
826 pthread_mutex_unlock (&(MessageQ_module->gate));
828 exit:
829 return (status);
830 }
832 /*
833 * This is a helper function to initialize a message.
834 */
835 Void MessageQ_msgInit (MessageQ_Msg msg)
836 {
837 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
838 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
839 msg->msgId = MessageQ_INVALIDMSGID;
840 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
841 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
842 msg->srcProc = MultiProc_self();
844 pthread_mutex_lock(&(MessageQ_module->gate));
845 msg->seqNum = MessageQ_module->seqNum++;
846 pthread_mutex_unlock(&(MessageQ_module->gate));
847 }
849 /*
850 * =============================================================================
851 * Transport: Fxns kept here until need for a transport layer is realized.
852 * =============================================================================
853 */
854 /*
855 * ======== transportCreateEndpoint ========
856 *
857 * Create a communication endpoint to receive messages.
858 */
859 static Int transportCreateEndpoint(int * fd, UInt16 * queueIndex)
860 {
861 Int status = MessageQ_S_SUCCESS;
862 int err;
863 UInt32 localAddr;
865 /* Create a fd to the ti-ipc to receive messages for this messageQ */
866 *fd= open("/dev/tiipc", O_RDWR);
867 if (*fd < 0) {
868 status = MessageQ_E_FAIL;
869 printf ("transportCreateEndpoint: Couldn't open tiipc device: %d, %s\n",
870 errno, strerror(errno));
872 goto exit;
873 }
875 PRINTVERBOSE1("transportCreateEndpoint: opened fd: %d\n", *fd)
877 err = BindAddr(*fd, &localAddr);
878 if (err < 0) {
879 status = MessageQ_E_FAIL;
880 printf("transportCreateEndpoint: bind failed: %d, %s\n",
881 errno, strerror(errno));
883 close(*fd);
884 goto exit;
885 }
887 if (localAddr >= MAX_LOCAL_ADDR) {
888 status = MessageQ_E_FAIL;
889 printf("transportCreateEndpoint: local address returned is"
890 "by BindAddr is greater than max supported\n");
892 close(*fd);
893 goto exit;
894 }
896 *queueIndex = localAddr;
898 exit:
899 return (status);
900 }
902 /*
903 * ======== transportCloseEndpoint ========
904 *
905 * Close the communication endpoint.
906 */
907 static Int transportCloseEndpoint(int fd)
908 {
909 Int status = MessageQ_S_SUCCESS;
911 PRINTVERBOSE1("transportCloseEndpoint: closing fd: %d\n", fd)
913 /* Stop communication to this endpoint */
914 close(fd);
916 return (status);
917 }
919 /*
920 * ======== transportGet ========
921 * Retrieve a message waiting in the queue.
922 */
923 static Int transportGet(int fd, MessageQ_Msg * retMsg)
924 {
925 Int status = MessageQ_S_SUCCESS;
926 MessageQ_Msg msg;
927 int ret;
928 int byteCount;
929 tiipc_remote_params remote;
931 /*
932 * We have no way of peeking to see what message size we'll get, so we
933 * allocate a message of max size to receive contents from tiipc
934 * (currently, a copy transport)
935 */
936 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
937 if (!msg) {
938 status = MessageQ_E_MEMORY;
939 goto exit;
940 }
942 /* Get message */
943 byteCount = read(fd, msg, MESSAGEQ_RPMSG_MAXSIZE);
944 if (byteCount < 0) {
945 printf("read failed: %s (%d)\n", strerror(errno), errno);
946 status = MessageQ_E_FAIL;
947 goto exit;
948 }
949 else {
950 /* Update the allocated message size (even though this may waste space
951 * when the actual message is smaller than the maximum rpmsg size,
952 * the message will be freed soon anyway, and it avoids an extra copy).
953 */
954 msg->msgSize = byteCount;
956 /*
957 * If the message received was statically allocated, reset the
958 * heapId, so the app can free it.
959 */
960 if (msg->heapId == MessageQ_STATICMSG) {
961 msg->heapId = 0; /* for a copy transport, heap id is 0. */
962 }
963 }
965 PRINTVERBOSE1("transportGet: read from fd: %d\n", fd)
966 ret = ioctl(fd, TIIPC_IOCGETREMOTE, &remote);
967 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg \
968 proc: %d\n", byteCount, remote.remote_addr, remote.remote_proc)
969 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
971 *retMsg = msg;
973 exit:
974 return (status);
975 }
977 /*
978 * ======== transportPut ========
979 *
980 * Write to tiipc file descriptor associated with
981 * with this destination procID.
982 */
983 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
984 {
985 Int status = MessageQ_S_SUCCESS;
986 int ipcFd;
987 int err;
989 /*
990 * Retrieve the tiipc file descriptor associated with this
991 * transport for the destination processor.
992 */
993 ipcFd = MessageQ_module->ipcFd[dstProcId];
995 PRINTVERBOSE2("Sending msgId: %d via fd: %d\n", msg->msgId, ipcFd)
997 /* send response message to remote processor */
998 err = write(ipcFd, msg, msg->msgSize);
999 if (err < 0) {
1000 printf ("transportPut: write failed: %d, %s\n",
1001 errno, strerror(errno));
1002 status = MessageQ_E_FAIL;
1003 goto exit;
1004 }
1006 /*
1007 * Free the message, as this is a copy transport, we maintain MessageQ
1008 * semantics.
1009 */
1010 MessageQ_free (msg);
1012 exit:
1013 return (status);
1014 }