index 8c3d902a9f36fe891aedecca64a60ac9a72f312a..cb9eea4cd05871bb366f9756b2b037b0fb6447d7 100644 (file)
* Implementation of functions specified in the IMessageQTransport interface.
*/
-#include <ti/ipc/Std.h>
-
-#include <ti/ipc/Ipc.h>
-#include <ti/ipc/MessageQ.h>
-#include <ti/ipc/MultiProc.h>
-#include <_MessageQ.h>
-
/* Socket Headers */
#include <sys/socket.h>
#include <sys/select.h>
/* Socket Protocol Family */
#include <net/rpmsg.h>
-/* Socket utils: */
-#include <SocketFxns.h>
+/* IPC headers */
+#include <ti/ipc/Std.h>
+#include <SocketFxns.h> /* Socket utils: */
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+#include <_MessageQ.h>
#include <_lad.h>
-#include <TransportRpmsg.h>
-
-
/* More magic rpmsg port numbers: */
#define MESSAGEQ_RPMSG_PORT 61
#define MESSAGEQ_RPMSG_MAXSIZE 512
#define TransportRpmsg_GROWSIZE 32
+#define INVALIDSOCKET (-1)
+
+#define TransportRpmsg_Event_ACK (1 << 0)
+#define TransportRpmsg_Event_PAUSE (1 << 1)
+#define TransportRpmsg_Event_CONTINUE (1 << 2)
+#define TransportRpmsg_Event_SHUTDOWN (1 << 3)
+
+
+#define _MAX(a,b) (((a)>(b))?(a):(b))
/* traces in this file are controlled via _TransportMessageQ_verbose */
Bool _TransportMessageQ_verbose = FALSE;
int inFds[1024];
int nInFds;
pthread_mutex_t gate;
- int unblockEvent; /* eventFd for unblocking socket */
+ int unblockEvent; /* unblock the dispatch thread */
+ int waitEvent; /* block the client thread */
pthread_t threadId; /* ID returned by pthread_create() */
Bool threadStarted;
} TransportRpmsg_Object;
TransportRpmsg_Module TransportRpmsg_state = {
- .sock = {0},
+ .sock = {INVALIDSOCKET},
+ .unblockEvent = -1,
+ .waitEvent = -1,
.threadStarted = FALSE,
.inst = NULL
};
TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
-static Int attach(UInt16 rprocId);
-static Int detach(UInt16 rprocId);
static void *rpmsgThreadFxn(void *arg);
static Int transportGet(int sock, MessageQ_Msg *retMsg);
static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
+/* factory functions */
Int TransportRpmsg_Factory_create(Void);
Void TransportRpmsg_Factory_delete(Void);
+Int TransportRpmsg_Factory_attach(UInt16 procId);
+Int TransportRpmsg_Factory_detach(UInt16 procId);
Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
.createFxn = TransportRpmsg_Factory_create,
- .deleteFxn = TransportRpmsg_Factory_delete
+ .deleteFxn = TransportRpmsg_Factory_delete,
+ .attachFxn = TransportRpmsg_Factory_attach,
+ .detachFxn = TransportRpmsg_Factory_detach
};
/* -------------------------------------------------------------------------- */
@@ -145,237 +156,205 @@ TransportRpmsg_Handle TransportRpmsg_downCast(IMessageQTransport_Handle base)
return ((TransportRpmsg_Handle)base);
}
-TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
- Int *attachStatus)
-{
- TransportRpmsg_Object *obj;
- Int rv;
-
- rv = attach(params->rprocId);
- if (attachStatus) {
- *attachStatus = rv;
- }
-
- if (rv != MessageQ_S_SUCCESS) {
- return NULL;
- }
-
- obj = calloc(1, sizeof (TransportRpmsg_Object));
-
- /* structure copy */
- obj->base.base.interfaceType = IMessageQTransport_TypeId;
- obj->base.fxns = &TransportRpmsg_fxns;
- obj->rprocId = params->rprocId;
-
- obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof (Int));
- obj->numQueues = TransportRpmsg_GROWSIZE;
-
- return (TransportRpmsg_Handle)obj;
-}
-
-Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
-{
- TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)handlep;
-
- detach(obj->rprocId);
-
- free(obj->qIndexToFd);
- free(obj);
-
- *handlep = NULL;
-}
-
-static Int attach(UInt16 rprocId)
+/*
+ * ======== TransportRpmsg_create ========
+ */
+TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params)
{
- Int status = MessageQ_S_SUCCESS;
- int sock;
- UInt16 clusterId;
+ Int status = MessageQ_S_SUCCESS;
+ TransportRpmsg_Object *obj = NULL;
+ int sock;
+ UInt16 clusterId;
- clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+ clusterId = params->rprocId - MultiProc_getBaseIdOfCluster();
- /* Create the socket for sending messages to the remote proc: */
+ /* create socket for sending messages to remote processor */
sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = MessageQ_E_FAIL;
- printf("attach: socket failed: %d (%s)\n",
- errno, strerror(errno));
- goto exit;
+ if (sock < 0) {
+ status = Ipc_E_FAIL;
+ printf("TransportRpmsg_create: socket failed: %d (%s)\n", errno,
+ strerror(errno));
+ goto done;
}
-
+ TransportRpmsg_module->sock[clusterId] = sock;
PRINTVERBOSE1("attach: created send socket: %d\n", sock)
- /* Attempt to connect: */
- status = ConnectSocket(sock, rprocId, MESSAGEQ_RPMSG_PORT);
- if (status < 0) {
- /* is it ok to "borrow" this error code from MessageQ? */
- status = MessageQ_E_RESOURCE;
-
- /* don't hard-printf or exit since this is no longer fatal */
- PRINTVERBOSE1("attach: ConnectSocket(rprocId:%d) failed\n", rprocId)
+ status = ConnectSocket(sock, params->rprocId, MESSAGEQ_RPMSG_PORT);
- goto exitSock;
+ if (status < 0) {
+ status = Ipc_E_FAIL;
+ printf("TransportRpmsg_create: connect failed: %d (%s) procId: %d\n",
+ errno, strerror(errno), params->rprocId);
+ goto done;
}
- TransportRpmsg_module->sock[clusterId] = sock;
-
- if (TransportRpmsg_module->threadStarted == FALSE) {
- /* create a module wide event to unblock the socket select thread */
- TransportRpmsg_module->unblockEvent = eventfd(0, 0);
- if (TransportRpmsg_module->unblockEvent == -1) {
- printf("attach: unblock socket failed: %d (%s)\n",
- errno, strerror(errno));
- status = MessageQ_E_FAIL;
+ /* create the instance object */
+ obj = calloc(1, sizeof(TransportRpmsg_Object));
- goto exitSock;
- }
-
- PRINTVERBOSE1("attach: created unblock event %d\n",
- TransportRpmsg_module->unblockEvent)
-
- FD_ZERO(&TransportRpmsg_module->rfds);
- FD_SET(TransportRpmsg_module->unblockEvent,
- &TransportRpmsg_module->rfds);
- TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
- TransportRpmsg_module->nInFds = 0;
-
- pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
-
- status = pthread_create(&TransportRpmsg_module->threadId, NULL,
- &rpmsgThreadFxn, NULL);
- if (status < 0) {
- status = MessageQ_E_FAIL;
- printf("attach: failed to spawn thread\n");
-
- goto exitEvent;
- }
- else {
- TransportRpmsg_module->threadStarted = TRUE;
- }
+ if (obj == NULL) {
+ status = Ipc_E_MEMORY;
+ goto done;
}
- goto exit;
+ /* initialize the instance */
+ obj->base.base.interfaceType = IMessageQTransport_TypeId;
+ obj->base.fxns = &TransportRpmsg_fxns;
+ obj->rprocId = params->rprocId;
+ obj->numQueues = TransportRpmsg_GROWSIZE;
-exitEvent:
- close(TransportRpmsg_module->unblockEvent);
+ obj->qIndexToFd = calloc(TransportRpmsg_GROWSIZE, sizeof(Int));
- FD_ZERO(&TransportRpmsg_module->rfds);
- TransportRpmsg_module->maxFd = 0;
+ if (obj->qIndexToFd == NULL) {
+ status = Ipc_E_MEMORY;
+ goto done;
+ }
-exitSock:
- close(sock);
- TransportRpmsg_module->sock[clusterId] = 0;
+done:
+ if (status < 0) {
+ TransportRpmsg_delete((TransportRpmsg_Handle *)&obj);
+ }
-exit:
- return status;
+ return (TransportRpmsg_Handle)obj;
}
-static Int detach(UInt16 rprocId)
+/*
+ * ======== TransportRpmsg_delete ========
+ */
+Void TransportRpmsg_delete(TransportRpmsg_Handle *pHandle)
{
+ TransportRpmsg_Object *obj = *(TransportRpmsg_Object **)pHandle;
+ UInt16 clusterId;
+ int sock;
- Int status = -1;
- int sock;
- UInt16 clusterId;
- clusterId = rprocId - MultiProc_getBaseIdOfCluster();
- sock = TransportRpmsg_module->sock[clusterId];
+ clusterId = obj->rprocId - MultiProc_getBaseIdOfCluster();
- if (sock) {
+ /* close the socket for the given transport instance */
+ sock = TransportRpmsg_module->sock[clusterId];
+ if (sock != INVALIDSOCKET) {
PRINTVERBOSE1("detach: closing socket: %d\n", sock)
+ close(sock);
+ }
+ TransportRpmsg_module->sock[clusterId] = INVALIDSOCKET;
- status = close(sock);
+ if ((obj != NULL) && (obj->qIndexToFd != NULL)) {
+ free(obj->qIndexToFd);
+ obj->qIndexToFd = NULL;
}
- return status;
+ if (obj != NULL) {
+ free(obj);
+ obj = NULL;
+ }
+
+ *pHandle = NULL;
}
+/*
+ * ======== TransportRpmsg_bind ========
+ */
Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
{
TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
- UInt16 queueIndex = queueId & 0x0000ffff;
- int fd;
- int err;
- uint64_t buf;
- UInt16 rprocId;
+ UInt16 queuePort = queueId & 0x0000ffff;
+ int fd;
+ int err;
+ uint64_t event;
+ UInt16 rprocId;
+ pthread_t tid;
+ tid = pthread_self();
rprocId = obj->rprocId;
- PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
- "queueIndex %d\n", rprocId, queueIndex)
+ PRINTVERBOSE3("TransportRpmsg_bind: creating endpoint for rprocId %d "
+ "queuePort 0x%x, tid=0x%x\n", rprocId, queuePort, (unsigned int)tid)
/* Create the socket to receive messages for this messageQ. */
fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
if (fd < 0) {
printf("TransportRpmsg_bind: socket call failed: %d (%s)\n",
errno, strerror(errno));
- goto exitClose;
+ return (MessageQ_E_OSFAILURE);
}
+ PRINTVERBOSE2("TransportRpmsg_bind: created socket fd %d, tdi=0x%x\n", fd,
+ (unsigned int)tid);
- PRINTVERBOSE1("TransportRpmsg_bind: created socket fd %d\n", fd)
-
- err = SocketBindAddr(fd, rprocId, (UInt32)queueIndex);
+ err = SocketBindAddr(fd, rprocId, (UInt32)queuePort);
if (err < 0) {
/* don't hard-printf since this is no longer fatal */
PRINTVERBOSE2("TransportRpmsg_bind: bind failed: %d (%s)\n",
- errno, strerror(errno))
-
+ errno, strerror(errno));
close(fd);
-
- return -1;
+ return (MessageQ_E_OSFAILURE);
}
pthread_mutex_lock(&TransportRpmsg_module->gate);
+ /* pause the dispatch thread */
+ PRINTVERBOSE1("TransportRpmsg_bind: sending PAUSE event, tid=0x%x\n",
+ (unsigned int)tid);
+ event = TransportRpmsg_Event_PAUSE;
+ write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+ PRINTVERBOSE2("TransportRpmsg_bind: received ACK event (%d), tid=0x%x\n",
+ (int)event, (unsigned int)tid);
+
/* add to our fat fd array and update select() parameters */
TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds++] = fd;
- TransportRpmsg_module->maxFd = MAX(TransportRpmsg_module->maxFd, fd);
+ TransportRpmsg_module->maxFd = _MAX(TransportRpmsg_module->maxFd, fd);
FD_SET(fd, &TransportRpmsg_module->rfds);
+ bindFdToQueueIndex(obj, fd, queuePort);
- pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
- bindFdToQueueIndex(obj, fd, queueIndex);
-
- /*
- * Even though we use the unblock event as just a signalling event with
- * no related payload, we need to write some non-zero value. Might as
- * well make it the fd (which the reader could decide to use if needed).
- */
- buf = fd;
- write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
- goto exit;
+ /* release the dispatch thread */
+ PRINTVERBOSE1("TransportRpmsg_bind: sending CONTINUE event, tid=0x%x\n",
+ (unsigned int)tid);
+ event = TransportRpmsg_Event_CONTINUE;
+ write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
-exitClose:
- TransportRpmsg_unbind(handle, fd);
- fd = 0;
+ pthread_mutex_unlock(&TransportRpmsg_module->gate);
-exit:
- return fd;
+ return (MessageQ_S_SUCCESS);
}
+/*
+ * ======== TransportRpmsg_unbind ========
+ */
Int TransportRpmsg_unbind(Void *handle, UInt32 queueId)
{
TransportRpmsg_Object *obj = (TransportRpmsg_Object *)handle;
- UInt16 queueIndex = queueId & 0x0000ffff;
- uint64_t buf;
+ UInt16 queuePort = queueId & 0x0000ffff;
+ uint64_t event;
Int status = MessageQ_S_SUCCESS;
int maxFd;
int fd;
int i;
int j;
- fd = queueIndexToFd(obj, queueIndex);
+ pthread_mutex_lock(&TransportRpmsg_module->gate);
+
+ /* pause the dispatch thread */
+ event = TransportRpmsg_Event_PAUSE;
+ write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(TransportRpmsg_module->waitEvent, &event, sizeof(event));
+
+ /* retrieve file descriptor for the given queue port */
+ fd = queueIndexToFd(obj, queuePort);
if (!fd) {
PRINTVERBOSE1("TransportRpmsg_unbind: queueId 0x%x not bound\n",
- queueId)
-
- return -1;
+ queueId);
+ status = MessageQ_E_INVALIDARG;
+ goto done;
}
-
PRINTVERBOSE1("TransportRpmsg_unbind: closing socket %d\n", fd)
- pthread_mutex_lock(&TransportRpmsg_module->gate);
+ /* guarenteed to work because queueIndexToFd above succeeded */
+ unbindQueueIndex(obj, queuePort);
/* remove from input fd array */
for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
/* shift subsequent elements down */
for (j = i; j < TransportRpmsg_module->nInFds; j++) {
TransportRpmsg_module->inFds[j] =
- TransportRpmsg_module->inFds[j + 1];
- }
- TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = 0;
-
- FD_CLR(fd, &TransportRpmsg_module->rfds);
- if (fd == TransportRpmsg_module->maxFd) {
- /* find new max fd */
- maxFd = TransportRpmsg_module->unblockEvent;
- for (j = 0; j < TransportRpmsg_module->nInFds; j++) {
- maxFd = MAX(TransportRpmsg_module->inFds[j], maxFd);
- }
- TransportRpmsg_module->maxFd = maxFd;
+ TransportRpmsg_module->inFds[j + 1];
}
-
- /*
- * Even though we use the unblock event as just a signalling
- * event with no related payload, we need to write some non-zero
- * value. Might as well make it the fd (which the reader could
- * decide to use if needed).
- */
- buf = fd;
- write(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
-
+ TransportRpmsg_module->inFds[TransportRpmsg_module->nInFds] = -1;
break;
}
+ }
- close(fd);
+ /* remove fd from the descriptor set, compute new max value */
+ FD_CLR(fd, &TransportRpmsg_module->rfds);
+ if (fd == TransportRpmsg_module->maxFd) {
+ /* find new max fd */
+ maxFd = TransportRpmsg_module->unblockEvent;
+ for (i = 0; i < TransportRpmsg_module->nInFds; i++) {
+ maxFd = _MAX(TransportRpmsg_module->inFds[i], maxFd);
+ }
+ TransportRpmsg_module->maxFd = maxFd;
}
- unbindQueueIndex(obj, queueIndex);
+ close(fd);
+
+ /* release the dispatch thread */
+ event = TransportRpmsg_Event_CONTINUE;
+ write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+done:
pthread_mutex_unlock(&TransportRpmsg_module->gate);
- return status;
+ return (status);
}
+/*
+ * ======== TransportRpmsg_put ========
+ */
Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
{
MessageQ_Msg msg = (MessageQ_Msg)pmsg;
return status;
}
+/*
+ * ======== TransportRpmsg_control ========
+ */
Bool TransportRpmsg_control(Void *handle, UInt cmd, UArg cmdArg)
{
return FALSE;
}
+/*
+ * ======== rpmsgThreadFxn ========
+ */
void *rpmsgThreadFxn(void *arg)
{
- static int lastFdx = 0;
- int curFdx = 0;
Int status = MessageQ_S_SUCCESS;
Int tmpStatus;
int retval;
- uint64_t buf;
+ uint64_t event;
fd_set rfds;
int maxFd;
int nfds;
MessageQ_Msg retMsg;
MessageQ_QueueId queueId;
+ Bool run = TRUE;
+ int i;
+ int fd;
- while (TRUE) {
- pthread_mutex_lock(&TransportRpmsg_module->gate);
+ while (run) {
maxFd = TransportRpmsg_module->maxFd;
rfds = TransportRpmsg_module->rfds;
nfds = TransportRpmsg_module->nInFds;
- pthread_mutex_unlock(&TransportRpmsg_module->gate);
-
PRINTVERBOSE3("rpmsgThreadFxn: maxFd %d rfds[1:0] 0x%08x%08x\n", maxFd,
- (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
+ (int)rfds.fds_bits[1], (int)rfds.fds_bits[0])
retval = select(maxFd + 1, &rfds, NULL, NULL, NULL);
- if (retval) {
- if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
- /*
- * Our event was signalled by TransportRpmsg_bind()
- * or TransportRpmsg_unbind() to tell us that the set of
- * fds has changed.
- */
- PRINTVERBOSE0("rpmsgThreadFxn: got unblock event\n")
-
- /* we don't need the written value */
- read(TransportRpmsg_module->unblockEvent, &buf, sizeof (buf));
- }
- else {
- /* start where we last left off */
- curFdx = lastFdx;
-
- /*
- * The set of fds that's used by select has been recorded
- * locally, but the array of fds that are scanned below is
- * a changing set (MessageQ_create/delete() can change it).
- * While this might present an issue in itself, one key
- * takeaway is that 'nfds' must not be zero else the % below
- * will cause a divide-by-zero exception. We won't even get
- * here if nfds == 0 since it's a local copy of the module's
- * 'nInFds' which has to be > 0 for us to get here. So, even
- * though the module's 'nInFds' might go to 0 during this loop,
- * the loop itself will still remain intact.
- */
- do {
- if (FD_ISSET(TransportRpmsg_module->inFds[curFdx], &rfds)) {
-
- PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
- TransportRpmsg_module->inFds[curFdx])
-
- /* transport input fd was signalled: get the message */
- tmpStatus = transportGet(
- TransportRpmsg_module->inFds[curFdx], &retMsg);
- if (tmpStatus < 0) {
- printf("rpmsgThreadFxn: transportGet failed.");
- status = MessageQ_E_FAIL;
- }
- else {
- queueId = MessageQ_getDstQueue(retMsg);
-
- PRINTVERBOSE1("rpmsgThreadFxn: got message, "
- "delivering to queueId 0x%x\n", queueId)
-
- MessageQ_put(queueId, retMsg);
- }
-
- lastFdx = (curFdx + 1) % nfds;
-
- break;
- }
-
- curFdx = (curFdx + 1) % nfds;
- } while (curFdx != lastFdx);
+
+ /* if error, try again */
+ if (retval < 0) {
+ printf("Warning: rpmsgThreadFxn: select failed, trying again\n");
+ continue;
+ }
+
+ /* dispatch all pending messages, do this first */
+ for (i = 0; i < nfds; i++) {
+ fd = TransportRpmsg_module->inFds[i];
+
+ if (FD_ISSET(fd, &rfds)) {
+ PRINTVERBOSE1("rpmsgThreadFxn: getting from fd %d\n",
+ TransportRpmsg_module->inFds[i]);
+
+ /* transport input fd was signalled: get the message */
+ tmpStatus = transportGet(fd, &retMsg);
+ if (tmpStatus < 0) {
+ printf("rpmsgThreadFxn: transportGet failed\n");
+ }
+ else {
+ queueId = MessageQ_getDstQueue(retMsg);
+ PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+ "delivering to queueId 0x%x\n", queueId)
+ MessageQ_put(queueId, retMsg);
+ }
}
}
+
+ /* check for events */
+ if (FD_ISSET(TransportRpmsg_module->unblockEvent, &rfds)) {
+
+ read(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
+
+ do {
+ if (event & TransportRpmsg_Event_SHUTDOWN) {
+ PRINTVERBOSE0("rpmsgThreadFxn: event SHUTDOWN\n");
+ run = FALSE;
+ break; /* highest priority, stop processing events */
+ }
+ if (event & TransportRpmsg_Event_CONTINUE) {
+ PRINTVERBOSE1("rpmsgThreadFxn: event CONTINUE (%d)\n",
+ (int)event);
+ event &= ~TransportRpmsg_Event_CONTINUE;
+ }
+ if (event & TransportRpmsg_Event_PAUSE) {
+ /* Our event was signalled by TransportRpmsg_bind()
+ * or TransportRpmsg_unbind() to tell us that the set
+ * of file descriptors has changed.
+ */
+ PRINTVERBOSE0("rpmsgThreadFxn: event PAUSE\n");
+ /* send the acknowledgement */
+ event = TransportRpmsg_Event_ACK;
+ write(TransportRpmsg_module->waitEvent, &event,
+ sizeof(event));
+ /* now wait to be released */
+ read(TransportRpmsg_module->unblockEvent, &event,
+ sizeof(event));
+ }
+ } while (event != 0);
+ }
}
return (void *)status;
/*
* ======== transportGet ========
* Retrieve a message waiting in the socket's queue.
-*/
+ */
static Int transportGet(int sock, MessageQ_Msg *retMsg)
{
Int status = MessageQ_S_SUCCESS;
return status;
}
-Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 qIndex)
+/*
+ * ======== bindFdToQueueIndex ========
+ *
+ * Precondition: caller must be inside the module gate
+ */
+Void bindFdToQueueIndex(TransportRpmsg_Object *obj, Int fd, UInt16 queuePort)
{
Int *queues;
Int *oldQueues;
UInt oldSize;
+ UInt queueIndex;
+
+ /* subtract port offset from queue index */
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
- if (qIndex >= obj->numQueues) {
+ if (queueIndex >= obj->numQueues) {
PRINTVERBOSE1("TransportRpmsg_bind: growing numQueues to %d\n",
- qIndex + TransportRpmsg_GROWSIZE)
+ queueIndex + TransportRpmsg_GROWSIZE)
+ /* allocate larger table */
oldSize = obj->numQueues * sizeof (Int);
+ queues = calloc(queueIndex + TransportRpmsg_GROWSIZE, sizeof(Int));
- queues = calloc(qIndex + TransportRpmsg_GROWSIZE, sizeof (Int));
+ /* copy contents from old table int new table */
memcpy(queues, obj->qIndexToFd, oldSize);
+ /* swap in new table, delete old table */
oldQueues = obj->qIndexToFd;
obj->qIndexToFd = queues;
- obj->numQueues = qIndex + TransportRpmsg_GROWSIZE;
-
+ obj->numQueues = queueIndex + TransportRpmsg_GROWSIZE;
free(oldQueues);
}
- obj->qIndexToFd[qIndex] = fd;
+ /* add new entry */
+ obj->qIndexToFd[queueIndex] = fd;
}
-Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex)
+/*
+ * ======== unbindQueueIndex ========
+ *
+ * Precondition: caller must be inside the module gate
+ */
+Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 queuePort)
{
- obj->qIndexToFd[qIndex] = 0;
+ UInt queueIndex;
+
+ /* subtract port offset from queue index */
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ /* clear table entry */
+ obj->qIndexToFd[queueIndex] = -1;
}
-Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
+/*
+ * ======== queueIndexToFd ========
+ *
+ * Precondition: caller must be inside the module gate
+ */
+Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 queuePort)
{
- return obj->qIndexToFd[qIndex];
+ UInt queueIndex;
+
+ /* subtract port offset from queue index */
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ /* return file descriptor */
+ return (obj->qIndexToFd[queueIndex]);
}
/*
*/
Int TransportRpmsg_Factory_create(Void)
{
- Int status;
- Int attachStatus;
- Int i;
- UInt16 procId;
- Int32 attachedAny;
- UInt16 clusterSize;
- UInt16 clusterBase;
+ Int status = Ipc_S_SUCCESS;
+ Int i;
+ UInt16 clusterSize;
+ TransportRpmsg_Handle *inst;
- TransportRpmsg_Handle *inst;
- TransportRpmsg_Handle transport;
- TransportRpmsg_Params params;
- IMessageQTransport_Handle iMsgQTrans;
-
-
- status = Ipc_S_SUCCESS;
- attachedAny = FALSE;
/* needed to enumerate processors in cluster */
clusterSize = MultiProc_getNumProcsInCluster();
- clusterBase = MultiProc_getBaseIdOfCluster();
/* allocate the instance array */
inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
if (inst == NULL) {
printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
status = Ipc_E_MEMORY;
- goto exit;
+ goto done;
+ }
+
+ for (i = 0; i < clusterSize; i++) {
+ inst[i] = NULL;
}
TransportRpmsg_module->inst = inst;
- /* create transport instance for all processors in cluster */
- for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+ /* counter event object for passing commands to dispatch thread */
+ TransportRpmsg_module->unblockEvent = eventfd(0, 0);
- if (MultiProc_self() == procId) {
- continue;
- }
+ if (TransportRpmsg_module->unblockEvent == -1) {
+ printf("create: unblock event failed: %d (%s)\n",
+ errno, strerror(errno));
+ status = Ipc_E_FAIL;
+ goto done;
+ }
- params.rprocId = procId;
- transport = TransportRpmsg_create(¶ms, &attachStatus);
+ PRINTVERBOSE1("create: created unblock event %d\n",
+ TransportRpmsg_module->unblockEvent)
- if (transport != NULL) {
- iMsgQTrans = TransportRpmsg_upCast(transport);
- MessageQ_registerTransport(iMsgQTrans, procId, 0);
- attachedAny = TRUE;
- }
- else {
- if (attachStatus == MessageQ_E_RESOURCE) {
- continue;
- }
- printf("TransportRpmsg_Factory_create: failed to attach to "
- "procId=%d status=%d\n", procId, attachStatus);
- status = Ipc_E_FAIL;
- break;
- }
+ /* semaphore event object for acknowledging client thread */
+ TransportRpmsg_module->waitEvent = eventfd(0, EFD_SEMAPHORE);
- TransportRpmsg_module->inst[i] = transport;
+ if (TransportRpmsg_module->waitEvent == -1) {
+ printf("create: wait event failed: %d (%s)\n", errno, strerror(errno));
+ status = Ipc_E_FAIL;
+ goto done;
}
- if (!attachedAny) {
+ PRINTVERBOSE1("create: created wait event %d\n",
+ TransportRpmsg_module->waitEvent)
+
+ FD_ZERO(&TransportRpmsg_module->rfds);
+ FD_SET(TransportRpmsg_module->unblockEvent,
+ &TransportRpmsg_module->rfds);
+ TransportRpmsg_module->maxFd = TransportRpmsg_module->unblockEvent;
+ TransportRpmsg_module->nInFds = 0;
+
+ pthread_mutex_init(&TransportRpmsg_module->gate, NULL);
+
+ status = pthread_create(&TransportRpmsg_module->threadId, NULL,
+ &rpmsgThreadFxn, NULL);
+
+ if (status < 0) {
status = Ipc_E_FAIL;
+ printf("attach: failed to spawn thread\n");
+ goto done;
+ }
+ TransportRpmsg_module->threadStarted = TRUE;
+
+done:
+ if (status < 0) {
+ TransportRpmsg_Factory_delete();
}
-exit:
return (status);
}
*/
Void TransportRpmsg_Factory_delete(Void)
{
- Int i;
- UInt16 procId;
- UInt16 clusterSize;
- UInt16 clusterBase;
+ uint64_t event;
- /* needed to enumerate processors in cluster */
- clusterSize = MultiProc_getNumProcsInCluster();
- clusterBase = MultiProc_getBaseIdOfCluster();
- /* detach from all remote processors, assuming they are up */
- for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+ /* shutdown the message dispatch thread */
+ if (TransportRpmsg_module->threadStarted) {
+ event = TransportRpmsg_Event_SHUTDOWN;
+ write(TransportRpmsg_module->unblockEvent, &event, sizeof(event));
- if (MultiProc_self() == procId) {
- continue;
- }
+ /* wait for dispatch thread to exit */
+ pthread_join(TransportRpmsg_module->threadId, NULL);
+ }
- if (TransportRpmsg_module->inst[i] != NULL) {
- MessageQ_unregisterTransport(procId, 0);
- TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
- }
+ /* destroy the mutex object */
+ pthread_mutex_destroy(&TransportRpmsg_module->gate);
+
+ /* close the client wait event */
+ if (TransportRpmsg_module->waitEvent != -1) {
+ close(TransportRpmsg_module->waitEvent);
+ TransportRpmsg_module->waitEvent = -1;
+ }
+
+ /* close the dispatch thread unblock event */
+ if (TransportRpmsg_module->unblockEvent != -1) {
+ close(TransportRpmsg_module->unblockEvent);
+ TransportRpmsg_module->unblockEvent = -1;
+ }
+
+ /* free the instance handle array */
+ if (TransportRpmsg_module->inst != NULL) {
+ free(TransportRpmsg_module->inst);
+ TransportRpmsg_module->inst = NULL;
}
return;
}
+
+/*
+ * ======== TransportRpmsg_Factory_attach ========
+ */
+Int TransportRpmsg_Factory_attach(UInt16 procId)
+{
+ Int status = Ipc_S_SUCCESS;
+ UInt16 clusterId;
+ TransportRpmsg_Params params;
+ TransportRpmsg_Handle transport;
+ IMessageQTransport_Handle iMsgQTrans;
+
+ /* cannot attach to yourself */
+ if (MultiProc_self() == procId) {
+ status = Ipc_E_INVALIDARG;
+ goto done;
+ }
+
+ /* processor must be a member of the cluster */
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+ if (clusterId >= MultiProc_getNumProcsInCluster()) {
+ status = Ipc_E_INVALIDARG;
+ goto done;
+ }
+
+ /* create transport instance for given processor */
+ params.rprocId = procId;
+ transport = TransportRpmsg_create(¶ms);
+
+ if (transport == NULL) {
+ status = Ipc_E_FAIL;
+ goto done;
+ }
+
+ /* register transport instance with MessageQ */
+ iMsgQTrans = TransportRpmsg_upCast(transport);
+ MessageQ_registerTransport(iMsgQTrans, procId, 0);
+ TransportRpmsg_module->inst[clusterId] = transport;
+
+done:
+ return (status);
+}
+
+/*
+ * ======== TransportRpmsg_Factory_detach ========
+ */
+Int TransportRpmsg_Factory_detach(UInt16 procId)
+{
+ Int status = Ipc_S_SUCCESS;
+ UInt16 clusterId;
+
+ /* cannot detach from yourself */
+ if (MultiProc_self() == procId) {
+ status = Ipc_E_INVALIDARG;
+ goto done;
+ }
+
+ /* processor must be a member of the cluster */
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+
+ if (clusterId >= MultiProc_getNumProcsInCluster()) {
+ status = Ipc_E_INVALIDARG;
+ goto done;
+ }
+
+ /* must be attached in order to detach */
+ if (TransportRpmsg_module->inst[clusterId] == NULL) {
+ status = Ipc_E_INVALIDSTATE;
+ goto done;
+ }
+
+ /* unregister from MessageQ, delete the transport instance */
+ MessageQ_unregisterTransport(procId, 0);
+ TransportRpmsg_delete(&(TransportRpmsg_module->inst[clusterId]));
+
+done:
+ return (status);
+}