index 52c275e8379c90014686af17e63d107162650b3b..cbc602e211974d785401ac2dd034fcd10732406f 100644 (file)
* Implementation of functions specified in the IMessageQTransport interface.
*/
-#include <ti/ipc/Std.h>
-
-#include <ti/ipc/MessageQ.h>
-#include <ti/ipc/MultiProc.h>
-#include <_MessageQ.h>
-
/* Socket Headers */
#include <sys/socket.h>
#include <sys/select.h>
/* Socket Protocol Family */
#include <net/rpmsg.h>
-/* Socket utils: */
-#include <SocketFxns.h>
+/* IPC headers */
+#include <ti/ipc/Std.h>
+#include <SocketFxns.h> /* Socket utils: */
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+#include <_MessageQ.h>
#include <_lad.h>
-#include <TransportRpmsg.h>
-
-
/* More magic rpmsg port numbers: */
#define MESSAGEQ_RPMSG_PORT 61
#define MESSAGEQ_RPMSG_MAXSIZE 512
int unblockEvent; /* eventFd for unblocking socket */
pthread_t threadId; /* ID returned by pthread_create() */
Bool threadStarted;
+
+ TransportRpmsg_Handle *inst; /* array of instances */
} TransportRpmsg_Module;
IMessageQTransport_Fxns TransportRpmsg_fxns = {
.bind = TransportRpmsg_bind,
.unbind = TransportRpmsg_unbind,
- .put = TransportRpmsg_put,
+ .put = TransportRpmsg_put
};
typedef struct TransportRpmsg_Object {
TransportRpmsg_Module TransportRpmsg_state = {
.sock = {0},
.threadStarted = FALSE,
+ .inst = NULL
};
TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
+Int TransportRpmsg_Factory_create(Void);
+Void TransportRpmsg_Factory_delete(Void);
+
+Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
+ .createFxn = TransportRpmsg_Factory_create,
+ .deleteFxn = TransportRpmsg_Factory_delete
+};
+
/* -------------------------------------------------------------------------- */
/* instance convertors */
return obj->qIndexToFd[qIndex];
}
+/*
+ * ======== TransportRpmsg_Factory_create ========
+ * Create the transport instances
+ *
+ * Attach to all remote processors. For now, must attach to
+ * at least one to tolerate MessageQ_E_RESOURCE failures.
+ *
+ * This function implements the IPC Factory interface, so it
+ * returns Ipc status codes.
+ */
+Int TransportRpmsg_Factory_create(Void)
+{
+ Int status;
+ Int attachStatus;
+ Int i;
+ UInt16 procId;
+ Int32 attachedAny;
+ UInt16 clusterSize;
+ UInt16 clusterBase;
+
+ TransportRpmsg_Handle *inst;
+ TransportRpmsg_Handle transport;
+ TransportRpmsg_Params params;
+ IMessageQTransport_Handle iMsgQTrans;
+
+
+ status = Ipc_S_SUCCESS;
+ attachedAny = FALSE;
+
+ /* needed to enumerate processors in cluster */
+ clusterSize = MultiProc_getNumProcsInCluster();
+ clusterBase = MultiProc_getBaseIdOfCluster();
+
+ /* allocate the instance array */
+ inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
+
+ if (inst == NULL) {
+ printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
+ status = Ipc_E_MEMORY;
+ goto exit;
+ }
+
+ TransportRpmsg_module->inst = inst;
+
+ /* create transport instance for all processors in cluster */
+ for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+ if (MultiProc_self() == procId) {
+ continue;
+ }
+
+ params.rprocId = procId;
+ transport = TransportRpmsg_create(¶ms, &attachStatus);
+
+ if (transport != NULL) {
+ iMsgQTrans = TransportRpmsg_upCast(transport);
+ MessageQ_registerTransport(iMsgQTrans, procId, 0);
+ attachedAny = TRUE;
+ }
+ else {
+ if (attachStatus == MessageQ_E_RESOURCE) {
+ continue;
+ }
+ printf("TransportRpmsg_Factory_create: failed to attach to "
+ "procId=%d status=%d\n", procId, attachStatus);
+ status = Ipc_E_FAIL;
+ break;
+ }
+
+ TransportRpmsg_module->inst[i] = transport;
+ }
+
+ if (!attachedAny) {
+ status = Ipc_E_FAIL;
+ }
+
+exit:
+ return (status);
+}
+
+/*
+ * ======== TransportRpmsg_Factory_delete ========
+ * Finalize the transport instances
+ */
+Void TransportRpmsg_Factory_delete(Void)
+{
+ Int i;
+ UInt16 procId;
+ UInt16 clusterSize;
+ UInt16 clusterBase;
+
+ /* needed to enumerate processors in cluster */
+ clusterSize = MultiProc_getNumProcsInCluster();
+ clusterBase = MultiProc_getBaseIdOfCluster();
+
+ /* detach from all remote processors, assuming they are up */
+ for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+ if (MultiProc_self() == procId) {
+ continue;
+ }
+
+ if (TransportRpmsg_module->inst[i] != NULL) {
+ MessageQ_unregisterTransport(procId, 0);
+ TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
+ }
+ }
+
+ return;
+}