aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2014-12-11 14:52:26 +0100
committerStefan Hajnoczi <stefanha@redhat.com>2014-12-12 16:57:55 +0000
commit28b240877bbcdc8add61be227f429b536edd4653 (patch)
treec7e3834242bd0c927fa5856754caed7c89dee97d
parentb5cf2c1b0897506a40e0c420391875acc484792b (diff)
linux-aio: queue requests that cannot be submitted
Keep a queue of requests that were not submitted; pass them to the kernel when a completion is reported, unless the queue is plugged. The array of iocbs is rebuilt every time from scratch. This avoids keeping the iocbs array and list synchronized. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Reviewed-by: Kevin Wolf <kwolf@redhat.com> Message-id: 1418305950-30924-2-git-send-email-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r--block/linux-aio.c75
1 files changed, 33 insertions, 42 deletions
diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b0f9..b6fbfd8c13 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -35,14 +35,13 @@ struct qemu_laiocb {
size_t nbytes;
QEMUIOVector *qiov;
bool is_read;
- QLIST_ENTRY(qemu_laiocb) node;
+ QSIMPLEQ_ENTRY(qemu_laiocb) next;
};
typedef struct {
- struct iocb *iocbs[MAX_QUEUED_IO];
int plugged;
- unsigned int size;
unsigned int idx;
+ QSIMPLEQ_HEAD(, qemu_laiocb) pending;
} LaioQueue;
struct qemu_laio_state {
@@ -59,6 +58,8 @@ struct qemu_laio_state {
int event_max;
};
+static int ioq_submit(struct qemu_laio_state *s);
+
static inline ssize_t io_event_ret(struct io_event *ev)
{
return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +136,10 @@ static void qemu_laio_completion_bh(void *opaque)
qemu_laio_process_completion(s, laiocb);
}
+
+ if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
+ ioq_submit(s);
+ }
}
static void qemu_laio_completion_cb(EventNotifier *e)
@@ -172,52 +177,40 @@ static const AIOCBInfo laio_aiocb_info = {
static void ioq_init(LaioQueue *io_q)
{
- io_q->size = MAX_QUEUED_IO;
- io_q->idx = 0;
+ QSIMPLEQ_INIT(&io_q->pending);
io_q->plugged = 0;
+ io_q->idx = 0;
}
static int ioq_submit(struct qemu_laio_state *s)
{
- int ret, i = 0;
- int len = s->io_q.idx;
-
- do {
- ret = io_submit(s->ctx, len, s->io_q.iocbs);
- } while (i++ < 3 && ret == -EAGAIN);
+ int ret, i;
+ int len = 0;
+ struct qemu_laiocb *aiocb;
+ struct iocb *iocbs[MAX_QUEUED_IO];
- /* empty io queue */
- s->io_q.idx = 0;
+ QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
+ iocbs[len++] = &aiocb->iocb;
+ if (len == MAX_QUEUED_IO) {
+ break;
+ }
+ }
+ ret = io_submit(s->ctx, len, iocbs);
+ if (ret == -EAGAIN) {
+ ret = 0;
+ }
if (ret < 0) {
- i = 0;
- } else {
- i = ret;
+ abort();
}
- for (; i < len; i++) {
- struct qemu_laiocb *laiocb =
- container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
- laiocb->ret = (ret < 0) ? ret : -EIO;
- qemu_laio_process_completion(s, laiocb);
+ for (i = 0; i < ret; i++) {
+ s->io_q.idx--;
+ QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
}
return ret;
}
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
-{
- unsigned int idx = s->io_q.idx;
-
- s->io_q.iocbs[idx++] = iocb;
- s->io_q.idx = idx;
-
- /* submit immediately if queue is full */
- if (idx == s->io_q.size) {
- ioq_submit(s);
- }
-}
-
void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
{
struct qemu_laio_state *s = aio_ctx;
@@ -236,7 +229,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
return 0;
}
- if (s->io_q.idx > 0) {
+ if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ret = ioq_submit(s);
}
@@ -276,12 +269,10 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
}
io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
- if (!s->io_q.plugged) {
- if (io_submit(s->ctx, 1, &iocbs) < 0) {
- goto out_free_aiocb;
- }
- } else {
- ioq_enqueue(s, iocbs);
+ QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
+ s->io_q.idx++;
+ if (s->io_q.idx == (s->io_q.plugged ? MAX_QUEUED_IO : 1)) {
+ ioq_submit(s);
}
return &laiocb->common;