summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 067573e)
raw | patch | inline | side by side (parent: 067573e)
author | Ramsey Harris <ramsey@ti.com> | |
Tue, 28 Apr 2015 18:50:24 +0000 (11:50 -0700) | ||
committer | Robert Tivy <rtivy@ti.com> | |
Thu, 30 Apr 2015 23:49:06 +0000 (16:49 -0700) |
The rpmsgproto driver requires a message queue to explicitly
bind to every remote processor. To work around this issue,
both MessageQ_create and Ipc_attach must invoke the bind method
at the appropriate time. This hack should be removed when the
rpmsgproto driver is fixed such that only one bind is required.
bind to every remote processor. To work around this issue,
both MessageQ_create and Ipc_attach must invoke the bind method
at the appropriate time. This hack should be removed when the
rpmsgproto driver is fixed such that only one bind is required.
linux/src/api/Ipc.c | patch | blob | history | |
linux/src/api/MessageQ.c | patch | blob | history | |
linux/src/transport/TransportRpmsg.c | patch | blob | history |
diff --git a/linux/src/api/Ipc.c b/linux/src/api/Ipc.c
index 96804660ab401ba5489188d970224552f6f49794..42ac0cbe415ad0a8d75f71e7fd5dc3a81d60cbbd 100644 (file)
--- a/linux/src/api/Ipc.c
+++ b/linux/src/api/Ipc.c
Int attached[MultiProc_MAXPROCESSORS];
} Ipc_Module;
+/* hack: rpmsgproto driver work around */
+Void MessageQ_bind(UInt16 procId);
+Void MessageQ_unbind(UInt16 procId);
+
/* =============================================================================
* Globals
goto done;
}
+ /* hack: bind all existing message queues to remote processor */
+ MessageQ_bind(procId);
+
/* getting here means we have successfully attached */
Ipc_module.attached[clusterId]++;
goto done;
}
+ /* hack: unbind all existing message queues from remote processor */
+ MessageQ_unbind(procId);
+
/* detach transport from remote processor */
status = Ipc_module.transportFactory->detachFxn(procId);
index 13ab27d5be1f4e548e87a20b237d1b43aac323ce..802e500365889a755bfadffd73471185bce9ee4d 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
"queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
*/
MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
return (MessageQ_Handle)obj;
}
"MessageQ_delete: got LAD response for client %d, status=%d\n",
handle, status)
+ pthread_mutex_lock(&MessageQ_module->gate);
+
for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
for (priority = 0; priority < 2; priority++) {
transport = MessageQ_module->transports[clusterId][priority];
queueIndex = MessageQ_getQueueIndex(obj->queue);
MessageQ_module->queues[queueIndex] = NULL;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(obj);
*handlePtr = NULL;
MessageQ_Handle *oldQueues;
UInt oldSize;
+ pthread_mutex_lock(&MessageQ_module->gate);
+
oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
MessageQ_module->queues = queues;
MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
free(oldQueues);
return;
}
+
+/*
+ * ======== MessageQ_bind ========
+ * Bind all existing message queues to the given processor
+ *
+ * Note: This function is a hack to work around the driver.
+ *
+ * The Linux rpmsgproto driver requires a socket for each
+ * message queue and remote processor tuple.
+ *
+ * socket --> (queue, processor)
+ *
+ * Therefore, each time a new remote processor is started, all
+ * existing message queues need to create a socket for the new
+ * processor.
+ *
+ * The driver should not have this requirement. One socket per
+ * message queue should be sufficient to uniquely identify the
+ * endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_bind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}
+
+/*
+ * ======== MessageQ_unbind ========
+ * Unbind all existing message queues from the given processor
+ *
+ * Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
+{
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
+
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
+
+ queue = ((MessageQ_Object *)handle)->queue;
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_unbind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+}
index 97936f6c8fc62e2fdd5f59d3853ade33884a3f6a..6e3d704a9e548cef40cf7b9925c38c41f77c24a5 100644 (file)
TransportRpmsg_Object *obj = NULL;
int sock;
UInt16 clusterId;
+ int i;
clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
obj->rprocId = params->rprocId;
obj->numQueues = TransportRpmsg_GROWSIZE;
- obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
+ obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
if (obj->qIndexToFd == NULL) {
status = Ipc_E_MEMORY;
goto done;
}
+ /* must initialize array */
+ for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
+ obj->qIndexToFd[i] = -1;
+ }
+
done:
if (status < 0) {
TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
uint64_t event;
UInt16 rprocId;
pthread_t tid;
+ Int status = MessageQ_S_SUCCESS;
tid = pthread_self();
rprocId = obj->rprocId;
PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
"queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ /* Check if binding already exists.
+ *
+ * There is a race condition between a thread calling MessageQ_create
+ * and another thread calling Ipc_attach. Must make sure we don't bind
+ * the same queue twice.
+ */
+ if (queueIndexToFd(obj, queueId) != -1) {
+ goto done;
+ }
+
/* Create the socket to receive messages for this messageQ. */
fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
if (fd < 0) {
printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
errno, strerror(errno));
- return (MessageQ_E_OSFAILURE);
+ status = MessageQ_E_OSFAILURE;
+ goto done;
}
PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
(unsigned int)tid);
PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
errno, strerror(errno));
close(fd);
- return (MessageQ_E_OSFAILURE);
+ status = MessageQ_E_OSFAILURE;
+ goto done;
}
- pthread_mutex_lock(&TransportRpmsg_module->gate);
-
/* pause the dispatch thread */
PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
(unsigned int)tid);
event = TransportRpmsg_Event_CONTINUE;
write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+done:
pthread_mutex_unlock(&TransportRpmsg_module->gate);
- return (MessageQ_S_SUCCESS);
+ return (status);
}
/*
/* wait for ACK event */
read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
- /* retrieve file descriptor for the given queue port */
- fd = queueIndexToFd(obj, queuePort);
- if (!fd) {
- PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
- queueId);
- status = MessageQ_E_INVALIDARG;
+ /* Check if binding already deleted.
+ *
+ * There is a race condition between a thread calling MessageQ_delete
+ * and another thread calling Ipc_detach. Must make sure we don't unbind
+ * the same queue twice.
+ */
+ if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
goto done;
}
PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
Int *queues;
Int *oldQueues;
UInt oldSize;
+ UInt newCount;
UInt queueIndex;
+ int i;
/* subtract port offset from queue index */
queueIndex = queuePort - MessageQ_PORTOFFSET;
if (queueIndex >= obj->numQueues) {
+ newCount = queueIndex + TransportRpmsg_GROWSIZE;
PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
- queueIndex + TransportRpmsg_GROWSIZE)
+ newCount);
/* allocate larger table */
- oldSize = obj->numQueues * sizeof (Int);
- queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
+ oldSize = obj->numQueues * sizeof(int);
+ queues = calloc(newCount, sizeof(int));
/* copy contents from old table int new table */
memcpy(queues, obj->qIndexToFd, oldSize);
+ /* initialize remaining entries of new (larger) table */
+ for (i = obj->numQueues; i < newCount; i++) {
+ queues[i] = -1;
+ }
+
/* swap in new table, delete old table */
oldQueues = obj->qIndexToFd;
obj->qIndexToFd = queues;
- obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
+ obj->numQueues = newCount;
free(oldQueues);
}
/* register transport instance with MessageQ */
iMsgQTrans = TransportRpmsg_upCast(transport);
- MessageQ_registerTransport(iMsgQTrans, procId, 0);
TransportRpmsg_module->inst[clusterId] = transport;
+ MessageQ_registerTransport(iMsgQTrans, procId, 0);
done:
return (status);