1 /*
2 * Copyright (c) 2012-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
44 /* Standard IPC header */
45 #include <ti/ipc/Std.h>
47 /* Linux specific header files, replacing OSAL: */
48 #include <pthread.h>
50 /* Module level headers */
51 #include <ti/ipc/NameServer.h>
52 #include <ti/ipc/MultiProc.h>
53 #include <_MultiProc.h>
54 #include <ti/ipc/MessageQ.h>
55 #include <_MessageQ.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/socket.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
71 /* Socket Protocol Family */
72 #include <net/rpmsg.h>
74 /* Socket utils: */
75 #include <SocketFxns.h>
77 #include <ladclient.h>
78 #include <_lad.h>
80 /* =============================================================================
81 * Macros/Constants
82 * =============================================================================
83 */
85 /*!
86 * @brief Name of the reserved NameServer used for MessageQ.
87 */
88 #define MessageQ_NAMESERVER "MessageQ"
90 /*!
91 * @brief Value of an invalid socket ID:
92 */
93 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
95 /* More magic rpmsg port numbers: */
96 #define MESSAGEQ_RPMSG_PORT 61
97 #define MESSAGEQ_RPMSG_MAXSIZE 512
99 /* Trace flag settings: */
100 #define TRACESHIFT 12
101 #define TRACEMASK 0x1000
103 /* Define BENCHMARK to quiet key MessageQ APIs: */
104 //#define BENCHMARK
106 /* =============================================================================
107 * Structures & Enums
108 * =============================================================================
109 */
111 /* structure for MessageQ module state */
112 typedef struct MessageQ_ModuleObject {
113 Int refCount;
114 /*!< Reference count */
115 NameServer_Handle nameServer;
116 /*!< Handle to the local NameServer used for storing GP objects */
117 pthread_mutex_t gate;
118 /*!< Handle of gate to be used for local thread safety */
119 MessageQ_Params defaultInstParams;
120 /*!< Default instance creation parameters */
121 int sock[MultiProc_MAXPROCESSORS];
122 /*!< Sockets to for sending to each remote processor */
123 int seqNum;
124 /*!< Process-specific sequence number */
125 } MessageQ_ModuleObject;
127 /*!
128 * @brief Structure for the Handle for the MessageQ.
129 */
130 typedef struct MessageQ_Object_tag {
131 MessageQ_Params params;
132 /*! Instance specific creation parameters */
133 MessageQ_QueueId queue;
134 /* Unique id */
135 int fd[MultiProc_MAXPROCESSORS];
136 /* File Descriptor to block on messages from remote processors. */
137 int unblockFd;
138 /* Write this fd to unblock the select() call in MessageQ _get() */
139 void *serverHandle;
140 } MessageQ_Object;
142 static Bool verbose = FALSE;
145 /* =============================================================================
146 * Globals
147 * =============================================================================
148 */
149 static MessageQ_ModuleObject MessageQ_state =
150 {
151 .refCount = 0,
152 .nameServer = NULL,
153 };
155 /*!
156 * @var MessageQ_module
157 *
158 * @brief Pointer to the MessageQ module state.
159 */
160 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
163 /* =============================================================================
164 * Forward declarations of internal functions
165 * =============================================================================
166 */
168 /* This is a helper function to initialize a message. */
169 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
170 static Int transportCloseEndpoint(int fd);
171 static Int transportGet(int sock, MessageQ_Msg * retMsg);
172 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
178 /* Function to get default configuration for the MessageQ module.
179 *
180 */
181 Void MessageQ_getConfig (MessageQ_Config * cfg)
182 {
183 Int status;
184 LAD_ClientHandle handle;
185 struct LAD_CommandObj cmd;
186 union LAD_ResponseObj rsp;
188 assert (cfg != NULL);
190 handle = LAD_findHandle();
191 if (handle == LAD_MAXNUMCLIENTS) {
192 PRINTVERBOSE1(
193 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
194 getpid())
196 return;
197 }
199 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
200 cmd.clientId = handle;
202 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
203 PRINTVERBOSE1(
204 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
205 return;
206 }
208 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
209 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
210 return;
211 }
212 status = rsp.messageQGetConfig.status;
214 PRINTVERBOSE2(
215 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
216 handle, status)
218 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
220 return;
221 }
223 /* Function to setup the MessageQ module. */
224 Int MessageQ_setup(const MessageQ_Config * cfg)
225 {
226 Int status;
227 LAD_ClientHandle handle;
228 struct LAD_CommandObj cmd;
229 union LAD_ResponseObj rsp;
230 Int i;
232 handle = LAD_findHandle();
233 if (handle == LAD_MAXNUMCLIENTS) {
234 PRINTVERBOSE1(
235 "MessageQ_setup: can't find connection to daemon for pid %d\n",
236 getpid())
238 return MessageQ_E_RESOURCE;
239 }
241 cmd.cmd = LAD_MESSAGEQ_SETUP;
242 cmd.clientId = handle;
243 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
245 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
246 PRINTVERBOSE1(
247 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
248 return MessageQ_E_FAIL;
249 }
251 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
252 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
253 return(status);
254 }
255 status = rsp.setup.status;
257 PRINTVERBOSE2(
258 "MessageQ_setup: got LAD response for client %d, status=%d\n",
259 handle, status)
261 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
262 MessageQ_module->seqNum = 0;
264 /* Create a default local gate. */
265 pthread_mutex_init (&(MessageQ_module->gate), NULL);
267 /* Clear sockets array. */
268 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
269 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
270 }
272 return status;
273 }
275 /*
276 * Function to destroy the MessageQ module.
277 * Destroys socket/protocol maps; sockets themselves should have been
278 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
279 */
280 Int MessageQ_destroy (void)
281 {
282 Int status;
283 LAD_ClientHandle handle;
284 struct LAD_CommandObj cmd;
285 union LAD_ResponseObj rsp;
287 handle = LAD_findHandle();
288 if (handle == LAD_MAXNUMCLIENTS) {
289 PRINTVERBOSE1(
290 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
291 getpid())
293 return MessageQ_E_RESOURCE;
294 }
296 cmd.cmd = LAD_MESSAGEQ_DESTROY;
297 cmd.clientId = handle;
299 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
300 PRINTVERBOSE1(
301 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
302 return MessageQ_E_FAIL;
303 }
305 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
306 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
307 return(status);
308 }
309 status = rsp.status;
311 PRINTVERBOSE2(
312 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
313 handle, status)
315 return status;
316 }
318 /* Function to initialize the parameters for the MessageQ instance. */
319 Void MessageQ_Params_init (MessageQ_Params * params)
320 {
321 memcpy (params, &(MessageQ_module->defaultInstParams),
322 sizeof (MessageQ_Params));
324 return;
325 }
327 /*
328 * Function to create a MessageQ object for receiving.
329 *
330 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
331 * order to get messages dispatched to this messageQ.
332 */
333 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
334 {
335 Int status;
336 MessageQ_Object * obj = NULL;
337 UInt16 queueIndex = 0u;
338 UInt16 procId;
339 UInt16 rprocId;
340 LAD_ClientHandle handle;
341 struct LAD_CommandObj cmd;
342 union LAD_ResponseObj rsp;
344 handle = LAD_findHandle();
345 if (handle == LAD_MAXNUMCLIENTS) {
346 PRINTVERBOSE1(
347 "MessageQ_create: can't find connection to daemon for pid %d\n",
348 getpid())
350 return NULL;
351 }
353 cmd.cmd = LAD_MESSAGEQ_CREATE;
354 cmd.clientId = handle;
355 strncpy(cmd.args.messageQCreate.name, name,
356 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
357 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
358 if (params) {
359 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
360 }
362 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
363 PRINTVERBOSE1(
364 "MessageQ_create: sending LAD command failed, status=%d\n", status)
365 return NULL;
366 }
368 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
369 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
370 return NULL;
371 }
372 status = rsp.messageQCreate.status;
374 PRINTVERBOSE2(
375 "MessageQ_create: got LAD response for client %d, status=%d\n",
376 handle, status)
378 if (status == -1) {
379 PRINTVERBOSE1(
380 "MessageQ_create: MessageQ server operation failed, status=%d\n",
381 status)
382 return NULL;
383 }
385 /* Create the generic obj */
386 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
388 if (params != NULL) {
389 /* Populate the params member */
390 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
391 }
393 procId = MultiProc_self();
394 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
395 obj->queue = rsp.messageQCreate.queueId;
396 obj->serverHandle = rsp.messageQCreate.serverHandle;
398 /*
399 * Create a set of communication endpoints (one per each remote proc),
400 * and return the socket as target for MessageQ_put() calls, and as
401 * a file descriptor to close during MessageQ_delete().
402 */
403 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
404 obj->fd[rprocId] = Transport_INVALIDSOCKET;
405 if (procId == rprocId) {
406 /* Skip creating an endpoint for ourself. */
407 continue;
408 }
410 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
412 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
413 queueIndex);
414 if (status < 0) {
415 obj->fd[rprocId] = Transport_INVALIDSOCKET;
416 }
417 }
419 /*
420 * Now, to support MessageQ_unblock() functionality, create an event object.
421 * Writing to this event will unblock the select() call in MessageQ_get().
422 */
423 obj->unblockFd = eventfd(0, 0);
424 if (obj->unblockFd == -1) {
425 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
426 errno, strerror(errno));
427 MessageQ_delete((MessageQ_Handle *)&obj);
428 }
429 else {
430 int endpointFound = 0;
432 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
433 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
434 endpointFound = 1;
435 }
436 }
437 if (!endpointFound) {
438 printf("MessageQ_create: no transport endpoints found, deleting\n");
439 MessageQ_delete((MessageQ_Handle *)&obj);
440 }
441 }
443 return ((MessageQ_Handle) obj);
444 }
446 /*
447 * Function to delete a MessageQ object for a specific slave processor.
448 *
449 * Deletes the socket associated with this MessageQ object.
450 */
451 Int MessageQ_delete (MessageQ_Handle * handlePtr)
452 {
453 Int status = MessageQ_S_SUCCESS;
454 MessageQ_Object * obj = NULL;
455 UInt16 rprocId;
456 LAD_ClientHandle handle;
457 struct LAD_CommandObj cmd;
458 union LAD_ResponseObj rsp;
460 handle = LAD_findHandle();
461 if (handle == LAD_MAXNUMCLIENTS) {
462 PRINTVERBOSE1(
463 "MessageQ_delete: can't find connection to daemon for pid %d\n",
464 getpid())
466 return MessageQ_E_FAIL;
467 }
469 obj = (MessageQ_Object *) (*handlePtr);
471 cmd.cmd = LAD_MESSAGEQ_DELETE;
472 cmd.clientId = handle;
473 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
475 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
476 PRINTVERBOSE1(
477 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
478 return MessageQ_E_FAIL;
479 }
481 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
482 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
483 return MessageQ_E_FAIL;
484 }
485 status = rsp.messageQDelete.status;
487 PRINTVERBOSE2(
488 "MessageQ_delete: got LAD response for client %d, status=%d\n",
489 handle, status)
492 /* Close the event used for MessageQ_unblock(): */
493 close(obj->unblockFd);
495 /* Close the communication endpoint: */
496 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
497 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
498 status = transportCloseEndpoint(obj->fd[rprocId]);
499 }
500 }
502 /* Now free the obj */
503 free (obj);
504 *handlePtr = NULL;
506 return (status);
507 }
509 /*
510 * Opens an instance of MessageQ for sending.
511 *
512 * We need not create a socket here; the sockets for all remote processors
513 * were created during MessageQ_attach(), and will be
514 * retrieved during MessageQ_put().
515 */
516 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
517 {
518 Int status = MessageQ_S_SUCCESS;
520 status = NameServer_getUInt32 (MessageQ_module->nameServer,
521 name, queueId, NULL);
523 if (status == NameServer_E_NOTFOUND) {
524 /* Set return queue ID to invalid. */
525 *queueId = MessageQ_INVALIDMESSAGEQ;
526 status = MessageQ_E_NOTFOUND;
527 }
528 else if (status >= 0) {
529 /* Override with a MessageQ status code. */
530 status = MessageQ_S_SUCCESS;
531 }
532 else {
533 /* Set return queue ID to invalid. */
534 *queueId = MessageQ_INVALIDMESSAGEQ;
535 /* Override with a MessageQ status code. */
536 if (status == NameServer_E_TIMEOUT) {
537 status = MessageQ_E_TIMEOUT;
538 }
539 else {
540 status = MessageQ_E_FAIL;
541 }
542 }
544 return (status);
545 }
547 /* Closes previously opened instance of MessageQ module. */
548 Int MessageQ_close (MessageQ_QueueId * queueId)
549 {
550 Int32 status = MessageQ_S_SUCCESS;
552 /* Nothing more to be done for closing the MessageQ. */
553 *queueId = MessageQ_INVALIDMESSAGEQ;
555 return (status);
556 }
558 /*
559 * Place a message onto a message queue.
560 *
561 * Calls TransportShm_put(), which handles the sending of the message using the
562 * appropriate kernel interface (socket, device ioctl) call for the remote
563 * procId encoded in the queueId argument.
564 *
565 */
566 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
567 {
568 Int status;
569 UInt16 dstProcId = (UInt16)(queueId >> 16);
570 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
572 msg->dstId = queueIndex;
573 msg->dstProc = dstProcId;
575 status = transportPut(msg, queueIndex, dstProcId);
577 return (status);
578 }
580 /*
581 * Gets a message for a message queue and blocks if the queue is empty.
582 * If a message is present, it returns it. Otherwise it blocks
583 * waiting for a message to arrive.
584 * When a message is returned, it is owned by the caller.
585 *
586 * We block using select() on the receiving socket's file descriptor, then
587 * get the waiting message via the socket API recvfrom().
588 * We use the socket stored in the messageQ object via a previous call to
589 * MessageQ_create().
590 *
591 */
592 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
593 {
594 Int status = MessageQ_S_SUCCESS;
595 Int tmpStatus;
596 MessageQ_Object * obj = (MessageQ_Object *) handle;
597 int retval;
598 int nfds;
599 fd_set rfds;
600 struct timeval tv;
601 void *timevalPtr;
602 UInt16 rprocId;
603 int maxfd = 0;
605 /* Wait (with timeout) and retreive message from socket: */
606 FD_ZERO(&rfds);
607 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
608 if (rprocId == MultiProc_self() ||
609 obj->fd[rprocId] == Transport_INVALIDSOCKET) {
610 continue;
611 }
612 maxfd = MAX(maxfd, obj->fd[rprocId]);
613 FD_SET(obj->fd[rprocId], &rfds);
614 }
616 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
617 FD_SET(obj->unblockFd, &rfds);
619 if (timeout == MessageQ_FOREVER) {
620 timevalPtr = NULL;
621 }
622 else {
623 /* Timeout given in msec: convert: */
624 tv.tv_sec = timeout / 1000;
625 tv.tv_usec = (timeout % 1000) * 1000;
626 timevalPtr = &tv;
627 }
628 /* Add one to last fd created: */
629 nfds = MAX(maxfd, obj->unblockFd) + 1;
631 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
632 if (retval) {
633 if (FD_ISSET(obj->unblockFd, &rfds)) {
634 /*
635 * Our event was signalled by MessageQ_unblock().
636 *
637 * This is typically done during a shutdown sequence, where
638 * the intention of the client would be to ignore (i.e. not fetch)
639 * any pending messages in the transport's queue.
640 * Thus, we shall not check for nor return any messages.
641 */
642 *msg = NULL;
643 status = MessageQ_E_UNBLOCKED;
644 }
645 else {
646 for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
647 rprocId++) {
648 if (rprocId == MultiProc_self() ||
649 obj->fd[rprocId] == Transport_INVALIDSOCKET) {
650 continue;
651 }
652 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
653 /* Our transport's fd was signalled: Get the message: */
654 tmpStatus = transportGet(obj->fd[rprocId], msg);
655 if (tmpStatus < 0) {
656 printf ("MessageQ_get: tranposrtshm_get failed.");
657 status = MessageQ_E_FAIL;
658 }
659 }
660 }
661 }
662 }
663 else if (retval == 0) {
664 *msg = NULL;
665 status = MessageQ_E_TIMEOUT;
666 }
668 return (status);
669 }
671 /*
672 * Return a count of the number of messages in the queue
673 *
674 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
675 */
676 Int MessageQ_count (MessageQ_Handle handle)
677 {
678 Int count = -1;
679 #if 0
680 MessageQ_Object * obj = (MessageQ_Object *) handle;
681 socklen_t optlen;
683 /*
684 * TBD: Need to find a way to implement (if anyone uses it!), and
685 * push down into transport..
686 */
688 /*
689 * 2nd arg to getsockopt should be transport independent, but using
690 * SSKPROTO_SHMFIFO for now:
691 */
692 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
693 &count, &optlen);
694 #endif
696 return (count);
697 }
699 /* Initializes a message not obtained from MessageQ_alloc. */
700 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
701 {
702 /* Fill in the fields of the message */
703 MessageQ_msgInit (msg);
704 msg->heapId = MessageQ_STATICMSG;
705 msg->msgSize = size;
706 }
708 /*
709 * Allocate a message and initialize the needed fields (note some
710 * of the fields in the header are set via other APIs or in the
711 * MessageQ_put function,
712 */
713 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
714 {
715 MessageQ_Msg msg = NULL;
717 /*
718 * heapId not used for local alloc (as this is over a copy transport), but
719 * we need to send to other side as heapId is used in BIOS transport:
720 */
721 msg = (MessageQ_Msg)calloc (1, size);
722 MessageQ_msgInit (msg);
723 msg->msgSize = size;
724 msg->heapId = heapId;
726 return msg;
727 }
729 /* Frees the message back to the heap that was used to allocate it. */
730 Int MessageQ_free (MessageQ_Msg msg)
731 {
732 UInt32 status = MessageQ_S_SUCCESS;
734 /* Check to ensure this was not allocated by user: */
735 if (msg->heapId == MessageQ_STATICMSG) {
736 status = MessageQ_E_CANNOTFREESTATICMSG;
737 }
738 else {
739 free (msg);
740 }
742 return status;
743 }
745 /* Register a heap with MessageQ. */
746 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
747 {
748 Int status = MessageQ_S_SUCCESS;
750 /* Do nothing, as this uses a copy transport: */
752 return status;
753 }
755 /* Unregister a heap with MessageQ. */
756 Int MessageQ_unregisterHeap (UInt16 heapId)
757 {
758 Int status = MessageQ_S_SUCCESS;
760 /* Do nothing, as this uses a copy transport: */
762 return status;
763 }
765 /* Unblocks a MessageQ */
766 Void MessageQ_unblock (MessageQ_Handle handle)
767 {
768 MessageQ_Object * obj = (MessageQ_Object *) handle;
769 uint64_t buf = 1;
770 int numBytes;
772 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
773 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
774 }
776 /* Embeds a source message queue into a message. */
777 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
778 {
779 MessageQ_Object * obj = (MessageQ_Object *) handle;
781 msg->replyId = (UInt16)(obj->queue);
782 msg->replyProc = (UInt16)(obj->queue >> 16);
783 }
785 /* Returns the QueueId associated with the handle. */
786 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
787 {
788 MessageQ_Object * obj = (MessageQ_Object *) handle;
789 UInt32 queueId;
791 queueId = (obj->queue);
793 return queueId;
794 }
796 /* Sets the tracing of a message */
797 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
798 {
799 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
800 }
802 /*
803 * Returns the amount of shared memory used by one transport instance.
804 *
805 * The MessageQ module itself does not use any shared memory but the
806 * underlying transport may use some shared memory.
807 */
808 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
809 {
810 SizeT memReq = 0u;
812 /* Do nothing, as this is a copy transport. */
814 return (memReq);
815 }
817 /*
818 * Create a socket for this remote proc, and attempt to connect.
819 *
820 * Only creates a socket if one does not already exist for this procId.
821 *
822 * Note: remoteProcId may be MultiProc_Self() for loopback case.
823 */
824 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
825 {
826 Int status = MessageQ_S_SUCCESS;
827 int sock;
829 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
831 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
832 status = MessageQ_E_INVALIDPROCID;
833 goto exit;
834 }
836 pthread_mutex_lock (&(MessageQ_module->gate));
838 /* Only create a socket if one doesn't exist: */
839 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
840 /* Create the socket for sending messages to the remote proc: */
841 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
842 if (sock < 0) {
843 status = MessageQ_E_FAIL;
844 printf ("MessageQ_attach: socket failed: %d, %s\n",
845 errno, strerror(errno));
846 }
847 else {
848 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
849 MessageQ_module->sock[remoteProcId] = sock;
850 /* Attempt to connect: */
851 status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
852 if (status < 0) {
853 status = MessageQ_E_RESOURCE;
854 /* don't hard-printf since this is no longer fatal */
855 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
856 remoteProcId);
857 }
858 }
859 }
860 else {
861 status = MessageQ_E_ALREADYEXISTS;
862 }
864 pthread_mutex_unlock (&(MessageQ_module->gate));
866 if (status == MessageQ_E_RESOURCE) {
867 MessageQ_detach(remoteProcId);
868 }
870 exit:
871 return (status);
872 }
874 /*
875 * Close the socket for this remote proc.
876 *
877 */
878 Int MessageQ_detach (UInt16 remoteProcId)
879 {
880 Int status = MessageQ_S_SUCCESS;
881 int sock;
883 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
884 status = MessageQ_E_INVALIDPROCID;
885 goto exit;
886 }
888 pthread_mutex_lock (&(MessageQ_module->gate));
890 sock = MessageQ_module->sock[remoteProcId];
891 if (sock != Transport_INVALIDSOCKET) {
892 if (close(sock)) {
893 status = MessageQ_E_OSFAILURE;
894 printf("MessageQ_detach: close failed: %d, %s\n",
895 errno, strerror(errno));
896 }
897 else {
898 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
899 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
900 }
901 }
903 pthread_mutex_unlock (&(MessageQ_module->gate));
905 exit:
906 return (status);
907 }
909 /*
910 * This is a helper function to initialize a message.
911 */
912 Void MessageQ_msgInit (MessageQ_Msg msg)
913 {
914 #if 0
915 Int status = MessageQ_S_SUCCESS;
916 LAD_ClientHandle handle;
917 struct LAD_CommandObj cmd;
918 union LAD_ResponseObj rsp;
920 handle = LAD_findHandle();
921 if (handle == LAD_MAXNUMCLIENTS) {
922 PRINTVERBOSE1(
923 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
924 getpid())
926 return;
927 }
929 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
930 cmd.clientId = handle;
932 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
933 PRINTVERBOSE1(
934 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
935 return;
936 }
938 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
939 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
940 return;
941 }
942 status = rsp.msgInit.status;
944 PRINTVERBOSE2(
945 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
946 handle, status)
948 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
949 #else
950 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
951 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
952 msg->msgId = MessageQ_INVALIDMSGID;
953 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
954 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
955 msg->srcProc = MultiProc_self();
957 pthread_mutex_lock(&(MessageQ_module->gate));
958 msg->seqNum = MessageQ_module->seqNum++;
959 pthread_mutex_unlock(&(MessageQ_module->gate));
960 #endif
961 }
963 /*
964 * =============================================================================
965 * Transport: Fxns kept here until need for a transport layer is realized.
966 * =============================================================================
967 */
968 /*
969 * ======== transportCreateEndpoint ========
970 *
971 * Create a communication endpoint to receive messages.
972 */
973 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
974 {
975 Int status = MessageQ_S_SUCCESS;
976 int err;
978 /* Create the socket to receive messages for this messageQ. */
979 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
980 if (*fd < 0) {
981 status = MessageQ_E_FAIL;
982 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
983 errno, strerror(errno));
984 goto exit;
985 }
987 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
989 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
990 if (err < 0) {
991 status = MessageQ_E_FAIL;
992 /* don't hard-printf since this is no longer fatal */
993 PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
994 errno, strerror(errno));
995 }
997 exit:
998 return (status);
999 }
1001 /*
1002 * ======== transportCloseEndpoint ========
1003 *
1004 * Close the communication endpoint.
1005 */
1006 static Int transportCloseEndpoint(int fd)
1007 {
1008 Int status = MessageQ_S_SUCCESS;
1010 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1012 /* Stop communication to this socket: */
1013 close(fd);
1015 return (status);
1016 }
1018 /*
1019 * ======== transportGet ========
1020 * Retrieve a message waiting in the socket's queue.
1021 */
1022 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1023 {
1024 Int status = MessageQ_S_SUCCESS;
1025 MessageQ_Msg msg;
1026 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1027 unsigned int len;
1028 int byteCount;
1030 /*
1031 * We have no way of peeking to see what message size we'll get, so we
1032 * allocate a message of max size to receive contents from the rpmsg socket
1033 * (currently, a copy transport)
1034 */
1035 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1036 if (!msg) {
1037 status = MessageQ_E_MEMORY;
1038 goto exit;
1039 }
1041 memset(&fromAddr, 0, sizeof(fromAddr));
1042 len = sizeof(fromAddr);
1044 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1045 (struct sockaddr *)&fromAddr, &len);
1046 if (len != sizeof(fromAddr)) {
1047 printf("recvfrom: got bad addr len (%d)\n", len);
1048 status = MessageQ_E_FAIL;
1049 goto exit;
1050 }
1051 if (byteCount < 0) {
1052 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1053 status = MessageQ_E_FAIL;
1054 goto exit;
1055 }
1056 else {
1057 /* Update the allocated message size (even though this may waste space
1058 * when the actual message is smaller than the maximum rpmsg size,
1059 * the message will be freed soon anyway, and it avoids an extra copy).
1060 */
1061 msg->msgSize = byteCount;
1063 /*
1064 * If the message received was statically allocated, reset the
1065 * heapId, so the app can free it.
1066 */
1067 if (msg->heapId == MessageQ_STATICMSG) {
1068 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1069 }
1070 }
1072 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1073 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1074 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1076 *retMsg = msg;
1078 exit:
1079 return (status);
1080 }
1082 /*
1083 * ======== transportPut ========
1084 *
1085 * Calls the socket API sendto() on the socket associated with
1086 * with this destination procID.
1087 * Currently, both local and remote messages are sent via the Socket ABI, so
1088 * no local object lists are maintained here.
1089 */
1090 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1091 {
1092 Int status = MessageQ_S_SUCCESS;
1093 int sock;
1094 int err;
1096 /*
1097 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1098 * transport.
1099 */
1100 sock = MessageQ_module->sock[dstProcId];
1102 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1104 err = send(sock, msg, msg->msgSize, 0);
1105 if (err < 0) {
1106 printf ("transportPut: send failed: %d, %s\n",
1107 errno, strerror(errno));
1108 status = MessageQ_E_FAIL;
1109 }
1111 /*
1112 * Free the message, as this is a copy transport, we maintain MessageQ
1113 * semantics.
1114 */
1115 MessageQ_free (msg);
1117 return (status);
1118 }