Linux/Android: Refactor MessageQ_create to Register Later with NameServer
authorAngela Stegmaier <angelabaker@ti.com>
Thu, 29 Sep 2016 20:47:00 +0000 (15:47 -0500)
committerAngela Stegmaier <angelabaker@ti.com>
Mon, 24 Oct 2016 16:27:03 +0000 (11:27 -0500)
In the current MessageQ_create implementation, the NameServer
registration is done early, before local creation in the user
library is done. It is therefore possible that a remote core
could open a MessageQ that is still being created and put a
message to that MessageQ. This can result in dropped messages
or in the worst case a crash.

This patch moves the NameServer registration to the end of
the creation sequence so that the MessageQ cannot be opened
before it is fully created.

A new LAD command called MESSAGEQ_ANNOUNCE is added that is specifically
for registering a previously created MessageQ with the NameServer.

Once all creation is completed and the MessageQ is ready to
receive messages, the MessageQ_create api makes the call to LAD
MESSAGEQ_ANNOUNCE to register with the NameServer.

Signed-off-by: Angela Stegmaier <angelabaker@ti.com>
linux/include/_lad.h
linux/src/api/MessageQ.c
linux/src/daemon/MessageQ_daemon.c
linux/src/daemon/lad.c

index d4bf3fee13c3e2e75efbe68d2d0f66510574e1cf..4bdfd8e38de4b02185ecaa14bc62ecd85ba751d2 100644 (file)
@@ -183,6 +183,7 @@ typedef enum {
     LAD_MESSAGEQ_SETUP,
     LAD_MESSAGEQ_DESTROY,
     LAD_MESSAGEQ_CREATE,
+    LAD_MESSAGEQ_ANNOUNCE,
     LAD_MESSAGEQ_DELETE,
     LAD_MESSAGEQ_MSGINIT,
     LAD_MULTIPROC_GETCONFIG,
@@ -269,6 +270,10 @@ struct LAD_CommandObj {
             Char name[LAD_MESSAGEQCREATEMAXNAMELEN];
             MessageQ_Params params;
         } messageQCreate;
+        struct {
+            Char name[LAD_MESSAGEQCREATEMAXNAMELEN];
+            Void *serverHandle;
+        } messageQAnnounce;
         struct {
             Void *serverHandle;
         } messageQDelete;
@@ -315,6 +320,9 @@ union LAD_ResponseObj {
        Int queueId;
        Void *serverHandle;
     } messageQCreate;
+    struct {
+       Int status;
+    } messageQAnnounce;
     struct {
        Int status;
     } messageQDelete;
index 2474e6fdd1b5169ff3bca411da1dc0c7e2c173ba..db96b241156963cc831833d95edd0de733991b04 100644 (file)
@@ -651,6 +651,44 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
 
     pthread_mutex_unlock(&MessageQ_module->gate);
 
+    /* send announce message to LAD, indicating we are ready to receive msgs */
+    cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
+    cmd.clientId = handle;
+
+    if (name == NULL) {
+        cmd.args.messageQAnnounce.name[0] = '\0';
+    }
+    else {
+        strncpy(cmd.args.messageQAnnounce.name, name,
+                LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+        cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
+    }
+
+    cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
+
+    if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+        PRINTVERBOSE1(
+          "MessageQ_create: sending LAD command failed, status=%d\n", status)
+        goto exit;
+    }
+
+    if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
+        PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
+        goto exit;
+    }
+    status = rsp.messageQAnnounce.status;
+
+    PRINTVERBOSE2(
+      "MessageQ_create: got LAD response for client %d, status=%d\n",
+      handle, status)
+
+    if (status == -1) {
+       PRINTVERBOSE1(
+          "MessageQ_create: MessageQ server operation failed, status=%d\n",
+          status)
+    }
+
+exit:
     return (MessageQ_Handle)obj;
 }
 
@@ -871,6 +909,9 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
                 goto done;
             }
         }
+        /* If we get here, then we have failed to deliver a local message. */
+        status = MessageQ_E_FAIL;
+        goto done;
     }
 
     /*  Getting here implies the message is outbound. Must give it to
@@ -930,7 +971,12 @@ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
         }
 
         msgTrans = MessageQ_module->transports[clusterId][priority];
-        delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        if (msgTrans) {
+            delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+        }
+        else {
+            delivered = MessageQ_E_FAIL;
+        }
         status = (delivered ? MessageQ_S_SUCCESS :
                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
     }
index 280aa8a803f38d6f203448bfee42c8ac0627c7a5..4ad26a176e322cef16dc36d2ba1eebdc21d28c3f 100644 (file)
@@ -354,11 +354,6 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
     obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
     obj->ownerPid = 0;
 
-    if (name != NULL) {
-        obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
-                obj->queue);
-    }
-
     /* Cleanup if fail */
     if (status < 0) {
         MessageQ_delete((MessageQ_Handle *)&obj);
@@ -369,6 +364,24 @@ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params * params)
     return ((MessageQ_Handle)obj);
 }
 
+Int MessageQ_announce(String name, MessageQ_Handle * handlePtr)
+{
+    Int                 status = MessageQ_S_SUCCESS;
+    MessageQ_Object   * obj    = (MessageQ_Object *)(*handlePtr);
+
+    LOG1("MessageQ_announce: announcing %p\n", obj);
+
+    if (name != NULL && obj->nsKey == NULL) {
+        obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
+                obj->queue);
+    }
+    else {
+        status = MessageQ_E_FAIL;
+    }
+
+    return status;
+}
+
 /*
  * Function to delete a MessageQ object for a specific slave processor.
  */
index c18e3dc357502e2a96ddbfc1ac9d23eeb0b8511f..f0ce4149de189386ff144e0943e33d07e1b9bb54 100644 (file)
@@ -669,6 +669,21 @@ opencommandFIFO:
 
             break;
 
+          case LAD_MESSAGEQ_ANNOUNCE:
+            LOG2("LAD_MESSAGEQ_ANNOUNCE: calling MessageQ_announce(%p, %p)...\n",
+                    cmd.args.messageQAnnounce.name,
+                    cmd.args.messageQAnnounce.serverHandle);
+
+            tmpString = (cmd.args.messageQAnnounce.name[0] == '\0') ? NULL :
+                cmd.args.messageQAnnounce.name;
+
+            rsp.messageQAnnounce.status =
+                MessageQ_announce(tmpString, (MessageQ_Handle *)&cmd.args.messageQAnnounce.serverHandle);
+
+            LOG1("    status = %d\n", rsp.messageQAnnounce.status)
+            LOG0("DONE\n")
+
+            break;
           case LAD_MESSAGEQ_DELETE:
             LOG1("LAD_MESSAGEQ_DELETE: calling MessageQ_delete(%p)...\n", cmd.args.messageQDelete.serverHandle)
 
@@ -831,6 +846,7 @@ opencommandFIFO:
           case LAD_MESSAGEQ_SETUP:
           case LAD_MESSAGEQ_DESTROY:
           case LAD_MESSAGEQ_CREATE:
+          case LAD_MESSAGEQ_ANNOUNCE:
           case LAD_MESSAGEQ_DELETE:
           case LAD_MESSAGEQ_MSGINIT:
           case LAD_MULTIPROC_GETCONFIG: