]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - ipc/ipcdev.git/blobdiff - linux/src/transport/TransportRpmsg.c
Generated makefiles to follow up on the previous commit.
[ipc/ipcdev.git] / linux / src / transport / TransportRpmsg.c
index c07dc90614e7133841ba8b8c15954276c06facae..cbc602e211974d785401ac2dd034fcd10732406f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, Texas Instruments Incorporated
+ * Copyright (c) 2014-2015 Texas Instruments Incorporated - http://www.ti.com
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  *  Implementation of functions specified in the IMessageQTransport interface.
  */
 
-#include <ti/ipc/Std.h>
-
-#include <ti/ipc/MessageQ.h>
-#include <ti/ipc/MultiProc.h>
-#include <_MessageQ.h>
-
 /* Socket Headers */
 #include <sys/socket.h>
 #include <sys/select.h>
 /* Socket Protocol Family */
 #include <net/rpmsg.h>
 
-/* Socket utils: */
-#include <SocketFxns.h>
 
+/* IPC headers */
+#include <ti/ipc/Std.h>
+#include <SocketFxns.h>         /* Socket utils: */
+#include <ti/ipc/Ipc.h>
+#include <ti/ipc/MessageQ.h>
+#include <ti/ipc/MultiProc.h>
+#include <ti/ipc/transports/TransportRpmsg.h>
+#include <_MessageQ.h>
 #include <_lad.h>
 
-#include <TransportRpmsg.h>
-
-
 /* More magic rpmsg port numbers: */
 #define MESSAGEQ_RPMSG_PORT       61
 #define MESSAGEQ_RPMSG_MAXSIZE   512
@@ -87,12 +84,14 @@ typedef struct TransportRpmsg_Module {
     int             unblockEvent;    /* eventFd for unblocking socket */
     pthread_t       threadId;        /* ID returned by pthread_create() */
     Bool            threadStarted;
+
+    TransportRpmsg_Handle *inst;    /* array of instances */
 } TransportRpmsg_Module;
 
 IMessageQTransport_Fxns TransportRpmsg_fxns = {
     .bind    = TransportRpmsg_bind,
     .unbind  = TransportRpmsg_unbind,
-    .put     = TransportRpmsg_put,
+    .put     = TransportRpmsg_put
 };
 
 typedef struct TransportRpmsg_Object {
@@ -106,6 +105,7 @@ typedef struct TransportRpmsg_Object {
 TransportRpmsg_Module TransportRpmsg_state = {
     .sock = {0},
     .threadStarted = FALSE,
+    .inst = NULL
 };
 TransportRpmsg_Module *TransportRpmsg_module = &TransportRpmsg_state;
 
@@ -119,6 +119,14 @@ static Void bindFdToQueueIndex(TransportRpmsg_Object *obj,
 static Void unbindQueueIndex(TransportRpmsg_Object *obj, UInt16 qIndex);
 static Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex);
 
+Int TransportRpmsg_Factory_create(Void);
+Void TransportRpmsg_Factory_delete(Void);
+
+Ipc_TransportFactoryFxns TransportRpmsg_Factory = {
+    .createFxn = TransportRpmsg_Factory_create,
+    .deleteFxn = TransportRpmsg_Factory_delete
+};
+
 /* -------------------------------------------------------------------------- */
 
 /* instance convertors */
@@ -137,7 +145,6 @@ TransportRpmsg_Handle TransportRpmsg_create(TransportRpmsg_Params *params,
                                             Int *attachStatus)
 {
     TransportRpmsg_Object *obj;
-    Int *queues;
     Int rv;
 
     rv = attach(params->rprocId);
@@ -176,8 +183,12 @@ Void TransportRpmsg_delete(TransportRpmsg_Handle *handlep)
 
 static Int attach(UInt16 rprocId)
 {
-    Int    status = MessageQ_S_SUCCESS;
-    int    sock;
+    Int     status = MessageQ_S_SUCCESS;
+    int     sock;
+    UInt16  clusterId;
+
+
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
 
     /* Create the socket for sending messages to the remote proc: */
     sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
@@ -203,7 +214,7 @@ static Int attach(UInt16 rprocId)
         goto exitSock;
     }
 
-    TransportRpmsg_module->sock[rprocId] = sock;
+    TransportRpmsg_module->sock[clusterId] = sock;
 
     if (TransportRpmsg_module->threadStarted == FALSE) {
         /* create a module wide event to unblock the socket select thread */
@@ -250,7 +261,7 @@ exitEvent:
 
 exitSock:
     close(sock);
-    TransportRpmsg_module->sock[rprocId] = 0;
+    TransportRpmsg_module->sock[clusterId] = 0;
 
 exit:
     return status;
@@ -259,10 +270,12 @@ exit:
 static Int detach(UInt16 rprocId)
 {
 
-    Int status = -1;
-    int sock;
+    Int     status = -1;
+    int     sock;
+    UInt16  clusterId;
 
-    sock = TransportRpmsg_module->sock[rprocId];
+    clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+    sock = TransportRpmsg_module->sock[clusterId];
 
     if (sock) {
         PRINTVERBOSE1("detach: closing socket: %d\n", sock)
@@ -284,7 +297,8 @@ Int TransportRpmsg_bind(Void *handle, UInt32 queueId)
 
     rprocId = obj->rprocId;
 
-    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d queueIndex %d\n", rprocId, queueIndex)
+    PRINTVERBOSE2("TransportRpmsg_bind: creating endpoint for rprocId %d "
+            "queueIndex %d\n", rprocId, queueIndex)
 
     /*  Create the socket to receive messages for this messageQ. */
     fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
@@ -409,13 +423,14 @@ Bool TransportRpmsg_put(Void *handle, Ptr pmsg)
     Int     status    = TRUE;
     int     sock;
     int     err;
-    UInt16  dstProcId = msg->dstProc;
+    UInt16  clusterId;
 
     /*
      * Retrieve the socket for the AF_SYSLINK protocol associated with this
      * transport.
      */
-    sock = TransportRpmsg_module->sock[dstProcId];
+    clusterId = msg->dstProc - MultiProc_getBaseIdOfCluster();
+    sock = TransportRpmsg_module->sock[clusterId];
     if (!sock) {
         return FALSE;
     }
@@ -517,7 +532,8 @@ void *rpmsgThreadFxn(void *arg)
                         else {
                             queueId = MessageQ_getDstQueue(retMsg);
 
-                            PRINTVERBOSE1("rpmsgThreadFxn: got message, delivering to queueId 0x%x\n", queueId)
+                            PRINTVERBOSE1("rpmsgThreadFxn: got message, "
+                                    "delivering to queueId 0x%x\n", queueId)
 
                             MessageQ_put(queueId, retMsg);
                         }
@@ -593,8 +609,10 @@ static Int transportGet(int sock, MessageQ_Msg *retMsg)
     }
 
     PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
-    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
-    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+    PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg "
+            "proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
+    PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId,
+            msg->msgSize)
 
     *retMsg = msg;
 
@@ -637,3 +655,113 @@ Int queueIndexToFd(TransportRpmsg_Object *obj, UInt16 qIndex)
     return obj->qIndexToFd[qIndex];
 }
 
+/*
+ *  ======== TransportRpmsg_Factory_create ========
+ *  Create the transport instances
+ *
+ *  Attach to all remote processors. For now, must attach to
+ *  at least one to tolerate MessageQ_E_RESOURCE failures.
+ *
+ *  This function implements the IPC Factory interface, so it
+ *  returns Ipc status codes.
+ */
+Int TransportRpmsg_Factory_create(Void)
+{
+    Int     status;
+    Int     attachStatus;
+    Int     i;
+    UInt16  procId;
+    Int32   attachedAny;
+    UInt16  clusterSize;
+    UInt16  clusterBase;
+
+    TransportRpmsg_Handle      *inst;
+    TransportRpmsg_Handle       transport;
+    TransportRpmsg_Params       params;
+    IMessageQTransport_Handle   iMsgQTrans;
+
+
+    status = Ipc_S_SUCCESS;
+    attachedAny = FALSE;
+
+    /* needed to enumerate processors in cluster */
+    clusterSize = MultiProc_getNumProcsInCluster();
+    clusterBase = MultiProc_getBaseIdOfCluster();
+
+    /* allocate the instance array */
+    inst = calloc(clusterSize, sizeof(TransportRpmsg_Handle));
+
+    if (inst == NULL) {
+        printf("Error: TransportRpmsg_Factory_create failed, no memory\n");
+        status = Ipc_E_MEMORY;
+        goto exit;
+    }
+
+    TransportRpmsg_module->inst = inst;
+
+    /* create transport instance for all processors in cluster */
+    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+        if (MultiProc_self() == procId) {
+            continue;
+        }
+
+        params.rprocId = procId;
+        transport = TransportRpmsg_create(&params, &attachStatus);
+
+        if (transport != NULL) {
+            iMsgQTrans = TransportRpmsg_upCast(transport);
+            MessageQ_registerTransport(iMsgQTrans, procId, 0);
+            attachedAny = TRUE;
+        }
+        else {
+            if (attachStatus == MessageQ_E_RESOURCE) {
+                continue;
+            }
+            printf("TransportRpmsg_Factory_create: failed to attach to "
+                    "procId=%d status=%d\n", procId, attachStatus);
+            status = Ipc_E_FAIL;
+            break;
+        }
+
+        TransportRpmsg_module->inst[i] = transport;
+    }
+
+    if (!attachedAny) {
+        status = Ipc_E_FAIL;
+    }
+
+exit:
+    return (status);
+}
+
+/*
+ *  ======== TransportRpmsg_Factory_delete ========
+ *  Finalize the transport instances
+ */
+Void TransportRpmsg_Factory_delete(Void)
+{
+    Int     i;
+    UInt16  procId;
+    UInt16  clusterSize;
+    UInt16  clusterBase;
+
+    /* needed to enumerate processors in cluster */
+    clusterSize = MultiProc_getNumProcsInCluster();
+    clusterBase = MultiProc_getBaseIdOfCluster();
+
+    /* detach from all remote processors, assuming they are up */
+    for (i = 0, procId = clusterBase; i < clusterSize; i++, procId++) {
+
+        if (MultiProc_self() == procId) {
+            continue;
+        }
+
+        if (TransportRpmsg_module->inst[i] != NULL) {
+            MessageQ_unregisterTransport(procId, 0);
+            TransportRpmsg_delete(&(TransportRpmsg_module->inst[i]));
+        }
+    }
+
+    return;
+}