ca67a7b977363a33fc6ab5038c97ad3e9004b2c0
[processor-sdk/open-amp.git] / lib / virtio / virtqueue.c
1 /*-
2  * Copyright (c) 2011, Bryan Venteicher <bryanv@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice unmodified, this list of conditions, and the following
10  *    disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
27 #include <string.h>
28 #include "openamp/virtqueue.h"
29 #include "metal/atomic.h"
30 #include "metal/dma.h"
31 #include "metal/io.h"
32 #include "metal/alloc.h"
34 /* Prototype for internal functions. */
35 static void vq_ring_init(struct virtqueue *);
36 static void vq_ring_update_avail(struct virtqueue *, uint16_t);
37 static uint16_t vq_ring_add_buffer(struct virtqueue *, struct vring_desc *,
38                                    uint16_t, struct metal_sg *, int, int);
39 static int vq_ring_enable_interrupt(struct virtqueue *, uint16_t);
40 static void vq_ring_free_chain(struct virtqueue *, uint16_t);
41 static int vq_ring_must_notify_host(struct virtqueue *vq);
42 static void vq_ring_notify_host(struct virtqueue *vq);
43 static int virtqueue_nused(struct virtqueue *vq);
45 /**
46  * virtqueue_create - Creates new VirtIO queue
47  *
48  * @param device    - Pointer to VirtIO device
49  * @param id        - VirtIO queue ID , must be unique
50  * @param name      - Name of VirtIO queue
51  * @param ring      - Pointer to vring_alloc_info control block
52  * @param callback  - Pointer to callback function, invoked
53  *                    when message is available on VirtIO queue
54  * @param notify    - Pointer to notify function, used to notify
55  *                    other side that there is job available for it
56  * @param shm_io    - shared memory I/O region of the virtqueue
57  * @param v_queue   - Created VirtIO queue.
58  *
59  * @return          - Function status
60  */
61 int virtqueue_create(struct virtio_device *virt_dev, unsigned short id,
62                      char *name, struct vring_alloc_info *ring,
63                      void (*callback) (struct virtqueue * vq),
64                      void (*notify) (struct virtqueue * vq),
65                      struct metal_io_region *shm_io,
66                      struct virtqueue **v_queue)
67 {
69         struct virtqueue *vq = VQ_NULL;
70         int status = VQUEUE_SUCCESS;
71         uint32_t vq_size = 0;
73         VQ_PARAM_CHK(ring == VQ_NULL, status, ERROR_VQUEUE_INVLD_PARAM);
74         VQ_PARAM_CHK(ring->num_descs == 0, status, ERROR_VQUEUE_INVLD_PARAM);
75         VQ_PARAM_CHK(ring->num_descs & (ring->num_descs - 1), status,
76                      ERROR_VRING_ALIGN);
78         //TODO : Error check for indirect buffer addition
80         if (status == VQUEUE_SUCCESS) {
82                 vq_size = sizeof(struct virtqueue)
83                     + (ring->num_descs) * sizeof(struct vq_desc_extra);
84                 vq = (struct virtqueue *)metal_allocate_memory(vq_size);
86                 if (vq == VQ_NULL) {
87                         return (ERROR_NO_MEM);
88                 }
90                 memset(vq, 0x00, vq_size);
92                 vq->vq_dev = virt_dev;
93                 strncpy(vq->vq_name, name, VIRTQUEUE_MAX_NAME_SZ);
94                 vq->vq_queue_index = id;
95                 vq->vq_alignment = ring->align;
96                 vq->vq_nentries = ring->num_descs;
97                 vq->vq_free_cnt = vq->vq_nentries;
98                 vq->callback = callback;
99                 vq->notify = notify;
100                 vq->shm_io = shm_io;
102                 //TODO : Whether we want to support indirect addition or not.
103                 vq->vq_ring_size = vring_size(ring->num_descs, ring->align);
104                 vq->vq_ring_mem = (void *)ring->vaddr;
106                 /* Initialize vring control block in virtqueue. */
107                 vq_ring_init(vq);
109                 /* Disable callbacks - will be enabled by the application
110                  * once initialization is completed.
111                  */
112                 virtqueue_disable_cb(vq);
114                 *v_queue = vq;
116                 //TODO : Need to add cleanup in case of error used with the indirect buffer addition
117                 //TODO: do we need to save the new queue in db based on its id
118         }
120         return (status);
123 /**
124  * virtqueue_add_buffer()   - Enqueues new buffer in vring for consumption
125  *                            by other side. Readable buffers are always
126  *                            inserted before writable buffers
127  *
128  * @param vq                - Pointer to VirtIO queue control block.
129  * @param sg                - Pointer to buffer scatter/gather list
130  * @param readable          - Number of readable buffers
131  * @param writable          - Number of writable buffers
132  * @param cookie            - Pointer to hold call back data
133  *
134  * @return                  - Function status
135  */
136 int virtqueue_add_buffer(struct virtqueue *vq, struct metal_sg *sg,
137                          int readable, int writable, void *cookie)
140         struct vq_desc_extra *dxp = VQ_NULL;
141         int status = VQUEUE_SUCCESS;
142         uint16_t head_idx;
143         uint16_t idx;
144         int needed;
146         needed = readable + writable;
148         VQ_PARAM_CHK(vq == VQ_NULL, status, ERROR_VQUEUE_INVLD_PARAM);
149         VQ_PARAM_CHK(needed < 1, status, ERROR_VQUEUE_INVLD_PARAM);
150         VQ_PARAM_CHK(vq->vq_free_cnt == 0, status, ERROR_VRING_FULL);
152         //TODO: Add parameters validation for indirect buffer addition
154         VQUEUE_BUSY(vq);
156         if (status == VQUEUE_SUCCESS) {
158                 //TODO : Indirect buffer addition support
160                 VQASSERT(vq, cookie != VQ_NULL, "enqueuing with no cookie");
162                 head_idx = vq->vq_desc_head_idx;
163                 VQ_RING_ASSERT_VALID_IDX(vq, head_idx);
164                 dxp = &vq->vq_descx[head_idx];
166                 VQASSERT(vq, (dxp->cookie == VQ_NULL),
167                          "cookie already exists for index");
169                 dxp->cookie = cookie;
170                 dxp->ndescs = needed;
172                 /* Enqueue buffer onto the ring. */
173                 idx = vq_ring_add_buffer(vq, vq->vq_ring.desc, head_idx, sg,
174                                          readable, writable);
176                 vq->vq_desc_head_idx = idx;
177                 vq->vq_free_cnt -= needed;
179                 if (vq->vq_free_cnt == 0) {
180                         VQ_RING_ASSERT_CHAIN_TERM(vq);
181                 } else {
182                         VQ_RING_ASSERT_VALID_IDX(vq, idx);
183                 }
185                 /*
186                  * Update vring_avail control block fields so that other
187                  * side can get buffer using it.
188                  */
189                 vq_ring_update_avail(vq, head_idx);
190         }
192         VQUEUE_IDLE(vq);
194         return (status);
197 /**
198  * virtqueue_add_single_buffer - Enqueues single buffer in vring
199  *
200  * @param vq                    - Pointer to VirtIO queue control block
201  * @param cookie                - Pointer to hold call back data
202  * @param sg                    - metal_scatter/gather struct element
203  * @param writable              - If buffer writable
204  * @param has_next              - If buffers for subsequent call are
205  *                                to be chained
206  *
207  * @return                      - Function status
208  */
209 int virtqueue_add_single_buffer(struct virtqueue *vq, void *cookie,
210                                 struct metal_sg *sg, int writable,
211                                 boolean has_next)
214         struct vq_desc_extra *dxp;
215         struct vring_desc *dp;
216         uint16_t head_idx;
217         uint16_t idx;
218         int status = VQUEUE_SUCCESS;
220         VQ_PARAM_CHK(vq == VQ_NULL, status, ERROR_VQUEUE_INVLD_PARAM);
221         VQ_PARAM_CHK(vq->vq_free_cnt == 0, status, ERROR_VRING_FULL);
223         VQUEUE_BUSY(vq);
225         if (status == VQUEUE_SUCCESS) {
227                 VQASSERT(vq, cookie != VQ_NULL, "enqueuing with no cookie");
229                 head_idx = vq->vq_desc_head_idx;
230                 dxp = &vq->vq_descx[head_idx];
232                 dxp->cookie = cookie;
233                 dxp->ndescs = 1;
234                 idx = head_idx;
236                 dp = &vq->vq_ring.desc[idx];
237                 dp->addr = metal_io_virt_to_phys(sg->io, sg->virt);
238                 dp->len = sg->len;
239                 dp->flags = 0;
240                 idx = dp->next;
242                 if (has_next)
243                         dp->flags |= VRING_DESC_F_NEXT;
244                 if (writable)
245                         dp->flags |= VRING_DESC_F_WRITE;
247                 vq->vq_desc_head_idx = idx;
248                 vq->vq_free_cnt--;
250                 if (vq->vq_free_cnt == 0) {
251                         VQ_RING_ASSERT_CHAIN_TERM(vq);
252                 } else {
253                         VQ_RING_ASSERT_VALID_IDX(vq, idx);
254                 }
256                 vq_ring_update_avail(vq, head_idx);
257         }
259         VQUEUE_IDLE(vq);
261         return (status);
264 /**
265  * virtqueue_get_buffer - Returns used buffers from VirtIO queue
266  *
267  * @param vq            - Pointer to VirtIO queue control block
268  * @param len           - Length of conumed buffer
269  *
270  * @return              - Pointer to used buffer
271  */
272 void *virtqueue_get_buffer(struct virtqueue *vq, uint32_t * len)
274         struct vring_used_elem *uep;
275         void *cookie;
276         uint16_t used_idx, desc_idx;
278         if ((vq == VQ_NULL) || (vq->vq_used_cons_idx == vq->vq_ring.used->idx))
279                 return (VQ_NULL);
281         VQUEUE_BUSY(vq);
283         used_idx = vq->vq_used_cons_idx++ & (vq->vq_nentries - 1);
284         uep = &vq->vq_ring.used->ring[used_idx];
286         atomic_thread_fence(memory_order_seq_cst);
288         desc_idx = (uint16_t) uep->id;
289         if (len != VQ_NULL)
290                 *len = uep->len;
292         vq_ring_free_chain(vq, desc_idx);
294         cookie = vq->vq_descx[desc_idx].cookie;
295         vq->vq_descx[desc_idx].cookie = VQ_NULL;
297         VQUEUE_IDLE(vq);
299         return (cookie);
302 /**
303  * virtqueue_free   - Frees VirtIO queue resources
304  *
305  * @param vq        - Pointer to VirtIO queue control block
306  *
307  */
308 void virtqueue_free(struct virtqueue *vq)
311         if (vq != VQ_NULL) {
313                 if (vq->vq_free_cnt != vq->vq_nentries) {
314                         openamp_print
315                             ("\r\nWARNING %s: freeing non-empty virtqueue\r\n",
316                              vq->vq_name);
317                 }
318                 //TODO : Need to free indirect buffers here
320                 if (vq->vq_ring_mem != VQ_NULL) {
321                         vq->vq_ring_size = 0;
322                         vq->vq_ring_mem = VQ_NULL;
323                 }
325                 metal_free_memory(vq);
326         }
329 /**
330  * virtqueue_get_available_buffer   - Returns buffer available for use in the
331  *                                    VirtIO queue
332  *
333  * @param vq                        - Pointer to VirtIO queue control block
334  * @param avail_idx                 - Pointer to index used in vring desc table
335  * @param len                       - Length of buffer
336  *
337  * @return                          - Pointer to available buffer
338  */
339 void *virtqueue_get_available_buffer(struct virtqueue *vq, uint16_t * avail_idx,
340                                      uint32_t * len)
343         uint16_t head_idx = 0;
344         void *buffer;
346         if (vq->vq_available_idx == vq->vq_ring.avail->idx) {
347                 return (VQ_NULL);
348         }
350         VQUEUE_BUSY(vq);
352         head_idx = vq->vq_available_idx++ & (vq->vq_nentries - 1);
353         *avail_idx = vq->vq_ring.avail->ring[head_idx];
355         atomic_thread_fence(memory_order_seq_cst);
357         buffer = metal_io_phys_to_virt(vq->shm_io, vq->vq_ring.desc[*avail_idx].addr);
358         *len = vq->vq_ring.desc[*avail_idx].len;
360         VQUEUE_IDLE(vq);
362         return (buffer);
365 /**
366  * virtqueue_add_consumed_buffer - Returns consumed buffer back to VirtIO queue
367  *
368  * @param vq                     - Pointer to VirtIO queue control block
369  * @param head_idx               - Index of vring desc containing used buffer
370  * @param len                    - Length of buffer
371  *
372  * @return                       - Function status
373  */
374 int virtqueue_add_consumed_buffer(struct virtqueue *vq, uint16_t head_idx,
375                                   uint32_t len)
378         struct vring_used_elem *used_desc = VQ_NULL;
379         uint16_t used_idx;
381         if (head_idx > vq->vq_nentries) {
382                 return (ERROR_VRING_NO_BUFF);
383         }
385         VQUEUE_BUSY(vq);
387         used_idx = vq->vq_ring.used->idx & (vq->vq_nentries - 1);
388         used_desc = &(vq->vq_ring.used->ring[used_idx]);
389         used_desc->id = head_idx;
390         used_desc->len = len;
392         atomic_thread_fence(memory_order_seq_cst);
394         vq->vq_ring.used->idx++;
396         VQUEUE_IDLE(vq);
398         return (VQUEUE_SUCCESS);
401 /**
402  * virtqueue_enable_cb  - Enables callback generation
403  *
404  * @param vq            - Pointer to VirtIO queue control block
405  *
406  * @return              - Function status
407  */
408 int virtqueue_enable_cb(struct virtqueue *vq)
411         return (vq_ring_enable_interrupt(vq, 0));
414 /**
415  * virtqueue_enable_cb - Disables callback generation
416  *
417  * @param vq           - Pointer to VirtIO queue control block
418  *
419  */
420 void virtqueue_disable_cb(struct virtqueue *vq)
423         VQUEUE_BUSY(vq);
425         if (vq->vq_flags & VIRTQUEUE_FLAG_EVENT_IDX) {
426                 vring_used_event(&vq->vq_ring) =
427                     vq->vq_used_cons_idx - vq->vq_nentries - 1;
428         } else {
429                 vq->vq_ring.avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
430         }
432         VQUEUE_IDLE(vq);
435 /**
436  * virtqueue_kick - Notifies other side that there is buffer available for it.
437  *
438  * @param vq      - Pointer to VirtIO queue control block
439  */
440 void virtqueue_kick(struct virtqueue *vq)
443         VQUEUE_BUSY(vq);
445         /* Ensure updated avail->idx is visible to host. */
446         atomic_thread_fence(memory_order_seq_cst);
448         if (vq_ring_must_notify_host(vq))
449                 vq_ring_notify_host(vq);
451         vq->vq_queued_cnt = 0;
453         VQUEUE_IDLE(vq);
456 /**
457  * virtqueue_dump Dumps important virtqueue fields , use for debugging purposes
458  *
459  * @param vq - Pointer to VirtIO queue control block
460  */
461 void virtqueue_dump(struct virtqueue *vq)
464         if (vq == VQ_NULL)
465                 return;
467         openamp_print("VQ: %s - size=%d; free=%d; used=%d; queued=%d; "
468                   "desc_head_idx=%d; avail.idx=%d; used_cons_idx=%d; "
469                   "used.idx=%d; avail.flags=0x%x; used.flags=0x%x\r\n",
470                   vq->vq_name, vq->vq_nentries, vq->vq_free_cnt,
471                   virtqueue_nused(vq), vq->vq_queued_cnt, vq->vq_desc_head_idx,
472                   vq->vq_ring.avail->idx, vq->vq_used_cons_idx,
473                   vq->vq_ring.used->idx, vq->vq_ring.avail->flags,
474                   vq->vq_ring.used->flags);
477 /**
478  * virtqueue_get_desc_size - Returns vring descriptor size
479  *
480  * @param vq            - Pointer to VirtIO queue control block
481  *
482  * @return              - Descriptor length
483  */
484 uint32_t virtqueue_get_desc_size(struct virtqueue * vq)
486         uint16_t head_idx = 0;
487         uint16_t avail_idx = 0;
488         uint32_t len = 0;
490         if (vq->vq_available_idx == vq->vq_ring.avail->idx) {
491                 return (VQ_NULL);
492         }
494         VQUEUE_BUSY(vq);
496         head_idx = vq->vq_available_idx & (vq->vq_nentries - 1);
497         avail_idx = vq->vq_ring.avail->ring[head_idx];
498         len = vq->vq_ring.desc[avail_idx].len;
500         VQUEUE_IDLE(vq);
502         return (len);
505 /**************************************************************************
506  *                            Helper Functions                            *
507  **************************************************************************/
509 /**
510  *
511  * vq_ring_add_buffer
512  *
513  */
514 static uint16_t vq_ring_add_buffer(struct virtqueue *vq,
515                                    struct vring_desc *desc, uint16_t head_idx,
516                                    struct metal_sg *sg, int readable,
517                                    int writable)
520         struct vring_desc *dp;
521         int i, needed;
522         uint16_t idx;
524         (void)vq;
526         needed = readable + writable;
528         for (i = 0, idx = head_idx; i < needed; i++, idx = dp->next) {
530                 VQASSERT(vq, idx != VQ_RING_DESC_CHAIN_END,
531                          "premature end of free desc chain");
533                 dp = &desc[idx];
534                 dp->addr = metal_io_virt_to_phys(sg[i].io, sg[i].virt);
535                 dp->len = sg[i].len;
536                 dp->flags = 0;
538                 if (i < needed - 1)
539                         dp->flags |= VRING_DESC_F_NEXT;
541                 /* Readable buffers are inserted into vring before the writable buffers. */
542                 if (i >= readable)
543                         dp->flags |= VRING_DESC_F_WRITE;
544         }
546         return (idx);
549 /**
550  *
551  * vq_ring_free_chain
552  *
553  */
554 static void vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
556         struct vring_desc *dp;
557         struct vq_desc_extra *dxp;
559         VQ_RING_ASSERT_VALID_IDX(vq, desc_idx);
560         dp = &vq->vq_ring.desc[desc_idx];
561         dxp = &vq->vq_descx[desc_idx];
563         if (vq->vq_free_cnt == 0) {
564                 VQ_RING_ASSERT_CHAIN_TERM(vq);
565         }
567         vq->vq_free_cnt += dxp->ndescs;
568         dxp->ndescs--;
570         if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
571                 while (dp->flags & VRING_DESC_F_NEXT) {
572                         VQ_RING_ASSERT_VALID_IDX(vq, dp->next);
573                         dp = &vq->vq_ring.desc[dp->next];
574                         dxp->ndescs--;
575                 }
576         }
578         VQASSERT(vq, (dxp->ndescs == 0),
579                  "failed to free entire desc chain, remaining");
581         /*
582          * We must append the existing free chain, if any, to the end of
583          * newly freed chain. If the virtqueue was completely used, then
584          * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
585          */
586         dp->next = vq->vq_desc_head_idx;
587         vq->vq_desc_head_idx = desc_idx;
590 /**
591  *
592  * vq_ring_init
593  *
594  */
595 static void vq_ring_init(struct virtqueue *vq)
597         struct vring *vr;
598         unsigned char *ring_mem;
599         int i, size;
601         ring_mem = vq->vq_ring_mem;
602         size = vq->vq_nentries;
603         vr = &vq->vq_ring;
605         vring_init(vr, size, ring_mem, vq->vq_alignment);
607         for (i = 0; i < size - 1; i++)
608                 vr->desc[i].next = i + 1;
609         vr->desc[i].next = VQ_RING_DESC_CHAIN_END;
612 /**
613  *
614  * vq_ring_update_avail
615  *
616  */
617 static void vq_ring_update_avail(struct virtqueue *vq, uint16_t desc_idx)
619         uint16_t avail_idx;
621         /*
622          * Place the head of the descriptor chain into the next slot and make
623          * it usable to the host. The chain is made available now rather than
624          * deferring to virtqueue_notify() in the hopes that if the host is
625          * currently running on another CPU, we can keep it processing the new
626          * descriptor.
627          */
628         avail_idx = vq->vq_ring.avail->idx & (vq->vq_nentries - 1);
629         vq->vq_ring.avail->ring[avail_idx] = desc_idx;
631         atomic_thread_fence(memory_order_seq_cst);
633         vq->vq_ring.avail->idx++;
635         /* Keep pending count until virtqueue_notify(). */
636         vq->vq_queued_cnt++;
639 /**
640  *
641  * vq_ring_enable_interrupt
642  *
643  */
644 static int vq_ring_enable_interrupt(struct virtqueue *vq, uint16_t ndesc)
647         /*
648          * Enable interrupts, making sure we get the latest index of
649          * what's already been consumed.
650          */
651         if (vq->vq_flags & VIRTQUEUE_FLAG_EVENT_IDX) {
652                 vring_used_event(&vq->vq_ring) = vq->vq_used_cons_idx + ndesc;
653         } else {
654                 vq->vq_ring.avail->flags &= ~VRING_AVAIL_F_NO_INTERRUPT;
655         }
657         atomic_thread_fence(memory_order_seq_cst);
659         /*
660          * Enough items may have already been consumed to meet our threshold
661          * since we last checked. Let our caller know so it processes the new
662          * entries.
663          */
664         if (virtqueue_nused(vq) > ndesc) {
665                 return (1);
666         }
668         return (0);
671 /**
672  *
673  * virtqueue_interrupt
674  *
675  */
676 void virtqueue_notification(struct virtqueue *vq)
679         if (vq->callback != VQ_NULL)
680                 vq->callback(vq);
683 /**
684  *
685  * vq_ring_must_notify_host
686  *
687  */
688 static int vq_ring_must_notify_host(struct virtqueue *vq)
690         uint16_t new_idx, prev_idx, event_idx;
692         if (vq->vq_flags & VIRTQUEUE_FLAG_EVENT_IDX) {
693                 new_idx = vq->vq_ring.avail->idx;
694                 prev_idx = new_idx - vq->vq_queued_cnt;
695                 event_idx = vring_avail_event(&vq->vq_ring);
697                 return (vring_need_event(event_idx, new_idx, prev_idx) != 0);
698         }
700         return ((vq->vq_ring.used->flags & VRING_USED_F_NO_NOTIFY) == 0);
703 /**
704  *
705  * vq_ring_notify_host
706  *
707  */
708 static void vq_ring_notify_host(struct virtqueue *vq)
711         if (vq->notify != VQ_NULL)
712                 vq->notify(vq);
715 /**
716  *
717  * virtqueue_nused
718  *
719  */
720 static int virtqueue_nused(struct virtqueue *vq)
722         uint16_t used_idx, nused;
724         used_idx = vq->vq_ring.used->idx;
726         nused = (uint16_t) (used_idx - vq->vq_used_cons_idx);
727         VQASSERT(vq, nused <= vq->vq_nentries, "used more than available");
729         return (nused);