index c07dc90614e7133841ba8b8c15954276c06facae..cbc602e211974d785401ac2dd034fcd10732406f 100644 (file)
/*
- * Copyright (c) 2014, Texas Instruments Incorporated
+ * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* Implementation of functions specified in the IMessageQTransport interface.
*/
-#include <ti/ipc/Std.h>
-
-#include <ti/ipc/MessageQ.h>
-#include <ti/ipc/MultiProc.h>
-#include <_MessageQ.h>
-
/* Socket Headers */
#include <sys/socket.h>
#include <sys/select.h>
/* Socket Protocol Family */
#include <net/rpmsg.h>
-/* Socket utils: */
-#include <SocketFxns.h>
+/* IPC headers */
+#include <ti/ipc/Std.h>
+#include <SocketFxns.h> /* Socket utils: */
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+#include <_MessageQ.h>
#include <_lad.h>
-#include <TransportRpmsg.h>
-
-
/* More magic rpmsg port numbers: */
#define MESSAGEQ_RPMSG_PORT 61
#define MESSAGEQ_RPMSG_MAXSIZE 512
int unblockEvent; /* eventFd for unblocking socket */
pthread_t threadId; /* ID returned by pthread_create() */
Bool threadStarted;
+
+ TransportRpmsg_Handle *inst; /* array of instances */
} TransportRpmsg_Module;
IMessageQTransport_Fxns TransportRpmsg_fxns = {
.bind = TransportRpmsg_bind,
.unbind = TransportRpmsg_unbind,
- .put = TransportRpmsg_put,
+ .put = TransportRpmsg_put
};
typedef struct TransportRpmsg_Object {
TransportRpmsg_Module TransportRpmsg_state = {
.sock = {0},
.threadStarted = FALSE,
+ .inst = NULL
};
TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
+Int TransportRpmsg_Factory_create(Void);
+Void TransportRpmsg_Factory_delete(Void);
+
+Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
+ .createFxn = TransportRpmsg_Factory_create,
+ .deleteFxn = TransportRpmsg_Factory_delete
+};
+
/* -------------------------------------------------------------------------- */
/* instance convertors */
Int *attachStatus)
{
TransportRpmsg_Object *obj;
- Int *queues;
Int rv;
rv = attach(params->rprocId);
static Int attach(UInt16 rprocId)
{
- Int status = MessageQ_S_SUCCESS;
- int sock;
+ Int status = MessageQ_S_SUCCESS;
+ int sock;
+ UInt16 clusterId;
+
+
+ clusterId = rprocId - MultiProc_getBaseIdOfCluster();
/* Create the socket for sending messages to the remote proc: */
sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
goto exitSock;
}
- TransportRpmsg_module->sock[rprocId] = sock;
+ TransportRpmsg_module->sock[clusterId] = sock;
if (TransportRpmsg_module->threadStarted == FALSE) {
/* create a module wide event to unblock the socket select thread */
exitSock:
close(sock);
- TransportRpmsg_module->sock[rprocId] = 0;
+ TransportRpmsg_module->sock[clusterId] = 0;
exit:
return status;
static Int detach(UInt16 rprocId)
{
- Int status = -1;
- int sock;
+ Int status = -1;
+ int sock;
+ UInt16 clusterId;
- sock = TransportRpmsg_module->sock[rprocId];
+ clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+ sock = TransportRpmsg_module->sock[clusterId];
if (sock) {
PRINTVERBOSE1("detach: closing socket: %d\n", sock)
rprocId = obj->rprocId;
- PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d queueIndex %d\n", rprocId, queueIndex)
+ PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
+ "queueIndex %d\n", rprocId, queueIndex)
/* Create the socket to receive messages for this messageQ. */
fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
Int status = TRUE;
int sock;
int err;
- UInt16 dstProcId = msg->dstProc;
+ UInt16 clusterId;
/*
* Retrieve the socket for the AF_SYSLINK protocol associated with this
* transport.
*/
- sock = TransportRpmsg_module->sock[dstProcId];
+ clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
+ sock = TransportRpmsg_module->sock[clusterId];
if (!sock) {
return FALSE;
}
else {
queueId = MessageQ_getDstQueue(retMsg);
- PRINTVERBOSE1("rpmsgThreadFxn: got message, delivering to queueId 0x%x\n", queueId)
+ PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+ "delivering to queueId 0x%x\n", queueId)
MessageQ_put(queueId, retMsg);
}
}
PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
- PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
- PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+ PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
+ "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
+ PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
+ msg->msgSize)
*retMsg = msg;
return obj->qIndexToFd[qIndex];
}
+/*
+ * ======== TransportRpmsg_Factory_create ========
+ * Create the transport instances
+ *
+ * Attach to all remote processors. For now, must attach to
+ * at least one to tolerate MessageQ_E_RESOURCE failures.
+ *
+ * This function implements the IPC Factory interface, so it
+ * returns Ipc status codes.
+ */
+Int TransportRpmsg_Factory_create(Void)
+{
+ Int status;
+ Int attachStatus;
+ Int i;
+ UInt16 procId;
+ Int32 attachedAny;
+ UInt16 clusterSize;
+ UInt16 clusterBase;
+
+ TransportRpmsg_Handle *inst;
+ TransportRpmsg_Handle transport;
+ TransportRpmsg_Params params;
+ IMessageQTransport_Handle iMsgQTrans;
+
+
+ status = Ipc_S_SUCCESS;
+ attachedAny = FALSE;
+
+ /* needed to enumerate processors in cluster */
+ clusterSize = MultiProc_getNumProcsInCluster();
+ clusterBase = MultiProc_getBaseIdOfCluster();
+
+ /* allocate the instance array */
+ inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
+
+ if (inst == NULL) {
+ printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
+ status = Ipc_E_MEMORY;
+ goto exit;
+ }
+
+ TransportRpmsg_module->inst = inst;
+
+ /* create transport instance for all processors in cluster */
+ for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+ if (MultiProc_self() == procId) {
+ continue;
+ }
+
+ params.rprocId = procId;
+ transport = TransportRpmsg_create(¶ms, &attachStatus);
+
+ if (transport != NULL) {
+ iMsgQTrans = TransportRpmsg_upCast(transport);
+ MessageQ_registerTransport(iMsgQTrans, procId, 0);
+ attachedAny = TRUE;
+ }
+ else {
+ if (attachStatus == MessageQ_E_RESOURCE) {
+ continue;
+ }
+ printf("TransportRpmsg_Factory_create: failed to attach to "
+ "procId=%d status=%d\n", procId, attachStatus);
+ status = Ipc_E_FAIL;
+ break;
+ }
+
+ TransportRpmsg_module->inst[i] = transport;
+ }
+
+ if (!attachedAny) {
+ status = Ipc_E_FAIL;
+ }
+
+exit:
+ return (status);
+}
+
+/*
+ * ======== TransportRpmsg_Factory_delete ========
+ * Finalize the transport instances
+ */
+Void TransportRpmsg_Factory_delete(Void)
+{
+ Int i;
+ UInt16 procId;
+ UInt16 clusterSize;
+ UInt16 clusterBase;
+
+ /* needed to enumerate processors in cluster */
+ clusterSize = MultiProc_getNumProcsInCluster();
+ clusterBase = MultiProc_getBaseIdOfCluster();
+
+ /* detach from all remote processors, assuming they are up */
+ for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+ if (MultiProc_self() == procId) {
+ continue;
+ }
+
+ if (TransportRpmsg_module->inst[i] != NULL) {
+ MessageQ_unregisterTransport(procId, 0);
+ TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
+ }
+ }
+
+ return;
+}