1 /*
2 * Copyright (c) 2012-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /*
33 * @file MessageQ.c
34 *
35 * @brief MessageQ module "client" implementation
36 *
37 * This implementation is geared for use in a "client/server" model, whereby
38 * system-wide data is maintained in a "server" component and process-
39 * specific data is handled here. At the moment, this implementation
40 * connects and communicates with LAD for the server connection.
41 */
44 /* Standard IPC header */
45 #include <ti/ipc/Std.h>
47 /* Linux specific header files, replacing OSAL: */
48 #include <pthread.h>
50 /* Module level headers */
51 #include <ti/ipc/NameServer.h>
52 #include <ti/ipc/MultiProc.h>
53 #include <_MultiProc.h>
54 #include <ti/ipc/MessageQ.h>
55 #include <_MessageQ.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/socket.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
71 /* Socket Protocol Family */
72 #include <net/rpmsg.h>
74 /* Socket utils: */
75 #include <SocketFxns.h>
77 #include <ladclient.h>
78 #include <_lad.h>
80 /* =============================================================================
81 * Macros/Constants
82 * =============================================================================
83 */
85 /*!
86 * @brief Name of the reserved NameServer used for MessageQ.
87 */
88 #define MessageQ_NAMESERVER "MessageQ"
90 /*!
91 * @brief Value of an invalid socket ID:
92 */
93 #define Transport_INVALIDSOCKET (0xFFFFFFFF)
95 /* More magic rpmsg port numbers: */
96 #define MESSAGEQ_RPMSG_PORT 61
97 #define MESSAGEQ_RPMSG_MAXSIZE 512
99 /* Trace flag settings: */
100 #define TRACESHIFT 12
101 #define TRACEMASK 0x1000
103 /* Define BENCHMARK to quiet key MessageQ APIs: */
104 //#define BENCHMARK
106 /* =============================================================================
107 * Structures & Enums
108 * =============================================================================
109 */
111 /* structure for MessageQ module state */
112 typedef struct MessageQ_ModuleObject {
113 Int refCount;
114 /*!< Reference count */
115 NameServer_Handle nameServer;
116 /*!< Handle to the local NameServer used for storing GP objects */
117 pthread_mutex_t gate;
118 /*!< Handle of gate to be used for local thread safety */
119 MessageQ_Params defaultInstParams;
120 /*!< Default instance creation parameters */
121 int sock[MultiProc_MAXPROCESSORS];
122 /*!< Sockets to for sending to each remote processor */
123 int seqNum;
124 /*!< Process-specific sequence number */
125 } MessageQ_ModuleObject;
127 /*!
128 * @brief Structure for the Handle for the MessageQ.
129 */
130 typedef struct MessageQ_Object_tag {
131 MessageQ_Params params;
132 /*! Instance specific creation parameters */
133 MessageQ_QueueId queue;
134 /* Unique id */
135 int fd[MultiProc_MAXPROCESSORS];
136 /* File Descriptor to block on messages from remote processors. */
137 int unblockFd;
138 /* Write this fd to unblock the select() call in MessageQ _get() */
139 void *serverHandle;
140 } MessageQ_Object;
142 static Bool verbose = FALSE;
145 /* =============================================================================
146 * Globals
147 * =============================================================================
148 */
149 static MessageQ_ModuleObject MessageQ_state =
150 {
151 .refCount = 0,
152 .nameServer = NULL,
153 };
155 /*!
156 * @var MessageQ_module
157 *
158 * @brief Pointer to the MessageQ module state.
159 */
160 MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
163 /* =============================================================================
164 * Forward declarations of internal functions
165 * =============================================================================
166 */
168 /* This is a helper function to initialize a message. */
169 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
170 static Int transportCloseEndpoint(int fd);
171 static Int transportGet(int sock, MessageQ_Msg * retMsg);
172 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
174 /* =============================================================================
175 * APIS
176 * =============================================================================
177 */
178 /* Function to get default configuration for the MessageQ module.
179 *
180 */
181 Void MessageQ_getConfig (MessageQ_Config * cfg)
182 {
183 Int status;
184 LAD_ClientHandle handle;
185 struct LAD_CommandObj cmd;
186 union LAD_ResponseObj rsp;
188 assert (cfg != NULL);
190 handle = LAD_findHandle();
191 if (handle == LAD_MAXNUMCLIENTS) {
192 PRINTVERBOSE1(
193 "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
194 getpid())
196 return;
197 }
199 cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
200 cmd.clientId = handle;
202 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
203 PRINTVERBOSE1(
204 "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
205 return;
206 }
208 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
209 PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
210 return;
211 }
212 status = rsp.messageQGetConfig.status;
214 PRINTVERBOSE2(
215 "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
216 handle, status)
218 memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
220 return;
221 }
223 /* Function to setup the MessageQ module. */
224 Int MessageQ_setup(const MessageQ_Config * cfg)
225 {
226 Int status;
227 LAD_ClientHandle handle;
228 struct LAD_CommandObj cmd;
229 union LAD_ResponseObj rsp;
230 Int i;
232 handle = LAD_findHandle();
233 if (handle == LAD_MAXNUMCLIENTS) {
234 PRINTVERBOSE1(
235 "MessageQ_setup: can't find connection to daemon for pid %d\n",
236 getpid())
238 return MessageQ_E_RESOURCE;
239 }
241 cmd.cmd = LAD_MESSAGEQ_SETUP;
242 cmd.clientId = handle;
243 memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
245 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
246 PRINTVERBOSE1(
247 "MessageQ_setup: sending LAD command failed, status=%d\n", status)
248 return MessageQ_E_FAIL;
249 }
251 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
252 PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
253 return(status);
254 }
255 status = rsp.setup.status;
257 PRINTVERBOSE2(
258 "MessageQ_setup: got LAD response for client %d, status=%d\n",
259 handle, status)
261 MessageQ_module->nameServer = rsp.setup.nameServerHandle;
262 MessageQ_module->seqNum = 0;
264 /* Create a default local gate. */
265 pthread_mutex_init (&(MessageQ_module->gate), NULL);
267 /* Clear sockets array. */
268 for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
269 MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
270 }
272 return status;
273 }
275 /*
276 * Function to destroy the MessageQ module.
277 * Destroys socket/protocol maps; sockets themselves should have been
278 * destroyed in MessageQ_delete() and MessageQ_detach() calls.
279 */
280 Int MessageQ_destroy (void)
281 {
282 Int status;
283 LAD_ClientHandle handle;
284 struct LAD_CommandObj cmd;
285 union LAD_ResponseObj rsp;
287 handle = LAD_findHandle();
288 if (handle == LAD_MAXNUMCLIENTS) {
289 PRINTVERBOSE1(
290 "MessageQ_destroy: can't find connection to daemon for pid %d\n",
291 getpid())
293 return MessageQ_E_RESOURCE;
294 }
296 cmd.cmd = LAD_MESSAGEQ_DESTROY;
297 cmd.clientId = handle;
299 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
300 PRINTVERBOSE1(
301 "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
302 return MessageQ_E_FAIL;
303 }
305 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
306 PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
307 return(status);
308 }
309 status = rsp.status;
311 PRINTVERBOSE2(
312 "MessageQ_destroy: got LAD response for client %d, status=%d\n",
313 handle, status)
315 return status;
316 }
318 /* Function to initialize the parameters for the MessageQ instance. */
319 Void MessageQ_Params_init (MessageQ_Params * params)
320 {
321 memcpy (params, &(MessageQ_module->defaultInstParams),
322 sizeof (MessageQ_Params));
324 return;
325 }
327 /*
328 * Function to create a MessageQ object for receiving.
329 *
330 * Create a socket and bind the source address (local ProcId/MessageQ ID) in
331 * order to get messages dispatched to this messageQ.
332 */
333 MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
334 {
335 Int status;
336 MessageQ_Object * obj = NULL;
337 UInt16 queueIndex = 0u;
338 UInt16 procId;
339 UInt16 rprocId;
340 LAD_ClientHandle handle;
341 struct LAD_CommandObj cmd;
342 union LAD_ResponseObj rsp;
344 handle = LAD_findHandle();
345 if (handle == LAD_MAXNUMCLIENTS) {
346 PRINTVERBOSE1(
347 "MessageQ_create: can't find connection to daemon for pid %d\n",
348 getpid())
350 return NULL;
351 }
353 cmd.cmd = LAD_MESSAGEQ_CREATE;
354 cmd.clientId = handle;
355 if (name == NULL) {
356 cmd.args.messageQCreate.name[0] = '\0';
357 }
358 else {
359 strncpy(cmd.args.messageQCreate.name, name,
360 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
361 cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
362 }
364 if (params) {
365 memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
366 }
368 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
369 PRINTVERBOSE1(
370 "MessageQ_create: sending LAD command failed, status=%d\n", status)
371 return NULL;
372 }
374 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
375 PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
376 return NULL;
377 }
378 status = rsp.messageQCreate.status;
380 PRINTVERBOSE2(
381 "MessageQ_create: got LAD response for client %d, status=%d\n",
382 handle, status)
384 if (status == -1) {
385 PRINTVERBOSE1(
386 "MessageQ_create: MessageQ server operation failed, status=%d\n",
387 status)
388 return NULL;
389 }
391 /* Create the generic obj */
392 obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
394 if (params != NULL) {
395 /* Populate the params member */
396 memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
397 }
399 procId = MultiProc_self();
400 queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
401 obj->queue = rsp.messageQCreate.queueId;
402 obj->serverHandle = rsp.messageQCreate.serverHandle;
404 /*
405 * Create a set of communication endpoints (one per each remote proc),
406 * and return the socket as target for MessageQ_put() calls, and as
407 * a file descriptor to close during MessageQ_delete().
408 */
409 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
410 obj->fd[rprocId] = Transport_INVALIDSOCKET;
411 if (procId == rprocId) {
412 /* Skip creating an endpoint for ourself. */
413 continue;
414 }
416 PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
418 status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
419 queueIndex);
420 if (status < 0) {
421 obj->fd[rprocId] = Transport_INVALIDSOCKET;
422 }
423 }
425 /*
426 * Now, to support MessageQ_unblock() functionality, create an event object.
427 * Writing to this event will unblock the select() call in MessageQ_get().
428 */
429 obj->unblockFd = eventfd(0, 0);
430 if (obj->unblockFd == -1) {
431 printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
432 errno, strerror(errno));
433 MessageQ_delete((MessageQ_Handle *)&obj);
434 }
435 else {
436 int endpointFound = 0;
438 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
439 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
440 endpointFound = 1;
441 }
442 }
443 if (!endpointFound) {
444 printf("MessageQ_create: no transport endpoints found, deleting\n");
445 MessageQ_delete((MessageQ_Handle *)&obj);
446 }
447 }
449 return ((MessageQ_Handle) obj);
450 }
452 /*
453 * Function to delete a MessageQ object for a specific slave processor.
454 *
455 * Deletes the socket associated with this MessageQ object.
456 */
457 Int MessageQ_delete (MessageQ_Handle * handlePtr)
458 {
459 Int status = MessageQ_S_SUCCESS;
460 MessageQ_Object * obj = NULL;
461 UInt16 rprocId;
462 LAD_ClientHandle handle;
463 struct LAD_CommandObj cmd;
464 union LAD_ResponseObj rsp;
466 handle = LAD_findHandle();
467 if (handle == LAD_MAXNUMCLIENTS) {
468 PRINTVERBOSE1(
469 "MessageQ_delete: can't find connection to daemon for pid %d\n",
470 getpid())
472 return MessageQ_E_FAIL;
473 }
475 obj = (MessageQ_Object *) (*handlePtr);
477 cmd.cmd = LAD_MESSAGEQ_DELETE;
478 cmd.clientId = handle;
479 cmd.args.messageQDelete.serverHandle = obj->serverHandle;
481 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
482 PRINTVERBOSE1(
483 "MessageQ_delete: sending LAD command failed, status=%d\n", status)
484 return MessageQ_E_FAIL;
485 }
487 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
488 PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
489 return MessageQ_E_FAIL;
490 }
491 status = rsp.messageQDelete.status;
493 PRINTVERBOSE2(
494 "MessageQ_delete: got LAD response for client %d, status=%d\n",
495 handle, status)
498 /* Close the event used for MessageQ_unblock(): */
499 close(obj->unblockFd);
501 /* Close the communication endpoint: */
502 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
503 if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
504 status = transportCloseEndpoint(obj->fd[rprocId]);
505 }
506 }
508 /* Now free the obj */
509 free (obj);
510 *handlePtr = NULL;
512 return (status);
513 }
515 /*
516 * Opens an instance of MessageQ for sending.
517 *
518 * We need not create a socket here; the sockets for all remote processors
519 * were created during MessageQ_attach(), and will be
520 * retrieved during MessageQ_put().
521 */
522 Int MessageQ_open (String name, MessageQ_QueueId * queueId)
523 {
524 Int status = MessageQ_S_SUCCESS;
526 status = NameServer_getUInt32 (MessageQ_module->nameServer,
527 name, queueId, NULL);
529 if (status == NameServer_E_NOTFOUND) {
530 /* Set return queue ID to invalid. */
531 *queueId = MessageQ_INVALIDMESSAGEQ;
532 status = MessageQ_E_NOTFOUND;
533 }
534 else if (status >= 0) {
535 /* Override with a MessageQ status code. */
536 status = MessageQ_S_SUCCESS;
537 }
538 else {
539 /* Set return queue ID to invalid. */
540 *queueId = MessageQ_INVALIDMESSAGEQ;
541 /* Override with a MessageQ status code. */
542 if (status == NameServer_E_TIMEOUT) {
543 status = MessageQ_E_TIMEOUT;
544 }
545 else {
546 status = MessageQ_E_FAIL;
547 }
548 }
550 return (status);
551 }
553 /* Closes previously opened instance of MessageQ module. */
554 Int MessageQ_close (MessageQ_QueueId * queueId)
555 {
556 Int32 status = MessageQ_S_SUCCESS;
558 /* Nothing more to be done for closing the MessageQ. */
559 *queueId = MessageQ_INVALIDMESSAGEQ;
561 return (status);
562 }
564 /*
565 * Place a message onto a message queue.
566 *
567 * Calls TransportShm_put(), which handles the sending of the message using the
568 * appropriate kernel interface (socket, device ioctl) call for the remote
569 * procId encoded in the queueId argument.
570 *
571 */
572 Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
573 {
574 Int status;
575 UInt16 dstProcId = (UInt16)(queueId >> 16);
576 UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
578 msg->dstId = queueIndex;
579 msg->dstProc = dstProcId;
581 status = transportPut(msg, queueIndex, dstProcId);
583 return (status);
584 }
586 /*
587 * Gets a message for a message queue and blocks if the queue is empty.
588 * If a message is present, it returns it. Otherwise it blocks
589 * waiting for a message to arrive.
590 * When a message is returned, it is owned by the caller.
591 *
592 * We block using select() on the receiving socket's file descriptor, then
593 * get the waiting message via the socket API recvfrom().
594 * We use the socket stored in the messageQ object via a previous call to
595 * MessageQ_create().
596 *
597 */
598 Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
599 {
600 static int last = 0;
601 Int status = MessageQ_S_SUCCESS;
602 Int tmpStatus;
603 MessageQ_Object * obj = (MessageQ_Object *) handle;
604 int retval;
605 int nfds;
606 fd_set rfds;
607 struct timeval tv;
608 void *timevalPtr;
609 UInt16 rprocId;
610 int maxfd = 0;
611 int selfId;
612 int nProcessors;
614 /* Wait (with timeout) and retreive message from socket: */
615 FD_ZERO(&rfds);
616 for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
617 if (rprocId == MultiProc_self() ||
618 obj->fd[rprocId] == Transport_INVALIDSOCKET) {
619 continue;
620 }
621 maxfd = MAX(maxfd, obj->fd[rprocId]);
622 FD_SET(obj->fd[rprocId], &rfds);
623 }
625 /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
626 FD_SET(obj->unblockFd, &rfds);
628 if (timeout == MessageQ_FOREVER) {
629 timevalPtr = NULL;
630 }
631 else {
632 /* Timeout given in msec: convert: */
633 tv.tv_sec = timeout / 1000;
634 tv.tv_usec = (timeout % 1000) * 1000;
635 timevalPtr = &tv;
636 }
637 /* Add one to last fd created: */
638 nfds = MAX(maxfd, obj->unblockFd) + 1;
640 retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
641 if (retval) {
642 if (FD_ISSET(obj->unblockFd, &rfds)) {
643 /*
644 * Our event was signalled by MessageQ_unblock().
645 *
646 * This is typically done during a shutdown sequence, where
647 * the intention of the client would be to ignore (i.e. not fetch)
648 * any pending messages in the transport's queue.
649 * Thus, we shall not check for nor return any messages.
650 */
651 *msg = NULL;
652 status = MessageQ_E_UNBLOCKED;
653 }
654 else {
655 /* start where we last left off */
656 rprocId = last;
658 selfId = MultiProc_self();
659 nProcessors = MultiProc_getNumProcessors();
661 do {
662 if (rprocId != selfId &&
663 obj->fd[rprocId] != Transport_INVALIDSOCKET) {
665 if (FD_ISSET(obj->fd[rprocId], &rfds)) {
666 /* Our transport's fd was signalled: Get the message */
667 tmpStatus = transportGet(obj->fd[rprocId], msg);
668 if (tmpStatus < 0) {
669 printf ("MessageQ_get: tranposrtshm_get failed.");
670 status = MessageQ_E_FAIL;
671 }
673 last = (rprocId + 1) % nProcessors;
674 break;
675 }
676 }
677 rprocId = (rprocId + 1) % nProcessors;
678 } while (rprocId != last);
679 }
680 }
681 else if (retval == 0) {
682 *msg = NULL;
683 status = MessageQ_E_TIMEOUT;
684 }
686 return (status);
687 }
689 /*
690 * Return a count of the number of messages in the queue
691 *
692 * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
693 */
694 Int MessageQ_count (MessageQ_Handle handle)
695 {
696 Int count = -1;
697 #if 0
698 MessageQ_Object * obj = (MessageQ_Object *) handle;
699 socklen_t optlen;
701 /*
702 * TBD: Need to find a way to implement (if anyone uses it!), and
703 * push down into transport..
704 */
706 /*
707 * 2nd arg to getsockopt should be transport independent, but using
708 * SSKPROTO_SHMFIFO for now:
709 */
710 getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
711 &count, &optlen);
712 #endif
714 return (count);
715 }
717 /* Initializes a message not obtained from MessageQ_alloc. */
718 Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
719 {
720 /* Fill in the fields of the message */
721 MessageQ_msgInit (msg);
722 msg->heapId = MessageQ_STATICMSG;
723 msg->msgSize = size;
724 }
726 /*
727 * Allocate a message and initialize the needed fields (note some
728 * of the fields in the header are set via other APIs or in the
729 * MessageQ_put function,
730 */
731 MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
732 {
733 MessageQ_Msg msg = NULL;
735 /*
736 * heapId not used for local alloc (as this is over a copy transport), but
737 * we need to send to other side as heapId is used in BIOS transport:
738 */
739 msg = (MessageQ_Msg)calloc (1, size);
740 MessageQ_msgInit (msg);
741 msg->msgSize = size;
742 msg->heapId = heapId;
744 return msg;
745 }
747 /* Frees the message back to the heap that was used to allocate it. */
748 Int MessageQ_free (MessageQ_Msg msg)
749 {
750 UInt32 status = MessageQ_S_SUCCESS;
752 /* Check to ensure this was not allocated by user: */
753 if (msg->heapId == MessageQ_STATICMSG) {
754 status = MessageQ_E_CANNOTFREESTATICMSG;
755 }
756 else {
757 free (msg);
758 }
760 return status;
761 }
763 /* Register a heap with MessageQ. */
764 Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
765 {
766 Int status = MessageQ_S_SUCCESS;
768 /* Do nothing, as this uses a copy transport: */
770 return status;
771 }
773 /* Unregister a heap with MessageQ. */
774 Int MessageQ_unregisterHeap (UInt16 heapId)
775 {
776 Int status = MessageQ_S_SUCCESS;
778 /* Do nothing, as this uses a copy transport: */
780 return status;
781 }
783 /* Unblocks a MessageQ */
784 Void MessageQ_unblock (MessageQ_Handle handle)
785 {
786 MessageQ_Object * obj = (MessageQ_Object *) handle;
787 uint64_t buf = 1;
788 int numBytes;
790 /* Write 8 bytes to awaken any threads blocked on this messageQ: */
791 numBytes = write(obj->unblockFd, &buf, sizeof(buf));
792 }
794 /* Embeds a source message queue into a message. */
795 Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
796 {
797 MessageQ_Object * obj = (MessageQ_Object *) handle;
799 msg->replyId = (UInt16)(obj->queue);
800 msg->replyProc = (UInt16)(obj->queue >> 16);
801 }
803 /* Returns the QueueId associated with the handle. */
804 MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
805 {
806 MessageQ_Object * obj = (MessageQ_Object *) handle;
807 UInt32 queueId;
809 queueId = (obj->queue);
811 return queueId;
812 }
814 /* Sets the tracing of a message */
815 Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
816 {
817 msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
818 }
820 /*
821 * Returns the amount of shared memory used by one transport instance.
822 *
823 * The MessageQ module itself does not use any shared memory but the
824 * underlying transport may use some shared memory.
825 */
826 SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
827 {
828 SizeT memReq = 0u;
830 /* Do nothing, as this is a copy transport. */
832 return (memReq);
833 }
835 /*
836 * Create a socket for this remote proc, and attempt to connect.
837 *
838 * Only creates a socket if one does not already exist for this procId.
839 *
840 * Note: remoteProcId may be MultiProc_Self() for loopback case.
841 */
842 Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
843 {
844 Int status = MessageQ_S_SUCCESS;
845 int sock;
847 PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
849 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
850 status = MessageQ_E_INVALIDPROCID;
851 goto exit;
852 }
854 pthread_mutex_lock (&(MessageQ_module->gate));
856 /* Only create a socket if one doesn't exist: */
857 if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
858 /* Create the socket for sending messages to the remote proc: */
859 sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
860 if (sock < 0) {
861 status = MessageQ_E_FAIL;
862 printf ("MessageQ_attach: socket failed: %d, %s\n",
863 errno, strerror(errno));
864 }
865 else {
866 PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
867 MessageQ_module->sock[remoteProcId] = sock;
868 /* Attempt to connect: */
869 status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
870 if (status < 0) {
871 status = MessageQ_E_RESOURCE;
872 /* don't hard-printf since this is no longer fatal */
873 PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
874 remoteProcId);
875 }
876 }
877 }
878 else {
879 status = MessageQ_E_ALREADYEXISTS;
880 }
882 pthread_mutex_unlock (&(MessageQ_module->gate));
884 if (status == MessageQ_E_RESOURCE) {
885 MessageQ_detach(remoteProcId);
886 }
888 exit:
889 return (status);
890 }
892 /*
893 * Close the socket for this remote proc.
894 *
895 */
896 Int MessageQ_detach (UInt16 remoteProcId)
897 {
898 Int status = MessageQ_S_SUCCESS;
899 int sock;
901 if (remoteProcId >= MultiProc_MAXPROCESSORS) {
902 status = MessageQ_E_INVALIDPROCID;
903 goto exit;
904 }
906 pthread_mutex_lock (&(MessageQ_module->gate));
908 sock = MessageQ_module->sock[remoteProcId];
909 if (sock != Transport_INVALIDSOCKET) {
910 if (close(sock)) {
911 status = MessageQ_E_OSFAILURE;
912 printf("MessageQ_detach: close failed: %d, %s\n",
913 errno, strerror(errno));
914 }
915 else {
916 PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
917 MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
918 }
919 }
921 pthread_mutex_unlock (&(MessageQ_module->gate));
923 exit:
924 return (status);
925 }
927 /*
928 * This is a helper function to initialize a message.
929 */
930 Void MessageQ_msgInit (MessageQ_Msg msg)
931 {
932 #if 0
933 Int status = MessageQ_S_SUCCESS;
934 LAD_ClientHandle handle;
935 struct LAD_CommandObj cmd;
936 union LAD_ResponseObj rsp;
938 handle = LAD_findHandle();
939 if (handle == LAD_MAXNUMCLIENTS) {
940 PRINTVERBOSE1(
941 "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
942 getpid())
944 return;
945 }
947 cmd.cmd = LAD_MESSAGEQ_MSGINIT;
948 cmd.clientId = handle;
950 if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
951 PRINTVERBOSE1(
952 "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
953 return;
954 }
956 if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
957 PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
958 return;
959 }
960 status = rsp.msgInit.status;
962 PRINTVERBOSE2(
963 "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
964 handle, status)
966 memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
967 #else
968 msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
969 msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
970 msg->msgId = MessageQ_INVALIDMSGID;
971 msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ;
972 msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
973 msg->srcProc = MultiProc_self();
975 pthread_mutex_lock(&(MessageQ_module->gate));
976 msg->seqNum = MessageQ_module->seqNum++;
977 pthread_mutex_unlock(&(MessageQ_module->gate));
978 #endif
979 }
981 /*
982 * =============================================================================
983 * Transport: Fxns kept here until need for a transport layer is realized.
984 * =============================================================================
985 */
986 /*
987 * ======== transportCreateEndpoint ========
988 *
989 * Create a communication endpoint to receive messages.
990 */
991 static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
992 {
993 Int status = MessageQ_S_SUCCESS;
994 int err;
996 /* Create the socket to receive messages for this messageQ. */
997 *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
998 if (*fd < 0) {
999 status = MessageQ_E_FAIL;
1000 printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
1001 errno, strerror(errno));
1002 goto exit;
1003 }
1005 PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
1007 err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
1008 if (err < 0) {
1009 status = MessageQ_E_FAIL;
1010 /* don't hard-printf since this is no longer fatal */
1011 PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
1012 errno, strerror(errno));
1013 }
1015 exit:
1016 return (status);
1017 }
1019 /*
1020 * ======== transportCloseEndpoint ========
1021 *
1022 * Close the communication endpoint.
1023 */
1024 static Int transportCloseEndpoint(int fd)
1025 {
1026 Int status = MessageQ_S_SUCCESS;
1028 PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
1030 /* Stop communication to this socket: */
1031 close(fd);
1033 return (status);
1034 }
1036 /*
1037 * ======== transportGet ========
1038 * Retrieve a message waiting in the socket's queue.
1039 */
1040 static Int transportGet(int sock, MessageQ_Msg * retMsg)
1041 {
1042 Int status = MessageQ_S_SUCCESS;
1043 MessageQ_Msg msg;
1044 struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
1045 unsigned int len;
1046 int byteCount;
1048 /*
1049 * We have no way of peeking to see what message size we'll get, so we
1050 * allocate a message of max size to receive contents from the rpmsg socket
1051 * (currently, a copy transport)
1052 */
1053 msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
1054 if (!msg) {
1055 status = MessageQ_E_MEMORY;
1056 goto exit;
1057 }
1059 memset(&fromAddr, 0, sizeof(fromAddr));
1060 len = sizeof(fromAddr);
1062 byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
1063 (struct sockaddr *)&fromAddr, &len);
1064 if (len != sizeof(fromAddr)) {
1065 printf("recvfrom: got bad addr len (%d)\n", len);
1066 status = MessageQ_E_FAIL;
1067 goto exit;
1068 }
1069 if (byteCount < 0) {
1070 printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
1071 status = MessageQ_E_FAIL;
1072 goto exit;
1073 }
1074 else {
1075 /* Update the allocated message size (even though this may waste space
1076 * when the actual message is smaller than the maximum rpmsg size,
1077 * the message will be freed soon anyway, and it avoids an extra copy).
1078 */
1079 msg->msgSize = byteCount;
1081 /*
1082 * If the message received was statically allocated, reset the
1083 * heapId, so the app can free it.
1084 */
1085 if (msg->heapId == MessageQ_STATICMSG) {
1086 msg->heapId = 0; /* for a copy transport, heap id is 0. */
1087 }
1088 }
1090 PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
1091 PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
1092 PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
1094 *retMsg = msg;
1096 exit:
1097 return (status);
1098 }
1100 /*
1101 * ======== transportPut ========
1102 *
1103 * Calls the socket API sendto() on the socket associated with
1104 * with this destination procID.
1105 * Currently, both local and remote messages are sent via the Socket ABI, so
1106 * no local object lists are maintained here.
1107 */
1108 static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
1109 {
1110 Int status = MessageQ_S_SUCCESS;
1111 int sock;
1112 int err;
1114 /*
1115 * Retrieve the socket for the AF_SYSLINK protocol associated with this
1116 * transport.
1117 */
1118 sock = MessageQ_module->sock[dstProcId];
1120 PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
1122 err = send(sock, msg, msg->msgSize, 0);
1123 if (err < 0) {
1124 printf ("transportPut: send failed: %d, %s\n",
1125 errno, strerror(errno));
1126 status = MessageQ_E_FAIL;
1127 }
1129 /*
1130 * Free the message, as this is a copy transport, we maintain MessageQ
1131 * semantics.
1132 */
1133 MessageQ_free (msg);
1135 return (status);
1136 }