index 55aabce6efafd250ec25eb9e681d3926ee1460da..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
- * @file MessageQ.c
- * @brief Prototype Mapping of SysLink MessageQ to Socket ABI
- * (SysLink 3).
- *
- * @ver 02.00.00.51_alpha2 (kernel code is basis for this module)
- *
- */
-/*============================================================================
* @file MessageQ.c
*
* @brief MessageQ module "client" implementation
* system-wide data is maintained in a "server" component and process-
* specific data is handled here. At the moment, this implementation
* connects and communicates with LAD for the server connection.
- *
- * The MessageQ module supports the structured sending and receiving of
- * variable length messages. This module can be used for homogeneous or
- * heterogeneous multi-processor messaging.
- *
- * MessageQ provides more sophisticated messaging than other modules. It is
- * typically used for complex situations such as multi-processor messaging.
- *
- * The following are key features of the MessageQ module:
- * -Writers and readers can be relocated to another processor with no
- * runtime code changes.
- * -Timeouts are allowed when receiving messages.
- * -Readers can determine the writer and reply back.
- * -Receiving a message is deterministic when the timeout is zero.
- * -Messages can reside on any message queue.
- * -Supports zero-copy transfers.
- * -Can send and receive from any type of thread.
- * -Notification mechanism is specified by application.
- * -Allows QoS (quality of service) on message buffer pools. For example,
- * using specific buffer pools for specific message queues.
- *
- * Messages are sent and received via a message queue. A reader is a thread
- * that gets (reads) messages from a message queue. A writer is a thread that
- * puts (writes) a message to a message queue. Each message queue has one
- * reader and can have many writers. A thread may read from or write to multiple
- * message queues.
- *
- * Conceptually, the reader thread owns a message queue. The reader thread
- * creates a message queue. Writer threads a created message queues to
- * get access to them.
- *
- * Message queues are identified by a system-wide unique name. Internally,
- * MessageQ uses the NameServermodule for managing
- * these names. The names are used for opening a message queue. Using
- * names is not required.
- *
- * Messages must be allocated from the MessageQ module. Once a message is
- * allocated, it can be sent on any message queue. Once a message is sent, the
- * writer loses ownership of the message and should not attempt to modify the
- * message. Once the reader receives the message, it owns the message. It
- * may either free the message or re-use the message.
- *
- * Messages in a message queue can be of variable length. The only
- * requirement is that the first field in the definition of a message must be a
- * MsgHeader structure. For example:
- * typedef struct MyMsg {
- * MessageQ_MsgHeader header;
- * ...
- * } MyMsg;
- *
- * The MessageQ API uses the MessageQ_MsgHeader internally. Your application
- * should not modify or directly access the fields in the MessageQ_MsgHeader.
- *
- * All messages sent via the MessageQ module must be allocated from a
- * Heap implementation. The heap can be used for
- * other memory allocation not related to MessageQ.
- *
- * An application can use multiple heaps. The purpose of having multiple
- * heaps is to allow an application to regulate its message usage. For
- * example, an application can allocate critical messages from one heap of fast
- * on-chip memory and non-critical messages from another heap of slower
- * external memory
- *
- * MessageQ does support the usage of messages that allocated via the
- * alloc function. Please refer to the staticMsgInit
- * function description for more details.
- *
- * In a multiple processor system, MessageQ communications to other
- * processors via MessageQTransport instances. There must be one and
- * only one MessageQTransport instance for each processor where communication
- * is desired.
- * So on a four processor system, each processor must have three
- * MessageQTransport instance.
- *
- * The user only needs to create the MessageQTransport instances. The instances
- * are responsible for registering themselves with MessageQ.
- * This is accomplished via the registerTransport function.
- *
- * ============================================================================
*/
-/* Standard headers */
-#include <Std.h>
+/* Standard IPC header */
+#include <ti/ipc/Std.h>
/* Linux specific header files, replacing OSAL: */
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
-/* SysLink Socket Protocol Family */
+/* Socket Protocol Family */
#include <net/rpmsg.h>
/* Socket utils: */
}
/* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+Int MessageQ_setup(const MessageQ_Config * cfg)
{
Int status;
LAD_ClientHandle handle;
/* Clear sockets array. */
for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
- MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
+ MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
}
-
return status;
}
*/
MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status;
MessageQ_Object * obj = NULL;
UInt16 queueIndex = 0u;
UInt16 procId;
status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
queueIndex);
if (status < 0) {
- goto cleanup;
+ obj->fd[rprocId] = Transport_INVALIDSOCKET;
}
}
if (obj->unblockFd == -1) {
printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
errno, strerror(errno));
- status = MessageQ_E_FAIL;
+ MessageQ_delete((MessageQ_Handle *)&obj);
}
+ else {
+ int endpointFound = 0;
-cleanup:
- /* Cleanup if fail: */
- if (status < 0) {
- MessageQ_delete((MessageQ_Handle *)&obj);
+ for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
+ if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+ endpointFound = 1;
+ }
+ }
+ if (!endpointFound) {
+ printf("MessageQ_create: no transport endpoints found, deleting\n");
+ MessageQ_delete((MessageQ_Handle *)&obj);
+ }
}
return ((MessageQ_Handle) obj);
*/
Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
{
+ static int last = 0;
Int status = MessageQ_S_SUCCESS;
Int tmpStatus;
MessageQ_Object * obj = (MessageQ_Object *) handle;
void *timevalPtr;
UInt16 rprocId;
int maxfd = 0;
+ int selfId;
+ int nProcessors;
/* Wait (with timeout) and retreive message from socket: */
FD_ZERO(&rfds);
for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (rprocId == MultiProc_self()) {
+ if (rprocId == MultiProc_self() ||
+ obj->fd[rprocId] == Transport_INVALIDSOCKET) {
continue;
}
maxfd = MAX(maxfd, obj->fd[rprocId]);
status = MessageQ_E_UNBLOCKED;
}
else {
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
- rprocId++) {
- if (rprocId == MultiProc_self()) {
- continue;
- }
- if (FD_ISSET(obj->fd[rprocId], &rfds)) {
- /* Our transport's fd was signalled: Get the message: */
- tmpStatus = transportGet(obj->fd[rprocId], msg);
- if (tmpStatus < 0) {
- printf ("MessageQ_get: tranposrtshm_get failed.");
- status = MessageQ_E_FAIL;
- }
+ /* start where we last left off */
+ rprocId = last;
+
+ selfId = MultiProc_self();
+ nProcessors = MultiProc_getNumProcessors();
+
+ do {
+ if (rprocId != selfId &&
+ obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+ if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+ /* Our transport's fd was signalled: Get the message */
+ tmpStatus = transportGet(obj->fd[rprocId], msg);
+ if (tmpStatus < 0) {
+ printf ("MessageQ_get: tranposrtshm_get failed.");
+ status = MessageQ_E_FAIL;
+ }
+
+ last = (rprocId + 1) % nProcessors;
+ break;
+ }
}
- }
+ rprocId = (rprocId + 1) % nProcessors;
+ } while (rprocId != last);
}
}
else if (retval == 0) {
PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
MessageQ_module->sock[remoteProcId] = sock;
/* Attempt to connect: */
- ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+ status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+ if (status < 0) {
+ status = MessageQ_E_RESOURCE;
+ /* don't hard-printf since this is no longer fatal */
+ PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
+ remoteProcId);
+ }
}
}
else {
pthread_mutex_unlock (&(MessageQ_module->gate));
+ if (status == MessageQ_E_RESOURCE) {
+ MessageQ_detach(remoteProcId);
+ }
+
exit:
return (status);
}
pthread_mutex_lock (&(MessageQ_module->gate));
sock = MessageQ_module->sock[remoteProcId];
- if (close (sock)) {
- status = MessageQ_E_OSFAILURE;
- printf ("MessageQ_detach: close failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+ if (sock != Transport_INVALIDSOCKET) {
+ if (close(sock)) {
+ status = MessageQ_E_OSFAILURE;
+ printf("MessageQ_detach: close failed: %d, %s\n",
+ errno, strerror(errno));
+ }
+ else {
+ PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
+ MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+ }
}
pthread_mutex_unlock (&(MessageQ_module->gate));
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
PRINTVERBOSE1(
- "MessageQ_setup: can't find connection to daemon for pid %d\n",
+ "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
getpid())
return;
@@ -1056,8 +1001,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
if (err < 0) {
status = MessageQ_E_FAIL;
- printf ("transportCreateEndpoint: bind failed: %d, %s\n",
- errno, strerror(errno));
+ /* don't hard-printf since this is no longer fatal */
+ PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
+ errno, strerror(errno));
}
exit: