summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 6dfeea8)
raw | patch | inline | side by side (parent: 6dfeea8)
author | Robert Tivy <rtivy@ti.com> | |
Wed, 19 Nov 2014 22:16:31 +0000 (14:16 -0800) | ||
committer | Robert Tivy <rtivy@ti.com> | |
Thu, 27 Nov 2014 00:26:15 +0000 (16:26 -0800) |
22 files changed:
diff --git a/.gitignore b/.gitignore
index 89703178c2ae994cfd6b132ebcb66cd4c3362ffb..4b9da0fa95211d390f585284aca8d1a08d26fddf 100644 (file)
--- a/.gitignore
+++ b/.gitignore
/linux/src/tests/MessageQApp
/linux/src/tests/MessageQBench
/linux/src/tests/MessageQMulti
+/linux/src/tests/MessageQMultiMulti
/linux/src/tests/Msgq100
/linux/src/tests/NameServerApp
/linux/src/tests/nano_test_*
diff --git a/Makefile.am b/Makefile.am
index eeb5d108a61ee1fa7827ff1a3ddcd4c08c5357e3..5e125bed1aea78c0e1cc05024692e8ef23f8f1be 100644 (file)
--- a/Makefile.am
+++ b/Makefile.am
##
# the subdirectories of the project to go into
-SUBDIRS = linux/etc linux/src/utils linux/src/api \
+SUBDIRS = linux/etc linux/src/utils linux/src/api linux/src/transport \
linux/src/mm linux/src/daemon linux/src/tests
# where to install common headers on the system
diff --git a/Makefile.in b/Makefile.in
index 8f8b4c6d46813c8d4912239b892e27ff2d23b08c..c7bab880f7ef58cc60c4ee2fa0f5ec7eba779e11 100644 (file)
--- a/Makefile.in
+++ b/Makefile.in
target_alias = @target_alias@
# the subdirectories of the project to go into
-SUBDIRS = linux/etc linux/src/utils linux/src/api \
+SUBDIRS = linux/etc linux/src/utils linux/src/api linux/src/transport \
linux/src/mm linux/src/daemon linux/src/tests
diff --git a/configure b/configure
index a2d57cc0abfe608cfe253bd7f4eee2f3f765f385..df1798ffd614efd85ea734b4fc0b5ec33d3febf7 100755 (executable)
--- a/configure
+++ b/configure
ac_config_files="$ac_config_files linux/src/tests/Makefile"
+ ac_config_files="$ac_config_files linux/src/transport/Makefile"
+
cat >confcache <<\_ACEOF
# This file is a shell script that caches the results of configure
# tests run on this system so they can be shared between configure
"linux/src/mm/libmmrpc.pc" ) CONFIG_FILES="$CONFIG_FILES linux/src/mm/libmmrpc.pc" ;;
"linux/src/daemon/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/daemon/Makefile" ;;
"linux/src/tests/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/tests/Makefile" ;;
+ "linux/src/transport/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/transport/Makefile" ;;
"depfiles" ) CONFIG_COMMANDS="$CONFIG_COMMANDS depfiles" ;;
*) { { echo "$as_me:$LINENO: error: invalid argument: $ac_config_target" >&5
echo "$as_me: error: invalid argument: $ac_config_target" >&2;}
diff --git a/configure.ac b/configure.ac
index 3bd2d896c1bba40c9beb32bd6f5763cb2ace1ce7..97f51beccf3f715f72d8d864d64e8d88d934bdf2 100644 (file)
--- a/configure.ac
+++ b/configure.ac
AC_CONFIG_FILES([linux/src/mm/libmmrpc.pc])
AC_CONFIG_FILES([linux/src/daemon/Makefile])
AC_CONFIG_FILES([linux/src/tests/Makefile])
+AC_CONFIG_FILES([linux/src/transport/Makefile])
AC_OUTPUT
echo \
diff --git a/linux/include/IMessageQTransport.h b/linux/include/IMessageQTransport.h
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ======== IMessageQTransport.h ========
+ */
+
+#ifndef IMESSAGEQTRANSPORT_H
+#define IMESSAGEQTRANSPORT_H
+
+#include <ITransport.h>
+
+/* opaque instance handle */
+typedef struct IMessageQTransport_Object *IMessageQTransport_Handle;
+
+#define IMessageQTransport_TypeId 0x02
+
+/* virtual functions */
+typedef struct IMessageQTransport_Fxns {
+ Int (*bind)(void *handle, UInt32 queueId);
+ Int (*unbind)(void *handle, UInt32 queueId);
+ Bool (*put)(void *handle, Ptr msg);
+} IMessageQTransport_Fxns;
+
+/* abstract instance object */
+typedef struct IMessageQTransport_Object {
+ ITransport_Object base; /* inheritance */
+ IMessageQTransport_Fxns *fxns; /* virtual functions */
+} IMessageQTransport_Object;
+
+/* function stubs */
+static inline
+Int IMessageQTransport_bind(IMessageQTransport_Handle inst, UInt32 queueId)
+{
+ IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+ return obj->fxns->bind((void *)inst, queueId);
+}
+
+static inline
+Int IMessageQTransport_unbind(IMessageQTransport_Handle inst, UInt32 queueId)
+{
+ IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+ return obj->fxns->unbind((void *)inst, queueId);
+}
+
+static inline
+Bool IMessageQTransport_put(IMessageQTransport_Handle inst, Ptr msg)
+{
+ IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+ return obj->fxns->put((void *)inst, msg);
+}
+
+/* instance convertors */
+static inline
+ITransport_Handle IMessageQTransport_upCast(IMessageQTransport_Handle inst)
+{
+ IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+ return (ITransport_Handle)&obj->base;
+}
+
+static inline
+IMessageQTransport_Handle IMessageQTransport_downCast(ITransport_Handle base)
+{
+ return (IMessageQTransport_Handle)base;
+}
+
+#endif /* IMESSAGEQTRANSPORT_H */
+
diff --git a/linux/include/INetworkTransport.h b/linux/include/INetworkTransport.h
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * ======== INetworkTransport.h ========
+ */
+
+#ifndef INETWORKTRANSPORT_H
+#define INETWORKTRANSPORT_H
+
+#include <ITransport.h>
+
+/* opaque instance handle */
+typedef struct INetworkTransport_Object *INetworkTransport_Handle;
+
+#define INetworkTransport_TypeId 0x03
+
+/* virtual functions */
+typedef struct INetworkTransport_Fxns {
+ Int (*bind)(void *handle, UInt32 queueId);
+ Int (*unbind)(void *handle, UInt32 queueId);
+ Bool (*put)(void *handle, Ptr msg);
+} INetworkTransport_Fxns;
+
+/* abstract instance object */
+typedef struct INetworkTransport_Object {
+ ITransport_Object base; /* inheritance */
+ INetworkTransport_Fxns *fxns; /* virtual functions */
+} INetworkTransport_Object;
+
+/* function stubs */
+static inline
+Bool INetworkTransport_bind(INetworkTransport_Handle inst, UInt32 queueId)
+{
+ INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+ return obj->fxns->bind((void *)inst, queueId);
+}
+
+static inline
+Bool INetworkTransport_unbind(INetworkTransport_Handle inst, UInt32 queueId)
+{
+ INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+ return obj->fxns->unbind((void *)inst, queueId);
+}
+
+static inline
+Bool INetworkTransport_put(INetworkTransport_Handle inst, Ptr msg)
+{
+ INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+ return obj->fxns->put((void *)inst, msg);
+}
+
+/* instance convertors */
+ITransport_Handle INetworkTransport_upCast(INetworkTransport_Handle inst)
+{
+ INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+ return (ITransport_Handle)&obj->base;
+}
+
+INetworkTransport_Handle INetworkTransport_downCast(ITransport_Handle base)
+{
+ return (INetworkTransport_Handle)base;
+}
+
+#endif /* INETWORKTRANSPORT_H */
+
diff --git a/linux/include/ITransport.h b/linux/include/ITransport.h
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * ======== ITransport.h ========
+ */
+
+#ifndef ITRANSPORT_H
+#define ITRANSPORT_H
+
+/* opaque instance handle */
+typedef struct ITransport_Object *ITransport_Handle;
+
+#define ITransport_TypeId 0x01
+
+/* instance object */
+typedef struct ITransport_Object {
+ Int interfaceType;
+} ITransport_Object;
+
+/* instance functions */
+static inline
+Int ITransport_itype(ITransport_Handle inst)
+{
+ ITransport_Object *obj = (ITransport_Object *)inst;
+ return obj->interfaceType;
+}
+
+#endif /* ITRANSPORT_H */
+
diff --git a/linux/include/TransportRpmsg.h b/linux/include/TransportRpmsg.h
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of Texas Instruments Incorporated nor the names of
+ * its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * ======== TransportRpmsg.h ========
+ */
+
+/**
+ * @file TransportRpmsg.h
+ *
+ * @brief Rpmsg transports implemenation
+ *
+ * The transports can be register with MessageQ. This is done
+ * via the MessageQ_registerTransport function.
+ */
+
+#ifndef _TransportRpmsg_
+#define _TransportRpmsg_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <ti/ipc/Std.h>
+#include <IMessageQTransport.h>
+
+struct TransportRpmsg_Params {
+ UInt16 rprocId;
+};
+typedef struct TransportRpmsg_Params TransportRpmsg_Params;
+
+typedef IMessageQTransport_Handle TransportRpmsg_Handle;
+
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *param,
+ Int *attachStatus);
+Void TransportRpmsg_delete(TransportRpmsg_Handle *hp);
+
+IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle);
+TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
index 43891966439c27c3f8587f17df82030cd513816b..7bef96fe070daa90b261f37f962f705b37961318 100644 (file)
#define FAIL -1
//#define NULL '\0'
+#define MAX(a,b) (((a)>(b))?(a):(b))
+
#if defined (IPC_BUILDOS_ANDROID)
#define MAX(a,b) (((a)>(b))?(a):(b))
#endif
diff --git a/linux/src/api/Ipc.c b/linux/src/api/Ipc.c
index 6dbb0b6cf622570d033405038a3e0689478c0bf2..ffad32bb5ce97ffb67c19e64c4ac41b1b3a049a8 100644 (file)
--- a/linux/src/api/Ipc.c
+++ b/linux/src/api/Ipc.c
#include <ti/ipc/Std.h>
#include <ti/ipc/Ipc.h>
#include <ti/ipc/NameServer.h>
+#include <IMessageQTransport.h>
+#include <TransportRpmsg.h>
/* User side headers */
#include <ladclient.h>
/* Function to start Ipc */
Int Ipc_start (Void)
{
+ TransportRpmsg_Handle transport;
+ TransportRpmsg_Params params;
+ IMessageQTransport_Handle iMsgQTrans;
MessageQ_Config msgqCfg;
MultiProc_Config mpCfg;
#if defined(GATEMP_SUPPORT)
GateHWSpinlock_Config gateHWCfg;
#endif
+ Int attachStatus;
Int32 status;
LAD_Status ladStatus;
UInt16 rprocId;
- Int32 attachedAny = 0;
+ Int32 attachedAny;
/* Catch ctrl-C, and cleanup: */
(void) signal(SIGINT, cleanup);
MessageQ_getConfig(&msgqCfg);
MessageQ_setup(&msgqCfg);
- /* Now attach to all remote processors, assuming they are up. */
+ /*
+ * Attach to all remote processors. We need to attach to
+ * at least one, so tolerate MessageQ_E_RESOURCE failures for
+ * now.
+ */
+ status = Ipc_S_SUCCESS;
+ attachedAny = FALSE;
+
for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
if (0 == rprocId) {
/* Skip host, which should always be 0th entry. */
continue;
}
- status = MessageQ_attach(rprocId, NULL);
- if (status == MessageQ_E_RESOURCE) {
- continue;
+
+ params.rprocId = rprocId;
+ transport = TransportRpmsg_create(¶ms, &attachStatus);
+
+ if (transport) {
+ iMsgQTrans = TransportRpmsg_upCast(transport);
+ MessageQ_registerTransport(iMsgQTrans, rprocId, 0);
+
+ attachedAny = TRUE;
}
- if (status < 0) {
- printf("Ipc_start: MessageQ_attach(%d) failed: %d\n",
- rprocId, status);
+ else {
+ if (attachStatus == MessageQ_E_RESOURCE) {
+ continue;
+ }
+
+ printf("Ipc_start: failed to attach to %d: %d\n",
+ rprocId, attachStatus);
+
status = Ipc_E_FAIL;
break;
}
- else {
- attachedAny = 1;
- }
}
- if (attachedAny) {
- status = Ipc_S_SUCCESS;
+ if (!attachedAny) {
+ status = Ipc_E_FAIL;
}
}
else {
gatempstart_fail:
GateHWSpinlock_stop();
gatehwspinlockstart_fail:
+#if 0
for (rprocId = rprocId - 1; (rprocId > 0) && (status >= 0); rprocId--) {
MessageQ_detach(rprocId);
}
#endif
+#endif
exit:
return (status);
/* Skip host, which should always be 0th entry. */
continue;
}
+#if 0
status = MessageQ_detach(rprocId);
if (status < 0) {
printf("Ipc_stop: MessageQ_detach(%d) failed: %d\n",
status = Ipc_E_FAIL;
goto exit;
}
+#endif
}
status = MessageQ_destroy();
index 8b958cbcba7cdccbd0add2425a606d5d21ffd2a2..7a20a7821aa629b0af73ccc8283ad38e6a3744a7 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
#include <_MultiProc.h>
#include <ti/ipc/MessageQ.h>
#include <_MessageQ.h>
+#include <ITransport.h>
+#include <IMessageQTransport.h>
+#include <INetworkTransport.h>
/* Socket Headers */
#include <sys/select.h>
#include <sys/types.h>
#include <sys/param.h>
#include <sys/eventfd.h>
-#include <sys/socket.h>
+#include <sys/queue.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
+#include <semaphore.h>
/* Socket Protocol Family */
#include <net/rpmsg.h>
-/* Socket utils: */
-#include <SocketFxns.h>
-
#include <ladclient.h>
#include <_lad.h>
*/
#define MessageQ_NAMESERVER "MessageQ"
-/*!
- * @brief Value of an invalid socket ID:
- */
-#define Transport_INVALIDSOCKET (0xFFFFFFFF)
+#define MessageQ_MAXTRANSPORTS 8
-/* More magic rpmsg port numbers: */
-#define MESSAGEQ_RPMSG_PORT 61
-#define MESSAGEQ_RPMSG_MAXSIZE 512
+#define MessageQ_GROWSIZE 32
/* Trace flag settings: */
#define TRACESHIFT 12
/* structure for MessageQ module state */
typedef struct MessageQ_ModuleObject {
- Int refCount;
- /*!< Reference count */
- NameServer_Handle nameServer;
- /*!< Handle to the local NameServer used for storing GP objects */
- pthread_mutex_t gate;
- /*!< Handle of gate to be used for local thread safety */
- MessageQ_Params defaultInstParams;
- /*!< Default instance creation parameters */
- int sock[MultiProc_MAXPROCESSORS];
- /*!< Sockets to for sending to each remote processor */
- int seqNum;
- /*!< Process-specific sequence number */
+ MessageQ_Handle *queues;
+ Int numQueues;
+ Int refCount;
+ NameServer_Handle nameServer;
+ pthread_mutex_t gate;
+ MessageQ_Params defaultInstParams;
+ int seqNum;
+ IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
+ INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS];
} MessageQ_ModuleObject;
+typedef struct MessageQ_CIRCLEQ_ENTRY {
+ CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
+} MessageQ_CIRCLEQ_ENTRY;
+
/*!
* @brief Structure for the Handle for the MessageQ.
*/
typedef struct MessageQ_Object_tag {
- MessageQ_Params params;
- /*! Instance specific creation parameters */
- MessageQ_QueueId queue;
- /* Unique id */
- int fd[MultiProc_MAXPROCESSORS];
- /* File Descriptor to block on messages from remote processors. */
- int unblockFd;
- /* Write this fd to unblock the select() call in MessageQ _get() */
- void *serverHandle;
+ CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+ MessageQ_Params params;
+ MessageQ_QueueId queue;
+ int unblocked;
+ void *serverHandle;
+ sem_t synchronizer;
} MessageQ_Object;
/* traces in this file are controlled via _MessageQ_verbose */
Bool _MessageQ_verbose = FALSE;
#define verbose _MessageQ_verbose
-
/* =============================================================================
* Globals
* =============================================================================
*/
static MessageQ_ModuleObject MessageQ_state =
{
- .refCount = 0,
- .nameServer = NULL,
+ .refCount = 0,
+ .nameServer = NULL,
};
/*!
*
* @brief Pointer to the MessageQ module state.
*/
-MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
+MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
+Void _MessageQ_grow(UInt16 queueIndex);
/* =============================================================================
- * Forward declarations of internal functions
+ * APIS
* =============================================================================
*/
-/* This is a helper function to initialize a message. */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
-static Int transportCloseEndpoint(int fd);
-static Int transportGet(int sock, MessageQ_Msg * retMsg);
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+ UInt16 rprocId, UInt priority)
+{
+ Int status = FALSE;
-/* =============================================================================
- * APIS
- * =============================================================================
- */
-/* Function to get default configuration for the MessageQ module.
- *
+ if (handle == NULL) {
+ printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+ );
+
+ return status;
+ }
+
+ if (rprocId >= MultiProc_MAXPROCESSORS) {
+ printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
+
+ return status;
+ }
+
+ if (MessageQ_module->transports[rprocId][priority] == NULL) {
+ MessageQ_module->transports[rprocId][priority] = handle;
+
+ status = TRUE;
+ }
+
+ return status;
+}
+
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+ if (inst == NULL) {
+ printf("MessageQ_registerTransportId: invalid NULL handle\n");
+
+ return MessageQ_E_INVALIDARG;
+ }
+
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+ return MessageQ_E_INVALIDARG;
+ }
+
+ if (MessageQ_module->transInst[tid] != NULL) {
+ printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
+
+ return MessageQ_E_ALREADYEXISTS;
+ }
+
+ MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+
+ return MessageQ_S_SUCCESS;
+}
+
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
+{
+ if (rprocId >= MultiProc_MAXPROCESSORS) {
+ printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
+
+ return;
+ }
+
+ MessageQ_module->transports[rprocId][priority] = NULL;
+}
+
+Void MessageQ_unregisterTransportId(UInt tid)
+{
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+ return;
+ }
+
+ MessageQ_module->transInst[tid] = NULL;
+}
+
+/*
+ * Function to get default configuration for the MessageQ module.
*/
-Void MessageQ_getConfig (MessageQ_Config * cfg)
+Void MessageQ_getConfig(MessageQ_Config *cfg)
{
Int status;
LAD_ClientHandle handle;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
- PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
+ PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
+ status)
return;
}
status = rsp.messageQGetConfig.status;
"MessageQ_getConfig: got LAD response for client %d, status=%d\n",
handle, status)
- memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
+ memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
return;
}
-/* Function to setup the MessageQ module. */
-Int MessageQ_setup(const MessageQ_Config * cfg)
+/*
+ * Function to setup the MessageQ module.
+ */
+Int MessageQ_setup(const MessageQ_Config *cfg)
{
Int status;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
- Int i;
+ Int pri;
+ Int rprocId;
+ Int tid;
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ MessageQ_module->refCount++;
+ if (MessageQ_module->refCount > 1) {
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
+ MessageQ_module->refCount)
+
+ return MessageQ_S_ALREADYSETUP;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
cmd.cmd = LAD_MESSAGEQ_SETUP;
cmd.clientId = handle;
- memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
+ memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
PRINTVERBOSE1(
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
- return(status);
+ return status;
}
status = rsp.setup.status;
"MessageQ_setup: got LAD response for client %d, status=%d\n",
handle, status)
- MessageQ_module->nameServer = rsp.setup.nameServerHandle;
MessageQ_module->seqNum = 0;
+ MessageQ_module->nameServer = rsp.setup.nameServerHandle;
+ MessageQ_module->numQueues = cfg->maxRuntimeEntries;
+ MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
+ sizeof (MessageQ_Handle));
- /* Create a default local gate. */
- pthread_mutex_init (&(MessageQ_module->gate), NULL);
+ pthread_mutex_init(&MessageQ_module->gate, NULL);
- /* Clear sockets array. */
- for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
- MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
+ for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+ for (pri = 0; pri < 2; pri++) {
+ MessageQ_module->transports[rprocId][pri] = NULL;
+ }
+ }
+ for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ MessageQ_module->transInst[tid] = NULL;
}
return status;
}
/*
- * Function to destroy the MessageQ module.
- * Destroys socket/protocol maps; sockets themselves should have been
- * destroyed in MessageQ_delete() and MessageQ_detach() calls.
+ * MessageQ_destroy - destroy the MessageQ module.
*/
-Int MessageQ_destroy (void)
+Int MessageQ_destroy(void)
{
Int status;
LAD_ClientHandle handle;
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
- return(status);
+ return status;
}
status = rsp.status;
return status;
}
+
/* Function to initialize the parameters for the MessageQ instance. */
-Void MessageQ_Params_init (MessageQ_Params * params)
+Void MessageQ_Params_init(MessageQ_Params *params)
{
memcpy (params, &(MessageQ_module->defaultInstParams),
sizeof (MessageQ_Params));
}
/*
- * Function to create a MessageQ object for receiving.
- *
- * Create a socket and bind the source address (local ProcId/MessageQ ID) in
- * order to get messages dispatched to this messageQ.
+ * MessageQ_create - create a MessageQ object for receiving.
*/
-MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
{
Int status;
- MessageQ_Object * obj = NULL;
- UInt16 queueIndex = 0u;
- UInt16 procId;
+ MessageQ_Object *obj = NULL;
+ IMessageQTransport_Handle transport;
+ INetworkTransport_Handle transInst;
+ UInt16 queueIndex;
UInt16 rprocId;
+ Int tid;
+ Int priority;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
}
if (params) {
- memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
+ memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
}
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
}
- procId = MultiProc_self();
- queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
+ queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
+
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
+ CIRCLEQ_INIT(&obj->msgList);
+ if (sem_init(&obj->synchronizer, 0, 0) < 0) {
+ PRINTVERBOSE1(
+ "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
- /*
- * Create a set of communication endpoints (one per each remote proc),
- * and return the socket as target for MessageQ_put() calls, and as
- * a file descriptor to close during MessageQ_delete().
- */
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- obj->fd[rprocId] = Transport_INVALIDSOCKET;
- if (procId == rprocId) {
- /* Skip creating an endpoint for ourself. */
- continue;
- }
+ MessageQ_delete((MessageQ_Handle *)&obj);
- PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
+ return NULL;
+ }
- status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
- queueIndex);
- if (status < 0) {
- obj->fd[rprocId] = Transport_INVALIDSOCKET;
+ PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
+
+ for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[rprocId][priority];
+ if (transport) {
+ /* need to check return and do something if error */
+ IMessageQTransport_bind((Void *)transport, obj->queue);
+ }
+ }
+ }
+ for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ transInst = MessageQ_module->transInst[tid];
+ if (transInst) {
+ /* need to check return and do something if error */
+ INetworkTransport_bind((Void *)transInst, obj->queue);
}
}
/*
- * Now, to support MessageQ_unblock() functionality, create an event object.
- * Writing to this event will unblock the select() call in MessageQ_get().
+ * Since LAD's MessageQ_module can grow, we need to be able to grow as well
*/
- obj->unblockFd = eventfd(0, 0);
- if (obj->unblockFd == -1) {
- printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
- errno, strerror(errno));
- MessageQ_delete((MessageQ_Handle *)&obj);
+ if (queueIndex >= MessageQ_module->numQueues) {
+ _MessageQ_grow(queueIndex);
}
- else {
- int endpointFound = 0;
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
- endpointFound = 1;
- }
- }
- if (!endpointFound) {
- printf("MessageQ_create: no transport endpoints found, deleting\n");
- MessageQ_delete((MessageQ_Handle *)&obj);
- }
- }
+ /*
+ * No need to "allocate" slot since the queueIndex returned by
+ * LAD is guaranteed to be unique.
+ */
+ MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
- return ((MessageQ_Handle) obj);
+ return (MessageQ_Handle)obj;
}
/*
- * Function to delete a MessageQ object for a specific slave processor.
- *
- * Deletes the socket associated with this MessageQ object.
+ * MessageQ_delete - delete a MessageQ object.
*/
-Int MessageQ_delete (MessageQ_Handle * handlePtr)
+Int MessageQ_delete(MessageQ_Handle *handlePtr)
{
- Int status = MessageQ_S_SUCCESS;
- MessageQ_Object * obj = NULL;
- UInt16 rprocId;
- LAD_ClientHandle handle;
+ MessageQ_Object *obj;
+ IMessageQTransport_Handle transport;
+ INetworkTransport_Handle transInst;
+ Int status = MessageQ_S_SUCCESS;
+ UInt16 queueIndex;
+ UInt16 rprocId;
+ Int tid;
+ Int priority;
+ LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
return MessageQ_E_FAIL;
}
- obj = (MessageQ_Object *) (*handlePtr);
+ obj = (MessageQ_Object *)(*handlePtr);
cmd.cmd = LAD_MESSAGEQ_DELETE;
cmd.clientId = handle;
"MessageQ_delete: got LAD response for client %d, status=%d\n",
handle, status)
-
- /* Close the event used for MessageQ_unblock(): */
- close(obj->unblockFd);
-
- /* Close the communication endpoint: */
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
- status = transportCloseEndpoint(obj->fd[rprocId]);
+ for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[rprocId][priority];
+ if (transport) {
+ IMessageQTransport_unbind((Void *)transport, obj->queue);
+ }
+ }
+ }
+ for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ transInst = MessageQ_module->transInst[tid];
+ if (transInst) {
+ INetworkTransport_unbind((Void *)transInst, obj->queue);
}
}
- /* Now free the obj */
- free (obj);
+ queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
+ MessageQ_module->queues[queueIndex] = NULL;
+
+ free(obj);
*handlePtr = NULL;
- return (status);
+ return status;
}
/*
- * Opens an instance of MessageQ for sending.
- *
- * We need not create a socket here; the sockets for all remote processors
- * were created during MessageQ_attach(), and will be
- * retrieved during MessageQ_put().
+ * MessageQ_open - Opens an instance of MessageQ for sending.
*/
-Int MessageQ_open (String name, MessageQ_QueueId * queueId)
+Int MessageQ_open(String name, MessageQ_QueueId *queueId)
{
Int status = MessageQ_S_SUCCESS;
- status = NameServer_getUInt32 (MessageQ_module->nameServer,
- name, queueId, NULL);
+ status = NameServer_getUInt32(MessageQ_module->nameServer,
+ name, queueId, NULL);
if (status == NameServer_E_NOTFOUND) {
- /* Set return queue ID to invalid. */
+ /* Set return queue ID to invalid */
*queueId = MessageQ_INVALIDMESSAGEQ;
status = MessageQ_E_NOTFOUND;
}
else if (status >= 0) {
- /* Override with a MessageQ status code. */
+ /* Override with a MessageQ status code */
status = MessageQ_S_SUCCESS;
}
else {
- /* Set return queue ID to invalid. */
+ /* Set return queue ID to invalid */
*queueId = MessageQ_INVALIDMESSAGEQ;
- /* Override with a MessageQ status code. */
+
+ /* Override with a MessageQ status code */
if (status == NameServer_E_TIMEOUT) {
status = MessageQ_E_TIMEOUT;
}
}
}
- return (status);
+ return status;
}
-/* Closes previously opened instance of MessageQ module. */
-Int MessageQ_close (MessageQ_QueueId * queueId)
+/*
+ * MessageQ_close - Closes previously opened instance of MessageQ.
+ */
+Int MessageQ_close(MessageQ_QueueId *queueId)
{
Int32 status = MessageQ_S_SUCCESS;
/* Nothing more to be done for closing the MessageQ. */
*queueId = MessageQ_INVALIDMESSAGEQ;
- return (status);
+ return status;
}
/*
- * Place a message onto a message queue.
+ * MessageQ_put - place a message onto a message queue.
*
- * Calls TransportShm_put(), which handles the sending of the message using the
+ * Calls transport's put(), which handles the sending of the message using the
* appropriate kernel interface (socket, device ioctl) call for the remote
* procId encoded in the queueId argument.
*
*/
-Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
+Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
{
- Int status;
+ MessageQ_Object *obj;
UInt16 dstProcId = (UInt16)(queueId >> 16);
UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+ Int status = MessageQ_S_SUCCESS;
+ ITransport_Handle transport;
+ IMessageQTransport_Handle msgTrans;
+ INetworkTransport_Handle netTrans;
+ Int priority;
+ UInt tid;
msg->dstId = queueIndex;
msg->dstProc = dstProcId;
- status = transportPut(msg, queueIndex, dstProcId);
+ if (dstProcId != MultiProc_self()) {
+ tid = MessageQ_getTransportId(msg);
+ if (tid == 0) {
+ priority = MessageQ_getMsgPri(msg);
+ msgTrans = MessageQ_module->transports[dstProcId][priority];
+
+ IMessageQTransport_put(msgTrans, (Ptr)msg);
+ }
+ else {
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ printf("MessageQ_put: transport id %d too big, must be < %d\n",
+ tid, MessageQ_MAXTRANSPORTS);
+
+ return MessageQ_E_FAIL;
+ }
+
+ /* use secondary transport */
+ netTrans = MessageQ_module->transInst[tid];
+ transport = INetworkTransport_upCast(netTrans);
+
+ /* downcast instance pointer to transport interface */
+ switch (ITransport_itype(transport)) {
+ case INetworkTransport_TypeId:
+ INetworkTransport_put(netTrans, (Ptr)msg);
+
+ break;
+
+ default:
+ /* error */
+ printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
+
+ status = MessageQ_E_FAIL;
+
+ break;
+ }
+ }
+ }
+ else {
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ /* It is a local MessageQ */
+ CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
- return (status);
+ sem_post(&obj->synchronizer);
+ }
+
+ return status;
}
/*
- * Gets a message for a message queue and blocks if the queue is empty.
- * If a message is present, it returns it. Otherwise it blocks
- * waiting for a message to arrive.
- * When a message is returned, it is owned by the caller.
- *
- * We block using select() on the receiving socket's file descriptor, then
- * get the waiting message via the socket API recvfrom().
- * We use the socket stored in the messageQ object via a previous call to
- * MessageQ_create().
+ * MessageQ_get - gets a message for a message queue and blocks if
+ * the queue is empty.
*
+ * If a message is present, it returns it. Otherwise it blocks
+ * waiting for a message to arrive.
+ * When a message is returned, it is owned by the caller.
*/
-Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
+Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
{
- static int last = 0;
+ MessageQ_Object * obj = (MessageQ_Object *)handle;
Int status = MessageQ_S_SUCCESS;
- Int tmpStatus;
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- int retval;
- int nfds;
- fd_set rfds;
- struct timeval tv;
- void *timevalPtr;
- UInt16 rprocId;
- int maxfd = 0;
- int selfId;
- int nProcessors;
-
- /* Wait (with timeout) and retreive message from socket: */
- FD_ZERO(&rfds);
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (rprocId == MultiProc_self() ||
- obj->fd[rprocId] == Transport_INVALIDSOCKET) {
- continue;
- }
- maxfd = MAX(maxfd, obj->fd[rprocId]);
- FD_SET(obj->fd[rprocId], &rfds);
- }
+ struct timespec ts;
+ struct timeval tv;
+
+#if 0
+/*
+ * Optimization here to get a message without going in to the sem
+ * operation, but the sem count will not be maintained properly.
+ */
+ pthread_mutex_lock(&MessageQ_module->gate);
- /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
- FD_SET(obj->unblockFd, &rfds);
+ if (obj->msgList.cqh_first != &obj->msgList) {
+ *msg = (MessageQ_Msg)obj->msglist.cqh_first;
+ CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
- if (timeout == MessageQ_FOREVER) {
- timevalPtr = NULL;
+ pthread_mutex_unlock(&MessageQ_module->gate);
}
else {
- /* Timeout given in msec: convert: */
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout % 1000) * 1000;
- timevalPtr = &tv;
+ pthread_mutex_unlock(&MessageQ_module->gate);
}
- /* Add one to last fd created: */
- nfds = MAX(maxfd, obj->unblockFd) + 1;
-
- retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
- if (retval) {
- if (FD_ISSET(obj->unblockFd, &rfds)) {
- /*
- * Our event was signalled by MessageQ_unblock().
- *
- * This is typically done during a shutdown sequence, where
- * the intention of the client would be to ignore (i.e. not fetch)
- * any pending messages in the transport's queue.
- * Thus, we shall not check for nor return any messages.
- */
- *msg = NULL;
- status = MessageQ_E_UNBLOCKED;
- }
- else {
- /* start where we last left off */
- rprocId = last;
-
- selfId = MultiProc_self();
- nProcessors = MultiProc_getNumProcessors();
-
- do {
- if (rprocId != selfId &&
- obj->fd[rprocId] != Transport_INVALIDSOCKET) {
-
- if (FD_ISSET(obj->fd[rprocId], &rfds)) {
- /* Our transport's fd was signalled: Get the message */
- tmpStatus = transportGet(obj->fd[rprocId], msg);
- if (tmpStatus < 0) {
- printf ("MessageQ_get: tranposrtshm_get failed.");
- status = MessageQ_E_FAIL;
- }
-
- last = (rprocId + 1) % nProcessors;
- break;
- }
- }
- rprocId = (rprocId + 1) % nProcessors;
- } while (rprocId != last);
+#endif
+
+ if (timeout == MessageQ_FOREVER) {
+ sem_wait(&obj->synchronizer);
+ }
+ else {
+ gettimeofday(&tv, NULL);
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+
+ if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
+ if (errno == ETIMEDOUT) {
+ PRINTVERBOSE0("MessageQ_get: operation timed out\n")
+
+ return MessageQ_E_TIMEOUT;
+ }
}
}
- else if (retval == 0) {
- *msg = NULL;
- status = MessageQ_E_TIMEOUT;
+
+ if (obj->unblocked) {
+ return MessageQ_E_UNBLOCKED;
}
- return (status);
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ *msg = (MessageQ_Msg)obj->msgList.cqh_first;
+ CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return status;
}
/*
*
* TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
*/
-Int MessageQ_count (MessageQ_Handle handle)
+Int MessageQ_count(MessageQ_Handle handle)
{
Int count = -1;
#if 0
&count, &optlen);
#endif
- return (count);
+ return count;
}
-/* Initializes a message not obtained from MessageQ_alloc. */
-Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
+/*
+ * Initializes a message not obtained from MessageQ_alloc.
+ */
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
{
/* Fill in the fields of the message */
- MessageQ_msgInit (msg);
- msg->heapId = MessageQ_STATICMSG;
+ MessageQ_msgInit(msg);
+ msg->heapId = MessageQ_STATICMSG;
msg->msgSize = size;
}
/*
- * Allocate a message and initialize the needed fields (note some
- * of the fields in the header are set via other APIs or in the
- * MessageQ_put function,
+ * Allocate a message and initialize the needed fields (note some
+ * of the fields in the header are set via other APIs or in the
+ * MessageQ_put function,
*/
-MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
{
- MessageQ_Msg msg = NULL;
+ MessageQ_Msg msg;
/*
* heapId not used for local alloc (as this is over a copy transport), but
- * we need to send to other side as heapId is used in BIOS transport:
+ * we need to send to other side as heapId is used in BIOS transport.
*/
- msg = (MessageQ_Msg)calloc (1, size);
- MessageQ_msgInit (msg);
+ msg = (MessageQ_Msg)calloc(1, size);
+ MessageQ_msgInit(msg);
msg->msgSize = size;
- msg->heapId = heapId;
+ msg->heapId = heapId;
return msg;
}
-/* Frees the message back to the heap that was used to allocate it. */
-Int MessageQ_free (MessageQ_Msg msg)
+/*
+ * Frees the message back to the heap that was used to allocate it.
+ */
+Int MessageQ_free(MessageQ_Msg msg)
{
- UInt32 status = MessageQ_S_SUCCESS;
+ UInt32 status = MessageQ_S_SUCCESS;
/* Check to ensure this was not allocated by user: */
- if (msg->heapId == MessageQ_STATICMSG) {
- status = MessageQ_E_CANNOTFREESTATICMSG;
+ if (msg->heapId == MessageQ_STATICMSG) {
+ status = MessageQ_E_CANNOTFREESTATICMSG;
}
else {
- free (msg);
+ free(msg);
}
return status;
}
/* Register a heap with MessageQ. */
-Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
+Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport: */
+ /* Do nothing, as this uses a copy transport */
return status;
}
/* Unregister a heap with MessageQ. */
-Int MessageQ_unregisterHeap (UInt16 heapId)
+Int MessageQ_unregisterHeap(UInt16 heapId)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport: */
+ /* Do nothing, as this uses a copy transport */
return status;
}
/* Unblocks a MessageQ */
-Void MessageQ_unblock (MessageQ_Handle handle)
+Void MessageQ_unblock(MessageQ_Handle handle)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- uint64_t buf = 1;
- int numBytes;
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
- /* Write 8 bytes to awaken any threads blocked on this messageQ: */
- numBytes = write(obj->unblockFd, &buf, sizeof(buf));
+ obj->unblocked = TRUE;
+ sem_post(&obj->synchronizer);
}
-/* Embeds a source message queue into a message. */
-Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
+/* Embeds a source message queue into a message */
+Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
- msg->replyId = (UInt16)(obj->queue);
+ msg->replyId = (UInt16)(obj->queue);
msg->replyProc = (UInt16)(obj->queue >> 16);
}
/* Returns the QueueId associated with the handle. */
-MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
+MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- UInt32 queueId;
+ MessageQ_Object *obj = (MessageQ_Object *) handle;
+ UInt32 queueId;
queueId = (obj->queue);
}
/* Sets the tracing of a message */
-Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
+Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
{
msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
}
* The MessageQ module itself does not use any shared memory but the
* underlying transport may use some shared memory.
*/
-SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
+SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
{
SizeT memReq = 0u;
/* Do nothing, as this is a copy transport. */
- return (memReq);
-}
-
-/*
- * Create a socket for this remote proc, and attempt to connect.
- *
- * Only creates a socket if one does not already exist for this procId.
- *
- * Note: remoteProcId may be MultiProc_Self() for loopback case.
- */
-Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
-{
- Int status = MessageQ_S_SUCCESS;
- int sock;
-
- PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
-
- if (remoteProcId >= MultiProc_MAXPROCESSORS) {
- status = MessageQ_E_INVALIDPROCID;
- goto exit;
- }
-
- pthread_mutex_lock (&(MessageQ_module->gate));
-
- /* Only create a socket if one doesn't exist: */
- if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
- /* Create the socket for sending messages to the remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = MessageQ_E_FAIL;
- printf ("MessageQ_attach: socket failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = sock;
- /* Attempt to connect: */
- status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
- if (status < 0) {
- status = MessageQ_E_RESOURCE;
- /* don't hard-printf since this is no longer fatal */
- PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
- remoteProcId);
- }
- }
- }
- else {
- status = MessageQ_E_ALREADYEXISTS;
- }
-
- pthread_mutex_unlock (&(MessageQ_module->gate));
-
- if (status == MessageQ_E_RESOURCE) {
- MessageQ_detach(remoteProcId);
- }
-
-exit:
- return (status);
-}
-
-/*
- * Close the socket for this remote proc.
- *
- */
-Int MessageQ_detach (UInt16 remoteProcId)
-{
- Int status = MessageQ_S_SUCCESS;
- int sock;
-
- if (remoteProcId >= MultiProc_MAXPROCESSORS) {
- status = MessageQ_E_INVALIDPROCID;
- goto exit;
- }
-
- pthread_mutex_lock (&(MessageQ_module->gate));
-
- sock = MessageQ_module->sock[remoteProcId];
- if (sock != Transport_INVALIDSOCKET) {
- if (close(sock)) {
- status = MessageQ_E_OSFAILURE;
- printf("MessageQ_detach: close failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
- }
- }
-
- pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
- return (status);
+ return memReq;
}
/*
* This is a helper function to initialize a message.
*/
-Void MessageQ_msgInit (MessageQ_Msg msg)
+Void MessageQ_msgInit(MessageQ_Msg msg)
{
#if 0
Int status = MessageQ_S_SUCCESS;
"MessageQ_msgInit: got LAD response for client %d, status=%d\n",
handle, status)
- memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
+ memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
#else
msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
msg->srcProc = MultiProc_self();
- pthread_mutex_lock(&(MessageQ_module->gate));
+ pthread_mutex_lock(&MessageQ_module->gate);
msg->seqNum = MessageQ_module->seqNum++;
- pthread_mutex_unlock(&(MessageQ_module->gate));
+ pthread_mutex_unlock(&MessageQ_module->gate);
#endif
}
/*
- * =============================================================================
- * Transport: Fxns kept here until need for a transport layer is realized.
- * =============================================================================
+ * Grow module's queues[] array to accommodate queueIndex from LAD
*/
-/*
- * ======== transportCreateEndpoint ========
- *
- * Create a communication endpoint to receive messages.
- */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
+Void _MessageQ_grow(UInt16 queueIndex)
{
- Int status = MessageQ_S_SUCCESS;
- int err;
-
- /* Create the socket to receive messages for this messageQ. */
- *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (*fd < 0) {
- status = MessageQ_E_FAIL;
- printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
- errno, strerror(errno));
- goto exit;
- }
+ MessageQ_Handle *queues;
+ MessageQ_Handle *oldQueues;
+ UInt oldSize;
- PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
+ oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
- err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
- if (err < 0) {
- status = MessageQ_E_FAIL;
- /* don't hard-printf since this is no longer fatal */
- PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
- errno, strerror(errno));
- close(*fd);
- }
+ queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
+ memcpy(queues, MessageQ_module->queues, oldSize);
-exit:
- return (status);
-}
+ oldQueues = MessageQ_module->queues;
+ MessageQ_module->queues = queues;
+ MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
-/*
- * ======== transportCloseEndpoint ========
- *
- * Close the communication endpoint.
- */
-static Int transportCloseEndpoint(int fd)
-{
- Int status = MessageQ_S_SUCCESS;
-
- PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
-
- /* Stop communication to this socket: */
- close(fd);
-
- return (status);
-}
+ free(oldQueues);
-/*
- * ======== transportGet ========
- * Retrieve a message waiting in the socket's queue.
-*/
-static Int transportGet(int sock, MessageQ_Msg * retMsg)
-{
- Int status = MessageQ_S_SUCCESS;
- MessageQ_Msg msg;
- struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
- unsigned int len;
- int byteCount;
-
- /*
- * We have no way of peeking to see what message size we'll get, so we
- * allocate a message of max size to receive contents from the rpmsg socket
- * (currently, a copy transport)
- */
- msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
- if (!msg) {
- status = MessageQ_E_MEMORY;
- goto exit;
- }
-
- memset(&fromAddr, 0, sizeof(fromAddr));
- len = sizeof(fromAddr);
-
- byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
- (struct sockaddr *)&fromAddr, &len);
- if (len != sizeof(fromAddr)) {
- printf("recvfrom: got bad addr len (%d)\n", len);
- status = MessageQ_E_FAIL;
- goto exit;
- }
- if (byteCount < 0) {
- printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
- status = MessageQ_E_FAIL;
- goto exit;
- }
- else {
- /* Update the allocated message size (even though this may waste space
- * when the actual message is smaller than the maximum rpmsg size,
- * the message will be freed soon anyway, and it avoids an extra copy).
- */
- msg->msgSize = byteCount;
-
- /*
- * If the message received was statically allocated, reset the
- * heapId, so the app can free it.
- */
- if (msg->heapId == MessageQ_STATICMSG) {
- msg->heapId = 0; /* for a copy transport, heap id is 0. */
- }
- }
-
- PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
- PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
- PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
-
- *retMsg = msg;
-
-exit:
- return (status);
+ return;
}
-/*
- * ======== transportPut ========
- *
- * Calls the socket API sendto() on the socket associated with
- * with this destination procID.
- * Currently, both local and remote messages are sent via the Socket ABI, so
- * no local object lists are maintained here.
-*/
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
-{
- Int status = MessageQ_S_SUCCESS;
- int sock;
- int err;
-
- /*
- * Retrieve the socket for the AF_SYSLINK protocol associated with this
- * transport.
- */
- sock = MessageQ_module->sock[dstProcId];
-
- PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
-
- err = send(sock, msg, msg->msgSize, 0);
- if (err < 0) {
- printf ("transportPut: send failed: %d, %s\n",
- errno, strerror(errno));
- status = MessageQ_E_FAIL;
- goto exit;
- }
-
- /*
- * Free the message, as this is a copy transport, we maintain MessageQ
- * semantics.
- */
- MessageQ_free (msg);
-
-exit:
- return (status);
-}
index d718f2d8249508aaaa6eb7513d40f777b325dd92..65df6c2ee2ca9a5f705e4e6f909d2c0c746ba3a3 100644 (file)
/* Slot 0 reserved for NameServer messages: */
#define RESERVED_MSGQ_INDEX 1
+/* Number of entries to grow when we run out of queueIndexs */
+#define MessageQ_GROWSIZE 32
+
/* Define BENCHMARK to quiet key MessageQ APIs: */
//#define BENCHMARK
MessageQ_module->numQueues = cfg->maxRuntimeEntries;
MessageQ_module->queues = (MessageQ_Handle *)
calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
+ MessageQ_module->canFreeQueues = TRUE;
exitSetup:
LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
MessageQ_module->numQueues = 0u;
- MessageQ_module->canFreeQueues = TRUE;
exitDestroy:
LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
/* Allocate larger table */
- queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
+ queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
+ sizeof(MessageQ_Handle));
/* Copy contents into new table */
memcpy(queues, MessageQ_module->queues, oldSize);
/* Hook-up new table */
oldQueues = MessageQ_module->queues;
MessageQ_module->queues = queues;
- MessageQ_module->numQueues++;
+ MessageQ_module->numQueues += MessageQ_GROWSIZE;
/* Delete old table if not statically defined */
if (MessageQ_module->canFreeQueues == TRUE) {
index 959f3ac09dc3c1efcf431d3a12ffb16ee0db70f6..fdae9cb3d787bd0266cecb3d6381b9a7a9a5ed7c 100644 (file)
# the program to build (the names of the final binaries)
bin_PROGRAMS = ping_rpmsg MessageQApp MessageQBench MessageQMulti \
- NameServerApp Msgq100
+ MessageQMultiMulti NameServerApp Msgq100
if OMAP54XX_SMP
$(top_srcdir)/packages/ti/ipc/NameServer.h \
NameServerApp.c
+
# list of sources for the 'ping_rpmsg' binary
ping_rpmsg_SOURCES = ping_rpmsg.c
# list of sources for the 'MessageQMulti' binary
MessageQMulti_SOURCES = $(common_sources) MessageQMulti.c
+# list of sources for the 'MessageQMultiMulti' binary
+MessageQMultiMulti_SOURCES = $(common_sources) MessageQMultiMulti.c
+
# list of sources for the 'NameServerApp' binary
NameServerApp_SOURCES = $(nameServer_common_sources)
$(top_srcdir)/packages/ti/ipc/tests/GateMPAppCommon.h
common_libraries = -lpthread $(top_builddir)/linux/src/api/libtiipc.la \
- $(top_builddir)/linux/src/utils/libtiipcutils.la
+ $(top_builddir)/linux/src/utils/libtiipcutils.la \
+ $(top_builddir)/linux/src/transport/libtransport.la
+
# the additional libraries to link ping_rpmsg
ping_rpmsg_LDADD = -lrt
MessageQMulti_LDADD = $(common_libraries) \
$(AM_LDFLAGS)
+# the additional libraries needed to link MessageQMultiMulti
+MessageQMultiMulti_LDADD = $(common_libraries) \
+ $(AM_LDFLAGS)
+
# the additional libraries needed to link NameServerApp
NameServerApp_LDADD = $(common_libraries) \
$(AM_LDFLAGS)
index 50f06fa4aff35cda116bb76f0ac188f50d2b2517..8f7ff9a992ec8303f550f5e99fb4cd2390765ff7 100644 (file)
bin_PROGRAMS = ping_rpmsg$(EXEEXT) MessageQApp$(EXEEXT) \
MessageQBench$(EXEEXT) MessageQMulti$(EXEEXT) \
- NameServerApp$(EXEEXT) Msgq100$(EXEEXT) $(am__EXEEXT_1) \
- $(am__EXEEXT_2) $(am__EXEEXT_1) $(am__EXEEXT_3) \
- $(am__EXEEXT_4) $(am__EXEEXT_1) $(am__EXEEXT_5) \
+ MessageQMultiMulti$(EXEEXT) NameServerApp$(EXEEXT) \
+ Msgq100$(EXEEXT) $(am__EXEEXT_1) $(am__EXEEXT_2) \
+ $(am__EXEEXT_1) $(am__EXEEXT_3) $(am__EXEEXT_4) \
+ $(am__EXEEXT_1) $(am__EXEEXT_5) $(am__EXEEXT_1) \
$(am__EXEEXT_1) $(am__EXEEXT_1) $(am__EXEEXT_1) \
- $(am__EXEEXT_1) $(am__EXEEXT_1) $(am__EXEEXT_6) \
- $(am__EXEEXT_7)
+ $(am__EXEEXT_1) $(am__EXEEXT_6) $(am__EXEEXT_7)
# Add platform specific bin application's here
@OMAP54XX_SMP_TRUE@am__append_3 =
GateMPApp.$(OBJEXT)
GateMPApp_OBJECTS = $(am_GateMPApp_OBJECTS)
am__DEPENDENCIES_1 = $(top_builddir)/linux/src/api/libtiipc.la \
- $(top_builddir)/linux/src/utils/libtiipcutils.la
+ $(top_builddir)/linux/src/utils/libtiipcutils.la \
+ $(top_builddir)/linux/src/transport/libtransport.la
am__DEPENDENCIES_2 =
GateMPApp_DEPENDENCIES = $(am__DEPENDENCIES_1) \
$(CMEM_INSTALL_DIR)/src/cmem/api/.libs/libticmem.a \
MessageQMulti_OBJECTS = $(am_MessageQMulti_OBJECTS)
MessageQMulti_DEPENDENCIES = $(am__DEPENDENCIES_1) \
$(am__DEPENDENCIES_2)
+am_MessageQMultiMulti_OBJECTS = $(am__objects_1) \
+ MessageQMultiMulti.$(OBJEXT)
+MessageQMultiMulti_OBJECTS = $(am_MessageQMultiMulti_OBJECTS)
+MessageQMultiMulti_DEPENDENCIES = $(am__DEPENDENCIES_1) \
+ $(am__DEPENDENCIES_2)
am_Msgq100_OBJECTS = $(am__objects_1) Msgq100.$(OBJEXT)
Msgq100_OBJECTS = $(am_Msgq100_OBJECTS)
Msgq100_DEPENDENCIES = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_2)
$(AM_LDFLAGS) $(LDFLAGS) -o $@
SOURCES = $(GateMPApp_SOURCES) $(MessageQApp_SOURCES) \
$(MessageQBench_SOURCES) $(MessageQMulti_SOURCES) \
- $(Msgq100_SOURCES) $(NameServerApp_SOURCES) \
- $(mmrpc_test_SOURCES) $(nano_test_SOURCES) \
- $(ping_rpmsg_SOURCES)
+ $(MessageQMultiMulti_SOURCES) $(Msgq100_SOURCES) \
+ $(NameServerApp_SOURCES) $(mmrpc_test_SOURCES) \
+ $(nano_test_SOURCES) $(ping_rpmsg_SOURCES)
DIST_SOURCES = $(GateMPApp_SOURCES) $(MessageQApp_SOURCES) \
$(MessageQBench_SOURCES) $(MessageQMulti_SOURCES) \
- $(Msgq100_SOURCES) $(NameServerApp_SOURCES) \
- $(mmrpc_test_SOURCES) $(nano_test_SOURCES) \
- $(ping_rpmsg_SOURCES)
+ $(MessageQMultiMulti_SOURCES) $(Msgq100_SOURCES) \
+ $(NameServerApp_SOURCES) $(mmrpc_test_SOURCES) \
+ $(nano_test_SOURCES) $(ping_rpmsg_SOURCES)
ETAGS = etags
CTAGS = ctags
DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
# list of sources for the 'MessageQMulti' binary
MessageQMulti_SOURCES = $(common_sources) MessageQMulti.c
+# list of sources for the 'MessageQMultiMulti' binary
+MessageQMultiMulti_SOURCES = $(common_sources) MessageQMultiMulti.c
+
# list of sources for the 'NameServerApp' binary
NameServerApp_SOURCES = $(nameServer_common_sources)
$(top_srcdir)/packages/ti/ipc/tests/GateMPAppCommon.h
common_libraries = -lpthread $(top_builddir)/linux/src/api/libtiipc.la \
- $(top_builddir)/linux/src/utils/libtiipcutils.la
+ $(top_builddir)/linux/src/utils/libtiipcutils.la \
+ $(top_builddir)/linux/src/transport/libtransport.la
# the additional libraries to link ping_rpmsg
$(AM_LDFLAGS)
+# the additional libraries needed to link MessageQMultiMulti
+MessageQMultiMulti_LDADD = $(common_libraries) \
+ $(AM_LDFLAGS)
+
+
# the additional libraries needed to link NameServerApp
NameServerApp_LDADD = $(common_libraries) \
$(AM_LDFLAGS)
MessageQMulti$(EXEEXT): $(MessageQMulti_OBJECTS) $(MessageQMulti_DEPENDENCIES)
@rm -f MessageQMulti$(EXEEXT)
$(LINK) $(MessageQMulti_LDFLAGS) $(MessageQMulti_OBJECTS) $(MessageQMulti_LDADD) $(LIBS)
+MessageQMultiMulti$(EXEEXT): $(MessageQMultiMulti_OBJECTS) $(MessageQMultiMulti_DEPENDENCIES)
+ @rm -f MessageQMultiMulti$(EXEEXT)
+ $(LINK) $(MessageQMultiMulti_LDFLAGS) $(MessageQMultiMulti_OBJECTS) $(MessageQMultiMulti_LDADD) $(LIBS)
Msgq100$(EXEEXT): $(Msgq100_OBJECTS) $(Msgq100_DEPENDENCIES)
@rm -f Msgq100$(EXEEXT)
$(LINK) $(Msgq100_LDFLAGS) $(Msgq100_OBJECTS) $(Msgq100_LDADD) $(LIBS)
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQApp.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQBench.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQMulti.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQMultiMulti.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Msgq100.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Mx.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NameServerApp.Po@am__quote@
diff --git a/linux/src/tests/MessageQMultiMulti.c b/linux/src/tests/MessageQMultiMulti.c
--- /dev/null
@@ -0,0 +1,269 @@
+/*
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of Texas Instruments Incorporated nor the names of
+ * its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/* =============================================================================
+ * @file MessageQApp.c
+ *
+ * @brief Sample application for MessageQ module between MPU and Remote Proc
+ *
+ * ============================================================================
+ */
+
+/* Standard headers */
+#include <pthread.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+
+/* IPC Headers */
+#include <ti/ipc/Std.h>
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+
+/* App defines: Must match on remote proc side: */
+#define MSGSIZE 64u
+#define HEAPID 0u
+#define SLAVE_MESSAGEQNAME "SLAVE"
+#define HOST_MESSAGEQNAME "HOST"
+
+/** ============================================================================
+ * Macros and types
+ * ============================================================================
+ */
+
+#define NUM_LOOPS_DFLT 1000
+#define NUM_THREADS_DFLT 10
+#define MAX_NUM_THREADS 25
+#define ONE_PROCESS_ONLY (-1)
+
+/** ============================================================================
+ * Globals
+ * ============================================================================
+ */
+static Int numLoops, numThreads, procNum;
+
+struct thread_info { /* Used as argument to thread_start() */
+ pthread_t thread_id; /* ID returned by pthread_create() */
+ int thread_num; /* Application-defined thread # */
+};
+
+static void * pingThreadFxn(void *arg);
+
+/** ============================================================================
+ * Functions
+ * ============================================================================
+ */
+
+static Void * pingThreadFxn(void *arg)
+{
+ Int threadNum = *(int *)arg;
+ Int32 status = 0;
+ MessageQ_Msg msg = NULL;
+ MessageQ_Params msgParams;
+ UInt16 i;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queueId = MessageQ_INVALIDMESSAGEQ;
+
+ char remoteQueueName[64];
+ char hostQueueName[64];
+
+ printf ("Entered pingThreadFxn: %d\n", threadNum);
+
+ sprintf(remoteQueueName, "%s_%d%d", SLAVE_MESSAGEQNAME, threadNum, (threadNum % (MultiProc_getNumProcessors() - 1)) + 1);
+ sprintf(hostQueueName, "%s_%d", HOST_MESSAGEQNAME, threadNum );
+
+ /* Create the local Message Queue for receiving. */
+ MessageQ_Params_init (&msgParams);
+ handle = MessageQ_create (hostQueueName, &msgParams);
+ if (handle == NULL) {
+ printf ("Error in MessageQ_create\n");
+ goto exit;
+ }
+ else {
+ printf ("thread: %d, Local Message: %s, QId: 0x%x\n",
+ threadNum, hostQueueName, MessageQ_getQueueId(handle));
+ }
+
+ /* Poll until remote side has it's messageQ created before we send: */
+ do {
+ status = MessageQ_open (remoteQueueName, &queueId);
+ sleep (1);
+ } while (status == MessageQ_E_NOTFOUND);
+ if (status < 0) {
+ printf ("Error in MessageQ_open [0x%x]\n", status);
+ goto cleanup;
+ }
+ else {
+ printf ("thread: %d, Remote queue: %s, QId: 0x%x\n",
+ threadNum, remoteQueueName, queueId);
+ }
+
+ printf ("\nthread: %d: Exchanging messages with remote processor...\n",
+ threadNum);
+ for (i = 0 ; i < numLoops ; i++) {
+ /* Allocate message. */
+ msg = MessageQ_alloc (HEAPID, MSGSIZE);
+ if (msg == NULL) {
+ printf ("Error in MessageQ_alloc\n");
+ break;
+ }
+
+ MessageQ_setMsgId (msg, i);
+
+ /* Have the remote proc reply to this message queue */
+ MessageQ_setReplyQueue (handle, msg);
+
+ status = MessageQ_put (queueId, msg);
+ if (status < 0) {
+ printf ("Error in MessageQ_put [0x%x]\n", status);
+ break;
+ }
+
+ status = MessageQ_get(handle, &msg, MessageQ_FOREVER);
+ if (status < 0) {
+ printf ("Error in MessageQ_get [0x%x]\n", status);
+ break;
+ }
+ else {
+ /* Validate the returned message. */
+ if ((msg != NULL) && (MessageQ_getMsgId (msg) != i)) {
+ printf ("Data integrity failure!\n"
+ " Expected %d\n"
+ " Received %d\n",
+ i, MessageQ_getMsgId (msg));
+ break;
+ }
+
+ status = MessageQ_free (msg);
+ }
+
+ printf ("thread: %d: Exchanged %d msgs\n", threadNum, (i+1));
+ }
+
+ printf ("thread: %d: pingThreadFxn successfully completed!\n", threadNum);
+
+ MessageQ_close (&queueId);
+
+cleanup:
+ /* Clean-up */
+ status = MessageQ_delete (&handle);
+ if (status < 0) {
+ printf ("Error in MessageQ_delete [0x%x]\n", status);
+ }
+
+exit:
+
+ return ((void *)status);
+}
+
+int main (int argc, char ** argv)
+{
+ struct thread_info threads[MAX_NUM_THREADS];
+ int ret,i;
+ Int32 status = 0;
+
+ /* Parse Args: */
+ numLoops = NUM_LOOPS_DFLT;
+ numThreads = NUM_THREADS_DFLT;
+ procNum = ONE_PROCESS_ONLY;
+ switch (argc) {
+ case 1:
+ /* use defaults */
+ break;
+ case 2:
+ numThreads = atoi(argv[1]);
+ break;
+ case 3:
+ numThreads = atoi(argv[1]);
+ numLoops = atoi(argv[2]);
+ break;
+ case 4:
+ /* We force numThreads = 1 if doing a multiProcess test: */
+ numThreads = 1;
+ numLoops = atoi(argv[2]);
+ procNum = atoi(argv[3]);
+ break;
+ default:
+ printf("Usage: %s [<numThreads>] [<numLoops>] [<Process #]>\n",
+ argv[0]);
+ printf("\tDefaults: numThreads: %d, numLoops: %d\n",
+ NUM_THREADS_DFLT, NUM_LOOPS_DFLT);
+ printf("\tMax Threads: %d\n", MAX_NUM_THREADS);
+ exit(0);
+ }
+
+ if (numThreads > MAX_NUM_THREADS) {
+ printf("Error: Maximum number of threads supported is %d\n",
+ MAX_NUM_THREADS);
+ exit(EXIT_FAILURE);
+ }
+
+ printf("Using numThreads: %d, numLoops: %d\n", numThreads, numLoops);
+ if (procNum != ONE_PROCESS_ONLY) {
+ printf("ProcNum: %d\n", procNum);
+ }
+
+ status = Ipc_start();
+ if (status < 0) {
+ printf ("Ipc_start failed: status = 0x%x\n", status);
+ goto exit;
+ }
+
+ /* Launch multiple threads: */
+ for (i = 0; i < numThreads; i++) {
+ /* Create the test thread: */
+ printf ("creating pingThreadFxn: %d\n", i);
+ threads[i].thread_num = (procNum == ONE_PROCESS_ONLY)? i: procNum;
+ ret = pthread_create(&threads[i].thread_id, NULL, &pingThreadFxn,
+ &(threads[i].thread_num));
+ if (ret) {
+ printf("MessageQMulti: can't spawn thread: %d, %s\n",
+ i, strerror(ret));
+ }
+ }
+
+ /* Join all threads: */
+ for (i = 0; i < numThreads; i++) {
+ ret = pthread_join(threads[i].thread_id, NULL);
+ if (ret != 0) {
+ printf("MessageQMulti: failed to join thread: %d, %s\n",
+ threads[i].thread_num, strerror(ret));
+ }
+ printf("MessageQMulti: Joined with thread %d\n",threads[i].thread_num);
+ }
+
+ Ipc_stop();
+
+exit:
+
+ return (0);
+}
diff --git a/linux/src/transport/Makefile.am b/linux/src/transport/Makefile.am
--- /dev/null
@@ -0,0 +1,66 @@
+##
+## Copyright (c) 2013-2014, Texas Instruments Incorporated
+##
+## Redistribution and use in source and binary forms, with or without
+## modification, are permitted provided that the following conditions
+## are met:
+##
+## * Redistributions of source code must retain the above copyright
+## notice, this list of conditions and the following disclaimer.
+##
+## * Redistributions in binary form must reproduce the above copyright
+## notice, this list of conditions and the following disclaimer in the
+## documentation and/or other materials provided with the distribution.
+##
+## * Neither the name of Texas Instruments Incorporated nor the names of
+## its contributors may be used to endorse or promote products derived
+## from this software without specific prior written permission.
+##
+## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+## AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+## THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+## PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+## CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+## EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+## PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+## OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+## OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+## EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+##
+## ======== src/api/Makefile.am ========
+##
+
+# additional include paths necessary to compile the library
+AM_CFLAGS = -I$(top_srcdir)/linux/include -I$(top_srcdir)/hlos_common/include \
+ -I$(top_srcdir)/packages -I$(KERNEL_INSTALL_DIR)/include/generated/uapi\
+ -D_GNU_SOURCE -Wall @AM_CFLAGS@
+
+#if DRA7XX
+#AM_CFLAGS += -DGATEMP_SUPPORT
+#endif
+
+###############################################################################
+# THE LIBRARIES TO BUILD
+###############################################################################
+
+# the library names to build (note we are building shared libs)
+lib_LTLIBRARIES = libtransport.la
+
+# where to install the headers on the system
+libtransport_ladir = $(includedir)/ti/ipc
+
+# the list of header files that belong to the library (to be installed later)
+libtransport_la_HEADERS = $(top_srcdir)/linux/include/ti/ipc/Std.h
+
+# the sources to add to the library and to add to the source distribution
+libtransport_la_SOURCES = \
+ TransportRpmsg.c
+
+# Add version info to the shared library
+libtransport_la_LDFLAGS = -version-info 1:0:0
+
+#pkgconfig_DATA = libtiipc.pc
+#pkgconfigdir = $(libdir)/pkgconfig
+
+###############################################################################
diff --git a/linux/src/transport/Makefile.in b/linux/src/transport/Makefile.in
--- /dev/null
@@ -0,0 +1,533 @@
+# Makefile.in generated by automake 1.9.6 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
+# 2003, 2004, 2005 Free Software Foundation, Inc.
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+
+srcdir = @srcdir@
+top_srcdir = @top_srcdir@
+VPATH = @srcdir@
+pkgdatadir = $(datadir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+top_builddir = ../../..
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+INSTALL = @INSTALL@
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = linux/src/transport
+DIST_COMMON = $(libtransport_la_HEADERS) $(srcdir)/Makefile.am \
+ $(srcdir)/Makefile.in
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+ $(ACLOCAL_M4)
+mkinstalldirs = $(install_sh) -d
+CONFIG_CLEAN_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+ *) f=$$p;; \
+ esac;
+am__strip_dir = `echo $$p | sed -e 's|^.*/||'`;
+am__installdirs = "$(DESTDIR)$(libdir)" \
+ "$(DESTDIR)$(libtransport_ladir)"
+libLTLIBRARIES_INSTALL = $(INSTALL)
+LTLIBRARIES = $(lib_LTLIBRARIES)
+libtransport_la_LIBADD =
+am_libtransport_la_OBJECTS = TransportRpmsg.lo
+libtransport_la_OBJECTS = $(am_libtransport_la_OBJECTS)
+DEFAULT_INCLUDES = -I. -I$(srcdir)
+depcomp = $(SHELL) $(top_srcdir)/linux/build-aux/depcomp
+am__depfiles_maybe = depfiles
+COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
+ $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
+LTCOMPILE = $(LIBTOOL) --tag=CC --mode=compile $(CC) $(DEFS) \
+ $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+ $(AM_CFLAGS) $(CFLAGS)
+CCLD = $(CC)
+LINK = $(LIBTOOL) --tag=CC --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+ $(AM_LDFLAGS) $(LDFLAGS) -o $@
+SOURCES = $(libtransport_la_SOURCES)
+DIST_SOURCES = $(libtransport_la_SOURCES)
+libtransport_laHEADERS_INSTALL = $(INSTALL_HEADER)
+HEADERS = $(libtransport_la_HEADERS)
+ETAGS = etags
+CTAGS = ctags
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMDEP_FALSE = @AMDEP_FALSE@
+AMDEP_TRUE = @AMDEP_TRUE@
+AMTAR = @AMTAR@
+
+# additional include paths necessary to compile the library
+AM_CFLAGS = -I$(top_srcdir)/linux/include -I$(top_srcdir)/hlos_common/include \
+ -I$(top_srcdir)/packages -I$(KERNEL_INSTALL_DIR)/include/generated/uapi\
+ -D_GNU_SOURCE -Wall @AM_CFLAGS@
+
+AM_LDFLAGS = @AM_LDFLAGS@
+AR = @AR@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+C66AK2E_FALSE = @C66AK2E_FALSE@
+C66AK2E_TRUE = @C66AK2E_TRUE@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CMEM_FALSE = @CMEM_FALSE@
+CMEM_INSTALL_DIR = @CMEM_INSTALL_DIR@
+CMEM_TRUE = @CMEM_TRUE@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CXX = @CXX@
+CXXCPP = @CXXCPP@
+CXXDEPMODE = @CXXDEPMODE@
+CXXFLAGS = @CXXFLAGS@
+CYGPATH_W = @CYGPATH_W@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+DRA7XX_FALSE = @DRA7XX_FALSE@
+DRA7XX_TRUE = @DRA7XX_TRUE@
+DRM_FALSE = @DRM_FALSE@
+DRM_PREFIX = @DRM_PREFIX@
+DRM_TRUE = @DRM_TRUE@
+ECHO = @ECHO@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+KDIR_FALSE = @KDIR_FALSE@
+KDIR_TRUE = @KDIR_TRUE@
+KERNEL_INSTALL_DIR = @KERNEL_INSTALL_DIR@
+LDFLAGS = @LDFLAGS@
+LIBOBJS = @LIBOBJS@
+LIBS = @LIBS@
+LIBTOOL = @LIBTOOL@
+LN_S = @LN_S@
+LTLIBOBJS = @LTLIBOBJS@
+MAINT = @MAINT@
+MAINTAINER_MODE_FALSE = @MAINTAINER_MODE_FALSE@
+MAINTAINER_MODE_TRUE = @MAINTAINER_MODE_TRUE@
+MAKEINFO = @MAKEINFO@
+OBJEXT = @OBJEXT@
+OMAP54XX_SMP_FALSE = @OMAP54XX_SMP_FALSE@
+OMAP54XX_SMP_TRUE = @OMAP54XX_SMP_TRUE@
+OMAPL138_FALSE = @OMAPL138_FALSE@
+OMAPL138_TRUE = @OMAPL138_TRUE@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+PLATFORM = @PLATFORM@
+RANLIB = @RANLIB@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+STRIP = @STRIP@
+TCI6614_FALSE = @TCI6614_FALSE@
+TCI6614_TRUE = @TCI6614_TRUE@
+TCI6630_FALSE = @TCI6630_FALSE@
+TCI6630_TRUE = @TCI6630_TRUE@
+TCI6636_FALSE = @TCI6636_FALSE@
+TCI6636_TRUE = @TCI6636_TRUE@
+TCI6638_FALSE = @TCI6638_FALSE@
+TCI6638_TRUE = @TCI6638_TRUE@
+VERSION = @VERSION@
+ac_ct_AR = @ac_ct_AR@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_CXX = @ac_ct_CXX@
+ac_ct_RANLIB = @ac_ct_RANLIB@
+ac_ct_STRIP = @ac_ct_STRIP@
+am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
+am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
+am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
+am__fastdepCXX_TRUE = @am__fastdepCXX_TRUE@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+datadir = @datadir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+oldincludedir = @oldincludedir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+
+#if DRA7XX
+#AM_CFLAGS += -DGATEMP_SUPPORT
+#endif
+
+###############################################################################
+# THE LIBRARIES TO BUILD
+###############################################################################
+
+# the library names to build (note we are building shared libs)
+lib_LTLIBRARIES = libtransport.la
+
+# where to install the headers on the system
+libtransport_ladir = $(includedir)/ti/ipc
+
+# the list of header files that belong to the library (to be installed later)
+libtransport_la_HEADERS = $(top_srcdir)/linux/include/ti/ipc/Std.h
+
+# the sources to add to the library and to add to the source distribution
+libtransport_la_SOURCES = \
+ TransportRpmsg.c
+
+
+# Add version info to the shared library
+libtransport_la_LDFLAGS = -version-info 1:0:0
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .c .lo .o .obj
+$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps)
+ @for dep in $?; do \
+ case '$(am__configure_deps)' in \
+ *$$dep*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh \
+ && exit 0; \
+ exit 1;; \
+ esac; \
+ done; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign linux/src/transport/Makefile'; \
+ cd $(top_srcdir) && \
+ $(AUTOMAKE) --foreign linux/src/transport/Makefile
+.PRECIOUS: Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+ @case '$?' in \
+ *config.status*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+ *) \
+ echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
+ cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
+ esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+install-libLTLIBRARIES: $(lib_LTLIBRARIES)
+ @$(NORMAL_INSTALL)
+ test -z "$(libdir)" || $(mkdir_p) "$(DESTDIR)$(libdir)"
+ @list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+ if test -f $$p; then \
+ f=$(am__strip_dir) \
+ echo " $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) '$$p' '$(DESTDIR)$(libdir)/$$f'"; \
+ $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) "$$p" "$(DESTDIR)$(libdir)/$$f"; \
+ else :; fi; \
+ done
+
+uninstall-libLTLIBRARIES:
+ @$(NORMAL_UNINSTALL)
+ @set -x; list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+ p=$(am__strip_dir) \
+ echo " $(LIBTOOL) --mode=uninstall rm -f '$(DESTDIR)$(libdir)/$$p'"; \
+ $(LIBTOOL) --mode=uninstall rm -f "$(DESTDIR)$(libdir)/$$p"; \
+ done
+
+clean-libLTLIBRARIES:
+ -test -z "$(lib_LTLIBRARIES)" || rm -f $(lib_LTLIBRARIES)
+ @list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+ dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \
+ test "$$dir" != "$$p" || dir=.; \
+ echo "rm -f \"$${dir}/so_locations\""; \
+ rm -f "$${dir}/so_locations"; \
+ done
+libtransport.la: $(libtransport_la_OBJECTS) $(libtransport_la_DEPENDENCIES)
+ $(LINK) -rpath $(libdir) $(libtransport_la_LDFLAGS) $(libtransport_la_OBJECTS) $(libtransport_la_LIBADD) $(LIBS)
+
+mostlyclean-compile:
+ -rm -f *.$(OBJEXT)
+
+distclean-compile:
+ -rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TransportRpmsg.Plo@am__quote@
+
+.c.o:
+@am__fastdepCC_TRUE@ if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(COMPILE) -c $<
+
+.c.obj:
+@am__fastdepCC_TRUE@ if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ `$(CYGPATH_W) '$<'`; \
+@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(COMPILE) -c `$(CYGPATH_W) '$<'`
+
+.c.lo:
+@am__fastdepCC_TRUE@ if $(LTCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCC_TRUE@ then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Plo"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(LTCOMPILE) -c -o $@ $<
+
+mostlyclean-libtool:
+ -rm -f *.lo
+
+clean-libtool:
+ -rm -rf .libs _libs
+
+distclean-libtool:
+ -rm -f libtool
+uninstall-info-am:
+install-libtransport_laHEADERS: $(libtransport_la_HEADERS)
+ @$(NORMAL_INSTALL)
+ test -z "$(libtransport_ladir)" || $(mkdir_p) "$(DESTDIR)$(libtransport_ladir)"
+ @list='$(libtransport_la_HEADERS)'; for p in $$list; do \
+ if test -f "$$p"; then d=; else d="$(srcdir)/"; fi; \
+ f=$(am__strip_dir) \
+ echo " $(libtransport_laHEADERS_INSTALL) '$$d$$p' '$(DESTDIR)$(libtransport_ladir)/$$f'"; \
+ $(libtransport_laHEADERS_INSTALL) "$$d$$p" "$(DESTDIR)$(libtransport_ladir)/$$f"; \
+ done
+
+uninstall-libtransport_laHEADERS:
+ @$(NORMAL_UNINSTALL)
+ @list='$(libtransport_la_HEADERS)'; for p in $$list; do \
+ f=$(am__strip_dir) \
+ echo " rm -f '$(DESTDIR)$(libtransport_ladir)/$$f'"; \
+ rm -f "$(DESTDIR)$(libtransport_ladir)/$$f"; \
+ done
+
+ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES)
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) ' { files[$$0] = 1; } \
+ END { for (i in files) print i; }'`; \
+ mkid -fID $$unique
+tags: TAGS
+
+TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \
+ $(TAGS_FILES) $(LISP)
+ tags=; \
+ here=`pwd`; \
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) ' { files[$$0] = 1; } \
+ END { for (i in files) print i; }'`; \
+ if test -z "$(ETAGS_ARGS)$$tags$$unique"; then :; else \
+ test -n "$$unique" || unique=$$empty_fix; \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ $$tags $$unique; \
+ fi
+ctags: CTAGS
+CTAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \
+ $(TAGS_FILES) $(LISP)
+ tags=; \
+ here=`pwd`; \
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) ' { files[$$0] = 1; } \
+ END { for (i in files) print i; }'`; \
+ test -z "$(CTAGS_ARGS)$$tags$$unique" \
+ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+ $$tags $$unique
+
+GTAGS:
+ here=`$(am__cd) $(top_builddir) && pwd` \
+ && cd $(top_srcdir) \
+ && gtags -i $(GTAGS_ARGS) $$here
+
+distclean-tags:
+ -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+distdir: $(DISTFILES)
+ $(mkdir_p) $(distdir)/../../../linux/include/ti/ipc
+ @srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \
+ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \
+ list='$(DISTFILES)'; for file in $$list; do \
+ case $$file in \
+ $(srcdir)/*) file=`echo "$$file" | sed "s|^$$srcdirstrip/||"`;; \
+ $(top_srcdir)/*) file=`echo "$$file" | sed "s|^$$topsrcdirstrip/|$(top_builddir)/|"`;; \
+ esac; \
+ if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+ dir=`echo "$$file" | sed -e 's,/[^/]*$$,,'`; \
+ if test "$$dir" != "$$file" && test "$$dir" != "."; then \
+ dir="/$$dir"; \
+ $(mkdir_p) "$(distdir)$$dir"; \
+ else \
+ dir=''; \
+ fi; \
+ if test -d $$d/$$file; then \
+ if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+ cp -pR $(srcdir)/$$file $(distdir)$$dir || exit 1; \
+ fi; \
+ cp -pR $$d/$$file $(distdir)$$dir || exit 1; \
+ else \
+ test -f $(distdir)/$$file \
+ || cp -p $$d/$$file $(distdir)/$$file \
+ || exit 1; \
+ fi; \
+ done
+check-am: all-am
+check: check-am
+all-am: Makefile $(LTLIBRARIES) $(HEADERS)
+installdirs:
+ for dir in "$(DESTDIR)$(libdir)" "$(DESTDIR)$(libtransport_ladir)"; do \
+ test -z "$$dir" || $(mkdir_p) "$$dir"; \
+ done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+ @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ `test -z '$(STRIP)' || \
+ echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+ -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+
+maintainer-clean-generic:
+ @echo "This command is intended for maintainers to use"
+ @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic clean-libLTLIBRARIES clean-libtool \
+ mostlyclean-am
+
+distclean: distclean-am
+ -rm -rf ./$(DEPDIR)
+ -rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+ distclean-libtool distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+info: info-am
+
+info-am:
+
+install-data-am: install-libtransport_laHEADERS
+
+install-exec-am: install-libLTLIBRARIES
+
+install-info: install-info-am
+
+install-man:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+ -rm -rf ./$(DEPDIR)
+ -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-info-am uninstall-libLTLIBRARIES \
+ uninstall-libtransport_laHEADERS
+
+.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \
+ clean-libLTLIBRARIES clean-libtool ctags distclean \
+ distclean-compile distclean-generic distclean-libtool \
+ distclean-tags distdir dvi dvi-am html html-am info info-am \
+ install install-am install-data install-data-am install-exec \
+ install-exec-am install-info install-info-am \
+ install-libLTLIBRARIES install-libtransport_laHEADERS \
+ install-man install-strip installcheck installcheck-am \
+ installdirs maintainer-clean maintainer-clean-generic \
+ mostlyclean mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool pdf pdf-am ps ps-am tags uninstall \
+ uninstall-am uninstall-info-am uninstall-libLTLIBRARIES \
+ uninstall-libtransport_laHEADERS
+
+
+#pkgconfig_DATA = libtiipc.pc
+#pkgconfigdir = $(libdir)/pkgconfig
+
+###############################################################################
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/linux/src/transport/TransportRpmsg.c b/linux/src/transport/TransportRpmsg.c
--- /dev/null
@@ -0,0 +1,639 @@
+/*
+ * Copyright (c) 2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of Texas Instruments Incorporated nor the names of
+ * its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * ======== TransportRpmsg.c ========
+ * Implementation of functions specified in the IMessageQTransport interface.
+ */
+
+#include <ti/ipc/Std.h>
+
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <_MessageQ.h>
+
+/* Socket Headers */
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/eventfd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <pthread.h>
+
+/* Socket Protocol Family */
+#include <net/rpmsg.h>
+
+/* Socket utils: */
+#include <SocketFxns.h>
+
+#include <_lad.h>
+
+#include <TransportRpmsg.h>
+
+
+/* More magic rpmsg port numbers: */
+#define MESSAGEQ_RPMSG_PORT 61
+#define MESSAGEQ_RPMSG_MAXSIZE 512
+
+#define TransportRpmsg_GROWSIZE 32
+
+/* traces in this file are controlled via _TransportMessageQ_verbose */
+Bool _TransportMessageQ_verbose = FALSE;
+#define verbose _TransportMessageQ_verbose
+
+Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
+Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
+Bool TransportRpmsg_put(Void *handle, Ptr msg);
+
+typedef struct TransportRpmsg_Module {
+ int sock[MultiProc_MAXPROCESSORS];
+ fd_set rfds;
+ int maxFd;
+ int inFds[1024];
+ int nInFds;
+ pthread_mutex_t gate;
+ int unblockEvent; /* eventFd for unblocking socket */
+ pthread_t threadId; /* ID returned by pthread_create() */
+ Bool threadStarted;
+} TransportRpmsg_Module;
+
+IMessageQTransport_Fxns TransportRpmsg_fxns = {
+ .bind = TransportRpmsg_bind,
+ .unbind = TransportRpmsg_unbind,
+ .put = TransportRpmsg_put,
+};
+
+typedef struct TransportRpmsg_Object {
+ IMessageQTransport_Object base;
+ Int status;
+ UInt16 rprocId;
+ int numQueues;
+ int *qIndexToFd;
+} TransportRpmsg_Object;
+
+TransportRpmsg_Module TransportRpmsg_state = {
+ .sock = {0},
+ .threadStarted = FALSE,
+};
+TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
+
+static Int attach(UInt16 rprocId);
+static Int detach(UInt16 rprocId);
+static void *rpmsgThreadFxn(void *arg);
+static Int transportGet(int sock, MessageQ_Msg *retMsg);
+static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
+ Int fd,
+ UInt16 qIndex);
+static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
+static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
+
+/* -------------------------------------------------------------------------- */
+
+/* instance convertors */
+IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
+{
+ TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+ return ((IMessageQTransport_Handle)&obj->base);
+}
+
+TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
+{
+ return ((TransportRpmsg_Handle)base);
+}
+
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
+ Int *attachStatus)
+{
+ TransportRpmsg_Object *obj;
+ Int *queues;
+ Int rv;
+
+ rv = attach(params->rprocId);
+ if (attachStatus) {
+ *attachStatus = rv;
+ }
+
+ if (rv != MessageQ_S_SUCCESS) {
+ return NULL;
+ }
+
+ obj = calloc(1, sizeof (TransportRpmsg_Object));
+
+ /* structure copy */
+ obj->base.base.interfaceType = IMessageQTransport_TypeId;
+ obj->base.fxns = &TransportRpmsg_fxns;
+ obj->rprocId = params->rprocId;
+
+ obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
+ obj->numQueues = TransportRpmsg_GROWSIZE;
+
+ return (TransportRpmsg_Handle)obj;
+}
+
+Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
+{
+ TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
+
+ detach(obj->rprocId);
+
+ free(obj->qIndexToFd);
+ free(obj);
+
+ *handlep = NULL;
+}
+
+static Int attach(UInt16 rprocId)
+{
+ Int status = MessageQ_S_SUCCESS;
+ int sock;
+
+ /* Create the socket for sending messages to the remote proc: */
+ sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ status = MessageQ_E_FAIL;
+ printf("attach: socket failed: %d (%s)\n",
+ errno, strerror(errno));
+
+ goto exit;
+ }
+
+ PRINTVERBOSE1("attach: created send socket: %d\n", sock)
+
+ /* Attempt to connect: */
+ status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
+ if (status < 0) {
+ /* is it ok to "borrow" this error code from MessageQ? */
+ status = MessageQ_E_RESOURCE;
+
+ /* don't hard-printf or exit since this is no longer fatal */
+ PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+
+ goto exitSock;
+ }
+
+ TransportRpmsg_module->sock[rprocId] = sock;
+
+ if (TransportRpmsg_module->threadStarted == FALSE) {
+ /* create a module wide event to unblock the socket select thread */
+ TransportRpmsg_module->unblockEvent = eventfd(0, 0);
+ if (TransportRpmsg_module->unblockEvent == -1) {
+ printf("attach: unblock socket failed: %d (%s)\n",
+ errno, strerror(errno));
+ status = MessageQ_E_FAIL;
+
+ goto exitSock;
+ }
+
+ PRINTVERBOSE1("attach: created unblock event %d\n",
+ TransportRpmsg_module->unblockEvent)
+
+ FD_ZERO(&TransportRpmsg_module->rfds);
+ FD_SET(TransportRpmsg_module->unblockEvent,
+ &TransportRpmsg_module->rfds);
+ TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+ TransportRpmsg_module->nInFds = 0;
+
+ pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+ status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+ &rpmsgThreadFxn, NULL);
+ if (status < 0) {
+ status = MessageQ_E_FAIL;
+ printf("attach: failed to spawn thread\n");
+
+ goto exitEvent;
+ }
+ else {
+ TransportRpmsg_module->threadStarted = TRUE;
+ }
+ }
+
+ goto exit;
+
+exitEvent:
+ close(TransportRpmsg_module->unblockEvent);
+
+ FD_ZERO(&TransportRpmsg_module->rfds);
+ TransportRpmsg_module->maxFd = 0;
+
+exitSock:
+ close(sock);
+ TransportRpmsg_module->sock[rprocId] = 0;
+
+exit:
+ return status;
+}
+
+static Int detach(UInt16 rprocId)
+{
+
+ Int status = -1;
+ int sock;
+
+ sock = TransportRpmsg_module->sock[rprocId];
+
+ if (sock) {
+ PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+
+ status = close(sock);
+ }
+
+ return status;
+}
+
+Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
+{
+ TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+ UInt16 queueIndex = queueId & 0x0000ffff;
+ int fd;
+ int err;
+ uint64_t buf;
+ UInt16 rprocId;
+
+ rprocId = obj->rprocId;
+
+ PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d queueIndex %d\n", rprocId, queueIndex)
+
+ /* Create the socket to receive messages for this messageQ. */
+ fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (fd < 0) {
+ printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
+ errno, strerror(errno));
+ goto exitClose;
+ }
+
+ PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
+
+ err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
+ if (err < 0) {
+ /* don't hard-printf since this is no longer fatal */
+ PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
+ errno, strerror(errno))
+
+ close(fd);
+
+ return -1;
+ }
+
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ /* add to our fat fd array and update select() parameters */
+ TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
+ TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
+ FD_SET(fd, &TransportRpmsg_module->rfds);
+
+ pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+ bindFdToQueueIndex(obj, fd, queueIndex);
+
+ /*
+ * Even though we use the unblock event as just a signalling event with
+ * no related payload, we need to write some non-zero value. Might as
+ * well make it the fd (which the reader could decide to use if needed).
+ */
+ buf = fd;
+ write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+
+ goto exit;
+
+exitClose:
+ TransportRpmsg_unbind(handle, fd);
+ fd = 0;
+
+exit:
+ return fd;
+}
+
+Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
+{
+ TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+ UInt16 queueIndex = queueId & 0x0000ffff;
+ uint64_t buf;
+ Int status = MessageQ_S_SUCCESS;
+ int maxFd;
+ int fd;
+ int i;
+ int j;
+
+ fd = queueIndexToFd(obj, queueIndex);
+ if (!fd) {
+ PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
+ queueId)
+
+ return -1;
+ }
+
+ PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
+
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ /* remove from input fd array */
+ for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+ if (TransportRpmsg_module->inFds[i] == fd) {
+ TransportRpmsg_module->nInFds--;
+
+ /* shift subsequent elements down */
+ for (j = i; j < TransportRpmsg_module->nInFds; j++) {
+ TransportRpmsg_module->inFds[j] =
+ TransportRpmsg_module->inFds[j + 1];
+ }
+ TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
+
+ FD_CLR(fd, &TransportRpmsg_module->rfds);
+ if (fd == TransportRpmsg_module->maxFd) {
+ /* find new max fd */
+ maxFd = TransportRpmsg_module->unblockEvent;
+ for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
+ maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
+ }
+ TransportRpmsg_module->maxFd = maxFd;
+ }
+
+ /*
+ * Even though we use the unblock event as just a signalling
+ * event with no related payload, we need to write some non-zero
+ * value. Might as well make it the fd (which the reader could
+ * decide to use if needed).
+ */
+ buf = fd;
+ write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+
+ break;
+ }
+
+ close(fd);
+ }
+
+ unbindQueueIndex(obj, queueIndex);
+
+ pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+ return status;
+}
+
+Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
+{
+ MessageQ_Msg msg = (MessageQ_Msg)pmsg;
+ Int status = TRUE;
+ int sock;
+ int err;
+ UInt16 dstProcId = msg->dstProc;
+
+ /*
+ * Retrieve the socket for the AF_SYSLINK protocol associated with this
+ * transport.
+ */
+ sock = TransportRpmsg_module->sock[dstProcId];
+ if (!sock) {
+ return FALSE;
+ }
+
+ PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
+
+ err = send(sock, msg, msg->msgSize, 0);
+ if (err < 0) {
+ printf("TransportRpmsg_put: send failed: %d (%s)\n",
+ errno, strerror(errno));
+ status = FALSE;
+
+ goto exit;
+ }
+
+ /*
+ * Free the message, as this is a copy transport, we maintain MessageQ
+ * semantics.
+ */
+ MessageQ_free(msg);
+
+exit:
+ return status;
+}
+
+Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
+{
+ return FALSE;
+}
+
+void *rpmsgThreadFxn(void *arg)
+{
+ static int lastFdx = 0;
+ int curFdx = 0;
+ Int status = MessageQ_S_SUCCESS;
+ Int tmpStatus;
+ int retval;
+ uint64_t buf;
+ fd_set rfds;
+ int maxFd;
+ int nfds;
+ MessageQ_Msg retMsg;
+ MessageQ_QueueId queueId;
+
+ while (TRUE) {
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ maxFd = TransportRpmsg_module->maxFd;
+ rfds = TransportRpmsg_module->rfds;
+ nfds = TransportRpmsg_module->nInFds;
+
+ pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+ PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
+ (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+
+ retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
+ if (retval) {
+ if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+ /*
+ * Our event was signalled by TransportRpmsg_bind()
+ * or TransportRpmsg_unbind() to tell us that the set of
+ * fds has changed.
+ */
+ PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
+
+ /* we don't need the written value */
+ read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+ }
+ else {
+ /* start where we last left off */
+ curFdx = lastFdx;
+
+ /*
+ * The set of fds that's used by select has been recorded
+ * locally, but the array of fds that are scanned below is
+ * a changing set (MessageQ_create/delete() can change it).
+ * While this might present an issue in itself, one key
+ * takeaway is that 'nfds' must not be zero else the % below
+ * will cause a divide-by-zero exception. We won't even get
+ * here if nfds == 0 since it's a local copy of the module's
+ * 'nInFds' which has to be > 0 for us to get here. So, even
+ * though the module's 'nInFds' might go to 0 during this loop,
+ * the loop itself will still remain intact.
+ */
+ do {
+ if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
+
+ PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+ TransportRpmsg_module->inFds[curFdx])
+
+ /* transport input fd was signalled: get the message */
+ tmpStatus = transportGet(
+ TransportRpmsg_module->inFds[curFdx], &retMsg);
+ if (tmpStatus < 0) {
+ printf("rpmsgThreadFxn: transportGet failed.");
+ status = MessageQ_E_FAIL;
+ }
+ else {
+ queueId = MessageQ_getDstQueue(retMsg);
+
+ PRINTVERBOSE1("rpmsgThreadFxn: got message, delivering to queueId 0x%x\n", queueId)
+
+ MessageQ_put(queueId, retMsg);
+ }
+
+ lastFdx = (curFdx + 1) % nfds;
+
+ break;
+ }
+
+ curFdx = (curFdx + 1) % nfds;
+ } while (curFdx != lastFdx);
+ }
+ }
+ }
+
+ return (void *)status;
+}
+
+/*
+ * ======== transportGet ========
+ * Retrieve a message waiting in the socket's queue.
+*/
+static Int transportGet(int sock, MessageQ_Msg *retMsg)
+{
+ Int status = MessageQ_S_SUCCESS;
+ MessageQ_Msg msg;
+ struct sockaddr_rpmsg fromAddr; /* [Socket address of sender] */
+ unsigned int len;
+ int byteCount;
+
+ /*
+ * We have no way of peeking to see what message size we'll get, so we
+ * allocate a message of max size to receive contents from the rpmsg socket
+ * (currently, a copy transport)
+ */
+ msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
+ if (!msg) {
+ status = MessageQ_E_MEMORY;
+ goto exit;
+ }
+
+ memset(&fromAddr, 0, sizeof (fromAddr));
+ len = sizeof (fromAddr);
+
+ byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
+ (struct sockaddr *)&fromAddr, &len);
+ if (len != sizeof (fromAddr)) {
+ printf("recvfrom: got bad addr len (%d)\n", len);
+ status = MessageQ_E_FAIL;
+ goto exit;
+ }
+ if (byteCount < 0) {
+ printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
+ status = MessageQ_E_FAIL;
+ goto exit;
+ }
+ else {
+ /*
+ * Update the allocated message size (even though this may waste
+ * space when the actual message is smaller than the maximum rpmsg
+ * size, the message will be freed soon anyway, and it avoids an
+ * extra copy).
+ */
+ msg->msgSize = byteCount;
+
+ /*
+ * If the message received was statically allocated, reset the
+ * heapId, so the app can free it.
+ */
+ if (msg->heapId == MessageQ_STATICMSG) {
+ msg->heapId = 0; /* for a copy transport, heap id is 0. */
+ }
+ }
+
+ PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
+ PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
+ PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+
+ *retMsg = msg;
+
+exit:
+ return status;
+}
+
+Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
+{
+ Int *queues;
+ Int *oldQueues;
+ UInt oldSize;
+
+ if (qIndex >= obj->numQueues) {
+ PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
+ qIndex + TransportRpmsg_GROWSIZE)
+
+ oldSize = obj->numQueues * sizeof (Int);
+
+ queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
+ memcpy(queues, obj->qIndexToFd, oldSize);
+
+ oldQueues = obj->qIndexToFd;
+ obj->qIndexToFd = queues;
+ obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
+
+ free(oldQueues);
+ }
+
+ obj->qIndexToFd[qIndex] = fd;
+}
+
+Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
+{
+ obj->qIndexToFd[qIndex] = 0;
+}
+
+Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
+{
+ return obj->qIndexToFd[qIndex];
+}
+
index 4971dffa90885c40dd75ad0ea6bcb0d17c8914fb..0453f4134af1b9f254ecf6a7d482c3dd382ed266 100644 (file)
*/
typedef Void (*MessageQ_FreeHookFxn)(Bits16 heapId, Bits16 msgId);
+#ifdef STD_H
+#include <ITransport.h>
+#include <IMessageQTransport.h>
+#else
+#include <ti/sdo/ipc/interfaces/ITransport.h>
+#include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
+#endif
+
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+ UInt16 rprocId, UInt priority);
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority);
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst);
+Void MessageQ_unregisterTransportId(UInt tid);
+
+
/* =============================================================================
* MessageQ Module-wide Functions
* =============================================================================
diff --git a/packages/ti/ipc/tests/messageq_multimulti.c b/packages/ti/ipc/tests/messageq_multimulti.c
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of Texas Instruments Incorporated nor the names of
+ * its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * ======== messageq_multi.c ========
+ *
+ * Test for messageq operating in multiple simultaneous threads.
+ *
+ */
+
+#include <xdc/std.h>
+#include <xdc/runtime/Error.h>
+#include <xdc/runtime/Memory.h>
+#include <xdc/runtime/System.h>
+
+#include <ti/sysbios/BIOS.h>
+#include <ti/sysbios/knl/Task.h>
+#include <ti/sysbios/heaps/HeapBuf.h>
+
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+
+#define SLAVE_MESSAGEQNAME "SLAVE"
+#define HOST_MESSAGEQNAME "HOST"
+#define NUMTHREADS 25
+#define NUMLOOPS 1000
+
+static int numTests = 0;
+
+#undef BENCHMARK
+
+/*
+ * ======== loopbackFxn========
+ * Receive and return messages.
+ * Run at priority lower than tsk1Fxn above.
+ * Inputs:
+ * - arg0: number of the thread, appended to MessageQ host and slave names.
+ */
+Void loopbackFxn(UArg arg0, UArg arg1)
+{
+ MessageQ_Msg getMsg;
+ MessageQ_Handle messageQ;
+ MessageQ_QueueId remoteQueueId;
+ Int status;
+ UInt16 msgId = 0;
+ Char localQueueName[64];
+ Char hostQueueName[64];
+
+ System_printf("Thread loopbackFxn: %d\n", arg0);
+
+ System_sprintf(localQueueName, "%s_%d%d", SLAVE_MESSAGEQNAME, arg0, arg1);
+ System_sprintf(hostQueueName, "%s_%d", HOST_MESSAGEQNAME, arg0);
+
+ /* Create a message queue. */
+ messageQ = MessageQ_create(localQueueName, NULL);
+ if (messageQ == NULL) {
+ System_abort("MessageQ_create failed\n");
+ }
+
+ System_printf("loopbackFxn: created MessageQ: %s; QueueID: 0x%x\n",
+ localQueueName, MessageQ_getQueueId(messageQ));
+
+ System_printf("Start the main loop: %d\n", arg0);
+ while (msgId < NUMLOOPS) {
+ /* Get a message */
+ status = MessageQ_get(messageQ, &getMsg, MessageQ_FOREVER);
+ if (status != MessageQ_S_SUCCESS) {
+ System_abort("This should not happen since timeout is forever\n");
+ }
+ remoteQueueId = MessageQ_getReplyQueue(getMsg);
+
+#ifndef BENCHMARK
+ System_printf("%d: Received message #%d from core %d\n",
+ arg0, MessageQ_getMsgId(getMsg),
+ MessageQ_getProcId(remoteQueueId));
+#endif
+ /* test id of message received */
+ if (MessageQ_getMsgId(getMsg) != msgId) {
+ System_abort("The id received is incorrect!\n");
+ }
+
+#ifndef BENCHMARK
+ /* Send it back */
+ System_printf("%d: Sending message Id #%d to core %d\n",
+ arg0, msgId, MessageQ_getProcId(remoteQueueId));
+#endif
+ status = MessageQ_put(remoteQueueId, getMsg);
+ if (status != MessageQ_S_SUCCESS) {
+ System_abort("MessageQ_put had a failure/error\n");
+ }
+ msgId++;
+ }
+
+ MessageQ_delete(&messageQ);
+ numTests += NUMLOOPS;
+
+ System_printf("Test thread %d complete!\n", arg0);
+}
+
+/*
+ * ======== main ========
+ */
+Int main(Int argc, Char* argv[])
+{
+ Task_Params params;
+ Int i;
+
+ System_printf("%s:main: MultiProc id = %d\n", __FILE__, MultiProc_self());
+
+ /* Create N threads to correspond with host side N thread test app: */
+ Task_Params_init(¶ms);
+ params.priority = 3;
+ params.arg1 = MultiProc_self();
+ for (i = 0; i < NUMTHREADS; i++) {
+ params.arg0 = i;
+ Task_create(loopbackFxn, ¶ms, NULL);
+ }
+
+ BIOS_start();
+
+ return (0);
+ }
index 36319771c883cae70f7aa9266a6a311a6ea0041e..2d27e446f374631865772decf444a5d9ddc301e2 100644 (file)
defs: "-D BENCHMARK" + extraDefs
}).addObjects(["messageq_multi.c"]);
+ /* messageq_multimulti */
+ Pkg.addExecutable(name + "/messageq_multimulti", targ, platform, {
+ cfgScript: "rpmsg_transport",
+ defs: "-D BENCHMARK" + extraDefs
+ }).addObjects(["messageq_multimulti.c"]);
+
/* messageq_single */
if (platform.match(/^ti\.platforms\.sdp5430/) &&
(targ.isa == "64T")) {