summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 6c2b643)
raw | patch | inline | side by side (parent: 6c2b643)
author | Angela Stegmaier <angelabaker@ti.com> | |
Thu, 29 Sep 2016 20:47:00 +0000 (15:47 -0500) | ||
committer | Angela Stegmaier <angelabaker@ti.com> | |
Mon, 24 Oct 2016 16:27:03 +0000 (11:27 -0500) |
In the current MessageQ_create implementation, the NameServer
registration is done early, before local creation in the user
library is done. It is therefore possible that a remote core
could open a MessageQ that is still being created and put a
message to that MessageQ. This can result in dropped messages
or in the worst case a crash.
This patch moves the NameServer registration to the end of
the creation sequence so that the MessageQ cannot be opened
before it is fully created.
A new LAD command called MESSAGEQ_ANNOUNCE is added that is specifically
for registering a previously created MessageQ with the NameServer.
Once all creation is completed and the MessageQ is ready to
receive messages, the MessageQ_create api makes the call to LAD
MESSAGEQ_ANNOUNCE to register with the NameServer.
Signed-off-by: Angela Stegmaier <angelabaker@ti.com>
registration is done early, before local creation in the user
library is done. It is therefore possible that a remote core
could open a MessageQ that is still being created and put a
message to that MessageQ. This can result in dropped messages
or in the worst case a crash.
This patch moves the NameServer registration to the end of
the creation sequence so that the MessageQ cannot be opened
before it is fully created.
A new LAD command called MESSAGEQ_ANNOUNCE is added that is specifically
for registering a previously created MessageQ with the NameServer.
Once all creation is completed and the MessageQ is ready to
receive messages, the MessageQ_create api makes the call to LAD
MESSAGEQ_ANNOUNCE to register with the NameServer.
Signed-off-by: Angela Stegmaier <angelabaker@ti.com>
linux/include/_lad.h | patch | blob | history | |
linux/src/api/MessageQ.c | patch | blob | history | |
linux/src/daemon/MessageQ_daemon.c | patch | blob | history | |
linux/src/daemon/lad.c | patch | blob | history |
diff --git a/linux/include/_lad.h b/linux/include/_lad.h
index d4bf3fee13c3e2e75efbe68d2d0f66510574e1cf..4bdfd8e38de4b02185ecaa14bc62ecd85ba751d2 100644 (file)
--- a/linux/include/_lad.h
+++ b/linux/include/_lad.h
LAD_MESSAGEQ_SETUP,
LAD_MESSAGEQ_DESTROY,
LAD_MESSAGEQ_CREATE,
+ LAD_MESSAGEQ_ANNOUNCE,
LAD_MESSAGEQ_DELETE,
LAD_MESSAGEQ_MSGINIT,
LAD_MULTIPROC_GETCONFIG,
Char name[LAD_MESSAGEQCREATEMAXNAMELEN];
MessageQ_Params params;
} messageQCreate;
+ struct {
+ Char name[LAD_MESSAGEQCREATEMAXNAMELEN];
+ Void *serverHandle;
+ } messageQAnnounce;
struct {
Void *serverHandle;
} messageQDelete;
Int queueId;
Void *serverHandle;
} messageQCreate;
+ struct {
+ Int status;
+ } messageQAnnounce;
struct {
Int status;
} messageQDelete;
index 2474e6fdd1b5169ff3bca411da1dc0c7e2c173ba..db96b241156963cc831833d95edd0de733991b04 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
pthread_mutex_unlock(&MessageQ_module->gate);
+ /* send announce message to LAD, indicating we are ready to receive msgs */
+ cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
+ cmd.clientId = handle;
+
+ if (name == NULL) {
+ cmd.args.messageQAnnounce.name[0] = '\0';
+ }
+ else {
+ strncpy(cmd.args.messageQAnnounce.name, name,
+ LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+ cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
+ }
+
+ cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
+
+ if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+ PRINTVERBOSE1(
+ "MessageQ_create: sending LAD command failed, status=%d\n", status)
+ goto exit;
+ }
+
+ if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
+ PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
+ goto exit;
+ }
+ status = rsp.messageQAnnounce.status;
+
+ PRINTVERBOSE2(
+ "MessageQ_create: got LAD response for client %d, status=%d\n",
+ handle, status)
+
+ if (status == -1) {
+ PRINTVERBOSE1(
+ "MessageQ_create: MessageQ server operation failed, status=%d\n",
+ status)
+ }
+
+exit:
return (MessageQ_Handle)obj;
}
goto done;
}
}
+ /* If we get here, then we have failed to deliver a local message. */
+ status = MessageQ_E_FAIL;
+ goto done;
}
/* Getting here implies the message is outbound. Must give it to
}
msgTrans = MessageQ_module->transports[clusterId][priority];
- delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ if (msgTrans) {
+ delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ }
+ else {
+ delivered = MessageQ_E_FAIL;
+ }
status = (delivered ? MessageQ_S_SUCCESS :
(errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
}
index 280aa8a803f38d6f203448bfee42c8ac0627c7a5..4ad26a176e322cef16dc36d2ba1eebdc21d28c3f 100644 (file)
obj->queue = (MessageQ_QueueId)(((UInt32)procId << 16) | queuePort);
obj->ownerPid = 0;
- if (name != NULL) {
- obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
- obj->queue);
- }
-
/* Cleanup if fail */
if (status < 0) {
MessageQ_delete((MessageQ_Handle *)&obj);
return ((MessageQ_Handle)obj);
}
+Int MessageQ_announce(String name, MessageQ_Handle * handlePtr)
+{
+ Int status = MessageQ_S_SUCCESS;
+ MessageQ_Object * obj = (MessageQ_Object *)(*handlePtr);
+
+ LOG1("MessageQ_announce: announcing %p\n", obj);
+
+ if (name != NULL && obj->nsKey == NULL) {
+ obj->nsKey = NameServer_addUInt32(MessageQ_module->nameServer, name,
+ obj->queue);
+ }
+ else {
+ status = MessageQ_E_FAIL;
+ }
+
+ return status;
+}
+
/*
* Function to delete a MessageQ object for a specific slave processor.
*/
diff --git a/linux/src/daemon/lad.c b/linux/src/daemon/lad.c
index c18e3dc357502e2a96ddbfc1ac9d23eeb0b8511f..f0ce4149de189386ff144e0943e33d07e1b9bb54 100644 (file)
--- a/linux/src/daemon/lad.c
+++ b/linux/src/daemon/lad.c
break;
+ case LAD_MESSAGEQ_ANNOUNCE:
+ LOG2("LAD_MESSAGEQ_ANNOUNCE: calling MessageQ_announce(%p, %p)...\n",
+ cmd.args.messageQAnnounce.name,
+ cmd.args.messageQAnnounce.serverHandle);
+
+ tmpString = (cmd.args.messageQAnnounce.name[0] == '\0') ? NULL :
+ cmd.args.messageQAnnounce.name;
+
+ rsp.messageQAnnounce.status =
+ MessageQ_announce(tmpString, (MessageQ_Handle *)&cmd.args.messageQAnnounce.serverHandle);
+
+ LOG1(" status = %d\n", rsp.messageQAnnounce.status)
+ LOG0("DONE\n")
+
+ break;
case LAD_MESSAGEQ_DELETE:
LOG1("LAD_MESSAGEQ_DELETE: calling MessageQ_delete(%p)...\n", cmd.args.messageQDelete.serverHandle)
case LAD_MESSAGEQ_SETUP:
case LAD_MESSAGEQ_DESTROY:
case LAD_MESSAGEQ_CREATE:
+ case LAD_MESSAGEQ_ANNOUNCE:
case LAD_MESSAGEQ_DELETE:
case LAD_MESSAGEQ_MSGINIT:
case LAD_MULTIPROC_GETCONFIG: