index 97936f6c8fc62e2fdd5f59d3853ade33884a3f6a..6e3d704a9e548cef40cf7b9925c38c41f77c24a5 100644 (file)
TransportRpmsg_Object *obj = NULL;
int sock;
UInt16 clusterId;
+ int i;
clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
obj->rprocId = params->rprocId;
obj->numQueues = TransportRpmsg_GROWSIZE;
- obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
+ obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(int));
if (obj->qIndexToFd == NULL) {
status = Ipc_E_MEMORY;
goto done;
}
+ /* must initialize array */
+ for (i = 0; i < TransportRpmsg_GROWSIZE; i++) {
+ obj->qIndexToFd[i] = -1;
+ }
+
done:
if (status < 0) {
TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
uint64_t event;
UInt16 rprocId;
pthread_t tid;
+ Int status = MessageQ_S_SUCCESS;
tid = pthread_self();
rprocId = obj->rprocId;
PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
"queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ /* Check if binding already exists.
+ *
+ * There is a race condition between a thread calling MessageQ_create
+ * and another thread calling Ipc_attach. Must make sure we don't bind
+ * the same queue twice.
+ */
+ if (queueIndexToFd(obj, queueId) != -1) {
+ goto done;
+ }
+
/* Create the socket to receive messages for this messageQ. */
fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
if (fd < 0) {
printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
errno, strerror(errno));
- return (MessageQ_E_OSFAILURE);
+ status = MessageQ_E_OSFAILURE;
+ goto done;
}
PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
(unsigned int)tid);
PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
errno, strerror(errno));
close(fd);
- return (MessageQ_E_OSFAILURE);
+ status = MessageQ_E_OSFAILURE;
+ goto done;
}
- pthread_mutex_lock(&TransportRpmsg_module->gate);
-
/* pause the dispatch thread */
PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
(unsigned int)tid);
event = TransportRpmsg_Event_CONTINUE;
write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+done:
pthread_mutex_unlock(&TransportRpmsg_module->gate);
- return (MessageQ_S_SUCCESS);
+ return (status);
}
/*
/* wait for ACK event */
read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
- /* retrieve file descriptor for the given queue port */
- fd = queueIndexToFd(obj, queuePort);
- if (!fd) {
- PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
- queueId);
- status = MessageQ_E_INVALIDARG;
+ /* Check if binding already deleted.
+ *
+ * There is a race condition between a thread calling MessageQ_delete
+ * and another thread calling Ipc_detach. Must make sure we don't unbind
+ * the same queue twice.
+ */
+ if ((fd = queueIndexToFd(obj, queuePort)) == -1) {
goto done;
}
PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
Int *queues;
Int *oldQueues;
UInt oldSize;
+ UInt newCount;
UInt queueIndex;
+ int i;
/* subtract port offset from queue index */
queueIndex = queuePort - MessageQ_PORTOFFSET;
if (queueIndex >= obj->numQueues) {
+ newCount = queueIndex + TransportRpmsg_GROWSIZE;
PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
- queueIndex + TransportRpmsg_GROWSIZE)
+ newCount);
/* allocate larger table */
- oldSize = obj->numQueues * sizeof (Int);
- queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
+ oldSize = obj->numQueues * sizeof(int);
+ queues = calloc(newCount, sizeof(int));
/* copy contents from old table int new table */
memcpy(queues, obj->qIndexToFd, oldSize);
+ /* initialize remaining entries of new (larger) table */
+ for (i = obj->numQueues; i < newCount; i++) {
+ queues[i] = -1;
+ }
+
/* swap in new table, delete old table */
oldQueues = obj->qIndexToFd;
obj->qIndexToFd = queues;
- obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
+ obj->numQueues = newCount;
free(oldQueues);
}
/* register transport instance with MessageQ */
iMsgQTrans = TransportRpmsg_upCast(transport);
- MessageQ_registerTransport(iMsgQTrans, procId, 0);
TransportRpmsg_module->inst[clusterId] = transport;
+ MessageQ_registerTransport(iMsgQTrans, procId, 0);
done:
return (status);