Add fault tolerance for TransportRpmsg socket failure
authorRobert Tivy <rtivy@ti.com>
Sat, 25 Apr 2015 00:09:07 +0000 (17:09 -0700)
committerRobert Tivy <rtivy@ti.com>
Sat, 25 Apr 2015 00:25:33 +0000 (17:25 -0700)
Modify TransportRpmsg to detect and handle underlying socket failures.
A socket failure will typically be caused by a crashed/reloaded remote
core executable.  TransportRpmsg will now alert the MessageQ layer of
its failure, allowing MessageQ to return back to the user application
with a specific error code, allowing the user application to clean up
and re-establish IPC with the newly loaded remote core (without having
to exit the app and/or restart LAD).

Add a fault generation test based on MessageQApp.

linux/src/api/MessageQ.c
linux/src/tests/Makefile.am
linux/src/tests/fault.c [new file with mode: 0644]
linux/src/transport/TransportRpmsg.c
packages/ti/ipc/MessageQ.h
packages/ti/ipc/tests/fault.c [new file with mode: 0644]
packages/ti/ipc/tests/package.bld

index b229e1b26193c28038929feeb2068fae1ec0ebdd..9fba28790e90bb7571234d538a6fd29d16c7b4a0 100644 (file)
@@ -957,7 +957,7 @@ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
     }
 
     if (obj->unblocked) {
-        return MessageQ_E_UNBLOCKED;
+        return obj->unblocked;
     }
 
     pthread_mutex_lock(&MessageQ_module->gate);
@@ -1073,7 +1073,16 @@ Void MessageQ_unblock(MessageQ_Handle handle)
 {
     MessageQ_Object *obj = (MessageQ_Object *)handle;
 
-    obj->unblocked = TRUE;
+    obj->unblocked = MessageQ_E_UNBLOCKED;
+    sem_post(&obj->synchronizer);
+}
+
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
+{
+    MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+    obj->unblocked = MessageQ_E_SHUTDOWN;
     sem_post(&obj->synchronizer);
 }
 
@@ -1097,6 +1106,24 @@ MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
     return queueId;
 }
 
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+    MessageQ_Object *obj;
+    MessageQ_QueueIndex queueIndex;
+    UInt16 procId;
+
+    procId = MessageQ_getProcId(queueId);
+    if (procId != MultiProc_self()) {
+        return NULL;
+    }
+
+    queueIndex = MessageQ_getQueueIndex(queueId);
+    obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+    return (MessageQ_Handle)obj;
+}
+
 /* Sets the tracing of a message */
 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
 {
index f6e1ed1e03fe771da75224bc48cd86354973542a..22e772b177775cd7367e948dc5cfa32c61be93ef 100644 (file)
@@ -63,7 +63,7 @@ VPATH = ../../../packages/ti/ipc/tests
 
 # the program to build (the names of the final binaries)
 bin_PROGRAMS = ping_rpmsg MessageQApp  MessageQBench MessageQMulti \
-                MessageQMultiMulti NameServerApp Msgq100
+                MessageQMultiMulti NameServerApp Msgq100 fault
 
 
 if OMAP54XX_SMP
@@ -153,6 +153,9 @@ mmrpc_test_SOURCES = $(top_srcdir)/packages/ti/ipc/tests/Mx.c \
                 $(top_srcdir)/packages/ti/ipc/tests/Mx.h \
                 $(top_srcdir)/packages/ti/ipc/tests/mmrpc_test.c
 
+# list of sources for the 'fault' binary
+fault_SOURCES = $(common_sources) fault.c
+
 # list of sources for the 'MessageQApp' binary
 MessageQApp_SOURCES = $(common_sources) MessageQApp.c
 
@@ -196,6 +199,10 @@ mmrpc_test_LDADD = $(common_libraries) \
                 $(DRM_PREFIX)/usr/lib/libdrm.la \
                 $(DRM_PREFIX)/usr/lib/libdrm_omap.la
 
+# the additional libraries needed to link fault
+fault_LDADD = $(common_libraries) \
+                $(AM_LDFLAGS)
+
 # the additional libraries needed to link MessageQApp
 MessageQApp_LDADD = $(common_libraries) \
                 $(AM_LDFLAGS)
diff --git a/linux/src/tests/fault.c b/linux/src/tests/fault.c
new file mode 100644 (file)
index 0000000..1c97c26
--- /dev/null
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/* =============================================================================
+ *  @file   fault.c
+ *
+ *  @brief  Sample application for fault recovery between MPU and Remote Proc
+ *
+ *  ============================================================================
+ */
+
+/* Standard headers */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+/* IPC Headers */
+#include <ti/ipc/Std.h>
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+
+/* App defines:  Must match on remote proc side: */
+#define HEAPID              0u
+#define SLAVE_MESSAGEQNAME  "SLAVE"
+#define MPU_MESSAGEQNAME    "HOST"
+
+#define PROC_ID_DFLT        1     /* Host is zero, remote cores start at 1 */
+#define NUM_LOOPS_DFLT   100
+
+typedef struct SyncMsg {
+    MessageQ_MsgHeader header;
+    UInt32 numLoops;  /* also used for msgId */
+    UInt32 print;
+    Int32 faultId;
+} SyncMsg ;
+
+Int MessageQApp_execute(UInt32 numLoops, UInt16 procId, UInt32 faultId)
+{
+    Int32                    status = 0;
+    MessageQ_Msg             msg = NULL;
+    MessageQ_Params          msgParams;
+    UInt32                   i;
+    MessageQ_QueueId         queueId = MessageQ_INVALIDMESSAGEQ;
+    MessageQ_Handle          msgqHandle;
+    char                     remoteQueueName[64];
+    UInt32                   msgId;
+
+    printf("Entered MessageQApp_execute\n");
+
+    /* Create the local Message Queue for receiving. */
+    MessageQ_Params_init(&msgParams);
+    msgqHandle = MessageQ_create(MPU_MESSAGEQNAME, &msgParams);
+    if (msgqHandle == NULL) {
+        printf("Error in MessageQ_create\n");
+        goto exit;
+    }
+    else {
+        printf("Local MessageQId: 0x%x\n", MessageQ_getQueueId(msgqHandle));
+    }
+
+    sprintf(remoteQueueName, "%s_%s", SLAVE_MESSAGEQNAME,
+             MultiProc_getName(procId));
+
+    /* Poll until remote side has it's messageQ created before we send: */
+    do {
+        status = MessageQ_open(remoteQueueName, &queueId);
+        sleep (1);
+    } while (status == MessageQ_E_NOTFOUND);
+
+    if (status < 0) {
+        printf("Error in MessageQ_open [%d]\n", status);
+        goto cleanup;
+    }
+    else {
+        printf("Remote queueId  [0x%x]\n", queueId);
+    }
+
+    msg = MessageQ_alloc(HEAPID, sizeof(SyncMsg));
+    if (msg == NULL) {
+        printf("Error in MessageQ_alloc\n");
+        MessageQ_close(&queueId);
+        goto cleanup;
+    }
+
+    /* handshake with remote to set the number of loops */
+    MessageQ_setReplyQueue(msgqHandle, msg);
+    ((SyncMsg *)msg)->numLoops = numLoops;
+    ((SyncMsg *)msg)->print = FALSE;
+    MessageQ_put(queueId, msg);
+    MessageQ_get(msgqHandle, &msg, MessageQ_FOREVER);
+
+    printf("Exchanging %d messages with remote processor %s...\n",
+           numLoops, MultiProc_getName(procId));
+
+    for (i = 1 ; i <= numLoops; i++) {
+        ((SyncMsg *)msg)->numLoops = i;
+        ((SyncMsg *)msg)->faultId = faultId;
+
+        /* Have the remote proc reply to this message queue */
+        MessageQ_setReplyQueue(msgqHandle, msg);
+
+        if (faultId != 0) {
+            printf("About to send fault command, hit ENTER to continue...\n");
+            getchar();
+        }
+
+        status = MessageQ_put(queueId, msg);
+        if (status < 0) {
+            printf("Error in MessageQ_put [%d]\n", status);
+            MessageQ_free(msg);
+            break;
+        }
+
+        status = MessageQ_get(msgqHandle, &msg, MessageQ_FOREVER);
+
+        if (status < 0) {
+            printf("Error in MessageQ_get [%d]\n", status);
+            break;
+        }
+        else {
+            /* validate the returned message */
+            msgId = ((SyncMsg *)msg)->numLoops;
+            if ((msg != NULL) && (msgId != i)) {
+                printf("Data integrity failure!\n"
+                        "    Expected %d\n"
+                        "    Received %d\n",
+                        i, msgId);
+                break;
+            }
+        }
+
+        if (numLoops <= 200) {
+            printf("MessageQ_get #%d Msg = 0x%x\n", i, (UInt)msg);
+        }
+        else if ((i % 1000) == 0) {
+            printf("MessageQ_get #%d Msg = 0x%x\n", i, (UInt)msg);
+        }
+    }
+
+    printf("Exchanged %d messages with remote processor %s\n",
+        (i-1), MultiProc_getName(procId));
+
+    if (status >= 0) {
+        printf("Sample application successfully completed!\n");
+        MessageQ_free(msg);
+    }
+
+    MessageQ_close(&queueId);
+
+cleanup:
+    /* Clean-up */
+    if (MessageQ_delete(&msgqHandle) < 0) {
+        printf("Error in MessageQ_delete [%d]\n", status);
+    }
+
+exit:
+    printf("Leaving MessageQApp_execute\n\n");
+
+    return (status);
+}
+
+int main (int argc, char ** argv)
+{
+    Int status = 0;
+    int opt;
+    UInt numLoops = NUM_LOOPS_DFLT;
+    UInt16 procId = PROC_ID_DFLT;
+    UInt32 faultId = 0;
+
+    while ((opt = getopt(argc, argv, "f:")) != -1) {
+        switch (opt) {
+          case 'f':
+            /*
+             * Argument for -f corresponds to remote-side "fault" commands.
+             * Negative commands cause remote fault before remote MessageQ_put.
+             * Positive commands cause remote fault after remote MessageQ_put.
+             */
+            faultId = atoi(optarg);
+            printf("fault %d will be sent in 1st msg\n", faultId);
+            break;
+
+          default:
+            fprintf(stderr, "Unknown arg '%s'\n", optarg);
+            return 1;
+        }
+    }
+
+    /* Parse Args: */
+    switch (argc - optind + 1) {
+        case 1:
+           /* use defaults */
+           break;
+        case 2:
+           numLoops   = atoi(argv[optind]);
+           break;
+        case 3:
+           numLoops   = atoi(argv[optind]);
+           procId     = atoi(argv[optind + 1]);
+           break;
+        default:
+           printf("Usage: %s [<numLoops>] [<ProcId>]\n", argv[0]);
+           printf("\tDefaults: numLoops: %d; ProcId: %d\n",
+                   NUM_LOOPS_DFLT, PROC_ID_DFLT);
+           exit(0);
+    }
+
+    /* configure the transport factory */
+    Ipc_transportConfig(&TransportRpmsg_Factory);
+
+    /* IPC initialization */
+    status = Ipc_start();
+
+    if (status < 0) {
+        printf("Error: Ipc_start failed, error=%d\n", status);
+        goto exit;
+    }
+
+    if ((procId == 0) || (procId >= MultiProc_getNumProcessors())) {
+        printf("ProcId (%d) must be nonzero and less than %d\n",
+                procId, MultiProc_getNumProcessors());
+        Ipc_stop();
+        exit(0);
+    }
+    printf("Using numLoops: %d; procId : %d\n", numLoops, procId);
+
+    if (MessageQApp_execute(numLoops, procId, faultId) < 0) {
+        int nAttachAttempts = 0;
+
+        printf("MessageQApp_execute failed, attempting detach/attach...\n");
+        Ipc_detach(procId);
+        while (Ipc_attach(procId) != Ipc_S_SUCCESS) {
+            nAttachAttempts++;
+            if ((nAttachAttempts % 1000) == 0) {
+                printf("Ipc_attach(%d) failed\n", procId);
+            }
+        }
+        printf("Ipc_attach(%d) succeeded (after %d tries)\n", procId, nAttachAttempts);
+
+        /* call without fault this time */
+        MessageQApp_execute(numLoops, procId, 0);
+    }
+
+    Ipc_stop();
+
+exit:
+    return (status);
+}
index 518fcf60cb5da20432edfcb729352e3185f4b131..97936f6c8fc62e2fdd5f59d3853ade33884a3f6a 100644 (file)
@@ -92,7 +92,10 @@ typedef struct TransportRpmsg_Module {
     int             sock[MultiProc_MAXPROCESSORS];
     fd_set          rfds;
     int             maxFd;
-    int             inFds[1024];
+    struct {
+        int     fd;
+        UInt32  qId;
+    } inFds[1024];
     int                    nInFds;
     pthread_mutex_t gate;
     int             unblockEvent;    /* unblock the dispatch thread */
@@ -309,7 +312,8 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
             (int)event, (unsigned int)tid);
 
     /* add to our fat fd array and update select() parameters */
-    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
+    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = fd;
+    TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++].qId = queueId;
     TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
     FD_SET(fd, &TransportRpmsg_module->rfds);
     bindFdToQueueIndex(obj, fd, queuePort);
@@ -363,7 +367,7 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
 
     /* remove from input fd array */
     for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
-        if (TransportRpmsg_module->inFds[i] == fd) {
+        if (TransportRpmsg_module->inFds[i].fd == fd) {
             TransportRpmsg_module->nInFds--;
 
             /* shift subsequent elements down */
@@ -371,7 +375,8 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
                 TransportRpmsg_module->inFds[j] =
                         TransportRpmsg_module->inFds[j + 1];
             }
-            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].fd = -1;
+            TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds].qId = 0;
             break;
         }
     }
@@ -382,7 +387,7 @@ Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
         /* find new max fd */
         maxFd = TransportRpmsg_module->unblockEvent;
         for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
-            maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
+            maxFd = _MAX(TransportRpmsg_module->inFds[i].fd, maxFd);
         }
         TransportRpmsg_module->maxFd = maxFd;
     }
@@ -463,8 +468,10 @@ void *rpmsgThreadFxn(void *arg)
     int      nfds;
     MessageQ_Msg     retMsg;
     MessageQ_QueueId queueId;
+    MessageQ_Handle handle;
     Bool run = TRUE;
     int i;
+    int j;
     int fd;
 
 
@@ -486,16 +493,52 @@ void *rpmsgThreadFxn(void *arg)
 
         /* dispatch all pending messages, do this first */
         for (i = 0; i < nfds; i++) {
-            fd = TransportRpmsg_module->inFds[i];
+            fd = TransportRpmsg_module->inFds[i].fd;
 
             if (FD_ISSET(fd, &rfds)) {
                 PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
-                        TransportRpmsg_module->inFds[i]);
+                        TransportRpmsg_module->inFds[i].fd);
 
                 /* transport input fd was signalled: get the message */
                 tmpStatus = transportGet(fd, &retMsg);
                 if (tmpStatus < 0) {
-                    printf("rpmsgThreadFxn: transportGet failed\n");
+                    printf("rpmsgThreadFxn: transportGet failed on fd %d,"
+                           " returned %d\n", fd, tmpStatus);
+
+                    pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+                    /*
+                     * Don't close(fd) at this time since it will get closed
+                     * later when MessageQ_delete() is called in response to
+                     * this failure.  Just remove fd's bit from the select mask
+                     * 'rfds' for now, but don't remove it from inFds[].
+                     */
+                    FD_CLR(fd, &TransportRpmsg_module->rfds);
+                    if (fd == TransportRpmsg_module->maxFd) {
+                        /* find new max fd */
+                        maxFd = TransportRpmsg_module->unblockEvent;
+                        for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
+                            maxFd = _MAX(TransportRpmsg_module->inFds[j].fd,
+                                         maxFd);
+                        }
+                        TransportRpmsg_module->maxFd = maxFd;
+                    }
+                    queueId = TransportRpmsg_module->inFds[i].qId;
+
+                    pthread_mutex_unlock(&TransportRpmsg_module->gate);
+
+                    handle = MessageQ_getLocalHandle(queueId);
+
+                    PRINTVERBOSE2("rpmsgThreadFxn: shutting down MessageQ "
+                                  "%p (queueId 0x%x)...\n", handle, queueId)
+
+                    if (handle != NULL) {
+                        MessageQ_shutdown(handle);
+                    }
+                    else {
+                        printf("rpmsgThreadFxn: MessageQ_getLocalHandle(0x%x) "
+                               "returned NULL, can't shutdown\n", queueId);
+                    }
                 }
                 else {
                     queueId = MessageQ_getDstQueue(retMsg);
@@ -574,12 +617,17 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
     if (len != sizeof (fromAddr)) {
         printf("recvfrom: got bad addr len (%d)\n", len);
         status = MessageQ_E_FAIL;
-        goto exit;
+        goto freeMsg;
     }
     if (byteCount < 0) {
         printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
-        status = MessageQ_E_FAIL;
-        goto exit;
+        if (errno == ESHUTDOWN) {
+            status = MessageQ_E_SHUTDOWN;
+        }
+        else {
+            status = MessageQ_E_FAIL;
+        }
+        goto freeMsg;
     }
     else {
          /*
@@ -607,6 +655,11 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
 
     *retMsg = msg;
 
+    goto exit;
+
+freeMsg:
+    MessageQ_free(msg);
+
 exit:
     return status;
 }
index e1a030bce92fe8a3dc9b1b8c9a9dbcdbc7c20e30..70cbd7533141cce474b8ee67d7b617b3aff72f40 100644 (file)
@@ -254,6 +254,11 @@ extern "C" {
  */
 #define MessageQ_E_UNBLOCKED            (-19)
 
+/*!
+ *  @brief  MessageQ was shutdown
+ */
+#define MessageQ_E_SHUTDOWN             (-20)
+
 /* =============================================================================
  *  Macros
  * =============================================================================
@@ -670,6 +675,12 @@ Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority);
  * =============================================================================
  */
 
+/** @cond INTERNAL */
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId);
+/** @endcond INTERNAL */
+
+
 /** @cond INTERNAL */
 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version);
 /** @endcond INTERNAL */
@@ -1032,8 +1043,10 @@ Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn);
  *  returns immediately and if no message is available, the msg
  *  is set to NULL and the status is #MessageQ_E_TIMEOUT. The
  *  #MessageQ_E_UNBLOCKED status is return, if MessageQ_unblock is called
- *  on the MessageQ handle. If a message is successfully retrieved, the msg
- *  is set to the message and a #MessageQ_S_SUCCESS status is returned.
+ *  on the MessageQ handle. The #MessageQ_E_SHUTDOWN status is returned if
+ *  MessageQ_shutdown is called on the MessageQ handle. If a message is
+ *  successfully retrieved, the msg is set to the message and a
+ *  #MessageQ_S_SUCCESS status is returned.
  *
  *  @param[in]  handle      MessageQ handle
  *  @param[out] msg         Pointer to the message
@@ -1048,6 +1061,7 @@ Void MessageQ_setPutHookFxn(MessageQ_PutHookFxn putHookFxn);
  *
  *  @sa         MessageQ_put()
  *  @sa         MessageQ_unblock()
+ *  @sa         MessageQ_shutdown()
  */
 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout);
 
@@ -1150,9 +1164,44 @@ Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg);
  *  @param[in]  handle      MessageQ handle
  *
  *  @sa         MessageQ_get
+ *  @sa         MessageQ_shutdown()
  */
 Void MessageQ_unblock(MessageQ_Handle handle);
 
+/*!
+ *  @brief      Shuts down a MessageQ
+ *
+ *  Similar to MessageQ_unblock(), MessageQ_shutdown() unblocks a reader thread
+ *  that is blocked on a MessageQ_get(), but causes a different return code
+ *  to be returned from MessageQ_get().  The MessageQ_get() call will return
+ *  with status #MessageQ_E_SHUTDOWN indicating that it returned due to a
+ *  MessageQ_shutdown() rather than MessageQ_unblock(), a timeout or a
+ *  received message.  This call is intended to be used by MessageQ transports
+ *  when the transport detects that the transport framework corresponding to
+ *  the MessageQ has become unusable.  This call should only be used during a
+ *  shutdown sequence in order to ensure that there is no blocked reader on a
+ *  queue before deleting the queue.  A queue may not be used after it has been
+ *  shut down.
+ *
+ *  MessageQ_shutdown() works by raising a flag in the queue indicating that it
+ *  is shut down and then signaling the synchronizer that is configured with
+ *  the target queue.  If MessageQ_shutdown() is called upon a queue that has
+ *  no blocked listeners, then any subsequent MessageQ_get will not block and
+ *  will immediately return #MessageQ_E_SHUTDOWN regardless of whether there
+ *  is a message on the queue.
+ *
+ *  Restrictions:
+ *  -  A queue may not be used after it has been shut down.
+ *  -  MessageQ_shutdown() may only be called on a local queue.
+ *  -  May only be used with a queue configured with a blocking synchronizer.
+ *
+ *  @param[in]  handle      MessageQ handle
+ *
+ *  @sa         MessageQ_get
+ *  @sa         MessageQ_unblock
+ */
+Void MessageQ_shutdown(MessageQ_Handle handle);
+
 #if defined (__cplusplus)
 }
 #endif /* defined (__cplusplus) */
diff --git a/packages/ti/ipc/tests/fault.c b/packages/ti/ipc/tests/fault.c
new file mode 100644 (file)
index 0000000..6e831ab
--- /dev/null
@@ -0,0 +1,206 @@
+/*
+ * Copyright (c) 2012-2014, Texas Instruments Incorporated
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * *  Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ *
+ * *  Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * *  Neither the name of Texas Instruments Incorporated nor the names of
+ *    its contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ *  ======== messageq_single.c ========
+ *
+ *  Single threaded test of messageq over rpmsg.
+ *
+ *  See:
+ *      MessageQApp in Linux user space
+ *
+ */
+#include <xdc/std.h>
+#include <xdc/runtime/Assert.h>
+#include <xdc/runtime/System.h>
+
+#include <ti/sysbios/BIOS.h>
+#include <ti/sysbios/knl/Task.h>
+#include <ti/sysbios/knl/Clock.h>
+
+#include <ti/ipc/MessageQ.h>
+
+#define SLAVE_MESSAGEQNAME "SLAVE"
+
+#define MessageQ_payload(m) ((void *)((char *)(m) + sizeof(MessageQ_MsgHeader)))
+
+Int32 fxnFault(UInt32 faultId)
+{
+    Int32 a;
+    Void (*fxn)(void) = (Void (*)())0x96000000;
+    volatile Int dummy = 0;
+
+    switch (faultId) {
+        case 0:  /* no fault */
+            return 0;
+        case 1:
+            System_printf("Generating read MMU Fault...\n");
+            a = *(volatile int *)(0x96000000);
+            break;
+        case 2:
+            System_printf("Generating write MMU Fault...\n");
+            *(volatile int *)(0x96000000) = 0x1;
+            break;
+        case 3:
+            System_printf("Generating program MMU Fault...\n");
+            fxn();
+            break;
+        case 4:
+            System_printf("Generating exception (w/ divide-by-zero...\n");
+            dummy = dummy / dummy;
+            break;
+        case 5:
+            System_printf("Generating Watchdog interrupt...\n");
+            dummy = 1;
+            while(dummy);
+            break;
+        default:
+            System_printf("Invalid fxnFault test\n");
+            break;
+    }
+
+    /* don't really need to return 'a', but shuts up compiler warning */
+    return(a);
+}
+
+/*
+ *  ======== tsk1Fxn ========
+ *  Receive and return messages
+ */
+Void tsk1Fxn(UArg arg0, UArg arg1)
+{
+    MessageQ_Msg msg;
+    MessageQ_Handle  messageQ;
+    MessageQ_QueueId remoteQueueId;
+    Char             localQueueName[64];
+    UInt16 procId;
+    Int status;
+    UInt32 i;
+    UInt32 msgId;
+    UInt32 start;
+    UInt32 end;
+    UInt32 numLoops;
+    UInt32 print;
+    UInt32 *params;
+    Int32 faultId;
+
+    /* Construct a MessageQ name adorned with core name: */
+    System_sprintf(localQueueName, "%s_%s", SLAVE_MESSAGEQNAME,
+                   MultiProc_getName(MultiProc_self()));
+
+    messageQ = MessageQ_create(localQueueName, NULL);
+    if (messageQ == NULL) {
+        System_abort("MessageQ_create failed\n");
+    }
+
+    System_printf("tsk1Fxn: created MessageQ: %s; QueueID: 0x%x\n",
+        localQueueName, MessageQ_getQueueId(messageQ));
+
+    while (1) {
+        /* handshake with host to get starting parameters */
+        System_printf("Awaiting sync message from host...\n");
+        MessageQ_get(messageQ, &msg, MessageQ_FOREVER);
+
+        params = MessageQ_payload(msg);
+        numLoops = params[0];
+        print = params[1];
+
+        remoteQueueId = MessageQ_getReplyQueue(msg);
+        procId = MessageQ_getProcId(remoteQueueId);
+
+        System_printf("Received msg from (procId:remoteQueueId): 0x%x:0x%x\n"
+            "\tpayload: %d bytes; loops: %d %s printing.\n",
+            procId, remoteQueueId,
+            (MessageQ_getMsgSize(msg) - sizeof(MessageQ_MsgHeader)),
+            numLoops, print ? "with" : "without");
+
+        MessageQ_put(remoteQueueId, msg);
+
+        start = Clock_getTicks();
+
+        for (i = 1; i <= numLoops; i++) {
+            status = MessageQ_get(messageQ, &msg, MessageQ_FOREVER);
+            Assert_isTrue(status == MessageQ_S_SUCCESS, NULL);
+
+            params = MessageQ_payload(msg);
+            msgId = params[0];
+           faultId = params[2];
+
+            if (print) {
+                System_printf("Got msg #%d (%d bytes) from procId %d\n",
+                    msgId, MessageQ_getMsgSize(msg), procId);
+            }
+
+            Assert_isTrue(msgId == i, NULL);
+
+            if (faultId < 0) {
+                System_printf("Generating fault %d before MessageQ_put()...\n",
+                              -faultId);
+                fxnFault(-faultId);
+            }
+
+            if (print) {
+                System_printf("Sending msg Id #%d to procId %d\n", i, procId);
+            }
+
+            status = MessageQ_put(remoteQueueId, msg);
+            Assert_isTrue(status == MessageQ_S_SUCCESS, NULL);
+
+            if (faultId > 0) {
+                System_printf("Generating fault %d after MessageQ_put()...\n",
+                              faultId);
+                fxnFault(faultId);
+            }
+        }
+
+        end = Clock_getTicks();
+
+        if (!print) {
+            System_printf("%d iterations took %d ticks or %d usecs/msg\n",
+                    numLoops, end - start,
+                    ((end - start) * Clock_tickPeriod) / numLoops);
+        }
+    }
+}
+
+/*
+ *  ======== main ========
+ */
+Int main(Int argc, Char* argv[])
+{
+    System_printf("%s:main: MultiProc id = %d\n", __FILE__, MultiProc_self());
+
+    Task_create(tsk1Fxn, NULL, NULL);
+
+    BIOS_start();
+
+    return (0);
+}
index 2d27e446f374631865772decf444a5d9ddc301e2..00880122bebeb485755cce67a4605ebb876b4268 100644 (file)
@@ -515,6 +515,12 @@ for (var i = 0; i < Build.targets.length; i++) {
             extraDefs = " -DRPMSG_NS_2_0";
         }
 
+        /* fault */
+        Pkg.addExecutable(name + "/fault", targ, platform, {
+            cfgScript: "rpmsg_transport",
+            defs: extraDefs
+        }).addObjects(["fault.c"]);
+
         /* ping_rpmsg */
         Pkg.addExecutable(name + "/ping_rpmsg", targ, platform, {
             cfgScript: "ping_rpmsg",