1 /*
2 * Copyright (c) 2012-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
44 /* Standard IPC header */
45 #include <ti/ipc/Std.h>
47 /* Linux specific header files, replacing OSAL: */
48 #include <pthread.h>
50 /* Module level headers */
51 #include <ti/ipc/NameServer.h>
52 #include <ti/ipc/MultiProc.h>
53 #include <_MultiProc.h>
54 #include <ti/ipc/MessageQ.h>
55 #include <_MessageQ.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/socket.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
71 /* Socket Protocol Family */
72 #include <net/rpmsg.h>
74 /* Socket utils: */
75 #include <SocketFxns.h>
77 #include <ladclient.h>
78 #include <_lad.h>
80 /* =============================================================================
81 * Macros/Constants
82 * =============================================================================
83 */
85 /*!
86 * @brief Name of the reserved NameServer used for MessageQ.
87 */
88 #define MessageQ_NAMESERVER "MessageQ"
90 /*!
91 * @brief Value of an invalid socket ID:
92 */
93 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
95 /* More magic rpmsg port numbers: */
96 #define MESSAGEQ_RPMSG_PORT 61
97 #define MESSAGEQ_RPMSG_MAXSIZE 512
99 /* Trace flag settings: */
100 #define TRACESHIFT 12
101 #define TRACEMASK 0x1000
103 /* Define BENCHMARK to quiet key MessageQ APIs: */
104 //#define BENCHMARK
106 /* =============================================================================
107 * Structures & Enums
108 * =============================================================================
109 */
111 /* structure for MessageQ module state */
112 typedef struct MessageQ_ModuleObject {
113 Int refCount;
114 /*!< Reference count */
115 NameServer_Handle nameServer;
116 /*!< Handle to the local NameServer used for storing GP objects */
117 pthread_mutex_t gate;
118 /*!< Handle of gate to be used for local thread safety */
119 MessageQ_Params defaultInstParams;
120 /*!< Default instance creation parameters */
121 int sock[MultiProc_MAXPROCESSORS];
122 /*!< Sockets to for sending to each remote processor */
123 int seqNum;
124 /*!< Process-specific sequence number */
125 } MessageQ_ModuleObject;
127 /*!
128 * @brief Structure for the Handle for the MessageQ.
129 */
130 typedef struct MessageQ_Object_tag {
131 MessageQ_Params params;
132 /*! Instance specific creation parameters */
133 MessageQ_QueueId queue;
134 /* Unique id */
135 int fd[MultiProc_MAXPROCESSORS];
136 /* File Descriptor to block on messages from remote processors. */
137 int unblockFd;
138 /* Write this fd to unblock the select() call in MessageQ _get() */
139 void *serverHandle;
140 } MessageQ_Object;
142 static Bool verbose = FALSE;
145 /* =============================================================================
146 * Globals
147 * =============================================================================
148 */
149 static MessageQ_ModuleObject MessageQ_state =
150 {
151 .refCount = 0,
152 .nameServer = NULL,
153 };
155 /*!
156 * @var MessageQ_module
157 *
158 * @brief Pointer to the MessageQ module state.
159 */
160 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
163 /* =============================================================================
164 * Forward declarations of internal functions
165 * =============================================================================
166 */
168 /* This is a helper function to initialize a message. */
169 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
170 static Int transportCloseEndpoint(int fd);
171 static Int transportGet(int sock, MessageQ_Msg * retMsg);
172 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
178 /* Function to get default configuration for the MessageQ module.
179 *
180 */
181 Void MessageQ_getConfig (MessageQ_Config * cfg)
182 {
183 Int status;
184 LAD_ClientHandle handle;
185 struct LAD_CommandObj cmd;
186 union LAD_ResponseObj rsp;
188 assert (cfg != NULL);
190 handle = LAD_findHandle();
191 if (handle == LAD_MAXNUMCLIENTS) {
192 PRINTVERBOSE1(
193 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
194 getpid())
196 return;
197 }
199 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
200 cmd.clientId = handle;
202 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
203 PRINTVERBOSE1(
204 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
205 return;
206 }
208 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
209 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
210 return;
211 }
212 status = rsp.messageQGetConfig.status;
214 PRINTVERBOSE2(
215 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
216 handle, status)
218 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
220 return;
221 }
223 /* Function to setup the MessageQ module. */
224 Int MessageQ_setup (const MessageQ_Config * cfg)
225 {
226 Int status;
227 LAD_ClientHandle handle;
228 struct LAD_CommandObj cmd;
229 union LAD_ResponseObj rsp;
230 Int i;
232 handle = LAD_findHandle();
233 if (handle == LAD_MAXNUMCLIENTS) {
234 PRINTVERBOSE1(
235 "MessageQ_setup: can't find connection to daemon for pid %d\n",
236 getpid())
238 return MessageQ_E_RESOURCE;
239 }
241 cmd.cmd = LAD_MESSAGEQ_SETUP;
242 cmd.clientId = handle;
243 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
245 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
246 PRINTVERBOSE1(
247 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
248 return MessageQ_E_FAIL;
249 }
251 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
252 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
253 return(status);
254 }
255 status = rsp.setup.status;
257 PRINTVERBOSE2(
258 "MessageQ_setup: got LAD response for client %d, status=%d\n",
259 handle, status)
261 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
262 MessageQ_module->seqNum = 0;
264 /* Create a default local gate. */
265 pthread_mutex_init (&(MessageQ_module->gate), NULL);
267 /* Clear sockets array. */
268 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
269 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
270 }
273 return status;
274 }
276 /*
277 * Function to destroy the MessageQ module.
278 * Destroys socket/protocol maps; sockets themselves should have been
279 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
280 */
281 Int MessageQ_destroy (void)
282 {
283 Int status;
284 LAD_ClientHandle handle;
285 struct LAD_CommandObj cmd;
286 union LAD_ResponseObj rsp;
288 handle = LAD_findHandle();
289 if (handle == LAD_MAXNUMCLIENTS) {
290 PRINTVERBOSE1(
291 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
292 getpid())
294 return MessageQ_E_RESOURCE;
295 }
297 cmd.cmd = LAD_MESSAGEQ_DESTROY;
298 cmd.clientId = handle;
300 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
301 PRINTVERBOSE1(
302 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
303 return MessageQ_E_FAIL;
304 }
306 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
307 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
308 return(status);
309 }
310 status = rsp.status;
312 PRINTVERBOSE2(
313 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
314 handle, status)
316 return status;
317 }
319 /* Function to initialize the parameters for the MessageQ instance. */
320 Void MessageQ_Params_init (MessageQ_Params * params)
321 {
322 memcpy (params, &(MessageQ_module->defaultInstParams),
323 sizeof (MessageQ_Params));
325 return;
326 }
328 /*
329 * Function to create a MessageQ object for receiving.
330 *
331 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
332 * order to get messages dispatched to this messageQ.
333 */
334 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
335 {
336 Int status = MessageQ_S_SUCCESS;
337 MessageQ_Object * obj = NULL;
338 UInt16 queueIndex = 0u;
339 UInt16 procId;
340 UInt16 rprocId;
341 LAD_ClientHandle handle;
342 struct LAD_CommandObj cmd;
343 union LAD_ResponseObj rsp;
345 handle = LAD_findHandle();
346 if (handle == LAD_MAXNUMCLIENTS) {
347 PRINTVERBOSE1(
348 "MessageQ_create: can't find connection to daemon for pid %d\n",
349 getpid())
351 return NULL;
352 }
354 cmd.cmd = LAD_MESSAGEQ_CREATE;
355 cmd.clientId = handle;
356 strncpy(cmd.args.messageQCreate.name, name,
357 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
358 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
359 if (params) {
360 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
361 }
363 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
364 PRINTVERBOSE1(
365 "MessageQ_create: sending LAD command failed, status=%d\n", status)
366 return NULL;
367 }
369 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
370 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
371 return NULL;
372 }
373 status = rsp.messageQCreate.status;
375 PRINTVERBOSE2(
376 "MessageQ_create: got LAD response for client %d, status=%d\n",
377 handle, status)
379 if (status == -1) {
380 PRINTVERBOSE1(
381 "MessageQ_create: MessageQ server operation failed, status=%d\n",
382 status)
383 return NULL;
384 }
386 /* Create the generic obj */
387 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
389 if (params != NULL) {
390 /* Populate the params member */
391 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
392 }
394 procId = MultiProc_self();
395 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
396 obj->queue = rsp.messageQCreate.queueId;
397 obj->serverHandle = rsp.messageQCreate.serverHandle;
399 /*
400 * Create a set of communication endpoints (one per each remote proc),
401 * and return the socket as target for MessageQ_put() calls, and as
402 * a file descriptor to close during MessageQ_delete().
403 */
404 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
405 obj->fd[rprocId] = Transport_INVALIDSOCKET;
406 if (procId == rprocId) {
407 /* Skip creating an endpoint for ourself. */
408 continue;
409 }
411 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
413 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
414 queueIndex);
415 if (status < 0) {
416 goto cleanup;
417 }
418 }
420 /*
421 * Now, to support MessageQ_unblock() functionality, create an event object.
422 * Writing to this event will unblock the select() call in MessageQ_get().
423 */
424 obj->unblockFd = eventfd(0, 0);
425 if (obj->unblockFd == -1) {
426 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
427 errno, strerror(errno));
428 status = MessageQ_E_FAIL;
429 }
431 cleanup:
432 /* Cleanup if fail: */
433 if (status < 0) {
434 MessageQ_delete((MessageQ_Handle *)&obj);
435 }
437 return ((MessageQ_Handle) obj);
438 }
440 /*
441 * Function to delete a MessageQ object for a specific slave processor.
442 *
443 * Deletes the socket associated with this MessageQ object.
444 */
445 Int MessageQ_delete (MessageQ_Handle * handlePtr)
446 {
447 Int status = MessageQ_S_SUCCESS;
448 MessageQ_Object * obj = NULL;
449 UInt16 rprocId;
450 LAD_ClientHandle handle;
451 struct LAD_CommandObj cmd;
452 union LAD_ResponseObj rsp;
454 handle = LAD_findHandle();
455 if (handle == LAD_MAXNUMCLIENTS) {
456 PRINTVERBOSE1(
457 "MessageQ_delete: can't find connection to daemon for pid %d\n",
458 getpid())
460 return MessageQ_E_FAIL;
461 }
463 obj = (MessageQ_Object *) (*handlePtr);
465 cmd.cmd = LAD_MESSAGEQ_DELETE;
466 cmd.clientId = handle;
467 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
469 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
470 PRINTVERBOSE1(
471 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
472 return MessageQ_E_FAIL;
473 }
475 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
476 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
477 return MessageQ_E_FAIL;
478 }
479 status = rsp.messageQDelete.status;
481 PRINTVERBOSE2(
482 "MessageQ_delete: got LAD response for client %d, status=%d\n",
483 handle, status)
486 /* Close the event used for MessageQ_unblock(): */
487 close(obj->unblockFd);
489 /* Close the communication endpoint: */
490 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
491 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
492 status = transportCloseEndpoint(obj->fd[rprocId]);
493 }
494 }
496 /* Now free the obj */
497 free (obj);
498 *handlePtr = NULL;
500 return (status);
501 }
503 /*
504 * Opens an instance of MessageQ for sending.
505 *
506 * We need not create a socket here; the sockets for all remote processors
507 * were created during MessageQ_attach(), and will be
508 * retrieved during MessageQ_put().
509 */
510 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
511 {
512 Int status = MessageQ_S_SUCCESS;
514 status = NameServer_getUInt32 (MessageQ_module->nameServer,
515 name, queueId, NULL);
517 if (status == NameServer_E_NOTFOUND) {
518 /* Set return queue ID to invalid. */
519 *queueId = MessageQ_INVALIDMESSAGEQ;
520 status = MessageQ_E_NOTFOUND;
521 }
522 else if (status >= 0) {
523 /* Override with a MessageQ status code. */
524 status = MessageQ_S_SUCCESS;
525 }
526 else {
527 /* Set return queue ID to invalid. */
528 *queueId = MessageQ_INVALIDMESSAGEQ;
529 /* Override with a MessageQ status code. */
530 if (status == NameServer_E_TIMEOUT) {
531 status = MessageQ_E_TIMEOUT;
532 }
533 else {
534 status = MessageQ_E_FAIL;
535 }
536 }
538 return (status);
539 }
541 /* Closes previously opened instance of MessageQ module. */
542 Int MessageQ_close (MessageQ_QueueId * queueId)
543 {
544 Int32 status = MessageQ_S_SUCCESS;
546 /* Nothing more to be done for closing the MessageQ. */
547 *queueId = MessageQ_INVALIDMESSAGEQ;
549 return (status);
550 }
552 /*
553 * Place a message onto a message queue.
554 *
555 * Calls TransportShm_put(), which handles the sending of the message using the
556 * appropriate kernel interface (socket, device ioctl) call for the remote
557 * procId encoded in the queueId argument.
558 *
559 */
560 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
561 {
562 Int status;
563 UInt16 dstProcId = (UInt16)(queueId >> 16);
564 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
566 msg->dstId = queueIndex;
567 msg->dstProc = dstProcId;
569 status = transportPut(msg, queueIndex, dstProcId);
571 return (status);
572 }
574 /*
575 * Gets a message for a message queue and blocks if the queue is empty.
576 * If a message is present, it returns it. Otherwise it blocks
577 * waiting for a message to arrive.
578 * When a message is returned, it is owned by the caller.
579 *
580 * We block using select() on the receiving socket's file descriptor, then
581 * get the waiting message via the socket API recvfrom().
582 * We use the socket stored in the messageQ object via a previous call to
583 * MessageQ_create().
584 *
585 */
586 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
587 {
588 Int status = MessageQ_S_SUCCESS;
589 Int tmpStatus;
590 MessageQ_Object * obj = (MessageQ_Object *) handle;
591 int retval;
592 int nfds;
593 fd_set rfds;
594 struct timeval tv;
595 void *timevalPtr;
596 UInt16 rprocId;
597 int maxfd = 0;
599 /* Wait (with timeout) and retreive message from socket: */
600 FD_ZERO(&rfds);
601 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
602 if (rprocId == MultiProc_self()) {
603 continue;
604 }
605 maxfd = MAX(maxfd, obj->fd[rprocId]);
606 FD_SET(obj->fd[rprocId], &rfds);
607 }
609 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
610 FD_SET(obj->unblockFd, &rfds);
612 if (timeout == MessageQ_FOREVER) {
613 timevalPtr = NULL;
614 }
615 else {
616 /* Timeout given in msec: convert: */
617 tv.tv_sec = timeout / 1000;
618 tv.tv_usec = (timeout % 1000) * 1000;
619 timevalPtr = &tv;
620 }
621 /* Add one to last fd created: */
622 nfds = MAX(maxfd, obj->unblockFd) + 1;
624 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
625 if (retval) {
626 if (FD_ISSET(obj->unblockFd, &rfds)) {
627 /*
628 * Our event was signalled by MessageQ_unblock().
629 *
630 * This is typically done during a shutdown sequence, where
631 * the intention of the client would be to ignore (i.e. not fetch)
632 * any pending messages in the transport's queue.
633 * Thus, we shall not check for nor return any messages.
634 */
635 *msg = NULL;
636 status = MessageQ_E_UNBLOCKED;
637 }
638 else {
639 for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
640 rprocId++) {
641 if (rprocId == MultiProc_self()) {
642 continue;
643 }
644 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
645 /* Our transport's fd was signalled: Get the message: */
646 tmpStatus = transportGet(obj->fd[rprocId], msg);
647 if (tmpStatus < 0) {
648 printf ("MessageQ_get: tranposrtshm_get failed.");
649 status = MessageQ_E_FAIL;
650 }
651 }
652 }
653 }
654 }
655 else if (retval == 0) {
656 *msg = NULL;
657 status = MessageQ_E_TIMEOUT;
658 }
660 return (status);
661 }
663 /*
664 * Return a count of the number of messages in the queue
665 *
666 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
667 */
668 Int MessageQ_count (MessageQ_Handle handle)
669 {
670 Int count = -1;
671 #if 0
672 MessageQ_Object * obj = (MessageQ_Object *) handle;
673 socklen_t optlen;
675 /*
676 * TBD: Need to find a way to implement (if anyone uses it!), and
677 * push down into transport..
678 */
680 /*
681 * 2nd arg to getsockopt should be transport independent, but using
682 * SSKPROTO_SHMFIFO for now:
683 */
684 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
685 &count, &optlen);
686 #endif
688 return (count);
689 }
691 /* Initializes a message not obtained from MessageQ_alloc. */
692 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
693 {
694 /* Fill in the fields of the message */
695 MessageQ_msgInit (msg);
696 msg->heapId = MessageQ_STATICMSG;
697 msg->msgSize = size;
698 }
700 /*
701 * Allocate a message and initialize the needed fields (note some
702 * of the fields in the header are set via other APIs or in the
703 * MessageQ_put function,
704 */
705 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
706 {
707 MessageQ_Msg msg = NULL;
709 /*
710 * heapId not used for local alloc (as this is over a copy transport), but
711 * we need to send to other side as heapId is used in BIOS transport:
712 */
713 msg = (MessageQ_Msg)calloc (1, size);
714 MessageQ_msgInit (msg);
715 msg->msgSize = size;
716 msg->heapId = heapId;
718 return msg;
719 }
721 /* Frees the message back to the heap that was used to allocate it. */
722 Int MessageQ_free (MessageQ_Msg msg)
723 {
724 UInt32 status = MessageQ_S_SUCCESS;
726 /* Check to ensure this was not allocated by user: */
727 if (msg->heapId == MessageQ_STATICMSG) {
728 status = MessageQ_E_CANNOTFREESTATICMSG;
729 }
730 else {
731 free (msg);
732 }
734 return status;
735 }
737 /* Register a heap with MessageQ. */
738 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
739 {
740 Int status = MessageQ_S_SUCCESS;
742 /* Do nothing, as this uses a copy transport: */
744 return status;
745 }
747 /* Unregister a heap with MessageQ. */
748 Int MessageQ_unregisterHeap (UInt16 heapId)
749 {
750 Int status = MessageQ_S_SUCCESS;
752 /* Do nothing, as this uses a copy transport: */
754 return status;
755 }
757 /* Unblocks a MessageQ */
758 Void MessageQ_unblock (MessageQ_Handle handle)
759 {
760 MessageQ_Object * obj = (MessageQ_Object *) handle;
761 uint64_t buf = 1;
762 int numBytes;
764 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
765 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
766 }
768 /* Embeds a source message queue into a message. */
769 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
770 {
771 MessageQ_Object * obj = (MessageQ_Object *) handle;
773 msg->replyId = (UInt16)(obj->queue);
774 msg->replyProc = (UInt16)(obj->queue >> 16);
775 }
777 /* Returns the QueueId associated with the handle. */
778 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
779 {
780 MessageQ_Object * obj = (MessageQ_Object *) handle;
781 UInt32 queueId;
783 queueId = (obj->queue);
785 return queueId;
786 }
788 /* Sets the tracing of a message */
789 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
790 {
791 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
792 }
794 /*
795 * Returns the amount of shared memory used by one transport instance.
796 *
797 * The MessageQ module itself does not use any shared memory but the
798 * underlying transport may use some shared memory.
799 */
800 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
801 {
802 SizeT memReq = 0u;
804 /* Do nothing, as this is a copy transport. */
806 return (memReq);
807 }
809 /*
810 * Create a socket for this remote proc, and attempt to connect.
811 *
812 * Only creates a socket if one does not already exist for this procId.
813 *
814 * Note: remoteProcId may be MultiProc_Self() for loopback case.
815 */
816 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
817 {
818 Int status = MessageQ_S_SUCCESS;
819 int sock;
821 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
823 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
824 status = MessageQ_E_INVALIDPROCID;
825 goto exit;
826 }
828 pthread_mutex_lock (&(MessageQ_module->gate));
830 /* Only create a socket if one doesn't exist: */
831 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
832 /* Create the socket for sending messages to the remote proc: */
833 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
834 if (sock < 0) {
835 status = MessageQ_E_FAIL;
836 printf ("MessageQ_attach: socket failed: %d, %s\n",
837 errno, strerror(errno));
838 }
839 else {
840 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
841 MessageQ_module->sock[remoteProcId] = sock;
842 /* Attempt to connect: */
843 ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
844 }
845 }
846 else {
847 status = MessageQ_E_ALREADYEXISTS;
848 }
850 pthread_mutex_unlock (&(MessageQ_module->gate));
852 exit:
853 return (status);
854 }
856 /*
857 * Close the socket for this remote proc.
858 *
859 */
860 Int MessageQ_detach (UInt16 remoteProcId)
861 {
862 Int status = MessageQ_S_SUCCESS;
863 int sock;
865 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
866 status = MessageQ_E_INVALIDPROCID;
867 goto exit;
868 }
870 pthread_mutex_lock (&(MessageQ_module->gate));
872 sock = MessageQ_module->sock[remoteProcId];
873 if (close (sock)) {
874 status = MessageQ_E_OSFAILURE;
875 printf ("MessageQ_detach: close failed: %d, %s\n",
876 errno, strerror(errno));
877 }
878 else {
879 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
880 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
881 }
883 pthread_mutex_unlock (&(MessageQ_module->gate));
885 exit:
886 return (status);
887 }
889 /*
890 * This is a helper function to initialize a message.
891 */
892 Void MessageQ_msgInit (MessageQ_Msg msg)
893 {
894 #if 0
895 Int status = MessageQ_S_SUCCESS;
896 LAD_ClientHandle handle;
897 struct LAD_CommandObj cmd;
898 union LAD_ResponseObj rsp;
900 handle = LAD_findHandle();
901 if (handle == LAD_MAXNUMCLIENTS) {
902 PRINTVERBOSE1(
903 "MessageQ_setup: can't find connection to daemon for pid %d\n",
904 getpid())
906 return;
907 }
909 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
910 cmd.clientId = handle;
912 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
913 PRINTVERBOSE1(
914 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
915 return;
916 }
918 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
919 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
920 return;
921 }
922 status = rsp.msgInit.status;
924 PRINTVERBOSE2(
925 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
926 handle, status)
928 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
929 #else
930 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
931 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
932 msg->msgId = MessageQ_INVALIDMSGID;
933 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
934 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
935 msg->srcProc = MultiProc_self();
937 pthread_mutex_lock(&(MessageQ_module->gate));
938 msg->seqNum = MessageQ_module->seqNum++;
939 pthread_mutex_unlock(&(MessageQ_module->gate));
940 #endif
941 }
943 /*
944 * =============================================================================
945 * Transport: Fxns kept here until need for a transport layer is realized.
946 * =============================================================================
947 */
948 /*
949 * ======== transportCreateEndpoint ========
950 *
951 * Create a communication endpoint to receive messages.
952 */
953 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
954 {
955 Int status = MessageQ_S_SUCCESS;
956 int err;
958 /* Create the socket to receive messages for this messageQ. */
959 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
960 if (*fd < 0) {
961 status = MessageQ_E_FAIL;
962 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
963 errno, strerror(errno));
964 goto exit;
965 }
967 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
969 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
970 if (err < 0) {
971 status = MessageQ_E_FAIL;
972 printf ("transportCreateEndpoint: bind failed: %d, %s\n",
973 errno, strerror(errno));
974 }
976 exit:
977 return (status);
978 }
980 /*
981 * ======== transportCloseEndpoint ========
982 *
983 * Close the communication endpoint.
984 */
985 static Int transportCloseEndpoint(int fd)
986 {
987 Int status = MessageQ_S_SUCCESS;
989 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
991 /* Stop communication to this socket: */
992 close(fd);
994 return (status);
995 }
997 /*
998 * ======== transportGet ========
999 * Retrieve a message waiting in the socket's queue.
1000 */
1001 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1002 {
1003 Int status = MessageQ_S_SUCCESS;
1004 MessageQ_Msg msg;
1005 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1006 unsigned int len;
1007 int byteCount;
1009 /*
1010 * We have no way of peeking to see what message size we'll get, so we
1011 * allocate a message of max size to receive contents from the rpmsg socket
1012 * (currently, a copy transport)
1013 */
1014 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1015 if (!msg) {
1016 status = MessageQ_E_MEMORY;
1017 goto exit;
1018 }
1020 memset(&fromAddr, 0, sizeof(fromAddr));
1021 len = sizeof(fromAddr);
1023 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1024 (struct sockaddr *)&fromAddr, &len);
1025 if (len != sizeof(fromAddr)) {
1026 printf("recvfrom: got bad addr len (%d)\n", len);
1027 status = MessageQ_E_FAIL;
1028 goto exit;
1029 }
1030 if (byteCount < 0) {
1031 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1032 status = MessageQ_E_FAIL;
1033 goto exit;
1034 }
1035 else {
1036 /* Update the allocated message size (even though this may waste space
1037 * when the actual message is smaller than the maximum rpmsg size,
1038 * the message will be freed soon anyway, and it avoids an extra copy).
1039 */
1040 msg->msgSize = byteCount;
1042 /*
1043 * If the message received was statically allocated, reset the
1044 * heapId, so the app can free it.
1045 */
1046 if (msg->heapId == MessageQ_STATICMSG) {
1047 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1048 }
1049 }
1051 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1052 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1053 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1055 *retMsg = msg;
1057 exit:
1058 return (status);
1059 }
1061 /*
1062 * ======== transportPut ========
1063 *
1064 * Calls the socket API sendto() on the socket associated with
1065 * with this destination procID.
1066 * Currently, both local and remote messages are sent via the Socket ABI, so
1067 * no local object lists are maintained here.
1068 */
1069 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1070 {
1071 Int status = MessageQ_S_SUCCESS;
1072 int sock;
1073 int err;
1075 /*
1076 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1077 * transport.
1078 */
1079 sock = MessageQ_module->sock[dstProcId];
1081 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1083 err = send(sock, msg, msg->msgSize, 0);
1084 if (err < 0) {
1085 printf ("transportPut: send failed: %d, %s\n",
1086 errno, strerror(errno));
1087 status = MessageQ_E_FAIL;
1088 }
1090 /*
1091 * Free the message, as this is a copy transport, we maintain MessageQ
1092 * semantics.
1093 */
1094 MessageQ_free (msg);
1096 return (status);
1097 }