1 /*
2 * Copyright (c) 2012-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
44 /* Standard IPC header */
45 #include <ti/ipc/Std.h>
47 /* Linux specific header files, replacing OSAL: */
48 #include <pthread.h>
50 /* Module level headers */
51 #include <ti/ipc/NameServer.h>
52 #include <ti/ipc/MultiProc.h>
53 #include <_MultiProc.h>
54 #include <ti/ipc/MessageQ.h>
55 #include <_MessageQ.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/socket.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
71 /* Socket Protocol Family */
72 #include <net/rpmsg.h>
74 /* Socket utils: */
75 #include <SocketFxns.h>
77 #include <ladclient.h>
78 #include <_lad.h>
80 /* =============================================================================
81 * Macros/Constants
82 * =============================================================================
83 */
85 /*!
86 * @brief Name of the reserved NameServer used for MessageQ.
87 */
88 #define MessageQ_NAMESERVER "MessageQ"
90 /*!
91 * @brief Value of an invalid socket ID:
92 */
93 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
95 /* More magic rpmsg port numbers: */
96 #define MESSAGEQ_RPMSG_PORT 61
97 #define MESSAGEQ_RPMSG_MAXSIZE 512
99 /* Trace flag settings: */
100 #define TRACESHIFT 12
101 #define TRACEMASK 0x1000
103 /* Define BENCHMARK to quiet key MessageQ APIs: */
104 //#define BENCHMARK
106 /* =============================================================================
107 * Structures & Enums
108 * =============================================================================
109 */
111 /* structure for MessageQ module state */
112 typedef struct MessageQ_ModuleObject {
113 Int refCount;
114 /*!< Reference count */
115 NameServer_Handle nameServer;
116 /*!< Handle to the local NameServer used for storing GP objects */
117 pthread_mutex_t gate;
118 /*!< Handle of gate to be used for local thread safety */
119 MessageQ_Params defaultInstParams;
120 /*!< Default instance creation parameters */
121 int sock[MultiProc_MAXPROCESSORS];
122 /*!< Sockets to for sending to each remote processor */
123 int seqNum;
124 /*!< Process-specific sequence number */
125 } MessageQ_ModuleObject;
127 /*!
128 * @brief Structure for the Handle for the MessageQ.
129 */
130 typedef struct MessageQ_Object_tag {
131 MessageQ_Params params;
132 /*! Instance specific creation parameters */
133 MessageQ_QueueId queue;
134 /* Unique id */
135 int fd[MultiProc_MAXPROCESSORS];
136 /* File Descriptor to block on messages from remote processors. */
137 int unblockFd;
138 /* Write this fd to unblock the select() call in MessageQ _get() */
139 void *serverHandle;
140 } MessageQ_Object;
142 static Bool verbose = FALSE;
145 /* =============================================================================
146 * Globals
147 * =============================================================================
148 */
149 static MessageQ_ModuleObject MessageQ_state =
150 {
151 .refCount = 0,
152 .nameServer = NULL,
153 };
155 /*!
156 * @var MessageQ_module
157 *
158 * @brief Pointer to the MessageQ module state.
159 */
160 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
163 /* =============================================================================
164 * Forward declarations of internal functions
165 * =============================================================================
166 */
168 /* This is a helper function to initialize a message. */
169 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
170 static Int transportCloseEndpoint(int fd);
171 static Int transportGet(int sock, MessageQ_Msg * retMsg);
172 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
178 /* Function to get default configuration for the MessageQ module.
179 *
180 */
181 Void MessageQ_getConfig (MessageQ_Config * cfg)
182 {
183 Int status;
184 LAD_ClientHandle handle;
185 struct LAD_CommandObj cmd;
186 union LAD_ResponseObj rsp;
188 assert (cfg != NULL);
190 handle = LAD_findHandle();
191 if (handle == LAD_MAXNUMCLIENTS) {
192 PRINTVERBOSE1(
193 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
194 getpid())
196 return;
197 }
199 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
200 cmd.clientId = handle;
202 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
203 PRINTVERBOSE1(
204 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
205 return;
206 }
208 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
209 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
210 return;
211 }
212 status = rsp.messageQGetConfig.status;
214 PRINTVERBOSE2(
215 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
216 handle, status)
218 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
220 return;
221 }
223 /* Function to setup the MessageQ module. */
224 Int MessageQ_setup(const MessageQ_Config * cfg)
225 {
226 Int status;
227 LAD_ClientHandle handle;
228 struct LAD_CommandObj cmd;
229 union LAD_ResponseObj rsp;
230 Int i;
232 handle = LAD_findHandle();
233 if (handle == LAD_MAXNUMCLIENTS) {
234 PRINTVERBOSE1(
235 "MessageQ_setup: can't find connection to daemon for pid %d\n",
236 getpid())
238 return MessageQ_E_RESOURCE;
239 }
241 cmd.cmd = LAD_MESSAGEQ_SETUP;
242 cmd.clientId = handle;
243 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
245 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
246 PRINTVERBOSE1(
247 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
248 return MessageQ_E_FAIL;
249 }
251 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
252 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
253 return(status);
254 }
255 status = rsp.setup.status;
257 PRINTVERBOSE2(
258 "MessageQ_setup: got LAD response for client %d, status=%d\n",
259 handle, status)
261 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
262 MessageQ_module->seqNum = 0;
264 /* Create a default local gate. */
265 pthread_mutex_init (&(MessageQ_module->gate), NULL);
267 /* Clear sockets array. */
268 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
269 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
270 }
272 return status;
273 }
275 /*
276 * Function to destroy the MessageQ module.
277 * Destroys socket/protocol maps; sockets themselves should have been
278 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
279 */
280 Int MessageQ_destroy (void)
281 {
282 Int status;
283 LAD_ClientHandle handle;
284 struct LAD_CommandObj cmd;
285 union LAD_ResponseObj rsp;
287 handle = LAD_findHandle();
288 if (handle == LAD_MAXNUMCLIENTS) {
289 PRINTVERBOSE1(
290 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
291 getpid())
293 return MessageQ_E_RESOURCE;
294 }
296 cmd.cmd = LAD_MESSAGEQ_DESTROY;
297 cmd.clientId = handle;
299 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
300 PRINTVERBOSE1(
301 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
302 return MessageQ_E_FAIL;
303 }
305 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
306 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
307 return(status);
308 }
309 status = rsp.status;
311 PRINTVERBOSE2(
312 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
313 handle, status)
315 return status;
316 }
318 /* Function to initialize the parameters for the MessageQ instance. */
319 Void MessageQ_Params_init (MessageQ_Params * params)
320 {
321 memcpy (params, &(MessageQ_module->defaultInstParams),
322 sizeof (MessageQ_Params));
324 return;
325 }
327 /*
328 * Function to create a MessageQ object for receiving.
329 *
330 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
331 * order to get messages dispatched to this messageQ.
332 */
333 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
334 {
335 Int status;
336 MessageQ_Object * obj = NULL;
337 UInt16 queueIndex = 0u;
338 UInt16 procId;
339 UInt16 rprocId;
340 LAD_ClientHandle handle;
341 struct LAD_CommandObj cmd;
342 union LAD_ResponseObj rsp;
344 handle = LAD_findHandle();
345 if (handle == LAD_MAXNUMCLIENTS) {
346 PRINTVERBOSE1(
347 "MessageQ_create: can't find connection to daemon for pid %d\n",
348 getpid())
350 return NULL;
351 }
353 cmd.cmd = LAD_MESSAGEQ_CREATE;
354 cmd.clientId = handle;
355 strncpy(cmd.args.messageQCreate.name, name,
356 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
357 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
358 if (params) {
359 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
360 }
362 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
363 PRINTVERBOSE1(
364 "MessageQ_create: sending LAD command failed, status=%d\n", status)
365 return NULL;
366 }
368 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
369 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
370 return NULL;
371 }
372 status = rsp.messageQCreate.status;
374 PRINTVERBOSE2(
375 "MessageQ_create: got LAD response for client %d, status=%d\n",
376 handle, status)
378 if (status == -1) {
379 PRINTVERBOSE1(
380 "MessageQ_create: MessageQ server operation failed, status=%d\n",
381 status)
382 return NULL;
383 }
385 /* Create the generic obj */
386 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
388 if (params != NULL) {
389 /* Populate the params member */
390 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
391 }
393 procId = MultiProc_self();
394 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
395 obj->queue = rsp.messageQCreate.queueId;
396 obj->serverHandle = rsp.messageQCreate.serverHandle;
398 /*
399 * Create a set of communication endpoints (one per each remote proc),
400 * and return the socket as target for MessageQ_put() calls, and as
401 * a file descriptor to close during MessageQ_delete().
402 */
403 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
404 obj->fd[rprocId] = Transport_INVALIDSOCKET;
405 if (procId == rprocId) {
406 /* Skip creating an endpoint for ourself. */
407 continue;
408 }
410 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
412 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
413 queueIndex);
414 if (status < 0) {
415 obj->fd[rprocId] = Transport_INVALIDSOCKET;
416 }
417 }
419 /*
420 * Now, to support MessageQ_unblock() functionality, create an event object.
421 * Writing to this event will unblock the select() call in MessageQ_get().
422 */
423 obj->unblockFd = eventfd(0, 0);
424 if (obj->unblockFd == -1) {
425 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
426 errno, strerror(errno));
427 MessageQ_delete((MessageQ_Handle *)&obj);
428 }
429 else {
430 int endpointFound = 0;
432 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
433 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
434 endpointFound = 1;
435 }
436 }
437 if (!endpointFound) {
438 printf("MessageQ_create: no transport endpoints found, deleting\n");
439 MessageQ_delete((MessageQ_Handle *)&obj);
440 }
441 }
443 return ((MessageQ_Handle) obj);
444 }
446 /*
447 * Function to delete a MessageQ object for a specific slave processor.
448 *
449 * Deletes the socket associated with this MessageQ object.
450 */
451 Int MessageQ_delete (MessageQ_Handle * handlePtr)
452 {
453 Int status = MessageQ_S_SUCCESS;
454 MessageQ_Object * obj = NULL;
455 UInt16 rprocId;
456 LAD_ClientHandle handle;
457 struct LAD_CommandObj cmd;
458 union LAD_ResponseObj rsp;
460 handle = LAD_findHandle();
461 if (handle == LAD_MAXNUMCLIENTS) {
462 PRINTVERBOSE1(
463 "MessageQ_delete: can't find connection to daemon for pid %d\n",
464 getpid())
466 return MessageQ_E_FAIL;
467 }
469 obj = (MessageQ_Object *) (*handlePtr);
471 cmd.cmd = LAD_MESSAGEQ_DELETE;
472 cmd.clientId = handle;
473 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
475 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
476 PRINTVERBOSE1(
477 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
478 return MessageQ_E_FAIL;
479 }
481 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
482 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
483 return MessageQ_E_FAIL;
484 }
485 status = rsp.messageQDelete.status;
487 PRINTVERBOSE2(
488 "MessageQ_delete: got LAD response for client %d, status=%d\n",
489 handle, status)
492 /* Close the event used for MessageQ_unblock(): */
493 close(obj->unblockFd);
495 /* Close the communication endpoint: */
496 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
497 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
498 status = transportCloseEndpoint(obj->fd[rprocId]);
499 }
500 }
502 /* Now free the obj */
503 free (obj);
504 *handlePtr = NULL;
506 return (status);
507 }
509 /*
510 * Opens an instance of MessageQ for sending.
511 *
512 * We need not create a socket here; the sockets for all remote processors
513 * were created during MessageQ_attach(), and will be
514 * retrieved during MessageQ_put().
515 */
516 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
517 {
518 Int status = MessageQ_S_SUCCESS;
520 status = NameServer_getUInt32 (MessageQ_module->nameServer,
521 name, queueId, NULL);
523 if (status == NameServer_E_NOTFOUND) {
524 /* Set return queue ID to invalid. */
525 *queueId = MessageQ_INVALIDMESSAGEQ;
526 status = MessageQ_E_NOTFOUND;
527 }
528 else if (status >= 0) {
529 /* Override with a MessageQ status code. */
530 status = MessageQ_S_SUCCESS;
531 }
532 else {
533 /* Set return queue ID to invalid. */
534 *queueId = MessageQ_INVALIDMESSAGEQ;
535 /* Override with a MessageQ status code. */
536 if (status == NameServer_E_TIMEOUT) {
537 status = MessageQ_E_TIMEOUT;
538 }
539 else {
540 status = MessageQ_E_FAIL;
541 }
542 }
544 return (status);
545 }
547 /* Closes previously opened instance of MessageQ module. */
548 Int MessageQ_close (MessageQ_QueueId * queueId)
549 {
550 Int32 status = MessageQ_S_SUCCESS;
552 /* Nothing more to be done for closing the MessageQ. */
553 *queueId = MessageQ_INVALIDMESSAGEQ;
555 return (status);
556 }
558 /*
559 * Place a message onto a message queue.
560 *
561 * Calls TransportShm_put(), which handles the sending of the message using the
562 * appropriate kernel interface (socket, device ioctl) call for the remote
563 * procId encoded in the queueId argument.
564 *
565 */
566 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
567 {
568 Int status;
569 UInt16 dstProcId = (UInt16)(queueId >> 16);
570 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
572 msg->dstId = queueIndex;
573 msg->dstProc = dstProcId;
575 status = transportPut(msg, queueIndex, dstProcId);
577 return (status);
578 }
580 /*
581 * Gets a message for a message queue and blocks if the queue is empty.
582 * If a message is present, it returns it. Otherwise it blocks
583 * waiting for a message to arrive.
584 * When a message is returned, it is owned by the caller.
585 *
586 * We block using select() on the receiving socket's file descriptor, then
587 * get the waiting message via the socket API recvfrom().
588 * We use the socket stored in the messageQ object via a previous call to
589 * MessageQ_create().
590 *
591 */
592 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
593 {
594 static int last = 0;
595 Int status = MessageQ_S_SUCCESS;
596 Int tmpStatus;
597 MessageQ_Object * obj = (MessageQ_Object *) handle;
598 int retval;
599 int nfds;
600 fd_set rfds;
601 struct timeval tv;
602 void *timevalPtr;
603 UInt16 rprocId;
604 int maxfd = 0;
605 int selfId;
606 int nProcessors;
608 /* Wait (with timeout) and retreive message from socket: */
609 FD_ZERO(&rfds);
610 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
611 if (rprocId == MultiProc_self() ||
612 obj->fd[rprocId] == Transport_INVALIDSOCKET) {
613 continue;
614 }
615 maxfd = MAX(maxfd, obj->fd[rprocId]);
616 FD_SET(obj->fd[rprocId], &rfds);
617 }
619 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
620 FD_SET(obj->unblockFd, &rfds);
622 if (timeout == MessageQ_FOREVER) {
623 timevalPtr = NULL;
624 }
625 else {
626 /* Timeout given in msec: convert: */
627 tv.tv_sec = timeout / 1000;
628 tv.tv_usec = (timeout % 1000) * 1000;
629 timevalPtr = &tv;
630 }
631 /* Add one to last fd created: */
632 nfds = MAX(maxfd, obj->unblockFd) + 1;
634 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
635 if (retval) {
636 if (FD_ISSET(obj->unblockFd, &rfds)) {
637 /*
638 * Our event was signalled by MessageQ_unblock().
639 *
640 * This is typically done during a shutdown sequence, where
641 * the intention of the client would be to ignore (i.e. not fetch)
642 * any pending messages in the transport's queue.
643 * Thus, we shall not check for nor return any messages.
644 */
645 *msg = NULL;
646 status = MessageQ_E_UNBLOCKED;
647 }
648 else {
649 /* start where we last left off */
650 rprocId = last;
652 selfId = MultiProc_self();
653 nProcessors = MultiProc_getNumProcessors();
655 do {
656 if (rprocId != selfId &&
657 obj->fd[rprocId] != Transport_INVALIDSOCKET) {
659 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
660 /* Our transport's fd was signalled: Get the message */
661 tmpStatus = transportGet(obj->fd[rprocId], msg);
662 if (tmpStatus < 0) {
663 printf ("MessageQ_get: tranposrtshm_get failed.");
664 status = MessageQ_E_FAIL;
665 }
667 last = (rprocId + 1) % nProcessors;
668 break;
669 }
670 }
671 rprocId = (rprocId + 1) % nProcessors;
672 } while (rprocId != last);
673 }
674 }
675 else if (retval == 0) {
676 *msg = NULL;
677 status = MessageQ_E_TIMEOUT;
678 }
680 return (status);
681 }
683 /*
684 * Return a count of the number of messages in the queue
685 *
686 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
687 */
688 Int MessageQ_count (MessageQ_Handle handle)
689 {
690 Int count = -1;
691 #if 0
692 MessageQ_Object * obj = (MessageQ_Object *) handle;
693 socklen_t optlen;
695 /*
696 * TBD: Need to find a way to implement (if anyone uses it!), and
697 * push down into transport..
698 */
700 /*
701 * 2nd arg to getsockopt should be transport independent, but using
702 * SSKPROTO_SHMFIFO for now:
703 */
704 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
705 &count, &optlen);
706 #endif
708 return (count);
709 }
711 /* Initializes a message not obtained from MessageQ_alloc. */
712 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
713 {
714 /* Fill in the fields of the message */
715 MessageQ_msgInit (msg);
716 msg->heapId = MessageQ_STATICMSG;
717 msg->msgSize = size;
718 }
720 /*
721 * Allocate a message and initialize the needed fields (note some
722 * of the fields in the header are set via other APIs or in the
723 * MessageQ_put function,
724 */
725 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
726 {
727 MessageQ_Msg msg = NULL;
729 /*
730 * heapId not used for local alloc (as this is over a copy transport), but
731 * we need to send to other side as heapId is used in BIOS transport:
732 */
733 msg = (MessageQ_Msg)calloc (1, size);
734 MessageQ_msgInit (msg);
735 msg->msgSize = size;
736 msg->heapId = heapId;
738 return msg;
739 }
741 /* Frees the message back to the heap that was used to allocate it. */
742 Int MessageQ_free (MessageQ_Msg msg)
743 {
744 UInt32 status = MessageQ_S_SUCCESS;
746 /* Check to ensure this was not allocated by user: */
747 if (msg->heapId == MessageQ_STATICMSG) {
748 status = MessageQ_E_CANNOTFREESTATICMSG;
749 }
750 else {
751 free (msg);
752 }
754 return status;
755 }
757 /* Register a heap with MessageQ. */
758 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
759 {
760 Int status = MessageQ_S_SUCCESS;
762 /* Do nothing, as this uses a copy transport: */
764 return status;
765 }
767 /* Unregister a heap with MessageQ. */
768 Int MessageQ_unregisterHeap (UInt16 heapId)
769 {
770 Int status = MessageQ_S_SUCCESS;
772 /* Do nothing, as this uses a copy transport: */
774 return status;
775 }
777 /* Unblocks a MessageQ */
778 Void MessageQ_unblock (MessageQ_Handle handle)
779 {
780 MessageQ_Object * obj = (MessageQ_Object *) handle;
781 uint64_t buf = 1;
782 int numBytes;
784 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
785 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
786 }
788 /* Embeds a source message queue into a message. */
789 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
790 {
791 MessageQ_Object * obj = (MessageQ_Object *) handle;
793 msg->replyId = (UInt16)(obj->queue);
794 msg->replyProc = (UInt16)(obj->queue >> 16);
795 }
797 /* Returns the QueueId associated with the handle. */
798 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
799 {
800 MessageQ_Object * obj = (MessageQ_Object *) handle;
801 UInt32 queueId;
803 queueId = (obj->queue);
805 return queueId;
806 }
808 /* Sets the tracing of a message */
809 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
810 {
811 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
812 }
814 /*
815 * Returns the amount of shared memory used by one transport instance.
816 *
817 * The MessageQ module itself does not use any shared memory but the
818 * underlying transport may use some shared memory.
819 */
820 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
821 {
822 SizeT memReq = 0u;
824 /* Do nothing, as this is a copy transport. */
826 return (memReq);
827 }
829 /*
830 * Create a socket for this remote proc, and attempt to connect.
831 *
832 * Only creates a socket if one does not already exist for this procId.
833 *
834 * Note: remoteProcId may be MultiProc_Self() for loopback case.
835 */
836 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
837 {
838 Int status = MessageQ_S_SUCCESS;
839 int sock;
841 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
843 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
844 status = MessageQ_E_INVALIDPROCID;
845 goto exit;
846 }
848 pthread_mutex_lock (&(MessageQ_module->gate));
850 /* Only create a socket if one doesn't exist: */
851 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
852 /* Create the socket for sending messages to the remote proc: */
853 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
854 if (sock < 0) {
855 status = MessageQ_E_FAIL;
856 printf ("MessageQ_attach: socket failed: %d, %s\n",
857 errno, strerror(errno));
858 }
859 else {
860 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
861 MessageQ_module->sock[remoteProcId] = sock;
862 /* Attempt to connect: */
863 status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
864 if (status < 0) {
865 status = MessageQ_E_RESOURCE;
866 /* don't hard-printf since this is no longer fatal */
867 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
868 remoteProcId);
869 }
870 }
871 }
872 else {
873 status = MessageQ_E_ALREADYEXISTS;
874 }
876 pthread_mutex_unlock (&(MessageQ_module->gate));
878 if (status == MessageQ_E_RESOURCE) {
879 MessageQ_detach(remoteProcId);
880 }
882 exit:
883 return (status);
884 }
886 /*
887 * Close the socket for this remote proc.
888 *
889 */
890 Int MessageQ_detach (UInt16 remoteProcId)
891 {
892 Int status = MessageQ_S_SUCCESS;
893 int sock;
895 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
896 status = MessageQ_E_INVALIDPROCID;
897 goto exit;
898 }
900 pthread_mutex_lock (&(MessageQ_module->gate));
902 sock = MessageQ_module->sock[remoteProcId];
903 if (sock != Transport_INVALIDSOCKET) {
904 if (close(sock)) {
905 status = MessageQ_E_OSFAILURE;
906 printf("MessageQ_detach: close failed: %d, %s\n",
907 errno, strerror(errno));
908 }
909 else {
910 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
911 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
912 }
913 }
915 pthread_mutex_unlock (&(MessageQ_module->gate));
917 exit:
918 return (status);
919 }
921 /*
922 * This is a helper function to initialize a message.
923 */
924 Void MessageQ_msgInit (MessageQ_Msg msg)
925 {
926 #if 0
927 Int status = MessageQ_S_SUCCESS;
928 LAD_ClientHandle handle;
929 struct LAD_CommandObj cmd;
930 union LAD_ResponseObj rsp;
932 handle = LAD_findHandle();
933 if (handle == LAD_MAXNUMCLIENTS) {
934 PRINTVERBOSE1(
935 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
936 getpid())
938 return;
939 }
941 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
942 cmd.clientId = handle;
944 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
945 PRINTVERBOSE1(
946 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
947 return;
948 }
950 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
951 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
952 return;
953 }
954 status = rsp.msgInit.status;
956 PRINTVERBOSE2(
957 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
958 handle, status)
960 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
961 #else
962 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
963 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
964 msg->msgId = MessageQ_INVALIDMSGID;
965 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
966 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
967 msg->srcProc = MultiProc_self();
969 pthread_mutex_lock(&(MessageQ_module->gate));
970 msg->seqNum = MessageQ_module->seqNum++;
971 pthread_mutex_unlock(&(MessageQ_module->gate));
972 #endif
973 }
975 /*
976 * =============================================================================
977 * Transport: Fxns kept here until need for a transport layer is realized.
978 * =============================================================================
979 */
980 /*
981 * ======== transportCreateEndpoint ========
982 *
983 * Create a communication endpoint to receive messages.
984 */
985 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
986 {
987 Int status = MessageQ_S_SUCCESS;
988 int err;
990 /* Create the socket to receive messages for this messageQ. */
991 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
992 if (*fd < 0) {
993 status = MessageQ_E_FAIL;
994 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
995 errno, strerror(errno));
996 goto exit;
997 }
999 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1001 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1002 if (err < 0) {
1003 status = MessageQ_E_FAIL;
1004 /* don't hard-printf since this is no longer fatal */
1005 PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
1006 errno, strerror(errno));
1007 }
1009 exit:
1010 return (status);
1011 }
1013 /*
1014 * ======== transportCloseEndpoint ========
1015 *
1016 * Close the communication endpoint.
1017 */
1018 static Int transportCloseEndpoint(int fd)
1019 {
1020 Int status = MessageQ_S_SUCCESS;
1022 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1024 /* Stop communication to this socket: */
1025 close(fd);
1027 return (status);
1028 }
1030 /*
1031 * ======== transportGet ========
1032 * Retrieve a message waiting in the socket's queue.
1033 */
1034 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1035 {
1036 Int status = MessageQ_S_SUCCESS;
1037 MessageQ_Msg msg;
1038 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1039 unsigned int len;
1040 int byteCount;
1042 /*
1043 * We have no way of peeking to see what message size we'll get, so we
1044 * allocate a message of max size to receive contents from the rpmsg socket
1045 * (currently, a copy transport)
1046 */
1047 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1048 if (!msg) {
1049 status = MessageQ_E_MEMORY;
1050 goto exit;
1051 }
1053 memset(&fromAddr, 0, sizeof(fromAddr));
1054 len = sizeof(fromAddr);
1056 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1057 (struct sockaddr *)&fromAddr, &len);
1058 if (len != sizeof(fromAddr)) {
1059 printf("recvfrom: got bad addr len (%d)\n", len);
1060 status = MessageQ_E_FAIL;
1061 goto exit;
1062 }
1063 if (byteCount < 0) {
1064 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1065 status = MessageQ_E_FAIL;
1066 goto exit;
1067 }
1068 else {
1069 /* Update the allocated message size (even though this may waste space
1070 * when the actual message is smaller than the maximum rpmsg size,
1071 * the message will be freed soon anyway, and it avoids an extra copy).
1072 */
1073 msg->msgSize = byteCount;
1075 /*
1076 * If the message received was statically allocated, reset the
1077 * heapId, so the app can free it.
1078 */
1079 if (msg->heapId == MessageQ_STATICMSG) {
1080 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1081 }
1082 }
1084 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1085 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1086 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1088 *retMsg = msg;
1090 exit:
1091 return (status);
1092 }
1094 /*
1095 * ======== transportPut ========
1096 *
1097 * Calls the socket API sendto() on the socket associated with
1098 * with this destination procID.
1099 * Currently, both local and remote messages are sent via the Socket ABI, so
1100 * no local object lists are maintained here.
1101 */
1102 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1103 {
1104 Int status = MessageQ_S_SUCCESS;
1105 int sock;
1106 int err;
1108 /*
1109 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1110 * transport.
1111 */
1112 sock = MessageQ_module->sock[dstProcId];
1114 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1116 err = send(sock, msg, msg->msgSize, 0);
1117 if (err < 0) {
1118 printf ("transportPut: send failed: %d, %s\n",
1119 errno, strerror(errno));
1120 status = MessageQ_E_FAIL;
1121 }
1123 /*
1124 * Free the message, as this is a copy transport, we maintain MessageQ
1125 * semantics.
1126 */
1127 MessageQ_free (msg);
1129 return (status);
1130 }