1 /*
2 * Copyright (c) 2012-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 * @brief Prototype Mapping of SysLink MessageQ to Socket ABI
35 * (SysLink 3).
36 *
37 * @ver 02.00.00.51_alpha2 (kernel code is basis for this module)
38 *
39 */
40 /*============================================================================
41 * @file MessageQ.c
42 *
43 * @brief MessageQ module "client" implementation
44 *
45 * This implementation is geared for use in a "client/server" model, whereby
46 * system-wide data is maintained in a "server" component and process-
47 * specific data is handled here. At the moment, this implementation
48 * connects and communicates with LAD for the server connection.
49 *
50 * The MessageQ module supports the structured sending and receiving of
51 * variable length messages. This module can be used for homogeneous or
52 * heterogeneous multi-processor messaging.
53 *
54 * MessageQ provides more sophisticated messaging than other modules. It is
55 * typically used for complex situations such as multi-processor messaging.
56 *
57 * The following are key features of the MessageQ module:
58 * -Writers and readers can be relocated to another processor with no
59 * runtime code changes.
60 * -Timeouts are allowed when receiving messages.
61 * -Readers can determine the writer and reply back.
62 * -Receiving a message is deterministic when the timeout is zero.
63 * -Messages can reside on any message queue.
64 * -Supports zero-copy transfers.
65 * -Can send and receive from any type of thread.
66 * -Notification mechanism is specified by application.
67 * -Allows QoS (quality of service) on message buffer pools. For example,
68 * using specific buffer pools for specific message queues.
69 *
70 * Messages are sent and received via a message queue. A reader is a thread
71 * that gets (reads) messages from a message queue. A writer is a thread that
72 * puts (writes) a message to a message queue. Each message queue has one
73 * reader and can have many writers. A thread may read from or write to multiple
74 * message queues.
75 *
76 * Conceptually, the reader thread owns a message queue. The reader thread
77 * creates a message queue. Writer threads a created message queues to
78 * get access to them.
79 *
80 * Message queues are identified by a system-wide unique name. Internally,
81 * MessageQ uses the NameServermodule for managing
82 * these names. The names are used for opening a message queue. Using
83 * names is not required.
84 *
85 * Messages must be allocated from the MessageQ module. Once a message is
86 * allocated, it can be sent on any message queue. Once a message is sent, the
87 * writer loses ownership of the message and should not attempt to modify the
88 * message. Once the reader receives the message, it owns the message. It
89 * may either free the message or re-use the message.
90 *
91 * Messages in a message queue can be of variable length. The only
92 * requirement is that the first field in the definition of a message must be a
93 * MsgHeader structure. For example:
94 * typedef struct MyMsg {
95 * MessageQ_MsgHeader header;
96 * ...
97 * } MyMsg;
98 *
99 * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
100 * should not modify or directly access the fields in the MessageQ_MsgHeader.
101 *
102 * All messages sent via the MessageQ module must be allocated from a
103 * Heap implementation. The heap can be used for
104 * other memory allocation not related to MessageQ.
105 *
106 * An application can use multiple heaps. The purpose of having multiple
107 * heaps is to allow an application to regulate its message usage. For
108 * example, an application can allocate critical messages from one heap of fast
109 * on-chip memory and non-critical messages from another heap of slower
110 * external memory
111 *
112 * MessageQ does support the usage of messages that allocated via the
113 * alloc function. Please refer to the staticMsgInit
114 * function description for more details.
115 *
116 * In a multiple processor system, MessageQ communications to other
117 * processors via MessageQTransport instances. There must be one and
118 * only one MessageQTransport instance for each processor where communication
119 * is desired.
120 * So on a four processor system, each processor must have three
121 * MessageQTransport instance.
122 *
123 * The user only needs to create the MessageQTransport instances. The instances
124 * are responsible for registering themselves with MessageQ.
125 * This is accomplished via the registerTransport function.
126 *
127 * ============================================================================
128 */
131 /* Standard headers */
132 #include <Std.h>
134 /* Linux specific header files, replacing OSAL: */
135 #include <pthread.h>
137 /* Module level headers */
138 #include <ti/ipc/NameServer.h>
139 #include <ti/ipc/MultiProc.h>
140 #include <_MultiProc.h>
141 #include <ti/ipc/MessageQ.h>
142 #include <_MessageQ.h>
144 /* Socket Headers */
145 #include <sys/select.h>
146 #include <sys/time.h>
147 #include <sys/types.h>
148 #include <sys/param.h>
149 #include <sys/eventfd.h>
150 #include <sys/socket.h>
151 #include <errno.h>
152 #include <stdio.h>
153 #include <string.h>
154 #include <stdlib.h>
155 #include <unistd.h>
156 #include <assert.h>
158 /* SysLink Socket Protocol Family */
159 #include <net/rpmsg.h>
161 /* Socket utils: */
162 #include <SocketFxns.h>
164 #include <ladclient.h>
165 #include <_lad.h>
167 /* =============================================================================
168 * Macros/Constants
169 * =============================================================================
170 */
172 /*!
173 * @brief Name of the reserved NameServer used for MessageQ.
174 */
175 #define MessageQ_NAMESERVER "MessageQ"
177 /*!
178 * @brief Value of an invalid socket ID:
179 */
180 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
182 /* More magic rpmsg port numbers: */
183 #define MESSAGEQ_RPMSG_PORT 61
184 #define MESSAGEQ_RPMSG_MAXSIZE 512
186 /* Trace flag settings: */
187 #define TRACESHIFT 12
188 #define TRACEMASK 0x1000
190 /* Define BENCHMARK to quiet key MessageQ APIs: */
191 //#define BENCHMARK
193 /* =============================================================================
194 * Structures & Enums
195 * =============================================================================
196 */
198 /* structure for MessageQ module state */
199 typedef struct MessageQ_ModuleObject {
200 Int refCount;
201 /*!< Reference count */
202 NameServer_Handle nameServer;
203 /*!< Handle to the local NameServer used for storing GP objects */
204 pthread_mutex_t gate;
205 /*!< Handle of gate to be used for local thread safety */
206 MessageQ_Params defaultInstParams;
207 /*!< Default instance creation parameters */
208 int sock[MultiProc_MAXPROCESSORS];
209 /*!< Sockets to for sending to each remote processor */
210 int seqNum;
211 /*!< Process-specific sequence number */
212 } MessageQ_ModuleObject;
214 /*!
215 * @brief Structure for the Handle for the MessageQ.
216 */
217 typedef struct MessageQ_Object_tag {
218 MessageQ_Params params;
219 /*! Instance specific creation parameters */
220 MessageQ_QueueId queue;
221 /* Unique id */
222 int fd[MultiProc_MAXPROCESSORS];
223 /* File Descriptor to block on messages from remote processors. */
224 int unblockFd;
225 /* Write this fd to unblock the select() call in MessageQ _get() */
226 void *serverHandle;
227 } MessageQ_Object;
229 static Bool verbose = FALSE;
232 /* =============================================================================
233 * Globals
234 * =============================================================================
235 */
236 static MessageQ_ModuleObject MessageQ_state =
237 {
238 .refCount = 0,
239 .nameServer = NULL,
240 };
242 /*!
243 * @var MessageQ_module
244 *
245 * @brief Pointer to the MessageQ module state.
246 */
247 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
250 /* =============================================================================
251 * Forward declarations of internal functions
252 * =============================================================================
253 */
255 /* This is a helper function to initialize a message. */
256 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
257 static Int transportCloseEndpoint(int fd);
258 static Int transportGet(int sock, MessageQ_Msg * retMsg);
259 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
261 /* =============================================================================
262 * APIS
263 * =============================================================================
264 */
265 /* Function to get default configuration for the MessageQ module.
266 *
267 */
268 Void MessageQ_getConfig (MessageQ_Config * cfg)
269 {
270 Int status;
271 LAD_ClientHandle handle;
272 struct LAD_CommandObj cmd;
273 union LAD_ResponseObj rsp;
275 assert (cfg != NULL);
277 handle = LAD_findHandle();
278 if (handle == LAD_MAXNUMCLIENTS) {
279 PRINTVERBOSE1(
280 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
281 getpid())
283 return;
284 }
286 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
287 cmd.clientId = handle;
289 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
290 PRINTVERBOSE1(
291 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
292 return;
293 }
295 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
296 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
297 return;
298 }
299 status = rsp.messageQGetConfig.status;
301 PRINTVERBOSE2(
302 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
303 handle, status)
305 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
307 return;
308 }
310 /* Function to setup the MessageQ module. */
311 Int MessageQ_setup (const MessageQ_Config * cfg)
312 {
313 Int status;
314 LAD_ClientHandle handle;
315 struct LAD_CommandObj cmd;
316 union LAD_ResponseObj rsp;
317 Int i;
319 handle = LAD_findHandle();
320 if (handle == LAD_MAXNUMCLIENTS) {
321 PRINTVERBOSE1(
322 "MessageQ_setup: can't find connection to daemon for pid %d\n",
323 getpid())
325 return MessageQ_E_RESOURCE;
326 }
328 cmd.cmd = LAD_MESSAGEQ_SETUP;
329 cmd.clientId = handle;
330 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
332 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
333 PRINTVERBOSE1(
334 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
335 return MessageQ_E_FAIL;
336 }
338 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
339 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
340 return(status);
341 }
342 status = rsp.setup.status;
344 PRINTVERBOSE2(
345 "MessageQ_setup: got LAD response for client %d, status=%d\n",
346 handle, status)
348 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
349 MessageQ_module->seqNum = 0;
351 /* Create a default local gate. */
352 pthread_mutex_init (&(MessageQ_module->gate), NULL);
354 /* Clear sockets array. */
355 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
356 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
357 }
360 return status;
361 }
363 /*
364 * Function to destroy the MessageQ module.
365 * Destroys socket/protocol maps; sockets themselves should have been
366 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
367 */
368 Int MessageQ_destroy (void)
369 {
370 Int status;
371 LAD_ClientHandle handle;
372 struct LAD_CommandObj cmd;
373 union LAD_ResponseObj rsp;
375 handle = LAD_findHandle();
376 if (handle == LAD_MAXNUMCLIENTS) {
377 PRINTVERBOSE1(
378 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
379 getpid())
381 return MessageQ_E_RESOURCE;
382 }
384 cmd.cmd = LAD_MESSAGEQ_DESTROY;
385 cmd.clientId = handle;
387 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
388 PRINTVERBOSE1(
389 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
390 return MessageQ_E_FAIL;
391 }
393 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
394 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
395 return(status);
396 }
397 status = rsp.status;
399 PRINTVERBOSE2(
400 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
401 handle, status)
403 return status;
404 }
406 /* Function to initialize the parameters for the MessageQ instance. */
407 Void MessageQ_Params_init (MessageQ_Params * params)
408 {
409 memcpy (params, &(MessageQ_module->defaultInstParams),
410 sizeof (MessageQ_Params));
412 return;
413 }
415 /*
416 * Function to create a MessageQ object for receiving.
417 *
418 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
419 * order to get messages dispatched to this messageQ.
420 */
421 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
422 {
423 Int status = MessageQ_S_SUCCESS;
424 MessageQ_Object * obj = NULL;
425 UInt16 queueIndex = 0u;
426 UInt16 procId;
427 UInt16 rprocId;
428 LAD_ClientHandle handle;
429 struct LAD_CommandObj cmd;
430 union LAD_ResponseObj rsp;
432 handle = LAD_findHandle();
433 if (handle == LAD_MAXNUMCLIENTS) {
434 PRINTVERBOSE1(
435 "MessageQ_create: can't find connection to daemon for pid %d\n",
436 getpid())
438 return NULL;
439 }
441 cmd.cmd = LAD_MESSAGEQ_CREATE;
442 cmd.clientId = handle;
443 strncpy(cmd.args.messageQCreate.name, name,
444 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
445 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
446 if (params) {
447 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
448 }
450 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
451 PRINTVERBOSE1(
452 "MessageQ_create: sending LAD command failed, status=%d\n", status)
453 return NULL;
454 }
456 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
457 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
458 return NULL;
459 }
460 status = rsp.messageQCreate.status;
462 PRINTVERBOSE2(
463 "MessageQ_create: got LAD response for client %d, status=%d\n",
464 handle, status)
466 if (status == -1) {
467 PRINTVERBOSE1(
468 "MessageQ_create: MessageQ server operation failed, status=%d\n",
469 status)
470 return NULL;
471 }
473 /* Create the generic obj */
474 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
476 if (params != NULL) {
477 /* Populate the params member */
478 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
479 }
481 procId = MultiProc_self();
482 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
483 obj->queue = rsp.messageQCreate.queueId;
484 obj->serverHandle = rsp.messageQCreate.serverHandle;
486 /*
487 * Create a set of communication endpoints (one per each remote proc),
488 * and return the socket as target for MessageQ_put() calls, and as
489 * a file descriptor to close during MessageQ_delete().
490 */
491 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
492 obj->fd[rprocId] = Transport_INVALIDSOCKET;
493 if (procId == rprocId) {
494 /* Skip creating an endpoint for ourself. */
495 continue;
496 }
498 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
500 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
501 queueIndex);
502 if (status < 0) {
503 goto cleanup;
504 }
505 }
507 /*
508 * Now, to support MessageQ_unblock() functionality, create an event object.
509 * Writing to this event will unblock the select() call in MessageQ_get().
510 */
511 obj->unblockFd = eventfd(0, 0);
512 if (obj->unblockFd == -1) {
513 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
514 errno, strerror(errno));
515 status = MessageQ_E_FAIL;
516 }
518 cleanup:
519 /* Cleanup if fail: */
520 if (status < 0) {
521 MessageQ_delete((MessageQ_Handle *)&obj);
522 }
524 return ((MessageQ_Handle) obj);
525 }
527 /*
528 * Function to delete a MessageQ object for a specific slave processor.
529 *
530 * Deletes the socket associated with this MessageQ object.
531 */
532 Int MessageQ_delete (MessageQ_Handle * handlePtr)
533 {
534 Int status = MessageQ_S_SUCCESS;
535 MessageQ_Object * obj = NULL;
536 UInt16 rprocId;
537 LAD_ClientHandle handle;
538 struct LAD_CommandObj cmd;
539 union LAD_ResponseObj rsp;
541 handle = LAD_findHandle();
542 if (handle == LAD_MAXNUMCLIENTS) {
543 PRINTVERBOSE1(
544 "MessageQ_delete: can't find connection to daemon for pid %d\n",
545 getpid())
547 return MessageQ_E_FAIL;
548 }
550 obj = (MessageQ_Object *) (*handlePtr);
552 cmd.cmd = LAD_MESSAGEQ_DELETE;
553 cmd.clientId = handle;
554 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
556 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
557 PRINTVERBOSE1(
558 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
559 return MessageQ_E_FAIL;
560 }
562 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
563 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
564 return MessageQ_E_FAIL;
565 }
566 status = rsp.messageQDelete.status;
568 PRINTVERBOSE2(
569 "MessageQ_delete: got LAD response for client %d, status=%d\n",
570 handle, status)
573 /* Close the event used for MessageQ_unblock(): */
574 close(obj->unblockFd);
576 /* Close the communication endpoint: */
577 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
578 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
579 status = transportCloseEndpoint(obj->fd[rprocId]);
580 }
581 }
583 /* Now free the obj */
584 free (obj);
585 *handlePtr = NULL;
587 return (status);
588 }
590 /*
591 * Opens an instance of MessageQ for sending.
592 *
593 * We need not create a socket here; the sockets for all remote processors
594 * were created during MessageQ_attach(), and will be
595 * retrieved during MessageQ_put().
596 */
597 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
598 {
599 Int status = MessageQ_S_SUCCESS;
601 status = NameServer_getUInt32 (MessageQ_module->nameServer,
602 name, queueId, NULL);
604 if (status == NameServer_E_NOTFOUND) {
605 /* Set return queue ID to invalid. */
606 *queueId = MessageQ_INVALIDMESSAGEQ;
607 status = MessageQ_E_NOTFOUND;
608 }
609 else if (status >= 0) {
610 /* Override with a MessageQ status code. */
611 status = MessageQ_S_SUCCESS;
612 }
613 else {
614 /* Set return queue ID to invalid. */
615 *queueId = MessageQ_INVALIDMESSAGEQ;
616 /* Override with a MessageQ status code. */
617 if (status == NameServer_E_TIMEOUT) {
618 status = MessageQ_E_TIMEOUT;
619 }
620 else {
621 status = MessageQ_E_FAIL;
622 }
623 }
625 return (status);
626 }
628 /* Closes previously opened instance of MessageQ module. */
629 Int MessageQ_close (MessageQ_QueueId * queueId)
630 {
631 Int32 status = MessageQ_S_SUCCESS;
633 /* Nothing more to be done for closing the MessageQ. */
634 *queueId = MessageQ_INVALIDMESSAGEQ;
636 return (status);
637 }
639 /*
640 * Place a message onto a message queue.
641 *
642 * Calls TransportShm_put(), which handles the sending of the message using the
643 * appropriate kernel interface (socket, device ioctl) call for the remote
644 * procId encoded in the queueId argument.
645 *
646 */
647 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
648 {
649 Int status;
650 UInt16 dstProcId = (UInt16)(queueId >> 16);
651 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
653 msg->dstId = queueIndex;
654 msg->dstProc = dstProcId;
656 status = transportPut(msg, queueIndex, dstProcId);
658 return (status);
659 }
661 /*
662 * Gets a message for a message queue and blocks if the queue is empty.
663 * If a message is present, it returns it. Otherwise it blocks
664 * waiting for a message to arrive.
665 * When a message is returned, it is owned by the caller.
666 *
667 * We block using select() on the receiving socket's file descriptor, then
668 * get the waiting message via the socket API recvfrom().
669 * We use the socket stored in the messageQ object via a previous call to
670 * MessageQ_create().
671 *
672 */
673 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
674 {
675 Int status = MessageQ_S_SUCCESS;
676 Int tmpStatus;
677 MessageQ_Object * obj = (MessageQ_Object *) handle;
678 int retval;
679 int nfds;
680 fd_set rfds;
681 struct timeval tv;
682 void *timevalPtr;
683 UInt16 rprocId;
684 int maxfd = 0;
686 /* Wait (with timeout) and retreive message from socket: */
687 FD_ZERO(&rfds);
688 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
689 if (rprocId == MultiProc_self()) {
690 continue;
691 }
692 maxfd = MAX(maxfd, obj->fd[rprocId]);
693 FD_SET(obj->fd[rprocId], &rfds);
694 }
696 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
697 FD_SET(obj->unblockFd, &rfds);
699 if (timeout == MessageQ_FOREVER) {
700 timevalPtr = NULL;
701 }
702 else {
703 /* Timeout given in msec: convert: */
704 tv.tv_sec = timeout / 1000;
705 tv.tv_usec = (timeout % 1000) * 1000;
706 timevalPtr = &tv;
707 }
708 /* Add one to last fd created: */
709 nfds = MAX(maxfd, obj->unblockFd) + 1;
711 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
712 if (retval) {
713 if (FD_ISSET(obj->unblockFd, &rfds)) {
714 /*
715 * Our event was signalled by MessageQ_unblock().
716 *
717 * This is typically done during a shutdown sequence, where
718 * the intention of the client would be to ignore (i.e. not fetch)
719 * any pending messages in the transport's queue.
720 * Thus, we shall not check for nor return any messages.
721 */
722 *msg = NULL;
723 status = MessageQ_E_UNBLOCKED;
724 }
725 else {
726 for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
727 rprocId++) {
728 if (rprocId == MultiProc_self()) {
729 continue;
730 }
731 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
732 /* Our transport's fd was signalled: Get the message: */
733 tmpStatus = transportGet(obj->fd[rprocId], msg);
734 if (tmpStatus < 0) {
735 printf ("MessageQ_get: tranposrtshm_get failed.");
736 status = MessageQ_E_FAIL;
737 }
738 }
739 }
740 }
741 }
742 else if (retval == 0) {
743 *msg = NULL;
744 status = MessageQ_E_TIMEOUT;
745 }
747 return (status);
748 }
750 /*
751 * Return a count of the number of messages in the queue
752 *
753 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
754 */
755 Int MessageQ_count (MessageQ_Handle handle)
756 {
757 Int count = -1;
758 #if 0
759 MessageQ_Object * obj = (MessageQ_Object *) handle;
760 socklen_t optlen;
762 /*
763 * TBD: Need to find a way to implement (if anyone uses it!), and
764 * push down into transport..
765 */
767 /*
768 * 2nd arg to getsockopt should be transport independent, but using
769 * SSKPROTO_SHMFIFO for now:
770 */
771 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
772 &count, &optlen);
773 #endif
775 return (count);
776 }
778 /* Initializes a message not obtained from MessageQ_alloc. */
779 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
780 {
781 /* Fill in the fields of the message */
782 MessageQ_msgInit (msg);
783 msg->heapId = MessageQ_STATICMSG;
784 msg->msgSize = size;
785 }
787 /*
788 * Allocate a message and initialize the needed fields (note some
789 * of the fields in the header are set via other APIs or in the
790 * MessageQ_put function,
791 */
792 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
793 {
794 MessageQ_Msg msg = NULL;
796 /*
797 * heapId not used for local alloc (as this is over a copy transport), but
798 * we need to send to other side as heapId is used in BIOS transport:
799 */
800 msg = (MessageQ_Msg)calloc (1, size);
801 MessageQ_msgInit (msg);
802 msg->msgSize = size;
803 msg->heapId = heapId;
805 return msg;
806 }
808 /* Frees the message back to the heap that was used to allocate it. */
809 Int MessageQ_free (MessageQ_Msg msg)
810 {
811 UInt32 status = MessageQ_S_SUCCESS;
813 /* Check to ensure this was not allocated by user: */
814 if (msg->heapId == MessageQ_STATICMSG) {
815 status = MessageQ_E_CANNOTFREESTATICMSG;
816 }
817 else {
818 free (msg);
819 }
821 return status;
822 }
824 /* Register a heap with MessageQ. */
825 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
826 {
827 Int status = MessageQ_S_SUCCESS;
829 /* Do nothing, as this uses a copy transport: */
831 return status;
832 }
834 /* Unregister a heap with MessageQ. */
835 Int MessageQ_unregisterHeap (UInt16 heapId)
836 {
837 Int status = MessageQ_S_SUCCESS;
839 /* Do nothing, as this uses a copy transport: */
841 return status;
842 }
844 /* Unblocks a MessageQ */
845 Void MessageQ_unblock (MessageQ_Handle handle)
846 {
847 MessageQ_Object * obj = (MessageQ_Object *) handle;
848 uint64_t buf = 1;
849 int numBytes;
851 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
852 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
853 }
855 /* Embeds a source message queue into a message. */
856 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
857 {
858 MessageQ_Object * obj = (MessageQ_Object *) handle;
860 msg->replyId = (UInt16)(obj->queue);
861 msg->replyProc = (UInt16)(obj->queue >> 16);
862 }
864 /* Returns the QueueId associated with the handle. */
865 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
866 {
867 MessageQ_Object * obj = (MessageQ_Object *) handle;
868 UInt32 queueId;
870 queueId = (obj->queue);
872 return queueId;
873 }
875 /* Sets the tracing of a message */
876 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
877 {
878 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
879 }
881 /*
882 * Returns the amount of shared memory used by one transport instance.
883 *
884 * The MessageQ module itself does not use any shared memory but the
885 * underlying transport may use some shared memory.
886 */
887 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
888 {
889 SizeT memReq = 0u;
891 /* Do nothing, as this is a copy transport. */
893 return (memReq);
894 }
896 /*
897 * Create a socket for this remote proc, and attempt to connect.
898 *
899 * Only creates a socket if one does not already exist for this procId.
900 *
901 * Note: remoteProcId may be MultiProc_Self() for loopback case.
902 */
903 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
904 {
905 Int status = MessageQ_S_SUCCESS;
906 int sock;
908 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
910 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
911 status = MessageQ_E_INVALIDPROCID;
912 goto exit;
913 }
915 pthread_mutex_lock (&(MessageQ_module->gate));
917 /* Only create a socket if one doesn't exist: */
918 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
919 /* Create the socket for sending messages to the remote proc: */
920 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
921 if (sock < 0) {
922 status = MessageQ_E_FAIL;
923 printf ("MessageQ_attach: socket failed: %d, %s\n",
924 errno, strerror(errno));
925 }
926 else {
927 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
928 MessageQ_module->sock[remoteProcId] = sock;
929 /* Attempt to connect: */
930 ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
931 }
932 }
933 else {
934 status = MessageQ_E_ALREADYEXISTS;
935 }
937 pthread_mutex_unlock (&(MessageQ_module->gate));
939 exit:
940 return (status);
941 }
943 /*
944 * Close the socket for this remote proc.
945 *
946 */
947 Int MessageQ_detach (UInt16 remoteProcId)
948 {
949 Int status = MessageQ_S_SUCCESS;
950 int sock;
952 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
953 status = MessageQ_E_INVALIDPROCID;
954 goto exit;
955 }
957 pthread_mutex_lock (&(MessageQ_module->gate));
959 sock = MessageQ_module->sock[remoteProcId];
960 if (close (sock)) {
961 status = MessageQ_E_OSFAILURE;
962 printf ("MessageQ_detach: close failed: %d, %s\n",
963 errno, strerror(errno));
964 }
965 else {
966 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
967 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
968 }
970 pthread_mutex_unlock (&(MessageQ_module->gate));
972 exit:
973 return (status);
974 }
976 /*
977 * This is a helper function to initialize a message.
978 */
979 Void MessageQ_msgInit (MessageQ_Msg msg)
980 {
981 #if 0
982 Int status = MessageQ_S_SUCCESS;
983 LAD_ClientHandle handle;
984 struct LAD_CommandObj cmd;
985 union LAD_ResponseObj rsp;
987 handle = LAD_findHandle();
988 if (handle == LAD_MAXNUMCLIENTS) {
989 PRINTVERBOSE1(
990 "MessageQ_setup: can't find connection to daemon for pid %d\n",
991 getpid())
993 return;
994 }
996 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
997 cmd.clientId = handle;
999 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1000 PRINTVERBOSE1(
1001 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1002 return;
1003 }
1005 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1006 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1007 return;
1008 }
1009 status = rsp.msgInit.status;
1011 PRINTVERBOSE2(
1012 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1013 handle, status)
1015 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
1016 #else
1017 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
1018 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1019 msg->msgId = MessageQ_INVALIDMSGID;
1020 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
1021 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1022 msg->srcProc = MultiProc_self();
1024 pthread_mutex_lock(&(MessageQ_module->gate));
1025 msg->seqNum = MessageQ_module->seqNum++;
1026 pthread_mutex_unlock(&(MessageQ_module->gate));
1027 #endif
1028 }
1030 /*
1031 * =============================================================================
1032 * Transport: Fxns kept here until need for a transport layer is realized.
1033 * =============================================================================
1034 */
1035 /*
1036 * ======== transportCreateEndpoint ========
1037 *
1038 * Create a communication endpoint to receive messages.
1039 */
1040 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
1041 {
1042 Int status = MessageQ_S_SUCCESS;
1043 int err;
1045 /* Create the socket to receive messages for this messageQ. */
1046 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
1047 if (*fd < 0) {
1048 status = MessageQ_E_FAIL;
1049 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
1050 errno, strerror(errno));
1051 goto exit;
1052 }
1054 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1056 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1057 if (err < 0) {
1058 status = MessageQ_E_FAIL;
1059 printf ("transportCreateEndpoint: bind failed: %d, %s\n",
1060 errno, strerror(errno));
1061 }
1063 exit:
1064 return (status);
1065 }
1067 /*
1068 * ======== transportCloseEndpoint ========
1069 *
1070 * Close the communication endpoint.
1071 */
1072 static Int transportCloseEndpoint(int fd)
1073 {
1074 Int status = MessageQ_S_SUCCESS;
1076 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1078 /* Stop communication to this socket: */
1079 close(fd);
1081 return (status);
1082 }
1084 /*
1085 * ======== transportGet ========
1086 * Retrieve a message waiting in the socket's queue.
1087 */
1088 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1089 {
1090 Int status = MessageQ_S_SUCCESS;
1091 MessageQ_Msg msg;
1092 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1093 unsigned int len;
1094 int byteCount;
1096 /*
1097 * We have no way of peeking to see what message size we'll get, so we
1098 * allocate a message of max size to receive contents from the rpmsg socket
1099 * (currently, a copy transport)
1100 */
1101 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1102 if (!msg) {
1103 status = MessageQ_E_MEMORY;
1104 goto exit;
1105 }
1107 memset(&fromAddr, 0, sizeof(fromAddr));
1108 len = sizeof(fromAddr);
1110 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1111 (struct sockaddr *)&fromAddr, &len);
1112 if (len != sizeof(fromAddr)) {
1113 printf("recvfrom: got bad addr len (%d)\n", len);
1114 status = MessageQ_E_FAIL;
1115 goto exit;
1116 }
1117 if (byteCount < 0) {
1118 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1119 status = MessageQ_E_FAIL;
1120 goto exit;
1121 }
1122 else {
1123 /* Update the allocated message size (even though this may waste space
1124 * when the actual message is smaller than the maximum rpmsg size,
1125 * the message will be freed soon anyway, and it avoids an extra copy).
1126 */
1127 msg->msgSize = byteCount;
1129 /*
1130 * If the message received was statically allocated, reset the
1131 * heapId, so the app can free it.
1132 */
1133 if (msg->heapId == MessageQ_STATICMSG) {
1134 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1135 }
1136 }
1138 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1139 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1140 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1142 *retMsg = msg;
1144 exit:
1145 return (status);
1146 }
1148 /*
1149 * ======== transportPut ========
1150 *
1151 * Calls the socket API sendto() on the socket associated with
1152 * with this destination procID.
1153 * Currently, both local and remote messages are sent via the Socket ABI, so
1154 * no local object lists are maintained here.
1155 */
1156 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1157 {
1158 Int status = MessageQ_S_SUCCESS;
1159 int sock;
1160 int err;
1162 /*
1163 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1164 * transport.
1165 */
1166 sock = MessageQ_module->sock[dstProcId];
1168 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1170 err = send(sock, msg, msg->msgSize, 0);
1171 if (err < 0) {
1172 printf ("transportPut: send failed: %d, %s\n",
1173 errno, strerror(errno));
1174 status = MessageQ_E_FAIL;
1175 }
1177 /*
1178 * Free the message, as this is a copy transport, we maintain MessageQ
1179 * semantics.
1180 */
1181 MessageQ_free (msg);
1183 return (status);
1184 }