Initial MessageQ transport implementation 3.35.00.01_eng
authorRobert Tivy <rtivy@ti.com>
Wed, 19 Nov 2014 22:16:31 +0000 (14:16 -0800)
committerRobert Tivy <rtivy@ti.com>
Thu, 27 Nov 2014 00:26:15 +0000 (16:26 -0800)
22 files changed:
.gitignore
Makefile.am
Makefile.in
configure
configure.ac
linux/include/IMessageQTransport.h [new file with mode: 0644]
linux/include/INetworkTransport.h [new file with mode: 0644]
linux/include/ITransport.h [new file with mode: 0644]
linux/include/TransportRpmsg.h [new file with mode: 0644]
linux/include/ti/ipc/Std.h
linux/src/api/Ipc.c
linux/src/api/MessageQ.c
linux/src/daemon/MessageQ_daemon.c
linux/src/tests/Makefile.am
linux/src/tests/Makefile.in
linux/src/tests/MessageQMultiMulti.c [new file with mode: 0644]
linux/src/transport/Makefile.am [new file with mode: 0644]
linux/src/transport/Makefile.in [new file with mode: 0644]
linux/src/transport/TransportRpmsg.c [new file with mode: 0644]
packages/ti/ipc/MessageQ.h
packages/ti/ipc/tests/messageq_multimulti.c [new file with mode: 0644]
packages/ti/ipc/tests/package.bld

index 89703178c2ae994cfd6b132ebcb66cd4c3362ffb..4b9da0fa95211d390f585284aca8d1a08d26fddf 100644 (file)
@@ -53,6 +53,7 @@ packages/ti/srvmgr/omx/OmxSrvMgr.h
 /linux/src/tests/MessageQApp
 /linux/src/tests/MessageQBench
 /linux/src/tests/MessageQMulti
+/linux/src/tests/MessageQMultiMulti
 /linux/src/tests/Msgq100
 /linux/src/tests/NameServerApp
 /linux/src/tests/nano_test_*
index eeb5d108a61ee1fa7827ff1a3ddcd4c08c5357e3..5e125bed1aea78c0e1cc05024692e8ef23f8f1be 100644 (file)
@@ -32,7 +32,7 @@
 ##
 
 # the subdirectories of the project to go into
-SUBDIRS =  linux/etc linux/src/utils linux/src/api \
+SUBDIRS =  linux/etc linux/src/utils linux/src/api linux/src/transport \
         linux/src/mm linux/src/daemon linux/src/tests
 
 # where to install common headers on the system
index 8f8b4c6d46813c8d4912239b892e27ff2d23b08c..c7bab880f7ef58cc60c4ee2fa0f5ec7eba779e11 100644 (file)
@@ -208,7 +208,7 @@ sysconfdir = @sysconfdir@
 target_alias = @target_alias@
 
 # the subdirectories of the project to go into
-SUBDIRS = linux/etc linux/src/utils linux/src/api \
+SUBDIRS = linux/etc linux/src/utils linux/src/api linux/src/transport \
         linux/src/mm linux/src/daemon linux/src/tests
 
 
index a2d57cc0abfe608cfe253bd7f4eee2f3f765f385..df1798ffd614efd85ea734b4fc0b5ec33d3febf7 100755 (executable)
--- a/configure
+++ b/configure
@@ -17001,6 +17001,8 @@ fi
 
           ac_config_files="$ac_config_files linux/src/tests/Makefile"
 
+          ac_config_files="$ac_config_files linux/src/transport/Makefile"
+
 cat >confcache <<\_ACEOF
 # This file is a shell script that caches the results of configure
 # tests run on this system so they can be shared between configure
@@ -17684,6 +17686,7 @@ do
   "linux/src/mm/libmmrpc.pc" ) CONFIG_FILES="$CONFIG_FILES linux/src/mm/libmmrpc.pc" ;;
   "linux/src/daemon/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/daemon/Makefile" ;;
   "linux/src/tests/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/tests/Makefile" ;;
+  "linux/src/transport/Makefile" ) CONFIG_FILES="$CONFIG_FILES linux/src/transport/Makefile" ;;
   "depfiles" ) CONFIG_COMMANDS="$CONFIG_COMMANDS depfiles" ;;
   *) { { echo "$as_me:$LINENO: error: invalid argument: $ac_config_target" >&5
 echo "$as_me: error: invalid argument: $ac_config_target" >&2;}
index 3bd2d896c1bba40c9beb32bd6f5763cb2ace1ce7..97f51beccf3f715f72d8d864d64e8d88d934bdf2 100644 (file)
@@ -132,6 +132,7 @@ AC_CONFIG_FILES([linux/src/mm/Makefile])
 AC_CONFIG_FILES([linux/src/mm/libmmrpc.pc])
 AC_CONFIG_FILES([linux/src/daemon/Makefile])
 AC_CONFIG_FILES([linux/src/tests/Makefile])
+AC_CONFIG_FILES([linux/src/transport/Makefile])
 AC_OUTPUT
 
 echo \
diff --git a/linux/include/IMessageQTransport.h b/linux/include/IMessageQTransport.h
new file mode 100644 (file)
index 0000000..81cff91
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ *  ======== IMessageQTransport.h ========
+ */
+
+#ifndef IMESSAGEQTRANSPORT_H
+#define IMESSAGEQTRANSPORT_H
+
+#include <ITransport.h>
+
+/* opaque instance handle */
+typedef struct IMessageQTransport_Object *IMessageQTransport_Handle;
+
+#define IMessageQTransport_TypeId 0x02
+
+/* virtual functions */
+typedef struct IMessageQTransport_Fxns {
+    Int (*bind)(void *handle, UInt32 queueId);
+    Int (*unbind)(void *handle, UInt32 queueId);
+    Bool (*put)(void *handle, Ptr msg);
+} IMessageQTransport_Fxns;
+
+/* abstract instance object */
+typedef struct IMessageQTransport_Object {
+    ITransport_Object base;             /* inheritance */
+    IMessageQTransport_Fxns *fxns;      /* virtual functions */
+} IMessageQTransport_Object;
+
+/* function stubs */
+static inline
+Int IMessageQTransport_bind(IMessageQTransport_Handle inst, UInt32 queueId)
+{
+    IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+    return obj->fxns->bind((void *)inst, queueId);
+}
+
+static inline
+Int IMessageQTransport_unbind(IMessageQTransport_Handle inst, UInt32 queueId)
+{
+    IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+    return obj->fxns->unbind((void *)inst, queueId);
+}
+
+static inline
+Bool IMessageQTransport_put(IMessageQTransport_Handle inst, Ptr msg)
+{
+    IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+    return obj->fxns->put((void *)inst, msg);
+}
+
+/* instance convertors */
+static inline
+ITransport_Handle IMessageQTransport_upCast(IMessageQTransport_Handle inst)
+{
+    IMessageQTransport_Object *obj = (IMessageQTransport_Object *)inst;
+    return (ITransport_Handle)&obj->base;
+}
+
+static inline
+IMessageQTransport_Handle IMessageQTransport_downCast(ITransport_Handle base)
+{
+    return (IMessageQTransport_Handle)base;
+}
+
+#endif /* IMESSAGEQTRANSPORT_H */
+
diff --git a/linux/include/INetworkTransport.h b/linux/include/INetworkTransport.h
new file mode 100644 (file)
index 0000000..956fc67
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ *  ======== INetworkTransport.h ========
+ */
+
+#ifndef INETWORKTRANSPORT_H
+#define INETWORKTRANSPORT_H
+
+#include <ITransport.h>
+
+/* opaque instance handle */
+typedef struct INetworkTransport_Object *INetworkTransport_Handle;
+
+#define INetworkTransport_TypeId 0x03
+
+/* virtual functions */
+typedef struct INetworkTransport_Fxns {
+    Int (*bind)(void *handle, UInt32 queueId);
+    Int (*unbind)(void *handle, UInt32 queueId);
+    Bool (*put)(void *handle, Ptr msg);
+} INetworkTransport_Fxns;
+
+/* abstract instance object */
+typedef struct INetworkTransport_Object {
+    ITransport_Object base;             /* inheritance */
+    INetworkTransport_Fxns *fxns;       /* virtual functions */
+} INetworkTransport_Object;
+
+/* function stubs */
+static inline
+Bool INetworkTransport_bind(INetworkTransport_Handle inst, UInt32 queueId)
+{
+    INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+    return obj->fxns->bind((void *)inst, queueId);
+}
+
+static inline
+Bool INetworkTransport_unbind(INetworkTransport_Handle inst, UInt32 queueId)
+{
+    INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+    return obj->fxns->unbind((void *)inst, queueId);
+}
+
+static inline
+Bool INetworkTransport_put(INetworkTransport_Handle inst, Ptr msg)
+{
+    INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+    return obj->fxns->put((void *)inst, msg);
+}
+
+/* instance convertors */
+ITransport_Handle INetworkTransport_upCast(INetworkTransport_Handle inst)
+{
+    INetworkTransport_Object *obj = (INetworkTransport_Object *)inst;
+    return (ITransport_Handle)&obj->base;
+}
+
+INetworkTransport_Handle INetworkTransport_downCast(ITransport_Handle base)
+{
+    return (INetworkTransport_Handle)base;
+}
+
+#endif /* INETWORKTRANSPORT_H */
+
diff --git a/linux/include/ITransport.h b/linux/include/ITransport.h
new file mode 100644 (file)
index 0000000..4287242
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ *  ======== ITransport.h ========
+ */
+
+#ifndef ITRANSPORT_H
+#define ITRANSPORT_H
+
+/* opaque instance handle */
+typedef struct ITransport_Object *ITransport_Handle;
+
+#define ITransport_TypeId 0x01
+
+/* instance object */
+typedef struct ITransport_Object {
+    Int interfaceType;
+} ITransport_Object;
+
+/* instance functions */
+static inline
+Int ITransport_itype(ITransport_Handle inst)
+{
+    ITransport_Object *obj = (ITransport_Object *)inst;
+    return obj->interfaceType;
+}
+
+#endif /* ITRANSPORT_H */
+
diff --git a/linux/include/TransportRpmsg.h b/linux/include/TransportRpmsg.h
new file mode 100644 (file)
index 0000000..ed2aa7d
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ *  ======== TransportRpmsg.h ========
+ */
+
+/**
+ *  @file       TransportRpmsg.h
+ *
+ *  @brief      Rpmsg transports implemenation
+ *
+ *              The transports can be register with MessageQ. This is done
+ *              via the MessageQ_registerTransport function.
+ */
+
+#ifndef _TransportRpmsg_
+#define _TransportRpmsg_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <ti/ipc/Std.h>
+#include <IMessageQTransport.h>
+
+struct TransportRpmsg_Params {
+    UInt16 rprocId;
+};
+typedef struct TransportRpmsg_Params TransportRpmsg_Params;
+
+typedef IMessageQTransport_Handle TransportRpmsg_Handle;
+
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *param,
+                                            Int *attachStatus);
+Void TransportRpmsg_delete(TransportRpmsg_Handle *hp);
+
+IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle);
+TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
index 43891966439c27c3f8587f17df82030cd513816b..7bef96fe070daa90b261f37f962f705b37961318 100644 (file)
@@ -94,6 +94,8 @@ typedef PVOID           HANDLE;         /* h    */
 #define FAIL   -1
 //#define NULL            '\0'
 
+#define MAX(a,b) (((a)>(b))?(a):(b))
+
 #if defined (IPC_BUILDOS_ANDROID)
 #define MAX(a,b) (((a)>(b))?(a):(b))
 #endif
index 6dbb0b6cf622570d033405038a3e0689478c0bf2..ffad32bb5ce97ffb67c19e64c4ac41b1b3a049a8 100644 (file)
@@ -46,6 +46,8 @@
 #include <ti/ipc/Std.h>
 #include <ti/ipc/Ipc.h>
 #include <ti/ipc/NameServer.h>
+#include <IMessageQTransport.h>
+#include <TransportRpmsg.h>
 
 /* User side headers */
 #include <ladclient.h>
@@ -71,15 +73,19 @@ static void cleanup(int arg);
 /* Function to start Ipc */
 Int Ipc_start (Void)
 {
+    TransportRpmsg_Handle  transport;
+    TransportRpmsg_Params  params;
+    IMessageQTransport_Handle iMsgQTrans;
     MessageQ_Config        msgqCfg;
     MultiProc_Config       mpCfg;
 #if defined(GATEMP_SUPPORT)
     GateHWSpinlock_Config  gateHWCfg;
 #endif
+    Int                    attachStatus;
     Int32                  status;
     LAD_Status             ladStatus;
     UInt16                 rprocId;
-    Int32                  attachedAny = 0;
+    Int32                  attachedAny;
 
     /* Catch ctrl-C, and cleanup: */
     (void) signal(SIGINT, cleanup);
@@ -138,29 +144,44 @@ Int Ipc_start (Void)
         MessageQ_getConfig(&msgqCfg);
         MessageQ_setup(&msgqCfg);
 
-        /* Now attach to all remote processors, assuming they are up. */
+        /*
+         * Attach to all remote processors.  We need to attach to
+         * at least one, so tolerate MessageQ_E_RESOURCE failures for
+         * now.
+         */
+        status = Ipc_S_SUCCESS;
+        attachedAny = FALSE;
+
         for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
             if (0 == rprocId) {
                 /* Skip host, which should always be 0th entry. */
                 continue;
             }
-            status = MessageQ_attach(rprocId, NULL);
-            if (status == MessageQ_E_RESOURCE) {
-                continue;
+
+            params.rprocId = rprocId;
+            transport = TransportRpmsg_create(&params, &attachStatus);
+
+            if (transport) {
+                iMsgQTrans = TransportRpmsg_upCast(transport);
+                MessageQ_registerTransport(iMsgQTrans, rprocId, 0);
+
+                attachedAny = TRUE;
             }
-            if (status < 0) {
-                printf("Ipc_start: MessageQ_attach(%d) failed: %d\n",
-                       rprocId, status);
+            else {
+                if (attachStatus == MessageQ_E_RESOURCE) {
+                    continue;
+                }
+
+                printf("Ipc_start: failed to attach to %d: %d\n",
+                       rprocId, attachStatus);
+
                 status = Ipc_E_FAIL;
 
                 break;
             }
-            else {
-                attachedAny = 1;
-            }
         }
-        if (attachedAny) {
-            status = Ipc_S_SUCCESS;
+        if (!attachedAny) {
+            status = Ipc_E_FAIL;
         }
     }
     else {
@@ -202,10 +223,12 @@ Int Ipc_start (Void)
 gatempstart_fail:
     GateHWSpinlock_stop();
 gatehwspinlockstart_fail:
+#if 0
     for (rprocId = rprocId - 1; (rprocId > 0) && (status >= 0); rprocId--) {
         MessageQ_detach(rprocId);
     }
 #endif
+#endif
 
 exit:
     return (status);
@@ -227,6 +250,7 @@ Int Ipc_stop (Void)
           /* Skip host, which should always be 0th entry. */
           continue;
         }
+#if 0
         status = MessageQ_detach(rprocId);
         if (status < 0) {
             printf("Ipc_stop: MessageQ_detach(%d) failed: %d\n",
@@ -234,6 +258,7 @@ Int Ipc_stop (Void)
             status = Ipc_E_FAIL;
             goto exit;
        }
+#endif
     }
 
     status = MessageQ_destroy();
index 8b958cbcba7cdccbd0add2425a606d5d21ffd2a2..7a20a7821aa629b0af73ccc8283ad38e6a3744a7 100644 (file)
@@ -49,6 +49,9 @@
 #include <_MultiProc.h>
 #include <ti/ipc/MessageQ.h>
 #include <_MessageQ.h>
+#include <ITransport.h>
+#include <IMessageQTransport.h>
+#include <INetworkTransport.h>
 
 /* Socket Headers */
 #include <sys/select.h>
@@ -56,7 +59,7 @@
 #include <sys/types.h>
 #include <sys/param.h>
 #include <sys/eventfd.h>
-#include <sys/socket.h>
+#include <sys/queue.h>
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 #include <assert.h>
 #include <pthread.h>
+#include <semaphore.h>
 
 /* Socket Protocol Family */
 #include <net/rpmsg.h>
 
-/* Socket utils: */
-#include <SocketFxns.h>
-
 #include <ladclient.h>
 #include <_lad.h>
 
  */
 #define MessageQ_NAMESERVER  "MessageQ"
 
-/*!
- *  @brief  Value of an invalid socket ID:
- */
-#define Transport_INVALIDSOCKET  (0xFFFFFFFF)
+#define MessageQ_MAXTRANSPORTS 8
 
-/* More magic rpmsg port numbers: */
-#define MESSAGEQ_RPMSG_PORT       61
-#define MESSAGEQ_RPMSG_MAXSIZE   512
+#define MessageQ_GROWSIZE 32
 
 /* Trace flag settings: */
 #define TRACESHIFT    12
 
 /* structure for MessageQ module state */
 typedef struct MessageQ_ModuleObject {
-    Int                 refCount;
-    /*!< Reference count */
-    NameServer_Handle   nameServer;
-    /*!< Handle to the local NameServer used for storing GP objects */
-    pthread_mutex_t     gate;
-    /*!< Handle of gate to be used for local thread safety */
-    MessageQ_Params     defaultInstParams;
-    /*!< Default instance creation parameters */
-    int                 sock[MultiProc_MAXPROCESSORS];
-    /*!< Sockets to for sending to each remote processor */
-    int                 seqNum;
-    /*!< Process-specific sequence number */
+    MessageQ_Handle           *queues;
+    Int                       numQueues;
+    Int                       refCount;
+    NameServer_Handle         nameServer;
+    pthread_mutex_t           gate;
+    MessageQ_Params           defaultInstParams;
+    int                       seqNum;
+    IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
+    INetworkTransport_Handle  transInst[MessageQ_MAXTRANSPORTS];
 } MessageQ_ModuleObject;
 
+typedef struct MessageQ_CIRCLEQ_ENTRY {
+     CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
+} MessageQ_CIRCLEQ_ENTRY;
+
 /*!
  *  @brief  Structure for the Handle for the MessageQ.
  */
 typedef struct MessageQ_Object_tag {
-    MessageQ_Params         params;
-    /*! Instance specific creation parameters */
-    MessageQ_QueueId        queue;
-    /* Unique id */
-    int                     fd[MultiProc_MAXPROCESSORS];
-    /* File Descriptor to block on messages from remote processors. */
-    int                     unblockFd;
-    /* Write this fd to unblock the select() call in MessageQ _get() */
-    void                    *serverHandle;
+    CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+    MessageQ_Params              params;
+    MessageQ_QueueId             queue;
+    int                          unblocked;
+    void                         *serverHandle;
+    sem_t                        synchronizer;
 } MessageQ_Object;
 
 /* traces in this file are controlled via _MessageQ_verbose */
 Bool _MessageQ_verbose = FALSE;
 #define verbose _MessageQ_verbose
 
-
 /* =============================================================================
  *  Globals
  * =============================================================================
  */
 static MessageQ_ModuleObject MessageQ_state =
 {
-    .refCount               = 0,
-    .nameServer             = NULL,
+    .refCount   = 0,
+    .nameServer = NULL,
 };
 
 /*!
@@ -156,28 +149,93 @@ static MessageQ_ModuleObject MessageQ_state =
  *
  *  @brief  Pointer to the MessageQ module state.
  */
-MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
+MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
 
+Void _MessageQ_grow(UInt16 queueIndex);
 
 /* =============================================================================
- * Forward declarations of internal functions
+ * APIS
  * =============================================================================
  */
 
-/* This is a helper function to initialize a message. */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
-static Int transportCloseEndpoint(int fd);
-static Int transportGet(int sock, MessageQ_Msg * retMsg);
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+                                UInt16 rprocId, UInt priority)
+{
+    Int status = FALSE;
 
-/* =============================================================================
- * APIS
- * =============================================================================
- */
-/* Function to get default configuration for the MessageQ module.
- *
+    if (handle == NULL) {
+        printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+              );
+
+        return status;
+    }
+
+    if (rprocId >= MultiProc_MAXPROCESSORS) {
+        printf("MessageQ_registerTransport: invalid procId %d\n", rprocId);
+
+        return status;
+    }
+
+    if (MessageQ_module->transports[rprocId][priority] == NULL) {
+        MessageQ_module->transports[rprocId][priority] = handle;
+
+        status = TRUE;
+    }
+
+    return status;
+}
+
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+    if (inst == NULL) {
+        printf("MessageQ_registerTransportId: invalid NULL handle\n");
+
+        return MessageQ_E_INVALIDARG;
+    }
+
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+        return MessageQ_E_INVALIDARG;
+    }
+
+    if (MessageQ_module->transInst[tid] != NULL) {
+        printf("MessageQ_registerTransportId: transport id %d already registered\n", tid);
+
+        return MessageQ_E_ALREADYEXISTS;
+    }
+
+    MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst;
+
+    return MessageQ_S_SUCCESS;
+}
+
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
+{
+    if (rprocId >= MultiProc_MAXPROCESSORS) {
+        printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId);
+
+        return;
+    }
+
+    MessageQ_module->transports[rprocId][priority] = NULL;
+}
+
+Void MessageQ_unregisterTransportId(UInt tid)
+{
+    if (tid >= MessageQ_MAXTRANSPORTS) {
+        printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+        return;
+    }
+
+    MessageQ_module->transInst[tid] = NULL;
+}
+
+/*
+ * Function to get default configuration for the MessageQ module.
  */
-Void MessageQ_getConfig (MessageQ_Config * cfg)
+Void MessageQ_getConfig(MessageQ_Config *cfg)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -205,7 +263,8 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
     }
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
-        PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
+        PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
+                      status)
         return;
     }
     status = rsp.messageQGetConfig.status;
@@ -214,19 +273,38 @@ Void MessageQ_getConfig (MessageQ_Config * cfg)
       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
+    memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
 
     return;
 }
 
-/* Function to setup the MessageQ module. */
-Int MessageQ_setup(const MessageQ_Config * cfg)
+/*
+ *  Function to setup the MessageQ module.
+ */
+Int MessageQ_setup(const MessageQ_Config *cfg)
 {
     Int status;
     LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
-    Int i;
+    Int pri;
+    Int rprocId;
+    Int tid;
+
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    MessageQ_module->refCount++;
+    if (MessageQ_module->refCount > 1) {
+
+        pthread_mutex_unlock(&MessageQ_module->gate);
+
+        PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
+                      MessageQ_module->refCount)
+
+        return MessageQ_S_ALREADYSETUP;
+    }
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
 
     handle = LAD_findHandle();
     if (handle == LAD_MAXNUMCLIENTS) {
@@ -239,7 +317,7 @@ Int MessageQ_setup(const MessageQ_Config * cfg)
 
     cmd.cmd = LAD_MESSAGEQ_SETUP;
     cmd.clientId = handle;
-    memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
+    memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg));
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
         PRINTVERBOSE1(
@@ -249,7 +327,7 @@ Int MessageQ_setup(const MessageQ_Config * cfg)
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
-        return(status);
+        return status;
     }
     status = rsp.setup.status;
 
@@ -257,26 +335,30 @@ Int MessageQ_setup(const MessageQ_Config * cfg)
       "MessageQ_setup: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    MessageQ_module->nameServer = rsp.setup.nameServerHandle;
     MessageQ_module->seqNum = 0;
+    MessageQ_module->nameServer = rsp.setup.nameServerHandle;
+    MessageQ_module->numQueues = cfg->maxRuntimeEntries;
+    MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
+                                     sizeof (MessageQ_Handle));
 
-    /* Create a default local gate. */
-    pthread_mutex_init (&(MessageQ_module->gate), NULL);
+    pthread_mutex_init(&MessageQ_module->gate, NULL);
 
-    /* Clear sockets array. */
-    for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
-        MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
+    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+        for (pri = 0; pri < 2; pri++) {
+            MessageQ_module->transports[rprocId][pri] = NULL;
+        }
+    }
+    for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        MessageQ_module->transInst[tid] = NULL;
     }
 
     return status;
 }
 
 /*
- * Function to destroy the MessageQ module.
- * Destroys socket/protocol maps;  sockets themselves should have been
- * destroyed in MessageQ_delete() and MessageQ_detach() calls.
+ *  MessageQ_destroy - destroy the MessageQ module.
  */
-Int MessageQ_destroy (void)
+Int MessageQ_destroy(void)
 {
     Int status;
     LAD_ClientHandle handle;
@@ -303,7 +385,7 @@ Int MessageQ_destroy (void)
 
     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
-        return(status);
+        return status;
     }
     status = rsp.status;
 
@@ -314,8 +396,9 @@ Int MessageQ_destroy (void)
     return status;
 }
 
+
 /* Function to initialize the parameters for the MessageQ instance. */
-Void MessageQ_Params_init (MessageQ_Params * params)
+Void MessageQ_Params_init(MessageQ_Params *params)
 {
     memcpy (params, &(MessageQ_module->defaultInstParams),
             sizeof (MessageQ_Params));
@@ -324,18 +407,18 @@ Void MessageQ_Params_init (MessageQ_Params * params)
 }
 
 /*
- *   Function to create a MessageQ object for receiving.
- *
- *   Create a socket and bind the source address (local ProcId/MessageQ ID) in
- *   order to get messages dispatched to this messageQ.
+ *  MessageQ_create - create a MessageQ object for receiving.
  */
-MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params)
 {
     Int                   status;
-    MessageQ_Object *     obj    = NULL;
-    UInt16                queueIndex = 0u;
-    UInt16                procId;
+    MessageQ_Object      *obj = NULL;
+    IMessageQTransport_Handle transport;
+    INetworkTransport_Handle transInst;
+    UInt16                queueIndex;
     UInt16                rprocId;
+    Int                   tid;
+    Int                   priority;
     LAD_ClientHandle      handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
@@ -361,7 +444,7 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
     }
 
     if (params) {
-        memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
+        memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params));
     }
 
     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
@@ -395,70 +478,69 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
         memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
     }
 
-    procId = MultiProc_self();
-    queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
+    queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff);
+
     obj->queue = rsp.messageQCreate.queueId;
     obj->serverHandle = rsp.messageQCreate.serverHandle;
+    CIRCLEQ_INIT(&obj->msgList);
+    if (sem_init(&obj->synchronizer, 0, 0) < 0) {
+        PRINTVERBOSE1(
+          "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
 
-    /*
-     * Create a set of communication endpoints (one per each remote proc),
-     * and return the socket as target for MessageQ_put() calls, and as
-     * a file descriptor to close during MessageQ_delete().
-     */
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        obj->fd[rprocId] = Transport_INVALIDSOCKET;
-        if (procId == rprocId) {
-            /* Skip creating an endpoint for ourself. */
-            continue;
-        }
+        MessageQ_delete((MessageQ_Handle *)&obj);
 
-        PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
+        return NULL;
+    }
 
-        status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
-                                           queueIndex);
-        if (status < 0) {
-           obj->fd[rprocId] = Transport_INVALIDSOCKET;
+    PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex)
+
+    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+       for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[rprocId][priority];
+            if (transport) {
+                /* need to check return and do something if error */
+                IMessageQTransport_bind((Void *)transport, obj->queue);
+            }
+        }
+    }
+    for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        transInst = MessageQ_module->transInst[tid];
+        if (transInst) {
+            /* need to check return and do something if error */
+            INetworkTransport_bind((Void *)transInst, obj->queue);
         }
     }
 
     /*
-     * Now, to support MessageQ_unblock() functionality, create an event object.
-     * Writing to this event will unblock the select() call in MessageQ_get().
+     * Since LAD's MessageQ_module can grow, we need to be able to grow as well
      */
-    obj->unblockFd = eventfd(0, 0);
-    if (obj->unblockFd == -1)  {
-        printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
-                   errno, strerror(errno));
-        MessageQ_delete((MessageQ_Handle *)&obj);
+    if (queueIndex >= MessageQ_module->numQueues) {
+        _MessageQ_grow(queueIndex);
     }
-    else {
-        int endpointFound = 0;
 
-        for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-            if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
-                endpointFound = 1;
-            }
-        }
-        if (!endpointFound) {
-            printf("MessageQ_create: no transport endpoints found, deleting\n");
-            MessageQ_delete((MessageQ_Handle *)&obj);
-        }
-    }
+    /*
+     * No need to "allocate" slot since the queueIndex returned by
+     * LAD is guaranteed to be unique.
+     */
+    MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
 
-    return ((MessageQ_Handle) obj);
+    return (MessageQ_Handle)obj;
 }
 
 /*
- * Function to delete a MessageQ object for a specific slave processor.
- *
- * Deletes the socket associated with this MessageQ object.
+ * MessageQ_delete - delete a MessageQ object.
  */
-Int MessageQ_delete (MessageQ_Handle * handlePtr)
+Int MessageQ_delete(MessageQ_Handle *handlePtr)
 {
-    Int               status    = MessageQ_S_SUCCESS;
-    MessageQ_Object * obj       = NULL;
-    UInt16            rprocId;
-    LAD_ClientHandle      handle;
+    MessageQ_Object *obj;
+    IMessageQTransport_Handle transport;
+    INetworkTransport_Handle transInst;
+    Int              status = MessageQ_S_SUCCESS;
+    UInt16           queueIndex;
+    UInt16                rprocId;
+    Int                   tid;
+    Int                   priority;
+    LAD_ClientHandle handle;
     struct LAD_CommandObj cmd;
     union LAD_ResponseObj rsp;
 
@@ -471,7 +553,7 @@ Int MessageQ_delete (MessageQ_Handle * handlePtr)
         return MessageQ_E_FAIL;
     }
 
-    obj = (MessageQ_Object *) (*handlePtr);
+    obj = (MessageQ_Object *)(*handlePtr);
 
     cmd.cmd = LAD_MESSAGEQ_DELETE;
     cmd.clientId = handle;
@@ -493,51 +575,54 @@ Int MessageQ_delete (MessageQ_Handle * handlePtr)
       "MessageQ_delete: got LAD response for client %d, status=%d\n",
       handle, status)
 
-
-    /* Close the event used for MessageQ_unblock(): */
-    close(obj->unblockFd);
-
-    /* Close the communication endpoint: */
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
-            status = transportCloseEndpoint(obj->fd[rprocId]);
+    for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) {
+       for (priority = 0; priority < 2; priority++) {
+            transport = MessageQ_module->transports[rprocId][priority];
+            if (transport) {
+                IMessageQTransport_unbind((Void *)transport, obj->queue);
+            }
+        }
+    }
+    for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+        transInst = MessageQ_module->transInst[tid];
+        if (transInst) {
+            INetworkTransport_unbind((Void *)transInst, obj->queue);
         }
     }
 
-    /* Now free the obj */
-    free (obj);
+    queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff);
+    MessageQ_module->queues[queueIndex] = NULL;
+
+    free(obj);
     *handlePtr = NULL;
 
-    return (status);
+    return status;
 }
 
 /*
- *  Opens an instance of MessageQ for sending.
- *
- *  We need not create a socket here; the sockets for all remote processors
- *  were created during MessageQ_attach(), and will be
- *  retrieved during MessageQ_put().
+ *  MessageQ_open - Opens an instance of MessageQ for sending.
  */
-Int MessageQ_open (String name, MessageQ_QueueId * queueId)
+Int MessageQ_open(String name, MessageQ_QueueId *queueId)
 {
     Int status = MessageQ_S_SUCCESS;
 
-    status = NameServer_getUInt32 (MessageQ_module->nameServer,
-                                     name, queueId, NULL);
+    status = NameServer_getUInt32(MessageQ_module->nameServer,
+                                  name, queueId, NULL);
 
     if (status == NameServer_E_NOTFOUND) {
-        /* Set return queue ID to invalid. */
+        /* Set return queue ID to invalid */
         *queueId = MessageQ_INVALIDMESSAGEQ;
         status = MessageQ_E_NOTFOUND;
     }
     else if (status >= 0) {
-        /* Override with a MessageQ status code. */
+        /* Override with a MessageQ status code */
         status = MessageQ_S_SUCCESS;
     }
     else {
-        /* Set return queue ID to invalid. */
+        /* Set return queue ID to invalid */
         *queueId = MessageQ_INVALIDMESSAGEQ;
-        /* Override with a MessageQ status code. */
+
+        /* Override with a MessageQ status code */
         if (status == NameServer_E_TIMEOUT) {
             status = MessageQ_E_TIMEOUT;
         }
@@ -546,143 +631,160 @@ Int MessageQ_open (String name, MessageQ_QueueId * queueId)
         }
     }
 
-    return (status);
+    return status;
 }
 
-/* Closes previously opened instance of MessageQ module. */
-Int MessageQ_close (MessageQ_QueueId * queueId)
+/*
+ *  MessageQ_close - Closes previously opened instance of MessageQ.
+ */
+Int MessageQ_close(MessageQ_QueueId *queueId)
 {
     Int32 status = MessageQ_S_SUCCESS;
 
     /* Nothing more to be done for closing the MessageQ. */
     *queueId = MessageQ_INVALIDMESSAGEQ;
 
-    return (status);
+    return status;
 }
 
 /*
- * Place a message onto a message queue.
+ * MessageQ_put - place a message onto a message queue.
  *
- * Calls TransportShm_put(), which handles the sending of the message using the
+ * Calls transport's put(), which handles the sending of the message using the
  * appropriate kernel interface (socket, device ioctl) call for the remote
  * procId encoded in the queueId argument.
  *
  */
-Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
+Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
 {
-    Int      status;
+    MessageQ_Object *obj;
     UInt16   dstProcId  = (UInt16)(queueId >> 16);
     UInt16   queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+    Int      status = MessageQ_S_SUCCESS;
+    ITransport_Handle transport;
+    IMessageQTransport_Handle msgTrans;
+    INetworkTransport_Handle netTrans;
+    Int priority;
+    UInt tid;
 
     msg->dstId     = queueIndex;
     msg->dstProc   = dstProcId;
 
-    status = transportPut(msg, queueIndex, dstProcId);
+    if (dstProcId != MultiProc_self()) {
+        tid = MessageQ_getTransportId(msg);
+        if (tid == 0) {
+            priority = MessageQ_getMsgPri(msg);
+            msgTrans = MessageQ_module->transports[dstProcId][priority];
+
+            IMessageQTransport_put(msgTrans, (Ptr)msg);
+        }
+        else {
+            if (tid >= MessageQ_MAXTRANSPORTS) {
+                printf("MessageQ_put: transport id %d too big, must be < %d\n",
+                       tid, MessageQ_MAXTRANSPORTS);
+
+                return MessageQ_E_FAIL;
+            }
+
+            /* use secondary transport */
+            netTrans = MessageQ_module->transInst[tid];
+            transport = INetworkTransport_upCast(netTrans);
+
+            /* downcast instance pointer to transport interface */
+            switch (ITransport_itype(transport)) {
+                case INetworkTransport_TypeId:
+                    INetworkTransport_put(netTrans, (Ptr)msg);
+
+                    break;
+
+                default:
+                    /* error */
+                    printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid);
+
+                    status = MessageQ_E_FAIL;
+
+                    break;
+            }
+        }
+    }
+    else {
+        obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+        pthread_mutex_lock(&MessageQ_module->gate);
+
+        /* It is a local MessageQ */
+        CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+
+        pthread_mutex_unlock(&MessageQ_module->gate);
 
-    return (status);
+       sem_post(&obj->synchronizer);
+    }
+
+    return status;
 }
 
 /*
- * Gets a message for a message queue and blocks if the queue is empty.
- * If a message is present, it returns it.  Otherwise it blocks
- * waiting for a message to arrive.
- * When a message is returned, it is owned by the caller.
- *
- * We block using select() on the receiving socket's file descriptor, then
- * get the waiting message via the socket API recvfrom().
- * We use the socket stored in the messageQ object via a previous call to
- * MessageQ_create().
+ *  MessageQ_get - gets a message for a message queue and blocks if
+ *  the queue is empty.
  *
+ *  If a message is present, it returns it.  Otherwise it blocks
+ *  waiting for a message to arrive.
+ *  When a message is returned, it is owned by the caller.
  */
-Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
+Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
 {
-    static int last = 0;
+    MessageQ_Object * obj = (MessageQ_Object *)handle;
     Int     status = MessageQ_S_SUCCESS;
-    Int     tmpStatus;
-    MessageQ_Object * obj = (MessageQ_Object *) handle;
-    int     retval;
-    int     nfds;
-    fd_set  rfds;
-    struct  timeval tv;
-    void    *timevalPtr;
-    UInt16  rprocId;
-    int     maxfd = 0;
-    int     selfId;
-    int     nProcessors;
-
-    /* Wait (with timeout) and retreive message from socket: */
-    FD_ZERO(&rfds);
-    for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
-        if (rprocId == MultiProc_self() ||
-            obj->fd[rprocId] == Transport_INVALIDSOCKET) {
-            continue;
-        }
-        maxfd = MAX(maxfd, obj->fd[rprocId]);
-        FD_SET(obj->fd[rprocId], &rfds);
-    }
+    struct timespec ts;
+    struct timeval tv;
+
+#if 0
+/*
+ * Optimization here to get a message without going in to the sem
+ * operation, but the sem count will not be maintained properly.
+ */
+    pthread_mutex_lock(&MessageQ_module->gate);
 
-    /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
-    FD_SET(obj->unblockFd, &rfds);
+    if (obj->msgList.cqh_first != &obj->msgList) {
+        *msg = (MessageQ_Msg)obj->msglist.cqh_first;
+        CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
 
-    if (timeout == MessageQ_FOREVER) {
-        timevalPtr = NULL;
+        pthread_mutex_unlock(&MessageQ_module->gate);
     }
     else {
-        /* Timeout given in msec: convert:  */
-        tv.tv_sec = timeout / 1000;
-        tv.tv_usec = (timeout % 1000) * 1000;
-        timevalPtr = &tv;
+        pthread_mutex_unlock(&MessageQ_module->gate);
     }
-    /* Add one to last fd created: */
-    nfds = MAX(maxfd, obj->unblockFd) + 1;
-
-    retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
-    if (retval)  {
-        if (FD_ISSET(obj->unblockFd, &rfds))  {
-            /*
-             * Our event was signalled by MessageQ_unblock().
-             *
-             * This is typically done during a shutdown sequence, where
-             * the intention of the client would be to ignore (i.e. not fetch)
-             * any pending messages in the transport's queue.
-             * Thus, we shall not check for nor return any messages.
-             */
-            *msg = NULL;
-            status = MessageQ_E_UNBLOCKED;
-        }
-        else {
-           /* start where we last left off */
-           rprocId = last;
-
-           selfId = MultiProc_self();
-           nProcessors = MultiProc_getNumProcessors();
-
-           do {
-                if (rprocId != selfId &&
-                    obj->fd[rprocId] != Transport_INVALIDSOCKET) {
-
-                   if (FD_ISSET(obj->fd[rprocId], &rfds)) {
-                       /* Our transport's fd was signalled: Get the message */
-                       tmpStatus = transportGet(obj->fd[rprocId], msg);
-                       if (tmpStatus < 0) {
-                           printf ("MessageQ_get: tranposrtshm_get failed.");
-                           status = MessageQ_E_FAIL;
-                       }
-
-                       last = (rprocId + 1) % nProcessors;
-                       break;
-                   }
-                }
-               rprocId = (rprocId + 1) % nProcessors;
-            } while (rprocId != last);
+#endif
+
+    if (timeout == MessageQ_FOREVER) {
+        sem_wait(&obj->synchronizer);
+    }
+    else {
+        gettimeofday(&tv, NULL);
+        ts.tv_sec = tv.tv_sec;
+        ts.tv_nsec = (tv.tv_usec + timeout) * 1000;
+
+        if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
+            if (errno == ETIMEDOUT) {
+                PRINTVERBOSE0("MessageQ_get: operation timed out\n")
+
+                return MessageQ_E_TIMEOUT;
+            }
         }
     }
-    else if (retval == 0) {
-        *msg = NULL;
-        status = MessageQ_E_TIMEOUT;
+
+    if (obj->unblocked) {
+        return MessageQ_E_UNBLOCKED;
     }
 
-    return (status);
+    pthread_mutex_lock(&MessageQ_module->gate);
+
+    *msg = (MessageQ_Msg)obj->msgList.cqh_first;
+    CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
+
+    pthread_mutex_unlock(&MessageQ_module->gate);
+
+    return status;
 }
 
 /*
@@ -690,7 +792,7 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
  *
  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
  */
-Int MessageQ_count (MessageQ_Handle handle)
+Int MessageQ_count(MessageQ_Handle handle)
 {
     Int               count = -1;
 #if 0
@@ -710,100 +812,102 @@ Int MessageQ_count (MessageQ_Handle handle)
                  &count, &optlen);
 #endif
 
-    return (count);
+    return count;
 }
 
-/* Initializes a message not obtained from MessageQ_alloc. */
-Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
+/*
+ *  Initializes a message not obtained from MessageQ_alloc.
+ */
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
 {
     /* Fill in the fields of the message */
-    MessageQ_msgInit (msg);
-    msg->heapId  = MessageQ_STATICMSG;
+    MessageQ_msgInit(msg);
+    msg->heapId = MessageQ_STATICMSG;
     msg->msgSize = size;
 }
 
 /*
- * Allocate a message and initialize the needed fields (note some
- * of the fields in the header are set via other APIs or in the
- * MessageQ_put function,
+ *  Allocate a message and initialize the needed fields (note some
+ *  of the fields in the header are set via other APIs or in the
+ *  MessageQ_put function,
  */
-MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
 {
-    MessageQ_Msg msg       = NULL;
+    MessageQ_Msg msg;
 
     /*
      * heapId not used for local alloc (as this is over a copy transport), but
-     * we need to send to other side as heapId is used in BIOS transport:
+     * we need to send to other side as heapId is used in BIOS transport.
      */
-    msg = (MessageQ_Msg)calloc (1, size);
-    MessageQ_msgInit (msg);
+    msg = (MessageQ_Msg)calloc(1, size);
+    MessageQ_msgInit(msg);
     msg->msgSize = size;
-    msg->heapId  = heapId;
+    msg->heapId = heapId;
 
     return msg;
 }
 
-/* Frees the message back to the heap that was used to allocate it. */
-Int MessageQ_free (MessageQ_Msg msg)
+/*
+ *  Frees the message back to the heap that was used to allocate it.
+ */
+Int MessageQ_free(MessageQ_Msg msg)
 {
-    UInt32         status = MessageQ_S_SUCCESS;
+    UInt32 status = MessageQ_S_SUCCESS;
 
     /* Check to ensure this was not allocated by user: */
-    if (msg->heapId == MessageQ_STATICMSG)  {
-        status =  MessageQ_E_CANNOTFREESTATICMSG;
+    if (msg->heapId == MessageQ_STATICMSG) {
+        status = MessageQ_E_CANNOTFREESTATICMSG;
     }
     else {
-        free (msg);
+        free(msg);
     }
 
     return status;
 }
 
 /* Register a heap with MessageQ. */
-Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
+Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
 {
-    Int  status = MessageQ_S_SUCCESS;
+    Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport: */
+    /* Do nothing, as this uses a copy transport */
 
     return status;
 }
 
 /* Unregister a heap with MessageQ. */
-Int MessageQ_unregisterHeap (UInt16 heapId)
+Int MessageQ_unregisterHeap(UInt16 heapId)
 {
-    Int  status = MessageQ_S_SUCCESS;
+    Int status = MessageQ_S_SUCCESS;
 
-    /* Do nothing, as this uses a copy transport: */
+    /* Do nothing, as this uses a copy transport */
 
     return status;
 }
 
 /* Unblocks a MessageQ */
-Void MessageQ_unblock (MessageQ_Handle handle)
+Void MessageQ_unblock(MessageQ_Handle handle)
 {
-    MessageQ_Object * obj   = (MessageQ_Object *) handle;
-    uint64_t     buf = 1;
-    int          numBytes;
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    /* Write 8 bytes to awaken any threads blocked on this messageQ: */
-    numBytes = write(obj->unblockFd, &buf, sizeof(buf));
+    obj->unblocked = TRUE;
+    sem_post(&obj->synchronizer);
 }
 
-/* Embeds a source message queue into a message. */
-Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
+/* Embeds a source message queue into a message */
+Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
 {
-    MessageQ_Object * obj   = (MessageQ_Object *) handle;
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    msg->replyId   = (UInt16)(obj->queue);
+    msg->replyId = (UInt16)(obj->queue);
     msg->replyProc = (UInt16)(obj->queue >> 16);
 }
 
 /* Returns the QueueId associated with the handle. */
-MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
+MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
 {
-    MessageQ_Object * obj = (MessageQ_Object *) handle;
-    UInt32            queueId;
+    MessageQ_Object *obj = (MessageQ_Object *) handle;
+    UInt32 queueId;
 
     queueId = (obj->queue);
 
@@ -811,7 +915,7 @@ MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
 }
 
 /* Sets the tracing of a message */
-Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
+Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
 {
     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
 }
@@ -822,111 +926,19 @@ Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
  *  The MessageQ module itself does not use any shared memory but the
  *  underlying transport may use some shared memory.
  */
-SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
+SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
 {
     SizeT memReq = 0u;
 
     /* Do nothing, as this is a copy transport. */
 
-    return (memReq);
-}
-
-/*
- *  Create a socket for this remote proc, and attempt to connect.
- *
- *  Only creates a socket if one does not already exist for this procId.
- *
- *  Note: remoteProcId may be MultiProc_Self() for loopback case.
- */
-Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
-{
-    Int     status = MessageQ_S_SUCCESS;
-    int     sock;
-
-    PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
-
-    if (remoteProcId >= MultiProc_MAXPROCESSORS) {
-        status = MessageQ_E_INVALIDPROCID;
-        goto exit;
-    }
-
-    pthread_mutex_lock (&(MessageQ_module->gate));
-
-    /* Only create a socket if one doesn't exist: */
-    if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET)  {
-        /* Create the socket for sending messages to the remote proc: */
-        sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-        if (sock < 0) {
-            status = MessageQ_E_FAIL;
-            printf ("MessageQ_attach: socket failed: %d, %s\n",
-                       errno, strerror(errno));
-        }
-        else  {
-            PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
-            MessageQ_module->sock[remoteProcId] = sock;
-            /* Attempt to connect: */
-            status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
-            if (status < 0) {
-                status = MessageQ_E_RESOURCE;
-                /* don't hard-printf since this is no longer fatal */
-                PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
-                       remoteProcId);
-            }
-        }
-    }
-    else {
-        status = MessageQ_E_ALREADYEXISTS;
-    }
-
-    pthread_mutex_unlock (&(MessageQ_module->gate));
-
-    if (status == MessageQ_E_RESOURCE) {
-        MessageQ_detach(remoteProcId);
-    }
-
-exit:
-    return (status);
-}
-
-/*
- *  Close the socket for this remote proc.
- *
- */
-Int MessageQ_detach (UInt16 remoteProcId)
-{
-    Int status = MessageQ_S_SUCCESS;
-    int sock;
-
-    if (remoteProcId >= MultiProc_MAXPROCESSORS) {
-        status = MessageQ_E_INVALIDPROCID;
-        goto exit;
-    }
-
-    pthread_mutex_lock (&(MessageQ_module->gate));
-
-    sock = MessageQ_module->sock[remoteProcId];
-    if (sock != Transport_INVALIDSOCKET) {
-        if (close(sock)) {
-            status = MessageQ_E_OSFAILURE;
-            printf("MessageQ_detach: close failed: %d, %s\n",
-                   errno, strerror(errno));
-        }
-        else {
-            PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
-            MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
-        }
-    }
-
-    pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
-    return (status);
+    return memReq;
 }
 
 /*
  * This is a helper function to initialize a message.
  */
-Void MessageQ_msgInit (MessageQ_Msg msg)
+Void MessageQ_msgInit(MessageQ_Msg msg)
 {
 #if 0
     Int                 status    = MessageQ_S_SUCCESS;
@@ -962,7 +974,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
       handle, status)
 
-    memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
+    memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
 #else
     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
@@ -971,168 +983,32 @@ Void MessageQ_msgInit (MessageQ_Msg msg)
     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
     msg->srcProc   = MultiProc_self();
 
-    pthread_mutex_lock(&(MessageQ_module->gate));
+    pthread_mutex_lock(&MessageQ_module->gate);
     msg->seqNum  = MessageQ_module->seqNum++;
-    pthread_mutex_unlock(&(MessageQ_module->gate));
+    pthread_mutex_unlock(&MessageQ_module->gate);
 #endif
 }
 
 /*
- * =============================================================================
- * Transport: Fxns kept here until need for a transport layer is realized.
- * =============================================================================
+ * Grow module's queues[] array to accommodate queueIndex from LAD
  */
-/*
- * ======== transportCreateEndpoint ========
- *
- * Create a communication endpoint to receive messages.
- */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
+Void _MessageQ_grow(UInt16 queueIndex)
 {
-    Int          status    = MessageQ_S_SUCCESS;
-    int         err;
-
-    /*  Create the socket to receive messages for this messageQ. */
-    *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
-    if (*fd < 0) {
-        status = MessageQ_E_FAIL;
-        printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
-                  errno, strerror(errno));
-        goto exit;
-    }
+    MessageQ_Handle *queues;
+    MessageQ_Handle *oldQueues;
+    UInt oldSize;
 
-    PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
+    oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
 
-    err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
-    if (err < 0) {
-        status = MessageQ_E_FAIL;
-        /* don't hard-printf since this is no longer fatal */
-        PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
-                      errno, strerror(errno));
-        close(*fd);
-    }
+    queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle));
+    memcpy(queues, MessageQ_module->queues, oldSize);
 
-exit:
-    return (status);
-}
+    oldQueues = MessageQ_module->queues;
+    MessageQ_module->queues = queues;
+    MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
 
-/*
- * ======== transportCloseEndpoint ========
- *
- *  Close the communication endpoint.
- */
-static Int transportCloseEndpoint(int fd)
-{
-    Int status = MessageQ_S_SUCCESS;
-
-    PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
-
-    /* Stop communication to this socket:  */
-    close(fd);
-
-    return (status);
-}
+    free(oldQueues);
 
-/*
- * ======== transportGet ========
- *  Retrieve a message waiting in the socket's queue.
-*/
-static Int transportGet(int sock, MessageQ_Msg * retMsg)
-{
-    Int           status    = MessageQ_S_SUCCESS;
-    MessageQ_Msg  msg;
-    struct sockaddr_rpmsg fromAddr;  // [Socket address of sender]
-    unsigned int  len;
-    int           byteCount;
-
-    /*
-     * We have no way of peeking to see what message size we'll get, so we
-     * allocate a message of max size to receive contents from the rpmsg socket
-     * (currently, a copy transport)
-     */
-    msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
-    if (!msg)  {
-        status = MessageQ_E_MEMORY;
-        goto exit;
-    }
-
-    memset(&fromAddr, 0, sizeof(fromAddr));
-    len = sizeof(fromAddr);
-
-    byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
-                      (struct sockaddr *)&fromAddr, &len);
-    if (len != sizeof(fromAddr)) {
-        printf("recvfrom: got bad addr len (%d)\n", len);
-        status = MessageQ_E_FAIL;
-        goto exit;
-    }
-    if (byteCount < 0) {
-        printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
-        status = MessageQ_E_FAIL;
-        goto exit;
-    }
-    else {
-         /* Update the allocated message size (even though this may waste space
-          * when the actual message is smaller than the maximum rpmsg size,
-          * the message will be freed soon anyway, and it avoids an extra copy).
-          */
-         msg->msgSize = byteCount;
-
-         /*
-          * If the message received was statically allocated, reset the
-          * heapId, so the app can free it.
-          */
-         if (msg->heapId == MessageQ_STATICMSG)  {
-             msg->heapId = 0;  /* for a copy transport, heap id is 0. */
-         }
-    }
-
-    PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
-    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
-    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
-
-    *retMsg = msg;
-
-exit:
-    return (status);
+    return;
 }
 
-/*
- * ======== transportPut ========
- *
- * Calls the socket API sendto() on the socket associated with
- * with this destination procID.
- * Currently, both local and remote messages are sent via the Socket ABI, so
- * no local object lists are maintained here.
-*/
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
-{
-    Int     status    = MessageQ_S_SUCCESS;
-    int     sock;
-    int     err;
-
-    /*
-     * Retrieve the socket for the AF_SYSLINK protocol associated with this
-     * transport.
-     */
-    sock = MessageQ_module->sock[dstProcId];
-
-    PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
-
-    err = send(sock, msg, msg->msgSize, 0);
-    if (err < 0) {
-        printf ("transportPut: send failed: %d, %s\n",
-                  errno, strerror(errno));
-        status = MessageQ_E_FAIL;
-        goto exit;
-    }
-
-    /*
-     * Free the message, as this is a copy transport, we maintain MessageQ
-     * semantics.
-     */
-    MessageQ_free (msg);
-
-exit:
-    return (status);
-}
index d718f2d8249508aaaa6eb7513d40f777b325dd92..65df6c2ee2ca9a5f705e4e6f909d2c0c746ba3a3 100644 (file)
@@ -85,6 +85,9 @@
 /* Slot 0 reserved for NameServer messages: */
 #define RESERVED_MSGQ_INDEX  1
 
+/* Number of entries to grow when we run out of queueIndexs */
+#define MessageQ_GROWSIZE 32
+
 /* Define BENCHMARK to quiet key MessageQ APIs: */
 //#define BENCHMARK
 
@@ -221,6 +224,7 @@ Int MessageQ_setup(const MessageQ_Config * cfg)
     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
     MessageQ_module->queues = (MessageQ_Handle *)
         calloc(1, sizeof(MessageQ_Handle) * MessageQ_module->numQueues);
+    MessageQ_module->canFreeQueues = TRUE;
 
 exitSetup:
     LOG1("MessageQ_setup: exiting, refCount=%d\n", MessageQ_module->refCount)
@@ -269,7 +273,6 @@ Int MessageQ_destroy(void)
 
     memset(&MessageQ_module->cfg, 0, sizeof(MessageQ_Config));
     MessageQ_module->numQueues  = 0u;
-    MessageQ_module->canFreeQueues = TRUE;
 
 exitDestroy:
     LOG1("MessageQ_destroy: exiting, refCount=%d\n", MessageQ_module->refCount)
@@ -431,7 +434,8 @@ static UInt16 _MessageQ_grow(MessageQ_Object * obj)
     oldSize = (MessageQ_module->numQueues) * sizeof(MessageQ_Handle);
 
     /* Allocate larger table */
-    queues = calloc(1, oldSize + sizeof(MessageQ_Handle));
+    queues = calloc(MessageQ_module->numQueues + MessageQ_GROWSIZE,
+                    sizeof(MessageQ_Handle));
 
     /* Copy contents into new table */
     memcpy(queues, MessageQ_module->queues, oldSize);
@@ -442,7 +446,7 @@ static UInt16 _MessageQ_grow(MessageQ_Object * obj)
     /* Hook-up new table */
     oldQueues = MessageQ_module->queues;
     MessageQ_module->queues = queues;
-    MessageQ_module->numQueues++;
+    MessageQ_module->numQueues += MessageQ_GROWSIZE;
 
     /* Delete old table if not statically defined */
     if (MessageQ_module->canFreeQueues == TRUE) {
index 959f3ac09dc3c1efcf431d3a12ffb16ee0db70f6..fdae9cb3d787bd0266cecb3d6381b9a7a9a5ed7c 100644 (file)
@@ -58,7 +58,7 @@ VPATH = ../../../packages/ti/ipc/tests
 
 # the program to build (the names of the final binaries)
 bin_PROGRAMS = ping_rpmsg MessageQApp  MessageQBench MessageQMulti \
-                NameServerApp Msgq100
+                MessageQMultiMulti NameServerApp Msgq100
 
 
 if OMAP54XX_SMP
@@ -139,6 +139,7 @@ nameServer_common_sources = \
                 $(top_srcdir)/packages/ti/ipc/NameServer.h \
                 NameServerApp.c
 
+
 # list of sources for the 'ping_rpmsg' binary
 ping_rpmsg_SOURCES = ping_rpmsg.c
 
@@ -156,6 +157,9 @@ MessageQBench_SOURCES = $(common_sources) MessageQBench.c
 # list of sources for the 'MessageQMulti' binary
 MessageQMulti_SOURCES = $(common_sources) MessageQMulti.c
 
+# list of sources for the 'MessageQMultiMulti' binary
+MessageQMultiMulti_SOURCES = $(common_sources) MessageQMultiMulti.c
+
 # list of sources for the 'NameServerApp' binary
 NameServerApp_SOURCES = $(nameServer_common_sources)
 
@@ -174,7 +178,9 @@ GateMPApp_SOURCES = $(common_sources) \
                 $(top_srcdir)/packages/ti/ipc/tests/GateMPAppCommon.h
 
 common_libraries = -lpthread $(top_builddir)/linux/src/api/libtiipc.la \
-                $(top_builddir)/linux/src/utils/libtiipcutils.la
+                $(top_builddir)/linux/src/utils/libtiipcutils.la \
+                $(top_builddir)/linux/src/transport/libtransport.la
+
 
 # the additional libraries to link ping_rpmsg
 ping_rpmsg_LDADD = -lrt
@@ -197,6 +203,10 @@ MessageQBench_LDADD = $(common_libraries) -lrt \
 MessageQMulti_LDADD = $(common_libraries) \
                 $(AM_LDFLAGS)
 
+# the additional libraries needed to link MessageQMultiMulti
+MessageQMultiMulti_LDADD = $(common_libraries) \
+                $(AM_LDFLAGS)
+
 # the additional libraries needed to link NameServerApp
 NameServerApp_LDADD = $(common_libraries) \
                 $(AM_LDFLAGS)
index 50f06fa4aff35cda116bb76f0ac188f50d2b2517..8f7ff9a992ec8303f550f5e99fb4cd2390765ff7 100644 (file)
@@ -45,12 +45,12 @@ host_triplet = @host@
 
 bin_PROGRAMS = ping_rpmsg$(EXEEXT) MessageQApp$(EXEEXT) \
        MessageQBench$(EXEEXT) MessageQMulti$(EXEEXT) \
-       NameServerApp$(EXEEXT) Msgq100$(EXEEXT) $(am__EXEEXT_1) \
-       $(am__EXEEXT_2) $(am__EXEEXT_1) $(am__EXEEXT_3) \
-       $(am__EXEEXT_4) $(am__EXEEXT_1) $(am__EXEEXT_5) \
+       MessageQMultiMulti$(EXEEXT) NameServerApp$(EXEEXT) \
+       Msgq100$(EXEEXT) $(am__EXEEXT_1) $(am__EXEEXT_2) \
+       $(am__EXEEXT_1) $(am__EXEEXT_3) $(am__EXEEXT_4) \
+       $(am__EXEEXT_1) $(am__EXEEXT_5) $(am__EXEEXT_1) \
        $(am__EXEEXT_1) $(am__EXEEXT_1) $(am__EXEEXT_1) \
-       $(am__EXEEXT_1) $(am__EXEEXT_1) $(am__EXEEXT_6) \
-       $(am__EXEEXT_7)
+       $(am__EXEEXT_1) $(am__EXEEXT_6) $(am__EXEEXT_7)
 
 # Add platform specific bin application's here
 @OMAP54XX_SMP_TRUE@am__append_3 = 
@@ -99,7 +99,8 @@ am_GateMPApp_OBJECTS = $(am__objects_1) main_host.$(OBJEXT) \
        GateMPApp.$(OBJEXT)
 GateMPApp_OBJECTS = $(am_GateMPApp_OBJECTS)
 am__DEPENDENCIES_1 = $(top_builddir)/linux/src/api/libtiipc.la \
-       $(top_builddir)/linux/src/utils/libtiipcutils.la
+       $(top_builddir)/linux/src/utils/libtiipcutils.la \
+       $(top_builddir)/linux/src/transport/libtransport.la
 am__DEPENDENCIES_2 =
 GateMPApp_DEPENDENCIES = $(am__DEPENDENCIES_1) \
        $(CMEM_INSTALL_DIR)/src/cmem/api/.libs/libticmem.a \
@@ -115,6 +116,11 @@ am_MessageQMulti_OBJECTS = $(am__objects_1) MessageQMulti.$(OBJEXT)
 MessageQMulti_OBJECTS = $(am_MessageQMulti_OBJECTS)
 MessageQMulti_DEPENDENCIES = $(am__DEPENDENCIES_1) \
        $(am__DEPENDENCIES_2)
+am_MessageQMultiMulti_OBJECTS = $(am__objects_1) \
+       MessageQMultiMulti.$(OBJEXT)
+MessageQMultiMulti_OBJECTS = $(am_MessageQMultiMulti_OBJECTS)
+MessageQMultiMulti_DEPENDENCIES = $(am__DEPENDENCIES_1) \
+       $(am__DEPENDENCIES_2)
 am_Msgq100_OBJECTS = $(am__objects_1) Msgq100.$(OBJEXT)
 Msgq100_OBJECTS = $(am_Msgq100_OBJECTS)
 Msgq100_DEPENDENCIES = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_2)
@@ -150,14 +156,14 @@ LINK = $(LIBTOOL) --tag=CC --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
        $(AM_LDFLAGS) $(LDFLAGS) -o $@
 SOURCES = $(GateMPApp_SOURCES) $(MessageQApp_SOURCES) \
        $(MessageQBench_SOURCES) $(MessageQMulti_SOURCES) \
-       $(Msgq100_SOURCES) $(NameServerApp_SOURCES) \
-       $(mmrpc_test_SOURCES) $(nano_test_SOURCES) \
-       $(ping_rpmsg_SOURCES)
+       $(MessageQMultiMulti_SOURCES) $(Msgq100_SOURCES) \
+       $(NameServerApp_SOURCES) $(mmrpc_test_SOURCES) \
+       $(nano_test_SOURCES) $(ping_rpmsg_SOURCES)
 DIST_SOURCES = $(GateMPApp_SOURCES) $(MessageQApp_SOURCES) \
        $(MessageQBench_SOURCES) $(MessageQMulti_SOURCES) \
-       $(Msgq100_SOURCES) $(NameServerApp_SOURCES) \
-       $(mmrpc_test_SOURCES) $(nano_test_SOURCES) \
-       $(ping_rpmsg_SOURCES)
+       $(MessageQMultiMulti_SOURCES) $(Msgq100_SOURCES) \
+       $(NameServerApp_SOURCES) $(mmrpc_test_SOURCES) \
+       $(nano_test_SOURCES) $(ping_rpmsg_SOURCES)
 ETAGS = etags
 CTAGS = ctags
 DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
@@ -322,6 +328,9 @@ MessageQBench_SOURCES = $(common_sources) MessageQBench.c
 # list of sources for the 'MessageQMulti' binary
 MessageQMulti_SOURCES = $(common_sources) MessageQMulti.c
 
+# list of sources for the 'MessageQMultiMulti' binary
+MessageQMultiMulti_SOURCES = $(common_sources) MessageQMultiMulti.c
+
 # list of sources for the 'NameServerApp' binary
 NameServerApp_SOURCES = $(nameServer_common_sources)
 
@@ -340,7 +349,8 @@ GateMPApp_SOURCES = $(common_sources) \
                 $(top_srcdir)/packages/ti/ipc/tests/GateMPAppCommon.h
 
 common_libraries = -lpthread $(top_builddir)/linux/src/api/libtiipc.la \
-                $(top_builddir)/linux/src/utils/libtiipcutils.la
+                $(top_builddir)/linux/src/utils/libtiipcutils.la \
+                $(top_builddir)/linux/src/transport/libtransport.la
 
 
 # the additional libraries to link ping_rpmsg
@@ -368,6 +378,11 @@ MessageQMulti_LDADD = $(common_libraries) \
                 $(AM_LDFLAGS)
 
 
+# the additional libraries needed to link MessageQMultiMulti
+MessageQMultiMulti_LDADD = $(common_libraries) \
+                $(AM_LDFLAGS)
+
+
 # the additional libraries needed to link NameServerApp
 NameServerApp_LDADD = $(common_libraries) \
                 $(AM_LDFLAGS)
@@ -462,6 +477,9 @@ MessageQBench$(EXEEXT): $(MessageQBench_OBJECTS) $(MessageQBench_DEPENDENCIES)
 MessageQMulti$(EXEEXT): $(MessageQMulti_OBJECTS) $(MessageQMulti_DEPENDENCIES) 
        @rm -f MessageQMulti$(EXEEXT)
        $(LINK) $(MessageQMulti_LDFLAGS) $(MessageQMulti_OBJECTS) $(MessageQMulti_LDADD) $(LIBS)
+MessageQMultiMulti$(EXEEXT): $(MessageQMultiMulti_OBJECTS) $(MessageQMultiMulti_DEPENDENCIES) 
+       @rm -f MessageQMultiMulti$(EXEEXT)
+       $(LINK) $(MessageQMultiMulti_LDFLAGS) $(MessageQMultiMulti_OBJECTS) $(MessageQMultiMulti_LDADD) $(LIBS)
 Msgq100$(EXEEXT): $(Msgq100_OBJECTS) $(Msgq100_DEPENDENCIES) 
        @rm -f Msgq100$(EXEEXT)
        $(LINK) $(Msgq100_LDFLAGS) $(Msgq100_OBJECTS) $(Msgq100_LDADD) $(LIBS)
@@ -488,6 +506,7 @@ distclean-compile:
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQApp.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQBench.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQMulti.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/MessageQMultiMulti.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Msgq100.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Mx.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/NameServerApp.Po@am__quote@
diff --git a/linux/src/tests/MessageQMultiMulti.c b/linux/src/tests/MessageQMultiMulti.c
new file mode 100644 (file)
index 0000000..8ca5075
--- /dev/null
@@ -0,0 +1,269 @@
+/*
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/* =============================================================================
+ *  @file   MessageQApp.c
+ *
+ *  @brief  Sample application for MessageQ module between MPU and Remote Proc
+ *
+ *  ============================================================================
+ */
+
+/* Standard headers */
+#include <pthread.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+
+/* IPC Headers */
+#include <ti/ipc/Std.h>
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+
+/* App defines: Must match on remote proc side: */
+#define MSGSIZE                     64u
+#define HEAPID                      0u
+#define SLAVE_MESSAGEQNAME          "SLAVE"
+#define HOST_MESSAGEQNAME           "HOST"
+
+/** ============================================================================
+ *  Macros and types
+ *  ============================================================================
+ */
+
+#define  NUM_LOOPS_DFLT   1000
+#define  NUM_THREADS_DFLT 10
+#define  MAX_NUM_THREADS  25
+#define  ONE_PROCESS_ONLY (-1)
+
+/** ============================================================================
+ *  Globals
+ *  ============================================================================
+ */
+static Int     numLoops, numThreads, procNum;
+
+struct thread_info {    /* Used as argument to thread_start() */
+    pthread_t thread_id;        /* ID returned by pthread_create() */
+    int       thread_num;       /* Application-defined thread # */
+};
+
+static void * pingThreadFxn(void *arg);
+
+/** ============================================================================
+ *  Functions
+ *  ============================================================================
+ */
+
+static Void * pingThreadFxn(void *arg)
+{
+    Int                      threadNum = *(int *)arg;
+    Int32                    status     = 0;
+    MessageQ_Msg             msg        = NULL;
+    MessageQ_Params          msgParams;
+    UInt16                   i;
+    MessageQ_Handle          handle;
+    MessageQ_QueueId         queueId = MessageQ_INVALIDMESSAGEQ;
+
+    char             remoteQueueName[64];
+    char             hostQueueName[64];
+
+    printf ("Entered pingThreadFxn: %d\n", threadNum);
+
+    sprintf(remoteQueueName, "%s_%d%d", SLAVE_MESSAGEQNAME, threadNum, (threadNum % (MultiProc_getNumProcessors() - 1)) + 1);
+    sprintf(hostQueueName,   "%s_%d", HOST_MESSAGEQNAME,  threadNum );
+
+    /* Create the local Message Queue for receiving. */
+    MessageQ_Params_init (&msgParams);
+    handle = MessageQ_create (hostQueueName, &msgParams);
+    if (handle == NULL) {
+        printf ("Error in MessageQ_create\n");
+        goto exit;
+    }
+    else {
+        printf ("thread: %d, Local Message: %s, QId: 0x%x\n",
+            threadNum, hostQueueName, MessageQ_getQueueId(handle));
+    }
+
+    /* Poll until remote side has it's messageQ created before we send: */
+    do {
+        status = MessageQ_open (remoteQueueName, &queueId);
+        sleep (1);
+    } while (status == MessageQ_E_NOTFOUND);
+    if (status < 0) {
+        printf ("Error in MessageQ_open [0x%x]\n", status);
+        goto cleanup;
+    }
+    else {
+        printf ("thread: %d, Remote queue: %s, QId: 0x%x\n",
+                 threadNum, remoteQueueName, queueId);
+    }
+
+    printf ("\nthread: %d: Exchanging messages with remote processor...\n",
+            threadNum);
+    for (i = 0 ; i < numLoops ; i++) {
+        /* Allocate message. */
+        msg = MessageQ_alloc (HEAPID, MSGSIZE);
+        if (msg == NULL) {
+            printf ("Error in MessageQ_alloc\n");
+            break;
+        }
+
+        MessageQ_setMsgId (msg, i);
+
+        /* Have the remote proc reply to this message queue */
+        MessageQ_setReplyQueue (handle, msg);
+
+        status = MessageQ_put (queueId, msg);
+        if (status < 0) {
+            printf ("Error in MessageQ_put [0x%x]\n", status);
+            break;
+        }
+
+        status = MessageQ_get(handle, &msg, MessageQ_FOREVER);
+        if (status < 0) {
+            printf ("Error in MessageQ_get [0x%x]\n", status);
+            break;
+        }
+        else {
+            /* Validate the returned message. */
+            if ((msg != NULL) && (MessageQ_getMsgId (msg) != i)) {
+                printf ("Data integrity failure!\n"
+                        "    Expected %d\n"
+                        "    Received %d\n",
+                        i, MessageQ_getMsgId (msg));
+                break;
+            }
+
+            status = MessageQ_free (msg);
+        }
+
+        printf ("thread: %d: Exchanged %d msgs\n", threadNum, (i+1));
+    }
+
+    printf ("thread: %d: pingThreadFxn successfully completed!\n", threadNum);
+
+    MessageQ_close (&queueId);
+
+cleanup:
+    /* Clean-up */
+    status = MessageQ_delete (&handle);
+    if (status < 0) {
+        printf ("Error in MessageQ_delete [0x%x]\n", status);
+    }
+
+exit:
+
+    return ((void *)status);
+}
+
+int main (int argc, char ** argv)
+{
+    struct thread_info threads[MAX_NUM_THREADS];
+    int ret,i;
+    Int32 status = 0;
+
+    /* Parse Args: */
+    numLoops = NUM_LOOPS_DFLT;
+    numThreads = NUM_THREADS_DFLT;
+    procNum = ONE_PROCESS_ONLY;
+    switch (argc) {
+        case 1:
+           /* use defaults */
+           break;
+        case 2:
+           numThreads = atoi(argv[1]);
+           break;
+        case 3:
+           numThreads = atoi(argv[1]);
+           numLoops   = atoi(argv[2]);
+           break;
+        case 4:
+           /* We force numThreads = 1 if doing a multiProcess test: */
+           numThreads = 1;
+           numLoops   = atoi(argv[2]);
+           procNum = atoi(argv[3]);
+           break;
+        default:
+           printf("Usage: %s [<numThreads>] [<numLoops>] [<Process #]>\n",
+               argv[0]);
+           printf("\tDefaults: numThreads: %d, numLoops: %d\n",
+               NUM_THREADS_DFLT, NUM_LOOPS_DFLT);
+           printf("\tMax Threads: %d\n", MAX_NUM_THREADS);
+           exit(0);
+    }
+
+    if (numThreads > MAX_NUM_THREADS) {
+        printf("Error: Maximum number of threads supported is %d\n",
+            MAX_NUM_THREADS);
+        exit(EXIT_FAILURE);
+    }
+
+    printf("Using numThreads: %d, numLoops: %d\n", numThreads, numLoops);
+    if (procNum != ONE_PROCESS_ONLY) {
+        printf("ProcNum: %d\n", procNum);
+    }
+
+    status = Ipc_start();
+    if (status < 0) {
+        printf ("Ipc_start failed: status = 0x%x\n", status);
+        goto exit;
+    }
+
+    /* Launch multiple threads: */
+    for (i = 0; i < numThreads; i++) {
+        /* Create the test thread: */
+        printf ("creating pingThreadFxn: %d\n", i);
+        threads[i].thread_num = (procNum == ONE_PROCESS_ONLY)? i: procNum;
+        ret = pthread_create(&threads[i].thread_id, NULL, &pingThreadFxn,
+                           &(threads[i].thread_num));
+        if (ret) {
+            printf("MessageQMulti: can't spawn thread: %d, %s\n",
+                    i, strerror(ret));
+        }
+    }
+
+    /* Join all threads: */
+    for (i = 0; i < numThreads; i++) {
+        ret = pthread_join(threads[i].thread_id, NULL);
+        if (ret != 0) {
+            printf("MessageQMulti: failed to join thread: %d, %s\n",
+                    threads[i].thread_num, strerror(ret));
+        }
+        printf("MessageQMulti: Joined with thread %d\n",threads[i].thread_num);
+    }
+
+    Ipc_stop();
+
+exit:
+
+    return (0);
+}
diff --git a/linux/src/transport/Makefile.am b/linux/src/transport/Makefile.am
new file mode 100644 (file)
index 0000000..121bdc3
--- /dev/null
@@ -0,0 +1,66 @@
+##
+##  Copyright (c) 2013-2014, Texas Instruments Incorporated
+##
+##  Redistribution and use in source and binary forms, with or without
+##  modification, are permitted provided that the following conditions
+##  are met:
+##
+##  *  Redistributions of source code must retain the above copyright
+##     notice, this list of conditions and the following disclaimer.
+##
+##  *  Redistributions in binary form must reproduce the above copyright
+##     notice, this list of conditions and the following disclaimer in the
+##     documentation and/or other materials provided with the distribution.
+##
+##  *  Neither the name of Texas Instruments Incorporated nor the names of
+##     its contributors may be used to endorse or promote products derived
+##     from this software without specific prior written permission.
+##
+##  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+##  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+##  THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+##  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+##  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+##  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+##  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+##  OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+##  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+##  OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+##  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+##
+## ======== src/api/Makefile.am ========
+##
+
+# additional include paths necessary to compile the library
+AM_CFLAGS = -I$(top_srcdir)/linux/include -I$(top_srcdir)/hlos_common/include \
+        -I$(top_srcdir)/packages -I$(KERNEL_INSTALL_DIR)/include/generated/uapi\
+        -D_GNU_SOURCE -Wall @AM_CFLAGS@
+
+#if DRA7XX
+#AM_CFLAGS += -DGATEMP_SUPPORT
+#endif
+
+###############################################################################
+# THE LIBRARIES TO BUILD
+###############################################################################
+
+# the library names to build (note we are building shared libs)
+lib_LTLIBRARIES = libtransport.la
+
+# where to install the headers on the system
+libtransport_ladir = $(includedir)/ti/ipc
+
+# the list of header files that belong to the library (to be installed later)
+libtransport_la_HEADERS = $(top_srcdir)/linux/include/ti/ipc/Std.h
+
+# the sources to add to the library and to add to the source distribution
+libtransport_la_SOURCES =    \
+                        TransportRpmsg.c
+
+# Add version info to the shared library
+libtransport_la_LDFLAGS = -version-info 1:0:0
+
+#pkgconfig_DATA          = libtiipc.pc
+#pkgconfigdir            = $(libdir)/pkgconfig
+
+###############################################################################
diff --git a/linux/src/transport/Makefile.in b/linux/src/transport/Makefile.in
new file mode 100644 (file)
index 0000000..a7910d2
--- /dev/null
@@ -0,0 +1,533 @@
+# Makefile.in generated by automake 1.9.6 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
+# 2003, 2004, 2005  Free Software Foundation, Inc.
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+
+srcdir = @srcdir@
+top_srcdir = @top_srcdir@
+VPATH = @srcdir@
+pkgdatadir = $(datadir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+top_builddir = ../../..
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+INSTALL = @INSTALL@
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = linux/src/transport
+DIST_COMMON = $(libtransport_la_HEADERS) $(srcdir)/Makefile.am \
+       $(srcdir)/Makefile.in
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+       $(ACLOCAL_M4)
+mkinstalldirs = $(install_sh) -d
+CONFIG_CLEAN_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+    $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+    *) f=$$p;; \
+  esac;
+am__strip_dir = `echo $$p | sed -e 's|^.*/||'`;
+am__installdirs = "$(DESTDIR)$(libdir)" \
+       "$(DESTDIR)$(libtransport_ladir)"
+libLTLIBRARIES_INSTALL = $(INSTALL)
+LTLIBRARIES = $(lib_LTLIBRARIES)
+libtransport_la_LIBADD =
+am_libtransport_la_OBJECTS = TransportRpmsg.lo
+libtransport_la_OBJECTS = $(am_libtransport_la_OBJECTS)
+DEFAULT_INCLUDES = -I. -I$(srcdir)
+depcomp = $(SHELL) $(top_srcdir)/linux/build-aux/depcomp
+am__depfiles_maybe = depfiles
+COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
+       $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
+LTCOMPILE = $(LIBTOOL) --tag=CC --mode=compile $(CC) $(DEFS) \
+       $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+       $(AM_CFLAGS) $(CFLAGS)
+CCLD = $(CC)
+LINK = $(LIBTOOL) --tag=CC --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+       $(AM_LDFLAGS) $(LDFLAGS) -o $@
+SOURCES = $(libtransport_la_SOURCES)
+DIST_SOURCES = $(libtransport_la_SOURCES)
+libtransport_laHEADERS_INSTALL = $(INSTALL_HEADER)
+HEADERS = $(libtransport_la_HEADERS)
+ETAGS = etags
+CTAGS = ctags
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMDEP_FALSE = @AMDEP_FALSE@
+AMDEP_TRUE = @AMDEP_TRUE@
+AMTAR = @AMTAR@
+
+# additional include paths necessary to compile the library
+AM_CFLAGS = -I$(top_srcdir)/linux/include -I$(top_srcdir)/hlos_common/include \
+        -I$(top_srcdir)/packages -I$(KERNEL_INSTALL_DIR)/include/generated/uapi\
+        -D_GNU_SOURCE -Wall @AM_CFLAGS@
+
+AM_LDFLAGS = @AM_LDFLAGS@
+AR = @AR@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+C66AK2E_FALSE = @C66AK2E_FALSE@
+C66AK2E_TRUE = @C66AK2E_TRUE@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CMEM_FALSE = @CMEM_FALSE@
+CMEM_INSTALL_DIR = @CMEM_INSTALL_DIR@
+CMEM_TRUE = @CMEM_TRUE@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CXX = @CXX@
+CXXCPP = @CXXCPP@
+CXXDEPMODE = @CXXDEPMODE@
+CXXFLAGS = @CXXFLAGS@
+CYGPATH_W = @CYGPATH_W@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+DRA7XX_FALSE = @DRA7XX_FALSE@
+DRA7XX_TRUE = @DRA7XX_TRUE@
+DRM_FALSE = @DRM_FALSE@
+DRM_PREFIX = @DRM_PREFIX@
+DRM_TRUE = @DRM_TRUE@
+ECHO = @ECHO@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+KDIR_FALSE = @KDIR_FALSE@
+KDIR_TRUE = @KDIR_TRUE@
+KERNEL_INSTALL_DIR = @KERNEL_INSTALL_DIR@
+LDFLAGS = @LDFLAGS@
+LIBOBJS = @LIBOBJS@
+LIBS = @LIBS@
+LIBTOOL = @LIBTOOL@
+LN_S = @LN_S@
+LTLIBOBJS = @LTLIBOBJS@
+MAINT = @MAINT@
+MAINTAINER_MODE_FALSE = @MAINTAINER_MODE_FALSE@
+MAINTAINER_MODE_TRUE = @MAINTAINER_MODE_TRUE@
+MAKEINFO = @MAKEINFO@
+OBJEXT = @OBJEXT@
+OMAP54XX_SMP_FALSE = @OMAP54XX_SMP_FALSE@
+OMAP54XX_SMP_TRUE = @OMAP54XX_SMP_TRUE@
+OMAPL138_FALSE = @OMAPL138_FALSE@
+OMAPL138_TRUE = @OMAPL138_TRUE@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+PLATFORM = @PLATFORM@
+RANLIB = @RANLIB@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+STRIP = @STRIP@
+TCI6614_FALSE = @TCI6614_FALSE@
+TCI6614_TRUE = @TCI6614_TRUE@
+TCI6630_FALSE = @TCI6630_FALSE@
+TCI6630_TRUE = @TCI6630_TRUE@
+TCI6636_FALSE = @TCI6636_FALSE@
+TCI6636_TRUE = @TCI6636_TRUE@
+TCI6638_FALSE = @TCI6638_FALSE@
+TCI6638_TRUE = @TCI6638_TRUE@
+VERSION = @VERSION@
+ac_ct_AR = @ac_ct_AR@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_CXX = @ac_ct_CXX@
+ac_ct_RANLIB = @ac_ct_RANLIB@
+ac_ct_STRIP = @ac_ct_STRIP@
+am__fastdepCC_FALSE = @am__fastdepCC_FALSE@
+am__fastdepCC_TRUE = @am__fastdepCC_TRUE@
+am__fastdepCXX_FALSE = @am__fastdepCXX_FALSE@
+am__fastdepCXX_TRUE = @am__fastdepCXX_TRUE@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+datadir = @datadir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+oldincludedir = @oldincludedir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+
+#if DRA7XX
+#AM_CFLAGS += -DGATEMP_SUPPORT
+#endif
+
+###############################################################################
+# THE LIBRARIES TO BUILD
+###############################################################################
+
+# the library names to build (note we are building shared libs)
+lib_LTLIBRARIES = libtransport.la
+
+# where to install the headers on the system
+libtransport_ladir = $(includedir)/ti/ipc
+
+# the list of header files that belong to the library (to be installed later)
+libtransport_la_HEADERS = $(top_srcdir)/linux/include/ti/ipc/Std.h
+
+# the sources to add to the library and to add to the source distribution
+libtransport_la_SOURCES = \
+                        TransportRpmsg.c
+
+
+# Add version info to the shared library
+libtransport_la_LDFLAGS = -version-info 1:0:0
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .c .lo .o .obj
+$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am  $(am__configure_deps)
+       @for dep in $?; do \
+         case '$(am__configure_deps)' in \
+           *$$dep*) \
+             cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh \
+               && exit 0; \
+             exit 1;; \
+         esac; \
+       done; \
+       echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign  linux/src/transport/Makefile'; \
+       cd $(top_srcdir) && \
+         $(AUTOMAKE) --foreign  linux/src/transport/Makefile
+.PRECIOUS: Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+       @case '$?' in \
+         *config.status*) \
+           cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+         *) \
+           echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
+           cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
+       esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+       cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
+       cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
+       cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+install-libLTLIBRARIES: $(lib_LTLIBRARIES)
+       @$(NORMAL_INSTALL)
+       test -z "$(libdir)" || $(mkdir_p) "$(DESTDIR)$(libdir)"
+       @list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+         if test -f $$p; then \
+           f=$(am__strip_dir) \
+           echo " $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) '$$p' '$(DESTDIR)$(libdir)/$$f'"; \
+           $(LIBTOOL) --mode=install $(libLTLIBRARIES_INSTALL) $(INSTALL_STRIP_FLAG) "$$p" "$(DESTDIR)$(libdir)/$$f"; \
+         else :; fi; \
+       done
+
+uninstall-libLTLIBRARIES:
+       @$(NORMAL_UNINSTALL)
+       @set -x; list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+         p=$(am__strip_dir) \
+         echo " $(LIBTOOL) --mode=uninstall rm -f '$(DESTDIR)$(libdir)/$$p'"; \
+         $(LIBTOOL) --mode=uninstall rm -f "$(DESTDIR)$(libdir)/$$p"; \
+       done
+
+clean-libLTLIBRARIES:
+       -test -z "$(lib_LTLIBRARIES)" || rm -f $(lib_LTLIBRARIES)
+       @list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+         dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \
+         test "$$dir" != "$$p" || dir=.; \
+         echo "rm -f \"$${dir}/so_locations\""; \
+         rm -f "$${dir}/so_locations"; \
+       done
+libtransport.la: $(libtransport_la_OBJECTS) $(libtransport_la_DEPENDENCIES) 
+       $(LINK) -rpath $(libdir) $(libtransport_la_LDFLAGS) $(libtransport_la_OBJECTS) $(libtransport_la_LIBADD) $(LIBS)
+
+mostlyclean-compile:
+       -rm -f *.$(OBJEXT)
+
+distclean-compile:
+       -rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/TransportRpmsg.Plo@am__quote@
+
+.c.o:
+@am__fastdepCC_TRUE@   if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCC_TRUE@   then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@  $(COMPILE) -c $<
+
+.c.obj:
+@am__fastdepCC_TRUE@   if $(COMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ `$(CYGPATH_W) '$<'`; \
+@am__fastdepCC_TRUE@   then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Po"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@  $(COMPILE) -c `$(CYGPATH_W) '$<'`
+
+.c.lo:
+@am__fastdepCC_TRUE@   if $(LTCOMPILE) -MT $@ -MD -MP -MF "$(DEPDIR)/$*.Tpo" -c -o $@ $<; \
+@am__fastdepCC_TRUE@   then mv -f "$(DEPDIR)/$*.Tpo" "$(DEPDIR)/$*.Plo"; else rm -f "$(DEPDIR)/$*.Tpo"; exit 1; fi
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@      DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@  $(LTCOMPILE) -c -o $@ $<
+
+mostlyclean-libtool:
+       -rm -f *.lo
+
+clean-libtool:
+       -rm -rf .libs _libs
+
+distclean-libtool:
+       -rm -f libtool
+uninstall-info-am:
+install-libtransport_laHEADERS: $(libtransport_la_HEADERS)
+       @$(NORMAL_INSTALL)
+       test -z "$(libtransport_ladir)" || $(mkdir_p) "$(DESTDIR)$(libtransport_ladir)"
+       @list='$(libtransport_la_HEADERS)'; for p in $$list; do \
+         if test -f "$$p"; then d=; else d="$(srcdir)/"; fi; \
+         f=$(am__strip_dir) \
+         echo " $(libtransport_laHEADERS_INSTALL) '$$d$$p' '$(DESTDIR)$(libtransport_ladir)/$$f'"; \
+         $(libtransport_laHEADERS_INSTALL) "$$d$$p" "$(DESTDIR)$(libtransport_ladir)/$$f"; \
+       done
+
+uninstall-libtransport_laHEADERS:
+       @$(NORMAL_UNINSTALL)
+       @list='$(libtransport_la_HEADERS)'; for p in $$list; do \
+         f=$(am__strip_dir) \
+         echo " rm -f '$(DESTDIR)$(libtransport_ladir)/$$f'"; \
+         rm -f "$(DESTDIR)$(libtransport_ladir)/$$f"; \
+       done
+
+ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES)
+       list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+       unique=`for i in $$list; do \
+           if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+         done | \
+         $(AWK) '    { files[$$0] = 1; } \
+              END { for (i in files) print i; }'`; \
+       mkid -fID $$unique
+tags: TAGS
+
+TAGS:  $(HEADERS) $(SOURCES)  $(TAGS_DEPENDENCIES) \
+               $(TAGS_FILES) $(LISP)
+       tags=; \
+       here=`pwd`; \
+       list='$(SOURCES) $(HEADERS)  $(LISP) $(TAGS_FILES)'; \
+       unique=`for i in $$list; do \
+           if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+         done | \
+         $(AWK) '    { files[$$0] = 1; } \
+              END { for (i in files) print i; }'`; \
+       if test -z "$(ETAGS_ARGS)$$tags$$unique"; then :; else \
+         test -n "$$unique" || unique=$$empty_fix; \
+         $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+           $$tags $$unique; \
+       fi
+ctags: CTAGS
+CTAGS:  $(HEADERS) $(SOURCES)  $(TAGS_DEPENDENCIES) \
+               $(TAGS_FILES) $(LISP)
+       tags=; \
+       here=`pwd`; \
+       list='$(SOURCES) $(HEADERS)  $(LISP) $(TAGS_FILES)'; \
+       unique=`for i in $$list; do \
+           if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+         done | \
+         $(AWK) '    { files[$$0] = 1; } \
+              END { for (i in files) print i; }'`; \
+       test -z "$(CTAGS_ARGS)$$tags$$unique" \
+         || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+            $$tags $$unique
+
+GTAGS:
+       here=`$(am__cd) $(top_builddir) && pwd` \
+         && cd $(top_srcdir) \
+         && gtags -i $(GTAGS_ARGS) $$here
+
+distclean-tags:
+       -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+distdir: $(DISTFILES)
+       $(mkdir_p) $(distdir)/../../../linux/include/ti/ipc
+       @srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \
+       topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \
+       list='$(DISTFILES)'; for file in $$list; do \
+         case $$file in \
+           $(srcdir)/*) file=`echo "$$file" | sed "s|^$$srcdirstrip/||"`;; \
+           $(top_srcdir)/*) file=`echo "$$file" | sed "s|^$$topsrcdirstrip/|$(top_builddir)/|"`;; \
+         esac; \
+         if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+         dir=`echo "$$file" | sed -e 's,/[^/]*$$,,'`; \
+         if test "$$dir" != "$$file" && test "$$dir" != "."; then \
+           dir="/$$dir"; \
+           $(mkdir_p) "$(distdir)$$dir"; \
+         else \
+           dir=''; \
+         fi; \
+         if test -d $$d/$$file; then \
+           if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+             cp -pR $(srcdir)/$$file $(distdir)$$dir || exit 1; \
+           fi; \
+           cp -pR $$d/$$file $(distdir)$$dir || exit 1; \
+         else \
+           test -f $(distdir)/$$file \
+           || cp -p $$d/$$file $(distdir)/$$file \
+           || exit 1; \
+         fi; \
+       done
+check-am: all-am
+check: check-am
+all-am: Makefile $(LTLIBRARIES) $(HEADERS)
+installdirs:
+       for dir in "$(DESTDIR)$(libdir)" "$(DESTDIR)$(libtransport_ladir)"; do \
+         test -z "$$dir" || $(mkdir_p) "$$dir"; \
+       done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+       @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+       $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+         install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+         `test -z '$(STRIP)' || \
+           echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+       -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+
+maintainer-clean-generic:
+       @echo "This command is intended for maintainers to use"
+       @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic clean-libLTLIBRARIES clean-libtool \
+       mostlyclean-am
+
+distclean: distclean-am
+       -rm -rf ./$(DEPDIR)
+       -rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+       distclean-libtool distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+info: info-am
+
+info-am:
+
+install-data-am: install-libtransport_laHEADERS
+
+install-exec-am: install-libLTLIBRARIES
+
+install-info: install-info-am
+
+install-man:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+       -rm -rf ./$(DEPDIR)
+       -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+       mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-info-am uninstall-libLTLIBRARIES \
+       uninstall-libtransport_laHEADERS
+
+.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \
+       clean-libLTLIBRARIES clean-libtool ctags distclean \
+       distclean-compile distclean-generic distclean-libtool \
+       distclean-tags distdir dvi dvi-am html html-am info info-am \
+       install install-am install-data install-data-am install-exec \
+       install-exec-am install-info install-info-am \
+       install-libLTLIBRARIES install-libtransport_laHEADERS \
+       install-man install-strip installcheck installcheck-am \
+       installdirs maintainer-clean maintainer-clean-generic \
+       mostlyclean mostlyclean-compile mostlyclean-generic \
+       mostlyclean-libtool pdf pdf-am ps ps-am tags uninstall \
+       uninstall-am uninstall-info-am uninstall-libLTLIBRARIES \
+       uninstall-libtransport_laHEADERS
+
+
+#pkgconfig_DATA          = libtiipc.pc
+#pkgconfigdir            = $(libdir)/pkgconfig
+
+###############################################################################
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/linux/src/transport/TransportRpmsg.c b/linux/src/transport/TransportRpmsg.c
new file mode 100644 (file)
index 0000000..c07dc90
--- /dev/null
@@ -0,0 +1,639 @@
+/*
+ * Copyright (c) 2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ *  ======== TransportRpmsg.c ========
+ *  Implementation of functions specified in the IMessageQTransport interface.
+ */
+
+#include <ti/ipc/Std.h>
+
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <_MessageQ.h>
+
+/* Socket Headers */
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/eventfd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <pthread.h>
+
+/* Socket Protocol Family */
+#include <net/rpmsg.h>
+
+/* Socket utils: */
+#include <SocketFxns.h>
+
+#include <_lad.h>
+
+#include <TransportRpmsg.h>
+
+
+/* More magic rpmsg port numbers: */
+#define MESSAGEQ_RPMSG_PORT       61
+#define MESSAGEQ_RPMSG_MAXSIZE   512
+
+#define TransportRpmsg_GROWSIZE 32
+
+/* traces in this file are controlled via _TransportMessageQ_verbose */
+Bool _TransportMessageQ_verbose = FALSE;
+#define verbose _TransportMessageQ_verbose
+
+Int TransportRpmsg_bind(Void *handle, UInt32 queueId);
+Int TransportRpmsg_unbind(Void *handle, UInt32 queueId);
+Bool TransportRpmsg_put(Void *handle, Ptr msg);
+
+typedef struct TransportRpmsg_Module {
+    int             sock[MultiProc_MAXPROCESSORS];
+    fd_set          rfds;
+    int             maxFd;
+    int             inFds[1024];
+    int                    nInFds;
+    pthread_mutex_t gate;
+    int             unblockEvent;    /* eventFd for unblocking socket */
+    pthread_t       threadId;        /* ID returned by pthread_create() */
+    Bool            threadStarted;
+} TransportRpmsg_Module;
+
+IMessageQTransport_Fxns TransportRpmsg_fxns = {
+    .bind    = TransportRpmsg_bind,
+    .unbind  = TransportRpmsg_unbind,
+    .put     = TransportRpmsg_put,
+};
+
+typedef struct TransportRpmsg_Object {
+    IMessageQTransport_Object base;
+    Int status;
+    UInt16 rprocId;
+    int numQueues;
+    int *qIndexToFd;
+} TransportRpmsg_Object;
+
+TransportRpmsg_Module TransportRpmsg_state = {
+    .sock = {0},
+    .threadStarted = FALSE,
+};
+TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
+
+static Int attach(UInt16 rprocId);
+static Int detach(UInt16 rprocId);
+static void *rpmsgThreadFxn(void *arg);
+static Int transportGet(int sock, MessageQ_Msg *retMsg);
+static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
+                               Int fd,
+                               UInt16 qIndex);
+static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
+static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
+
+/* -------------------------------------------------------------------------- */
+
+/* instance convertors */
+IMessageQTransport_Handle TransportRpmsg_upCast(TransportRpmsg_Handle handle)
+{
+    TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+    return ((IMessageQTransport_Handle)&obj->base);
+}
+
+TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
+{
+    return ((TransportRpmsg_Handle)base);
+}
+
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
+                                            Int *attachStatus)
+{
+    TransportRpmsg_Object *obj;
+    Int *queues;
+    Int rv;
+
+    rv = attach(params->rprocId);
+    if (attachStatus) {
+        *attachStatus = rv;
+    }
+
+    if (rv != MessageQ_S_SUCCESS) {
+        return NULL;
+    }
+
+    obj = calloc(1, sizeof (TransportRpmsg_Object));
+
+    /* structure copy */
+    obj->base.base.interfaceType = IMessageQTransport_TypeId;
+    obj->base.fxns = &TransportRpmsg_fxns;
+    obj->rprocId = params->rprocId;
+
+    obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
+    obj->numQueues = TransportRpmsg_GROWSIZE;
+
+    return (TransportRpmsg_Handle)obj;
+}
+
+Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
+{
+    TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
+
+    detach(obj->rprocId);
+
+    free(obj->qIndexToFd);
+    free(obj);
+
+    *handlep = NULL;
+}
+
+static Int attach(UInt16 rprocId)
+{
+    Int    status = MessageQ_S_SUCCESS;
+    int    sock;
+
+    /* Create the socket for sending messages to the remote proc: */
+    sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (sock < 0) {
+        status = MessageQ_E_FAIL;
+        printf("attach: socket failed: %d (%s)\n",
+               errno, strerror(errno));
+
+        goto exit;
+    }
+
+    PRINTVERBOSE1("attach: created send socket: %d\n", sock)
+
+    /* Attempt to connect: */
+    status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
+    if (status < 0) {
+        /* is it ok to "borrow" this error code from MessageQ? */
+        status = MessageQ_E_RESOURCE;
+
+        /* don't hard-printf or exit since this is no longer fatal */
+        PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+
+        goto exitSock;
+    }
+
+    TransportRpmsg_module->sock[rprocId] = sock;
+
+    if (TransportRpmsg_module->threadStarted == FALSE) {
+        /* create a module wide event to unblock the socket select thread */
+        TransportRpmsg_module->unblockEvent = eventfd(0, 0);
+        if (TransportRpmsg_module->unblockEvent == -1) {
+            printf("attach: unblock socket failed: %d (%s)\n",
+                   errno, strerror(errno));
+            status = MessageQ_E_FAIL;
+
+            goto exitSock;
+        }
+
+        PRINTVERBOSE1("attach: created unblock event %d\n",
+                      TransportRpmsg_module->unblockEvent)
+
+        FD_ZERO(&TransportRpmsg_module->rfds);
+        FD_SET(TransportRpmsg_module->unblockEvent,
+               &TransportRpmsg_module->rfds);
+        TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+        TransportRpmsg_module->nInFds = 0;
+
+        pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+        status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+                                &rpmsgThreadFxn, NULL);
+        if (status < 0) {
+            status = MessageQ_E_FAIL;
+            printf("attach: failed to spawn thread\n");
+
+            goto exitEvent;
+        }
+        else {
+            TransportRpmsg_module->threadStarted = TRUE;
+        }
+    }
+
+    goto exit;
+
+exitEvent:
+    close(TransportRpmsg_module->unblockEvent);
+
+    FD_ZERO(&TransportRpmsg_module->rfds);
+    TransportRpmsg_module->maxFd = 0;
+
+exitSock:
+    close(sock);
+    TransportRpmsg_module->sock[rprocId] = 0;
+
+exit:
+    return status;
+}
+
+static Int detach(UInt16 rprocId)
+{
+
+    Int status = -1;
+    int sock;
+
+    sock = TransportRpmsg_module->sock[rprocId];
+
+    if (sock) {
+        PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+
+        status = close(sock);
+    }
+
+    return status;
+}
+
+Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
+{
+    TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+    UInt16   queueIndex = queueId & 0x0000ffff;
+    int      fd;
+    int      err;
+    uint64_t buf;
+    UInt16   rprocId;
+
+    rprocId = obj->rprocId;
+
+    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d queueIndex %d\n", rprocId, queueIndex)
+
+    /*  Create the socket to receive messages for this messageQ. */
+    fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+    if (fd < 0) {
+        printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
+                errno, strerror(errno));
+        goto exitClose;
+    }
+
+    PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
+
+    err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
+    if (err < 0) {
+        /* don't hard-printf since this is no longer fatal */
+        PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
+                      errno, strerror(errno))
+
+        close(fd);
+
+        return -1;
+    }
+
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /* add to our fat fd array and update select() parameters */
+    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
+    TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
+    FD_SET(fd, &TransportRpmsg_module->rfds);
+
+    pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+    bindFdToQueueIndex(obj, fd, queueIndex);
+
+    /*
+     * Even though we use the unblock event as just a signalling event with
+     * no related payload, we need to write some non-zero value.  Might as
+     * well make it the fd (which the reader could decide to use if needed).
+     */
+    buf = fd;
+    write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+
+    goto exit;
+
+exitClose:
+    TransportRpmsg_unbind(handle, fd);
+    fd = 0;
+
+exit:
+    return fd;
+}
+
+Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
+{
+    TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
+    UInt16 queueIndex = queueId & 0x0000ffff;
+    uint64_t buf;
+    Int    status = MessageQ_S_SUCCESS;
+    int    maxFd;
+    int    fd;
+    int    i;
+    int    j;
+
+    fd = queueIndexToFd(obj, queueIndex);
+    if (!fd) {
+        PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
+                      queueId)
+
+        return -1;
+    }
+
+    PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
+
+    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+    /* remove from input fd array */
+    for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+        if (TransportRpmsg_module->inFds[i] == fd) {
+            TransportRpmsg_module->nInFds--;
+
+            /* shift subsequent elements down */
+            for (j = i; j < TransportRpmsg_module->nInFds; j++) {
+                TransportRpmsg_module->inFds[j] =
+                    TransportRpmsg_module->inFds[j + 1];
+            }
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
+
+            FD_CLR(fd, &TransportRpmsg_module->rfds);
+            if (fd == TransportRpmsg_module->maxFd) {
+                /* find new max fd */
+                maxFd = TransportRpmsg_module->unblockEvent;
+                for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
+                    maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
+                }
+                TransportRpmsg_module->maxFd = maxFd;
+            }
+
+            /*
+             * Even though we use the unblock event as just a signalling
+             * event with no related payload, we need to write some non-zero
+             * value.  Might as well make it the fd (which the reader could
+             * decide to use if needed).
+             */
+            buf = fd;
+            write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+
+            break;
+        }
+
+        close(fd);
+    }
+
+    unbindQueueIndex(obj, queueIndex);
+
+    pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+    return status;
+}
+
+Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
+{
+    MessageQ_Msg msg  = (MessageQ_Msg)pmsg;
+    Int     status    = TRUE;
+    int     sock;
+    int     err;
+    UInt16  dstProcId = msg->dstProc;
+
+    /*
+     * Retrieve the socket for the AF_SYSLINK protocol associated with this
+     * transport.
+     */
+    sock = TransportRpmsg_module->sock[dstProcId];
+    if (!sock) {
+        return FALSE;
+    }
+
+    PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
+
+    err = send(sock, msg, msg->msgSize, 0);
+    if (err < 0) {
+        printf("TransportRpmsg_put: send failed: %d (%s)\n",
+               errno, strerror(errno));
+        status = FALSE;
+
+        goto exit;
+    }
+
+    /*
+     * Free the message, as this is a copy transport, we maintain MessageQ
+     * semantics.
+     */
+    MessageQ_free(msg);
+
+exit:
+    return status;
+}
+
+Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
+{
+    return FALSE;
+}
+
+void *rpmsgThreadFxn(void *arg)
+{
+    static int lastFdx = 0;
+    int      curFdx = 0;
+    Int      status = MessageQ_S_SUCCESS;
+    Int      tmpStatus;
+    int      retval;
+    uint64_t buf;
+    fd_set   rfds;
+    int      maxFd;
+    int      nfds;
+    MessageQ_Msg     retMsg;
+    MessageQ_QueueId queueId;
+
+    while (TRUE) {
+        pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+        maxFd = TransportRpmsg_module->maxFd;
+        rfds = TransportRpmsg_module->rfds;
+        nfds = TransportRpmsg_module->nInFds;
+
+        pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+        PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
+                      (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+
+        retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
+        if (retval) {
+            if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+                /*
+                 * Our event was signalled by TransportRpmsg_bind()
+                 * or TransportRpmsg_unbind() to tell us that the set of
+                 * fds has changed.
+                 */
+                PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
+
+                /* we don't need the written value */
+                read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
+            }
+            else {
+                /* start where we last left off */
+                curFdx = lastFdx;
+
+                /*
+                 * The set of fds that's used by select has been recorded
+                 * locally, but the array of fds that are scanned below is
+                 * a changing set (MessageQ_create/delete() can change it).
+                 * While this might present an issue in itself, one key
+                 * takeaway is that 'nfds' must not be zero else the % below
+                 * will cause a divide-by-zero exception.  We won't even get
+                 * here if nfds == 0 since it's a local copy of the module's
+                 * 'nInFds' which has to be > 0 for us to get here.  So, even
+                 * though the module's 'nInFds' might go to 0 during this loop,
+                 * the loop itself will still remain intact.
+                 */
+                do {
+                    if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
+
+                        PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+                                      TransportRpmsg_module->inFds[curFdx])
+
+                        /* transport input fd was signalled: get the message */
+                        tmpStatus = transportGet(
+                            TransportRpmsg_module->inFds[curFdx], &retMsg);
+                        if (tmpStatus < 0) {
+                            printf("rpmsgThreadFxn: transportGet failed.");
+                            status = MessageQ_E_FAIL;
+                        }
+                        else {
+                            queueId = MessageQ_getDstQueue(retMsg);
+
+                            PRINTVERBOSE1("rpmsgThreadFxn: got message, delivering to queueId 0x%x\n", queueId)
+
+                            MessageQ_put(queueId, retMsg);
+                        }
+
+                        lastFdx = (curFdx + 1) % nfds;
+
+                        break;
+                    }
+
+                    curFdx = (curFdx + 1) % nfds;
+                } while (curFdx != lastFdx);
+            }
+        }
+    }
+
+    return (void *)status;
+}
+
+/*
+ * ======== transportGet ========
+ *  Retrieve a message waiting in the socket's queue.
+*/
+static Int transportGet(int sock, MessageQ_Msg *retMsg)
+{
+    Int           status    = MessageQ_S_SUCCESS;
+    MessageQ_Msg  msg;
+    struct sockaddr_rpmsg fromAddr;  /* [Socket address of sender] */
+    unsigned int  len;
+    int           byteCount;
+
+    /*
+     * We have no way of peeking to see what message size we'll get, so we
+     * allocate a message of max size to receive contents from the rpmsg socket
+     * (currently, a copy transport)
+     */
+    msg = MessageQ_alloc(0, MESSAGEQ_RPMSG_MAXSIZE);
+    if (!msg) {
+        status = MessageQ_E_MEMORY;
+        goto exit;
+    }
+
+    memset(&fromAddr, 0, sizeof (fromAddr));
+    len = sizeof (fromAddr);
+
+    byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
+                         (struct sockaddr *)&fromAddr, &len);
+    if (len != sizeof (fromAddr)) {
+        printf("recvfrom: got bad addr len (%d)\n", len);
+        status = MessageQ_E_FAIL;
+        goto exit;
+    }
+    if (byteCount < 0) {
+        printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
+        status = MessageQ_E_FAIL;
+        goto exit;
+    }
+    else {
+         /*
+          * Update the allocated message size (even though this may waste
+          * space when the actual message is smaller than the maximum rpmsg
+          * size, the message will be freed soon anyway, and it avoids an
+          * extra copy).
+          */
+         msg->msgSize = byteCount;
+
+         /*
+          * If the message received was statically allocated, reset the
+          * heapId, so the app can free it.
+          */
+         if (msg->heapId == MessageQ_STATICMSG)  {
+             msg->heapId = 0;  /* for a copy transport, heap id is 0. */
+         }
+    }
+
+    PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
+    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
+    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+
+    *retMsg = msg;
+
+exit:
+    return status;
+}
+
+Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
+{
+    Int *queues;
+    Int *oldQueues;
+    UInt oldSize;
+
+    if (qIndex >= obj->numQueues) {
+        PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
+                      qIndex + TransportRpmsg_GROWSIZE)
+
+        oldSize = obj->numQueues * sizeof (Int);
+
+        queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
+        memcpy(queues, obj->qIndexToFd, oldSize);
+
+        oldQueues = obj->qIndexToFd;
+        obj->qIndexToFd = queues;
+        obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
+
+        free(oldQueues);
+    }
+
+    obj->qIndexToFd[qIndex] = fd;
+}
+
+Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
+{
+    obj->qIndexToFd[qIndex] = 0;
+}
+
+Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
+{
+    return obj->qIndexToFd[qIndex];
+}
+
index 4971dffa90885c40dd75ad0ea6bcb0d17c8914fb..0453f4134af1b9f254ecf6a7d482c3dd382ed266 100644 (file)
@@ -535,6 +535,21 @@ typedef enum {
  */
 typedef Void (*MessageQ_FreeHookFxn)(Bits16 heapId, Bits16 msgId);
 
+#ifdef STD_H
+#include <ITransport.h>
+#include <IMessageQTransport.h>
+#else
+#include <ti/sdo/ipc/interfaces/ITransport.h>
+#include <ti/sdo/ipc/interfaces/IMessageQTransport.h>
+#endif
+
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+                                UInt16 rprocId, UInt priority);
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority);
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst);
+Void MessageQ_unregisterTransportId(UInt tid);
+
+
 /* =============================================================================
  *  MessageQ Module-wide Functions
  * =============================================================================
diff --git a/packages/ti/ipc/tests/messageq_multimulti.c b/packages/ti/ipc/tests/messageq_multimulti.c
new file mode 100644 (file)
index 0000000..746deba
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ *  ======== messageq_multi.c ========
+ *
+ *  Test for messageq operating in multiple simultaneous threads.
+ *
+ */
+
+#include <xdc/std.h>
+#include <xdc/runtime/Error.h>
+#include <xdc/runtime/Memory.h>
+#include <xdc/runtime/System.h>
+
+#include <ti/sysbios/BIOS.h>
+#include <ti/sysbios/knl/Task.h>
+#include <ti/sysbios/heaps/HeapBuf.h>
+
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+
+#define SLAVE_MESSAGEQNAME "SLAVE"
+#define HOST_MESSAGEQNAME "HOST"
+#define NUMTHREADS 25
+#define NUMLOOPS 1000
+
+static int numTests = 0;
+
+#undef BENCHMARK
+
+/*
+ *  ======== loopbackFxn========
+ *  Receive and return messages.
+ *  Run at priority lower than tsk1Fxn above.
+ *  Inputs:
+ *     - arg0: number of the thread, appended to MessageQ host and slave names.
+ */
+Void loopbackFxn(UArg arg0, UArg arg1)
+{
+    MessageQ_Msg     getMsg;
+    MessageQ_Handle  messageQ;
+    MessageQ_QueueId remoteQueueId;
+    Int              status;
+    UInt16           msgId = 0;
+    Char             localQueueName[64];
+    Char             hostQueueName[64];
+
+    System_printf("Thread loopbackFxn: %d\n", arg0);
+
+    System_sprintf(localQueueName, "%s_%d%d", SLAVE_MESSAGEQNAME, arg0, arg1);
+    System_sprintf(hostQueueName,  "%s_%d", HOST_MESSAGEQNAME,  arg0);
+
+    /* Create a message queue. */
+    messageQ = MessageQ_create(localQueueName, NULL);
+    if (messageQ == NULL) {
+        System_abort("MessageQ_create failed\n");
+    }
+
+    System_printf("loopbackFxn: created MessageQ: %s; QueueID: 0x%x\n",
+        localQueueName, MessageQ_getQueueId(messageQ));
+
+    System_printf("Start the main loop: %d\n", arg0);
+    while (msgId < NUMLOOPS) {
+        /* Get a message */
+        status = MessageQ_get(messageQ, &getMsg, MessageQ_FOREVER);
+        if (status != MessageQ_S_SUCCESS) {
+           System_abort("This should not happen since timeout is forever\n");
+        }
+        remoteQueueId = MessageQ_getReplyQueue(getMsg);
+
+#ifndef BENCHMARK
+        System_printf("%d: Received message #%d from core %d\n",
+                arg0, MessageQ_getMsgId(getMsg),
+                MessageQ_getProcId(remoteQueueId));
+#endif
+        /* test id of message received */
+        if (MessageQ_getMsgId(getMsg) != msgId) {
+            System_abort("The id received is incorrect!\n");
+        }
+
+#ifndef BENCHMARK
+        /* Send it back */
+        System_printf("%d: Sending message Id #%d to core %d\n",
+                      arg0, msgId, MessageQ_getProcId(remoteQueueId));
+#endif
+        status = MessageQ_put(remoteQueueId, getMsg);
+        if (status != MessageQ_S_SUCCESS) {
+           System_abort("MessageQ_put had a failure/error\n");
+        }
+        msgId++;
+    }
+
+    MessageQ_delete(&messageQ);
+    numTests += NUMLOOPS;
+
+    System_printf("Test thread %d complete!\n", arg0);
+}
+
+/*
+ *  ======== main ========
+ */
+Int main(Int argc, Char* argv[])
+{
+    Task_Params params;
+    Int i;
+
+    System_printf("%s:main: MultiProc id = %d\n", __FILE__, MultiProc_self());
+
+    /* Create N threads to correspond with host side N thread test app: */
+    Task_Params_init(&params);
+    params.priority = 3;
+    params.arg1 = MultiProc_self();
+    for (i = 0; i < NUMTHREADS; i++) {
+        params.arg0 = i;
+        Task_create(loopbackFxn, &params, NULL);
+    }
+
+    BIOS_start();
+
+    return (0);
+ }
index 36319771c883cae70f7aa9266a6a311a6ea0041e..2d27e446f374631865772decf444a5d9ddc301e2 100644 (file)
@@ -559,6 +559,12 @@ for (var i = 0; i < Build.targets.length; i++) {
             defs: "-D BENCHMARK" + extraDefs
         }).addObjects(["messageq_multi.c"]);
 
+        /* messageq_multimulti */
+        Pkg.addExecutable(name + "/messageq_multimulti", targ, platform, {
+            cfgScript: "rpmsg_transport",
+            defs: "-D BENCHMARK" + extraDefs
+        }).addObjects(["messageq_multimulti.c"]);
+
         /* messageq_single */
         if (platform.match(/^ti\.platforms\.sdp5430/) &&
                 (targ.isa == "64T")) {