1 /*
2 * Copyright (c) 2012-2014, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ Linux implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <_MessageQ.h>
53 /* Socket Headers */
54 #include <sys/select.h>
55 #include <sys/time.h>
56 #include <sys/types.h>
57 #include <sys/param.h>
58 #include <sys/eventfd.h>
59 #include <sys/socket.h>
60 #include <errno.h>
61 #include <stdio.h>
62 #include <string.h>
63 #include <stdlib.h>
64 #include <unistd.h>
65 #include <assert.h>
66 #include <pthread.h>
68 /* Socket Protocol Family */
69 #include <net/rpmsg.h>
71 /* Socket utils: */
72 #include <SocketFxns.h>
74 #include <ladclient.h>
75 #include <_lad.h>
77 /* =============================================================================
78 * Macros/Constants
79 * =============================================================================
80 */
82 /*!
83 * @brief Name of the reserved NameServer used for MessageQ.
84 */
85 #define MessageQ_NAMESERVER "MessageQ"
87 /*!
88 * @brief Value of an invalid socket ID:
89 */
90 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
92 /* More magic rpmsg port numbers: */
93 #define MESSAGEQ_RPMSG_PORT 61
94 #define MESSAGEQ_RPMSG_MAXSIZE 512
96 /* Trace flag settings: */
97 #define TRACESHIFT 12
98 #define TRACEMASK 0x1000
100 /* Define BENCHMARK to quiet key MessageQ APIs: */
101 //#define BENCHMARK
103 /* =============================================================================
104 * Structures & Enums
105 * =============================================================================
106 */
108 /* structure for MessageQ module state */
109 typedef struct MessageQ_ModuleObject {
110 Int refCount;
111 /*!< Reference count */
112 NameServer_Handle nameServer;
113 /*!< Handle to the local NameServer used for storing GP objects */
114 pthread_mutex_t gate;
115 /*!< Handle of gate to be used for local thread safety */
116 MessageQ_Params defaultInstParams;
117 /*!< Default instance creation parameters */
118 int sock[MultiProc_MAXPROCESSORS];
119 /*!< Sockets to for sending to each remote processor */
120 int seqNum;
121 /*!< Process-specific sequence number */
122 } MessageQ_ModuleObject;
124 /*!
125 * @brief Structure for the Handle for the MessageQ.
126 */
127 typedef struct MessageQ_Object_tag {
128 MessageQ_Params params;
129 /*! Instance specific creation parameters */
130 MessageQ_QueueId queue;
131 /* Unique id */
132 int fd[MultiProc_MAXPROCESSORS];
133 /* File Descriptor to block on messages from remote processors. */
134 int unblockFd;
135 /* Write this fd to unblock the select() call in MessageQ _get() */
136 void *serverHandle;
137 } MessageQ_Object;
139 /* traces in this file are controlled via _MessageQ_verbose */
140 Bool _MessageQ_verbose = FALSE;
141 #define verbose _MessageQ_verbose
144 /* =============================================================================
145 * Globals
146 * =============================================================================
147 */
148 static MessageQ_ModuleObject MessageQ_state =
149 {
150 .refCount = 0,
151 .nameServer = NULL,
152 };
154 /*!
155 * @var MessageQ_module
156 *
157 * @brief Pointer to the MessageQ module state.
158 */
159 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
162 /* =============================================================================
163 * Forward declarations of internal functions
164 * =============================================================================
165 */
167 /* This is a helper function to initialize a message. */
168 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
169 static Int transportCloseEndpoint(int fd);
170 static Int transportGet(int sock, MessageQ_Msg * retMsg);
171 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
173 /* =============================================================================
174 * APIS
175 * =============================================================================
176 */
177 /* Function to get default configuration for the MessageQ module.
178 *
179 */
180 Void MessageQ_getConfig (MessageQ_Config * cfg)
181 {
182 Int status;
183 LAD_ClientHandle handle;
184 struct LAD_CommandObj cmd;
185 union LAD_ResponseObj rsp;
187 assert (cfg != NULL);
189 handle = LAD_findHandle();
190 if (handle == LAD_MAXNUMCLIENTS) {
191 PRINTVERBOSE1(
192 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
193 getpid())
195 return;
196 }
198 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
199 cmd.clientId = handle;
201 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
202 PRINTVERBOSE1(
203 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
204 return;
205 }
207 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
208 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
209 return;
210 }
211 status = rsp.messageQGetConfig.status;
213 PRINTVERBOSE2(
214 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
215 handle, status)
217 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
219 return;
220 }
222 /* Function to setup the MessageQ module. */
223 Int MessageQ_setup(const MessageQ_Config * cfg)
224 {
225 Int status;
226 LAD_ClientHandle handle;
227 struct LAD_CommandObj cmd;
228 union LAD_ResponseObj rsp;
229 Int i;
231 handle = LAD_findHandle();
232 if (handle == LAD_MAXNUMCLIENTS) {
233 PRINTVERBOSE1(
234 "MessageQ_setup: can't find connection to daemon for pid %d\n",
235 getpid())
237 return MessageQ_E_RESOURCE;
238 }
240 cmd.cmd = LAD_MESSAGEQ_SETUP;
241 cmd.clientId = handle;
242 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
244 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
245 PRINTVERBOSE1(
246 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
247 return MessageQ_E_FAIL;
248 }
250 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
251 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
252 return(status);
253 }
254 status = rsp.setup.status;
256 PRINTVERBOSE2(
257 "MessageQ_setup: got LAD response for client %d, status=%d\n",
258 handle, status)
260 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
261 MessageQ_module->seqNum = 0;
263 /* Create a default local gate. */
264 pthread_mutex_init (&(MessageQ_module->gate), NULL);
266 /* Clear sockets array. */
267 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
268 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
269 }
271 return status;
272 }
274 /*
275 * Function to destroy the MessageQ module.
276 * Destroys socket/protocol maps; sockets themselves should have been
277 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
278 */
279 Int MessageQ_destroy (void)
280 {
281 Int status;
282 LAD_ClientHandle handle;
283 struct LAD_CommandObj cmd;
284 union LAD_ResponseObj rsp;
286 handle = LAD_findHandle();
287 if (handle == LAD_MAXNUMCLIENTS) {
288 PRINTVERBOSE1(
289 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
290 getpid())
292 return MessageQ_E_RESOURCE;
293 }
295 cmd.cmd = LAD_MESSAGEQ_DESTROY;
296 cmd.clientId = handle;
298 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
299 PRINTVERBOSE1(
300 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
301 return MessageQ_E_FAIL;
302 }
304 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
305 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
306 return(status);
307 }
308 status = rsp.status;
310 PRINTVERBOSE2(
311 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
312 handle, status)
314 return status;
315 }
317 /* Function to initialize the parameters for the MessageQ instance. */
318 Void MessageQ_Params_init (MessageQ_Params * params)
319 {
320 memcpy (params, &(MessageQ_module->defaultInstParams),
321 sizeof (MessageQ_Params));
323 return;
324 }
326 /*
327 * Function to create a MessageQ object for receiving.
328 *
329 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
330 * order to get messages dispatched to this messageQ.
331 */
332 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
333 {
334 Int status;
335 MessageQ_Object * obj = NULL;
336 UInt16 queueIndex = 0u;
337 UInt16 procId;
338 UInt16 rprocId;
339 LAD_ClientHandle handle;
340 struct LAD_CommandObj cmd;
341 union LAD_ResponseObj rsp;
343 handle = LAD_findHandle();
344 if (handle == LAD_MAXNUMCLIENTS) {
345 PRINTVERBOSE1(
346 "MessageQ_create: can't find connection to daemon for pid %d\n",
347 getpid())
349 return NULL;
350 }
352 cmd.cmd = LAD_MESSAGEQ_CREATE;
353 cmd.clientId = handle;
354 if (name == NULL) {
355 cmd.args.messageQCreate.name[0] = '\0';
356 }
357 else {
358 strncpy(cmd.args.messageQCreate.name, name,
359 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
360 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
361 }
363 if (params) {
364 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
365 }
367 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
368 PRINTVERBOSE1(
369 "MessageQ_create: sending LAD command failed, status=%d\n", status)
370 return NULL;
371 }
373 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
374 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
375 return NULL;
376 }
377 status = rsp.messageQCreate.status;
379 PRINTVERBOSE2(
380 "MessageQ_create: got LAD response for client %d, status=%d\n",
381 handle, status)
383 if (status == -1) {
384 PRINTVERBOSE1(
385 "MessageQ_create: MessageQ server operation failed, status=%d\n",
386 status)
387 return NULL;
388 }
390 /* Create the generic obj */
391 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
393 if (params != NULL) {
394 /* Populate the params member */
395 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
396 }
398 procId = MultiProc_self();
399 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
400 obj->queue = rsp.messageQCreate.queueId;
401 obj->serverHandle = rsp.messageQCreate.serverHandle;
403 /*
404 * Create a set of communication endpoints (one per each remote proc),
405 * and return the socket as target for MessageQ_put() calls, and as
406 * a file descriptor to close during MessageQ_delete().
407 */
408 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
409 obj->fd[rprocId] = Transport_INVALIDSOCKET;
410 if (procId == rprocId) {
411 /* Skip creating an endpoint for ourself. */
412 continue;
413 }
415 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
417 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
418 queueIndex);
419 if (status < 0) {
420 obj->fd[rprocId] = Transport_INVALIDSOCKET;
421 }
422 }
424 /*
425 * Now, to support MessageQ_unblock() functionality, create an event object.
426 * Writing to this event will unblock the select() call in MessageQ_get().
427 */
428 obj->unblockFd = eventfd(0, 0);
429 if (obj->unblockFd == -1) {
430 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
431 errno, strerror(errno));
432 MessageQ_delete((MessageQ_Handle *)&obj);
433 }
434 else {
435 int endpointFound = 0;
437 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
438 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
439 endpointFound = 1;
440 }
441 }
442 if (!endpointFound) {
443 printf("MessageQ_create: no transport endpoints found, deleting\n");
444 MessageQ_delete((MessageQ_Handle *)&obj);
445 }
446 }
448 return ((MessageQ_Handle) obj);
449 }
451 /*
452 * Function to delete a MessageQ object for a specific slave processor.
453 *
454 * Deletes the socket associated with this MessageQ object.
455 */
456 Int MessageQ_delete (MessageQ_Handle * handlePtr)
457 {
458 Int status = MessageQ_S_SUCCESS;
459 MessageQ_Object * obj = NULL;
460 UInt16 rprocId;
461 LAD_ClientHandle handle;
462 struct LAD_CommandObj cmd;
463 union LAD_ResponseObj rsp;
465 handle = LAD_findHandle();
466 if (handle == LAD_MAXNUMCLIENTS) {
467 PRINTVERBOSE1(
468 "MessageQ_delete: can't find connection to daemon for pid %d\n",
469 getpid())
471 return MessageQ_E_FAIL;
472 }
474 obj = (MessageQ_Object *) (*handlePtr);
476 cmd.cmd = LAD_MESSAGEQ_DELETE;
477 cmd.clientId = handle;
478 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
480 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
481 PRINTVERBOSE1(
482 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
483 return MessageQ_E_FAIL;
484 }
486 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
487 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
488 return MessageQ_E_FAIL;
489 }
490 status = rsp.messageQDelete.status;
492 PRINTVERBOSE2(
493 "MessageQ_delete: got LAD response for client %d, status=%d\n",
494 handle, status)
497 /* Close the event used for MessageQ_unblock(): */
498 close(obj->unblockFd);
500 /* Close the communication endpoint: */
501 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
502 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
503 status = transportCloseEndpoint(obj->fd[rprocId]);
504 }
505 }
507 /* Now free the obj */
508 free (obj);
509 *handlePtr = NULL;
511 return (status);
512 }
514 /*
515 * Opens an instance of MessageQ for sending.
516 *
517 * We need not create a socket here; the sockets for all remote processors
518 * were created during MessageQ_attach(), and will be
519 * retrieved during MessageQ_put().
520 */
521 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
522 {
523 Int status = MessageQ_S_SUCCESS;
525 status = NameServer_getUInt32 (MessageQ_module->nameServer,
526 name, queueId, NULL);
528 if (status == NameServer_E_NOTFOUND) {
529 /* Set return queue ID to invalid. */
530 *queueId = MessageQ_INVALIDMESSAGEQ;
531 status = MessageQ_E_NOTFOUND;
532 }
533 else if (status >= 0) {
534 /* Override with a MessageQ status code. */
535 status = MessageQ_S_SUCCESS;
536 }
537 else {
538 /* Set return queue ID to invalid. */
539 *queueId = MessageQ_INVALIDMESSAGEQ;
540 /* Override with a MessageQ status code. */
541 if (status == NameServer_E_TIMEOUT) {
542 status = MessageQ_E_TIMEOUT;
543 }
544 else {
545 status = MessageQ_E_FAIL;
546 }
547 }
549 return (status);
550 }
552 /* Closes previously opened instance of MessageQ module. */
553 Int MessageQ_close (MessageQ_QueueId * queueId)
554 {
555 Int32 status = MessageQ_S_SUCCESS;
557 /* Nothing more to be done for closing the MessageQ. */
558 *queueId = MessageQ_INVALIDMESSAGEQ;
560 return (status);
561 }
563 /*
564 * Place a message onto a message queue.
565 *
566 * Calls TransportShm_put(), which handles the sending of the message using the
567 * appropriate kernel interface (socket, device ioctl) call for the remote
568 * procId encoded in the queueId argument.
569 *
570 */
571 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
572 {
573 Int status;
574 UInt16 dstProcId = (UInt16)(queueId >> 16);
575 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
577 msg->dstId = queueIndex;
578 msg->dstProc = dstProcId;
580 status = transportPut(msg, queueIndex, dstProcId);
582 return (status);
583 }
585 /*
586 * Gets a message for a message queue and blocks if the queue is empty.
587 * If a message is present, it returns it. Otherwise it blocks
588 * waiting for a message to arrive.
589 * When a message is returned, it is owned by the caller.
590 *
591 * We block using select() on the receiving socket's file descriptor, then
592 * get the waiting message via the socket API recvfrom().
593 * We use the socket stored in the messageQ object via a previous call to
594 * MessageQ_create().
595 *
596 */
597 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
598 {
599 static int last = 0;
600 Int status = MessageQ_S_SUCCESS;
601 Int tmpStatus;
602 MessageQ_Object * obj = (MessageQ_Object *) handle;
603 int retval;
604 int nfds;
605 fd_set rfds;
606 struct timeval tv;
607 void *timevalPtr;
608 UInt16 rprocId;
609 int maxfd = 0;
610 int selfId;
611 int nProcessors;
613 /* Wait (with timeout) and retreive message from socket: */
614 FD_ZERO(&rfds);
615 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
616 if (rprocId == MultiProc_self() ||
617 obj->fd[rprocId] == Transport_INVALIDSOCKET) {
618 continue;
619 }
620 maxfd = MAX(maxfd, obj->fd[rprocId]);
621 FD_SET(obj->fd[rprocId], &rfds);
622 }
624 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
625 FD_SET(obj->unblockFd, &rfds);
627 if (timeout == MessageQ_FOREVER) {
628 timevalPtr = NULL;
629 }
630 else {
631 /* Timeout given in msec: convert: */
632 tv.tv_sec = timeout / 1000;
633 tv.tv_usec = (timeout % 1000) * 1000;
634 timevalPtr = &tv;
635 }
636 /* Add one to last fd created: */
637 nfds = MAX(maxfd, obj->unblockFd) + 1;
639 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
640 if (retval) {
641 if (FD_ISSET(obj->unblockFd, &rfds)) {
642 /*
643 * Our event was signalled by MessageQ_unblock().
644 *
645 * This is typically done during a shutdown sequence, where
646 * the intention of the client would be to ignore (i.e. not fetch)
647 * any pending messages in the transport's queue.
648 * Thus, we shall not check for nor return any messages.
649 */
650 *msg = NULL;
651 status = MessageQ_E_UNBLOCKED;
652 }
653 else {
654 /* start where we last left off */
655 rprocId = last;
657 selfId = MultiProc_self();
658 nProcessors = MultiProc_getNumProcessors();
660 do {
661 if (rprocId != selfId &&
662 obj->fd[rprocId] != Transport_INVALIDSOCKET) {
664 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
665 /* Our transport's fd was signalled: Get the message */
666 tmpStatus = transportGet(obj->fd[rprocId], msg);
667 if (tmpStatus < 0) {
668 printf ("MessageQ_get: tranposrtshm_get failed.");
669 status = MessageQ_E_FAIL;
670 }
672 last = (rprocId + 1) % nProcessors;
673 break;
674 }
675 }
676 rprocId = (rprocId + 1) % nProcessors;
677 } while (rprocId != last);
678 }
679 }
680 else if (retval == 0) {
681 *msg = NULL;
682 status = MessageQ_E_TIMEOUT;
683 }
685 return (status);
686 }
688 /*
689 * Return a count of the number of messages in the queue
690 *
691 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
692 */
693 Int MessageQ_count (MessageQ_Handle handle)
694 {
695 Int count = -1;
696 #if 0
697 MessageQ_Object * obj = (MessageQ_Object *) handle;
698 socklen_t optlen;
700 /*
701 * TBD: Need to find a way to implement (if anyone uses it!), and
702 * push down into transport..
703 */
705 /*
706 * 2nd arg to getsockopt should be transport independent, but using
707 * SSKPROTO_SHMFIFO for now:
708 */
709 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
710 &count, &optlen);
711 #endif
713 return (count);
714 }
716 /* Initializes a message not obtained from MessageQ_alloc. */
717 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
718 {
719 /* Fill in the fields of the message */
720 MessageQ_msgInit (msg);
721 msg->heapId = MessageQ_STATICMSG;
722 msg->msgSize = size;
723 }
725 /*
726 * Allocate a message and initialize the needed fields (note some
727 * of the fields in the header are set via other APIs or in the
728 * MessageQ_put function,
729 */
730 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
731 {
732 MessageQ_Msg msg = NULL;
734 /*
735 * heapId not used for local alloc (as this is over a copy transport), but
736 * we need to send to other side as heapId is used in BIOS transport:
737 */
738 msg = (MessageQ_Msg)calloc (1, size);
739 MessageQ_msgInit (msg);
740 msg->msgSize = size;
741 msg->heapId = heapId;
743 return msg;
744 }
746 /* Frees the message back to the heap that was used to allocate it. */
747 Int MessageQ_free (MessageQ_Msg msg)
748 {
749 UInt32 status = MessageQ_S_SUCCESS;
751 /* Check to ensure this was not allocated by user: */
752 if (msg->heapId == MessageQ_STATICMSG) {
753 status = MessageQ_E_CANNOTFREESTATICMSG;
754 }
755 else {
756 free (msg);
757 }
759 return status;
760 }
762 /* Register a heap with MessageQ. */
763 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
764 {
765 Int status = MessageQ_S_SUCCESS;
767 /* Do nothing, as this uses a copy transport: */
769 return status;
770 }
772 /* Unregister a heap with MessageQ. */
773 Int MessageQ_unregisterHeap (UInt16 heapId)
774 {
775 Int status = MessageQ_S_SUCCESS;
777 /* Do nothing, as this uses a copy transport: */
779 return status;
780 }
782 /* Unblocks a MessageQ */
783 Void MessageQ_unblock (MessageQ_Handle handle)
784 {
785 MessageQ_Object * obj = (MessageQ_Object *) handle;
786 uint64_t buf = 1;
787 int numBytes;
789 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
790 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
791 }
793 /* Embeds a source message queue into a message. */
794 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
795 {
796 MessageQ_Object * obj = (MessageQ_Object *) handle;
798 msg->replyId = (UInt16)(obj->queue);
799 msg->replyProc = (UInt16)(obj->queue >> 16);
800 }
802 /* Returns the QueueId associated with the handle. */
803 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
804 {
805 MessageQ_Object * obj = (MessageQ_Object *) handle;
806 UInt32 queueId;
808 queueId = (obj->queue);
810 return queueId;
811 }
813 /* Sets the tracing of a message */
814 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
815 {
816 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
817 }
819 /*
820 * Returns the amount of shared memory used by one transport instance.
821 *
822 * The MessageQ module itself does not use any shared memory but the
823 * underlying transport may use some shared memory.
824 */
825 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
826 {
827 SizeT memReq = 0u;
829 /* Do nothing, as this is a copy transport. */
831 return (memReq);
832 }
834 /*
835 * Create a socket for this remote proc, and attempt to connect.
836 *
837 * Only creates a socket if one does not already exist for this procId.
838 *
839 * Note: remoteProcId may be MultiProc_Self() for loopback case.
840 */
841 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
842 {
843 Int status = MessageQ_S_SUCCESS;
844 int sock;
846 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
848 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
849 status = MessageQ_E_INVALIDPROCID;
850 goto exit;
851 }
853 pthread_mutex_lock (&(MessageQ_module->gate));
855 /* Only create a socket if one doesn't exist: */
856 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
857 /* Create the socket for sending messages to the remote proc: */
858 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
859 if (sock < 0) {
860 status = MessageQ_E_FAIL;
861 printf ("MessageQ_attach: socket failed: %d, %s\n",
862 errno, strerror(errno));
863 }
864 else {
865 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
866 MessageQ_module->sock[remoteProcId] = sock;
867 /* Attempt to connect: */
868 status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
869 if (status < 0) {
870 status = MessageQ_E_RESOURCE;
871 /* don't hard-printf since this is no longer fatal */
872 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
873 remoteProcId);
874 }
875 }
876 }
877 else {
878 status = MessageQ_E_ALREADYEXISTS;
879 }
881 pthread_mutex_unlock (&(MessageQ_module->gate));
883 if (status == MessageQ_E_RESOURCE) {
884 MessageQ_detach(remoteProcId);
885 }
887 exit:
888 return (status);
889 }
891 /*
892 * Close the socket for this remote proc.
893 *
894 */
895 Int MessageQ_detach (UInt16 remoteProcId)
896 {
897 Int status = MessageQ_S_SUCCESS;
898 int sock;
900 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
901 status = MessageQ_E_INVALIDPROCID;
902 goto exit;
903 }
905 pthread_mutex_lock (&(MessageQ_module->gate));
907 sock = MessageQ_module->sock[remoteProcId];
908 if (sock != Transport_INVALIDSOCKET) {
909 if (close(sock)) {
910 status = MessageQ_E_OSFAILURE;
911 printf("MessageQ_detach: close failed: %d, %s\n",
912 errno, strerror(errno));
913 }
914 else {
915 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
916 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
917 }
918 }
920 pthread_mutex_unlock (&(MessageQ_module->gate));
922 exit:
923 return (status);
924 }
926 /*
927 * This is a helper function to initialize a message.
928 */
929 Void MessageQ_msgInit (MessageQ_Msg msg)
930 {
931 #if 0
932 Int status = MessageQ_S_SUCCESS;
933 LAD_ClientHandle handle;
934 struct LAD_CommandObj cmd;
935 union LAD_ResponseObj rsp;
937 handle = LAD_findHandle();
938 if (handle == LAD_MAXNUMCLIENTS) {
939 PRINTVERBOSE1(
940 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
941 getpid())
943 return;
944 }
946 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
947 cmd.clientId = handle;
949 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
950 PRINTVERBOSE1(
951 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
952 return;
953 }
955 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
956 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
957 return;
958 }
959 status = rsp.msgInit.status;
961 PRINTVERBOSE2(
962 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
963 handle, status)
965 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
966 #else
967 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
968 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
969 msg->msgId = MessageQ_INVALIDMSGID;
970 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
971 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
972 msg->srcProc = MultiProc_self();
974 pthread_mutex_lock(&(MessageQ_module->gate));
975 msg->seqNum = MessageQ_module->seqNum++;
976 pthread_mutex_unlock(&(MessageQ_module->gate));
977 #endif
978 }
980 /*
981 * =============================================================================
982 * Transport: Fxns kept here until need for a transport layer is realized.
983 * =============================================================================
984 */
985 /*
986 * ======== transportCreateEndpoint ========
987 *
988 * Create a communication endpoint to receive messages.
989 */
990 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
991 {
992 Int status = MessageQ_S_SUCCESS;
993 int err;
995 /* Create the socket to receive messages for this messageQ. */
996 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
997 if (*fd < 0) {
998 status = MessageQ_E_FAIL;
999 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
1000 errno, strerror(errno));
1001 goto exit;
1002 }
1004 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1006 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1007 if (err < 0) {
1008 status = MessageQ_E_FAIL;
1009 /* don't hard-printf since this is no longer fatal */
1010 PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
1011 errno, strerror(errno));
1012 close(*fd);
1013 }
1015 exit:
1016 return (status);
1017 }
1019 /*
1020 * ======== transportCloseEndpoint ========
1021 *
1022 * Close the communication endpoint.
1023 */
1024 static Int transportCloseEndpoint(int fd)
1025 {
1026 Int status = MessageQ_S_SUCCESS;
1028 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1030 /* Stop communication to this socket: */
1031 close(fd);
1033 return (status);
1034 }
1036 /*
1037 * ======== transportGet ========
1038 * Retrieve a message waiting in the socket's queue.
1039 */
1040 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1041 {
1042 Int status = MessageQ_S_SUCCESS;
1043 MessageQ_Msg msg;
1044 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1045 unsigned int len;
1046 int byteCount;
1048 /*
1049 * We have no way of peeking to see what message size we'll get, so we
1050 * allocate a message of max size to receive contents from the rpmsg socket
1051 * (currently, a copy transport)
1052 */
1053 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1054 if (!msg) {
1055 status = MessageQ_E_MEMORY;
1056 goto exit;
1057 }
1059 memset(&fromAddr, 0, sizeof(fromAddr));
1060 len = sizeof(fromAddr);
1062 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1063 (struct sockaddr *)&fromAddr, &len);
1064 if (len != sizeof(fromAddr)) {
1065 printf("recvfrom: got bad addr len (%d)\n", len);
1066 status = MessageQ_E_FAIL;
1067 goto exit;
1068 }
1069 if (byteCount < 0) {
1070 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1071 status = MessageQ_E_FAIL;
1072 goto exit;
1073 }
1074 else {
1075 /* Update the allocated message size (even though this may waste space
1076 * when the actual message is smaller than the maximum rpmsg size,
1077 * the message will be freed soon anyway, and it avoids an extra copy).
1078 */
1079 msg->msgSize = byteCount;
1081 /*
1082 * If the message received was statically allocated, reset the
1083 * heapId, so the app can free it.
1084 */
1085 if (msg->heapId == MessageQ_STATICMSG) {
1086 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1087 }
1088 }
1090 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1091 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1092 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1094 *retMsg = msg;
1096 exit:
1097 return (status);
1098 }
1100 /*
1101 * ======== transportPut ========
1102 *
1103 * Calls the socket API sendto() on the socket associated with
1104 * with this destination procID.
1105 * Currently, both local and remote messages are sent via the Socket ABI, so
1106 * no local object lists are maintained here.
1107 */
1108 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1109 {
1110 Int status = MessageQ_S_SUCCESS;
1111 int sock;
1112 int err;
1114 /*
1115 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1116 * transport.
1117 */
1118 sock = MessageQ_module->sock[dstProcId];
1120 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1122 err = send(sock, msg, msg->msgSize, 0);
1123 if (err < 0) {
1124 printf ("transportPut: send failed: %d, %s\n",
1125 errno, strerror(errno));
1126 status = MessageQ_E_FAIL;
1127 goto exit;
1128 }
1130 /*
1131 * Free the message, as this is a copy transport, we maintain MessageQ
1132 * semantics.
1133 */
1134 MessageQ_free (msg);
1136 exit:
1137 return (status);
1138 }