Adding RPMsg Extension layer implementing zero-copy send and receive.
authorMarek Novak (NXA17138) <marek.novak@nxp.com>
Wed, 24 Aug 2016 13:12:06 +0000 (15:12 +0200)
committerWendy Liang <jliang@xilinx.com>
Sat, 15 Oct 2016 04:39:20 +0000 (21:39 -0700)
Signed-off-by: Marek Novak <marek.novak@nxp.com>
Signed-off-by: Wendy Liang <jliang@xilinx.com>
lib/include/openamp/rpmsg.h
lib/include/openamp/rpmsg_core.h
lib/include/openamp/virtqueue.h
lib/rpmsg/rpmsg.c
lib/rpmsg/rpmsg_core.c
lib/virtio/virtqueue.c

index 022cfd1b2c1cbbd410289163e1ab437b6cc0993e..432afb43cfae0ceaa8877687b6e28ce66d4ec8c7 100644 (file)
@@ -41,6 +41,7 @@
 /* The feature bitmap for virtio rpmsg */
 #define VIRTIO_RPMSG_F_NS      0       /* RP supports name service notifications */
 #define RPMSG_NAME_SIZE     32
+#define RPMSG_BUF_HELD      (1U << 31) /* Flag to suggest to hold the buffer */
 
 #define RPMSG_LOCATE_DATA(p)  ((unsigned char *) p + sizeof (struct rpmsg_hdr))
 
@@ -63,6 +64,20 @@ struct rpmsg_hdr {
        uint16_t flags;
 } OPENAMP_PACKED_END;
 
+/**
+ * struct rpmsg_hdr_reserved - this is the "union" of the rpmsg_hdr->reserved
+ * @rfu: reserved for future usage
+ * @idx: index of a buffer (not to be returned back to the buffer's pool)
+ *
+ * This structure has been introduced to keep the backward compatibility. 
+ * It could be integrated into rpmsg_hdr struct, replacing the reserved field.
+ */
+struct rpmsg_hdr_reserved
+{
+       uint16_t rfu; /* reserved for future usage */
+       uint16_t idx;
+};
+
 /**
  * struct rpmsg_ns_msg - dynamic name service announcement message
  * @name: name of remote service that is published
@@ -316,6 +331,185 @@ static inline int rpmsg_trysend_offchannel(struct rpmsg_channel *rpdev,
                                         RPMSG_FALSE);
 }
 
+/**
+ * @brief Holds the rx buffer for usage outside the receive callback.
+ *
+ * Calling this function prevents the RPMsg receive buffer from being released
+ * back to the pool of shmem buffers. This API can only be called at rx
+ * callback context (rpmsg_rx_cb_t). With this API, the application doesn't
+ * need to copy the message in rx callback. Instead, the rx buffer base address
+ * is saved in application context and further processed in application
+ * process. After the message is processed, the application can release the rx
+ * buffer for future reuse in vring by calling the rpmsg_release_rx_buffer()
+ * function.
+ *
+ * @param[in] rpdev The rpmsg channel
+ * @param[in] rxbuf RX buffer with message payload
+ *
+ * @see rpmsg_release_rx_buffer
+ */
+void rpmsg_hold_rx_buffer(struct rpmsg_channel *rpdev, void *rxbuf);
+
+/**
+ * @brief Releases the rx buffer for future reuse in vring.
+ *
+ * This API can be called at process context when the message in rx buffer is
+ * processed.
+ *
+ * @param rpdev - the rpmsg channel
+ * @param rxbuf - rx buffer with message payload
+ *
+ * @see rpmsg_hold_rx_buffer
+ */
+void rpmsg_release_rx_buffer(struct rpmsg_channel *rpdev, void *rxbuf);
+
+/**
+ * @brief Gets the tx buffer for message payload.
+ *
+ * This API can only be called at process context to get the tx buffer in vring.
+ * By this way, the application can directly put its message into the vring tx
+ * buffer without copy from an application buffer.
+ * It is the application responsibility to correctly fill the allocated tx
+ * buffer by data and passing correct parameters to the rpmsg_send_nocopy() or
+ * rpmsg_sendto_nocopy() function to perform data no-copy-send mechanism.
+ *
+ * @param[in] rpdev Pointer to rpmsg channel
+ * @param[in] size  Pointer to store tx buffer size
+ * @param[in] wait  Boolean, wait or not for buffer to become available
+ *
+ * @return The tx buffer address on success and NULL on failure
+ *
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_sendto_nocopy
+ * @see rpmsg_send_nocopy
+ */
+void *rpmsg_get_tx_payload_buffer(struct rpmsg_channel *rpdev, uint32_t *size,
+                           int wait);
+
+/**
+ * @brief Sends a message in tx buffer allocated by rpmsg_alloc_tx_buffer()
+ * using explicit src/dst addresses.
+ *
+ * This function sends txbuf of length len to the remote dst address,
+ * and uses src as the source address.
+ * The message will be sent to the remote processor which the rpdev
+ * channel belongs to.
+ * The application has to take the responsibility for:
+ *  1. tx buffer allocation (rpmsg_alloc_tx_buffer() )
+ *  2. filling the data to be sent into the pre-allocated tx buffer
+ *  3. not exceeding the buffer size when filling the data
+ *  4. data cache coherency
+ *
+ * After the rpmsg_send_offchannel_nocopy() function is issued the tx buffer is
+ * no more owned by the sending task and must not be touched anymore unless the
+ * rpmsg_send_offchannel_nocopy() function fails and returns an error. In that
+ * case the application should try to re-issue the
+ * rpmsg_send_offchannel_nocopy() again and if it is still not possible to send
+ * the message and the application wants to give it up from whatever reasons
+ * the rpmsg_release_rx_buffer function could be called, passing the pointer to
+ * the tx buffer to be released as a parameter.
+ *
+ * @param[in] rpdev The rpmsg channel
+ * @param[in] src   Source address
+ * @param[in] dst   Destination address
+ * @param[in] txbuf TX buffer with message filled
+ * @param[in] len   Length of payload
+ *
+ * @return 0 on success and an appropriate error value on failure
+ *
+ * @see rpmsg_alloc_tx_buffer
+ * @see rpmsg_sendto_nocopy
+ * @see rpmsg_send_nocopy
+ */
+int rpmsg_send_offchannel_nocopy(struct rpmsg_channel *rpdev, uint32_t src,
+                                uint32_t dst, void *txbuf, int len);
+
+/**
+ * @brief Sends a message in tx buffer allocated by rpmsg_alloc_tx_buffer()
+ * across to the remote processor, specify dst.
+ *
+ * This function sends txbuf of length len to the remote dst address.
+ * The message will be sent to the remote processor which the rpdev
+ * channel belongs to, using rpdev's source address.
+ * The application has to take the responsibility for:
+ *  1. tx buffer allocation (rpmsg_alloc_tx_buffer() )
+ *  2. filling the data to be sent into the pre-allocated tx buffer
+ *  3. not exceeding the buffer size when filling the data
+ *  4. data cache coherency
+ *
+ * After the rpmsg_sendto_nocopy() function is issued the tx buffer is no more
+ * owned by the sending task and must not be touched anymore unless the
+ * rpmsg_sendto_nocopy() function fails and returns an error. In that case the
+ * application should try to re-issue the rpmsg_sendto_nocopy() again and if
+ * it is still not possible to send the message and the application wants to
+ * give it up from whatever reasons the rpmsg_release_rx_buffer function
+ * could be called,
+ * passing the pointer to the tx buffer to be released as a parameter.
+ *
+ * @param[in] rpdev The rpmsg channel
+ * @param[in] txbuf TX buffer with message filled
+ * @param[in] len   Length of payload
+ * @param[in] dst   Destination address
+ *
+ * @return 0 on success and an appropriate error value on failure
+ *
+ * @see rpmsg_alloc_tx_buffer
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_send_nocopy
+ */
+static inline
+int rpmsg_sendto_nocopy(struct rpmsg_channel *rpdev, void *txbuf, int len,
+                       uint32_t dst)
+{
+       if (!rpdev)
+               return RPMSG_ERR_PARAM;
+
+       return rpmsg_send_offchannel_nocopy(rpdev, (uint32_t)rpdev->src, dst,
+                                           txbuf, len);
+}
+
+/**
+ * @brief Sends a message in tx buffer allocated by
+ * rpmsg_alloc_tx_buffer() across to the remote processor.
+ *
+ * This function sends txbuf of length len on the rpdev channel.
+ * The message will be sent to the remote processor which the rpdev
+ * channel belongs to, using rpdev's source and destination addresses.
+ * The application has to take the responsibility for:
+ *  1. tx buffer allocation (rpmsg_alloc_tx_buffer() )
+ *  2. filling the data to be sent into the pre-allocated tx buffer
+ *  3. not exceeding the buffer size when filling the data
+ *  4. data cache coherency
+ *
+ * After the rpmsg_send_nocopy() function is issued the tx buffer is no more
+ * owned by the sending task and must not be touched anymore unless the
+ * rpmsg_send_nocopy() function fails and returns an error. In that case the
+ * application should try to re-issue the rpmsg_send_nocopy() again and if
+ * it is still not possible to send the message and the application wants to
+ * give it up from whatever reasons the rpmsg_release_rx_buffer function
+ * could be called, passing the pointer to the tx buffer to be released as a
+ * parameter.
+ *
+ * @param[in] rpdev The rpmsg channel
+ * @param[in] txbuf TX buffer with message filled
+ * @param[in] len   Length of payload
+ *
+ * @return 0 on success and an appropriate error value on failure
+ *
+ * @see rpmsg_alloc_tx_buffer
+ * @see rpmsg_send_offchannel_nocopy
+ * @see rpmsg_sendto_nocopy
+ */
+static inline
+int rpmsg_send_nocopy(struct rpmsg_channel *rpdev, void *txbuf, int len)
+{
+       if (!rpdev)
+               return RPMSG_ERR_PARAM;
+
+       return rpmsg_send_offchannel_nocopy(rpdev, rpdev->src, rpdev->dst,
+                                           txbuf, len);
+}
+
 /**
  * rpmsg_init
  *
index 0923e2409e8135a60f29b5a1e5ddc9bbb1808130..ed17a2a3cf70606fd28e6307c0c0e251700b3b5b 100644 (file)
 #define RPMSG_ERR_DEV_INIT                      (RPMSG_ERROR_BASE - 7)
 #define RPMSG_ERR_DEV_ADDR                      (RPMSG_ERROR_BASE - 8)
 
+/* Zero-Copy extension macros */
+#define RPMSG_HDR_FROM_BUF(buf)             (struct rpmsg_hdr *)((char*)buf - \
+                                            sizeof(struct rpmsg_hdr))
+
 #if (RPMSG_DEBUG == true)
 #define RPMSG_ASSERT(_exp, _msg) do{ \
     if (!(_exp)){ openamp_print("%s - "_msg, __func__); while(1);} \
index 1ad4cb34e74dc9b299c5934bbda414b093e3ea55..0cfaf714c22672c99e5ad362c96edf2a25cd6c12 100644 (file)
@@ -208,7 +208,7 @@ int virtqueue_add_single_buffer(struct virtqueue *vq, void *cookie,
                                struct metal_sg *sg, int writable,
                                boolean has_next);
 
-void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len);
+void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len, uint16_t *idx);
 
 void *virtqueue_get_available_buffer(struct virtqueue *vq, uint16_t * avail_idx,
                                     uint32_t * len);
@@ -230,4 +230,6 @@ void virtqueue_notification(struct virtqueue *vq);
 
 uint32_t virtqueue_get_desc_size(struct virtqueue *vq);
 
+uint32_t virtqueue_get_buffer_length(struct virtqueue *vq, uint16_t idx);
+
 #endif                         /* VIRTQUEUE_H_ */
index 0bf3b58fb10872fbf0a952f7c76eb58a7baf549b..c48d12ac89b84acd24411e2554904ae1ae682957 100644 (file)
@@ -57,6 +57,7 @@
 #include <string.h>
 #include "openamp/rpmsg.h"
 #include "metal/sys.h"
+#include "metal/cache.h"
 
 /**
  * rpmsg_init
@@ -193,6 +194,7 @@ int rpmsg_send_offchannel_raw(struct rpmsg_channel *rp_chnl, uint32_t src,
        rp_hdr->dst = dst;
        rp_hdr->src = src;
        rp_hdr->len = size;
+       rp_hdr->reserved = 0;
 
        /* Copy data to rpmsg buffer. */
        if (rdev->proc->sh_buff.io->mem_flags & METAL_IO_MAPPED)
@@ -254,6 +256,144 @@ int rpmsg_get_buffer_size(struct rpmsg_channel *rp_chnl)
        return length;
 }
 
+void rpmsg_hold_rx_buffer(struct rpmsg_channel *rpdev, void *rxbuf)
+{
+       struct rpmsg_hdr *rp_hdr = NULL;
+       if (!rpdev || !rxbuf)
+           return;
+
+       rp_hdr = RPMSG_HDR_FROM_BUF(rxbuf);
+
+       /* set held status to keep buffer */
+       rp_hdr->reserved |= RPMSG_BUF_HELD;
+}
+
+void rpmsg_release_rx_buffer(struct rpmsg_channel *rpdev, void *rxbuf)
+{
+       struct rpmsg_hdr *hdr;
+       struct remote_device *rdev;
+       struct rpmsg_hdr_reserved * reserved = NULL;
+       struct metal_io_region *io;
+       unsigned int len;
+
+       if (!rpdev || !rxbuf)
+           return;
+
+       rdev = rpdev->rdev;
+       hdr = RPMSG_HDR_FROM_BUF(rxbuf);
+
+       /* Get the pointer to the reserved field that contains buffer size
+        * and the index */
+       reserved = (struct rpmsg_hdr_reserved*)&hdr->reserved;
+       hdr->reserved &= (~RPMSG_BUF_HELD);
+       len = (unsigned int)virtqueue_get_buffer_length(rdev->rvq,
+                                               reserved->idx);
+
+       io = rdev->proc->sh_buff.io;
+       if (io) {
+               if (! (io->mem_flags & METAL_UNCACHED))
+                       metal_cache_flush(rxbuf, len);
+       }
+
+       metal_mutex_acquire(&rdev->lock);
+
+       /* Return used buffer, with total length
+          (header length + buffer size). */
+       rpmsg_return_buffer(rdev, hdr, (unsigned long)len, reserved->idx);
+
+       metal_mutex_release(&rdev->lock);
+}
+
+void *rpmsg_get_tx_payload_buffer(struct rpmsg_channel *rpdev, uint32_t *size,
+                                int wait)
+{
+       struct rpmsg_hdr *hdr;
+       struct remote_device *rdev;
+       struct rpmsg_hdr_reserved *reserved;
+       unsigned short idx;
+       unsigned long buff_len, tick_count = 0;
+
+       if (!rpdev || !size)
+               return NULL;
+
+       rdev = rpdev->rdev;
+
+       metal_mutex_acquire(&rdev->lock);
+
+       /* Get tx buffer from vring */
+       hdr = (struct rpmsg_hdr *) rpmsg_get_tx_buffer(rdev, &buff_len, &idx);
+
+       metal_mutex_release(&rdev->lock);
+
+       if (!hdr && !wait) {
+               return NULL;
+       } else {
+               while (!hdr) {
+                       /*
+                        * Wait parameter is true - pool the buffer for
+                        * 15 secs as defined by the APIs.
+                        */
+                       env_sleep_msec(RPMSG_TICKS_PER_INTERVAL);
+                       metal_mutex_acquire(&rdev->lock);
+                       hdr = (struct rpmsg_hdr *) rpmsg_get_tx_buffer(rdev, &buff_len, &idx);
+                       metal_mutex_release(&rdev->lock);
+                       tick_count += RPMSG_TICKS_PER_INTERVAL;
+                       if (tick_count >= (RPMSG_TICK_COUNT / RPMSG_TICKS_PER_INTERVAL)) {
+                                       return NULL;
+                       }
+               }
+
+               /* Store the index into the reserved field to be used when sending */
+               reserved = (struct rpmsg_hdr_reserved*)&hdr->reserved;
+               reserved->idx = (uint16_t)idx;
+
+               /* Actual data buffer size is vring buffer size minus rpmsg header length */
+               *size = (uint32_t)(buff_len - sizeof(struct rpmsg_hdr));
+               return (void *)RPMSG_LOCATE_DATA(hdr);
+       }
+}
+
+int rpmsg_send_offchannel_nocopy(struct rpmsg_channel *rpdev, uint32_t src,
+                                uint32_t dst, void *txbuf, int len)
+{
+       struct rpmsg_hdr *hdr;
+       struct remote_device *rdev;
+       struct rpmsg_hdr_reserved * reserved = NULL;
+       int status;
+
+       if (!rpdev || !txbuf)
+           return RPMSG_ERR_PARAM;
+
+       rdev = rpdev->rdev;
+       hdr = RPMSG_HDR_FROM_BUF(txbuf);
+
+       /* Initialize RPMSG header. */
+       hdr->dst = dst;
+       hdr->src = src;
+       hdr->len = len;
+       hdr->flags = 0;
+       hdr->reserved &= (~RPMSG_BUF_HELD);
+
+       /* Get the pointer to the reserved field that contains buffer size and
+        * the index */
+       reserved = (struct rpmsg_hdr_reserved*)&hdr->reserved;
+
+       metal_mutex_acquire(&rdev->lock);
+
+       status = rpmsg_enqueue_buffer(rdev, hdr,
+                       (unsigned long)virtqueue_get_buffer_length(
+                       rdev->tvq, reserved->idx),
+                       reserved->idx);
+       if (status == RPMSG_SUCCESS) {
+               /* Let the other side know that there is a job to process. */
+               virtqueue_kick(rdev->tvq);
+       }
+
+       metal_mutex_release(&rdev->lock);
+
+       return status;
+}
+
 /**
  * rpmsg_create_ept
  *
index 18f65ff65b3982463ebc45c807529d2fb7db6cca..cd14ef5fdad9d817be9361ceccf1b59ef31b1220 100644 (file)
@@ -410,7 +410,7 @@ void *rpmsg_get_tx_buffer(struct remote_device *rdev, unsigned long *len,
        void *data;
 
        if (rdev->role == RPMSG_REMOTE) {
-               data = virtqueue_get_buffer(rdev->tvq, (uint32_t *) len);
+               data = virtqueue_get_buffer(rdev->tvq, (uint32_t *) len, idx);
                if (data == RPMSG_NULL) {
                        data = sh_mem_get_buffer(rdev->mem_pool);
                        *len = RPMSG_BUFFER_SIZE;
@@ -441,7 +441,7 @@ void *rpmsg_get_rx_buffer(struct remote_device *rdev, unsigned long *len,
 
        void *data;
        if (rdev->role == RPMSG_REMOTE) {
-               data = virtqueue_get_buffer(rdev->rvq, (uint32_t *) len);
+               data = virtqueue_get_buffer(rdev->rvq, (uint32_t *) len, idx);
        } else {
                data =
                    virtqueue_get_available_buffer(rdev->rvq, idx,
@@ -456,6 +456,7 @@ void *rpmsg_get_rx_buffer(struct remote_device *rdev, unsigned long *len,
                                        (unsigned int)(*len));
                }
        }
+
        return data;
 }
 
@@ -543,6 +544,7 @@ void rpmsg_rx_callback(struct virtqueue *vq)
        struct rpmsg_channel *rp_chnl;
        struct rpmsg_endpoint *rp_ept;
        struct rpmsg_hdr *rp_hdr;
+       struct rpmsg_hdr_reserved *reserved;
        struct metal_list *node;
        unsigned long len;
        unsigned short idx;
@@ -604,8 +606,17 @@ void rpmsg_rx_callback(struct virtqueue *vq)
 
                metal_mutex_acquire(&rdev->lock);
 
-               /* Return used buffers. */
-               rpmsg_return_buffer(rdev, rp_hdr, len, idx);
+               /* Check whether callback wants to hold buffer */
+               if (rp_hdr->reserved & RPMSG_BUF_HELD)
+               {
+                       /* 'rp_hdr->reserved' field is now used as storage for
+                        * 'idx' to release buffer later */
+                       reserved = (struct rpmsg_hdr_reserved*)&rp_hdr->reserved;
+                       reserved->idx = (uint16_t)idx;
+               } else {
+                       /* Return used buffers. */
+                       rpmsg_return_buffer(rdev, rp_hdr, len, idx);
+               }
 
                rp_hdr =
                    (struct rpmsg_hdr *)rpmsg_get_rx_buffer(rdev, &len, &idx);
index ca67a7b977363a33fc6ab5038c97ad3e9004b2c0..87cdf7d16f085c4038078aeaafe3f4dff75ed590 100644 (file)
@@ -266,10 +266,11 @@ int virtqueue_add_single_buffer(struct virtqueue *vq, void *cookie,
  *
  * @param vq            - Pointer to VirtIO queue control block
  * @param len           - Length of conumed buffer
+ * @param idx           - index of the buffer
  *
  * @return              - Pointer to used buffer
  */
-void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len)
+void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len, uint16_t * idx)
 {
        struct vring_used_elem *uep;
        void *cookie;
@@ -294,11 +295,18 @@ void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len)
        cookie = vq->vq_descx[desc_idx].cookie;
        vq->vq_descx[desc_idx].cookie = VQ_NULL;
 
+       if (idx != VQ_NULL)
+               *idx = used_idx;
        VQUEUE_IDLE(vq);
 
        return (cookie);
 }
 
+uint32_t virtqueue_get_buffer_length(struct virtqueue *vq, uint16_t idx)
+{
+       return vq->vq_ring.desc[idx].len;
+}
+
 /**
  * virtqueue_free   - Frees VirtIO queue resources
  *