aboutsummaryrefslogtreecommitdiff
path: root/block
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2017-02-21 11:58:03 +0000
committerPeter Maydell <peter.maydell@linaro.org>2017-02-21 11:58:03 +0000
commita0775e28cd6cae7eae248f74db7bc4a03da20c6b (patch)
tree6d141e7710855c40fdaf81a4c2731995782e443c /block
parentb856256179f14c33a513d0b9cc3e4be355b95f43 (diff)
parenta7b91d35bab97a2d3e779d0c64c9b837b52a6cf7 (diff)
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging
Pull request v2: * Rebased to resolve scsi conflicts # gpg: Signature made Tue 21 Feb 2017 11:56:24 GMT # gpg: using RSA key 0x9CA4ABB381AB73C8 # gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" # gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>" # Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35 775A 9CA4 ABB3 81AB 73C8 * remotes/stefanha/tags/block-pull-request: (24 commits) coroutine-lock: make CoRwlock thread-safe and fair coroutine-lock: add mutex argument to CoQueue APIs coroutine-lock: place CoMutex before CoQueue in header test-aio-multithread: add performance comparison with thread-based mutexes coroutine-lock: add limited spinning to CoMutex coroutine-lock: make CoMutex thread-safe block: document fields protected by AioContext lock async: remove unnecessary inc/dec pairs aio-posix: partially inline aio_dispatch into aio_poll block: explicitly acquire aiocontext in aio callbacks that need it block: explicitly acquire aiocontext in bottom halves that need it block: explicitly acquire aiocontext in callbacks that need it block: explicitly acquire aiocontext in timers that need it aio: push aio_context_acquire/release down to dispatching qed: introduce qed_aio_start_io and qed_aio_next_io_cb blkdebug: reschedule coroutine on the AioContext it is running on coroutine-lock: reschedule coroutine on the AioContext it was running on nbd: convert to use qio_channel_yield io: make qio_channel_yield aware of AioContexts io: add methods to set I/O handlers on AioContext ... Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Diffstat (limited to 'block')
-rw-r--r--block/backup.c2
-rw-r--r--block/blkdebug.c9
-rwxr-xr-xblock/blkreplay.c2
-rw-r--r--block/block-backend.c13
-rw-r--r--block/curl.c44
-rw-r--r--block/gluster.c9
-rw-r--r--block/io.c42
-rw-r--r--block/iscsi.c15
-rw-r--r--block/linux-aio.c10
-rw-r--r--block/mirror.c12
-rw-r--r--block/nbd-client.c119
-rw-r--r--block/nbd-client.h2
-rw-r--r--block/nfs.c9
-rw-r--r--block/qcow2-cluster.c4
-rw-r--r--block/qed-cluster.c2
-rw-r--r--block/qed-table.c12
-rw-r--r--block/qed.c58
-rw-r--r--block/qed.h3
-rw-r--r--block/sheepdog.c31
-rw-r--r--block/ssh.c29
-rw-r--r--block/throttle-groups.c4
-rw-r--r--block/win32-aio.c9
22 files changed, 234 insertions, 206 deletions
diff --git a/block/backup.c b/block/backup.c
index ea38733849..fe010e78e3 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
retry = false;
QLIST_FOREACH(req, &job->inflight_reqs, list) {
if (end > req->start && start < req->end) {
- qemu_co_queue_wait(&req->wait_queue);
+ qemu_co_queue_wait(&req->wait_queue, NULL);
retry = true;
break;
}
diff --git a/block/blkdebug.c b/block/blkdebug.c
index acccf85666..d8eee1b9b4 100644
--- a/block/blkdebug.c
+++ b/block/blkdebug.c
@@ -405,12 +405,6 @@ out:
return ret;
}
-static void error_callback_bh(void *opaque)
-{
- Coroutine *co = opaque;
- qemu_coroutine_enter(co);
-}
-
static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
{
BDRVBlkdebugState *s = bs->opaque;
@@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
}
if (!immediately) {
- aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh,
- qemu_coroutine_self());
+ aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self());
qemu_coroutine_yield();
}
diff --git a/block/blkreplay.c b/block/blkreplay.c
index a741654d35..cfc8c5be02 100755
--- a/block/blkreplay.c
+++ b/block/blkreplay.c
@@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs)
static void blkreplay_bh_cb(void *opaque)
{
Request *req = opaque;
- qemu_coroutine_enter(req->co);
+ aio_co_wake(req->co);
qemu_bh_delete(req->bh);
g_free(req);
}
diff --git a/block/block-backend.c b/block/block-backend.c
index efbf398bb5..819f27213a 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
{
QEMUIOVector qiov;
struct iovec iov;
- Coroutine *co;
BlkRwCo rwco;
iov = (struct iovec) {
@@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
.ret = NOT_DONE,
};
- co = qemu_coroutine_create(co_entry, &rwco);
- qemu_coroutine_enter(co);
- BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+ if (qemu_in_coroutine()) {
+ /* Fast-path if already in coroutine context */
+ co_entry(&rwco);
+ } else {
+ Coroutine *co = qemu_coroutine_create(co_entry, &rwco);
+ qemu_coroutine_enter(co);
+ BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
+ }
return rwco.ret;
}
@@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
static void blk_aio_complete_bh(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
-
assert(acb->has_returned);
blk_aio_complete(acb);
}
diff --git a/block/curl.c b/block/curl.c
index 792fef8269..2939cc77e9 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
}
}
-static void curl_multi_do(void *arg)
+static void curl_multi_do_locked(CURLState *s)
{
- CURLState *s = (CURLState *)arg;
CURLSocket *socket, *next_socket;
int running;
int r;
@@ -406,12 +405,23 @@ static void curl_multi_do(void *arg)
}
}
+static void curl_multi_do(void *arg)
+{
+ CURLState *s = (CURLState *)arg;
+
+ aio_context_acquire(s->s->aio_context);
+ curl_multi_do_locked(s);
+ aio_context_release(s->s->aio_context);
+}
+
static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;
- curl_multi_do(arg);
+ aio_context_acquire(s->s->aio_context);
+ curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
+ aio_context_release(s->s->aio_context);
}
static void curl_multi_timeout_do(void *arg)
@@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg)
return;
}
+ aio_context_acquire(s->aio_context);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
curl_multi_check_completion(s);
+ aio_context_release(s->aio_context);
#else
abort();
#endif
@@ -784,13 +796,18 @@ static void curl_readv_bh_cb(void *p)
{
CURLState *state;
int running;
+ int ret = -EINPROGRESS;
CURLAIOCB *acb = p;
- BDRVCURLState *s = acb->common.bs->opaque;
+ BlockDriverState *bs = acb->common.bs;
+ BDRVCURLState *s = bs->opaque;
+ AioContext *ctx = bdrv_get_aio_context(bs);
size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;
+ aio_context_acquire(ctx);
+
// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
@@ -798,7 +815,7 @@ static void curl_readv_bh_cb(void *p)
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
- return;
+ goto out;
default:
break;
}
@@ -806,9 +823,8 @@ static void curl_readv_bh_cb(void *p)
// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
- acb->common.cb(acb->common.opaque, -EIO);
- qemu_aio_unref(acb);
- return;
+ ret = -EIO;
+ goto out;
}
acb->start = 0;
@@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p)
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
- acb->common.cb(acb->common.opaque, -ENOMEM);
- qemu_aio_unref(acb);
- return;
+ ret = -ENOMEM;
+ goto out;
}
state->acb[0] = acb;
@@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p)
/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
+
+out:
+ aio_context_release(ctx);
+ if (ret != -EINPROGRESS) {
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_unref(acb);
+ }
}
static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
diff --git a/block/gluster.c b/block/gluster.c
index 1a22f2982d..56b4abe3a7 100644
--- a/block/gluster.c
+++ b/block/gluster.c
@@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf,
return qemu_gluster_glfs_init(gconf, errp);
}
-static void qemu_gluster_complete_aio(void *opaque)
-{
- GlusterAIOCB *acb = (GlusterAIOCB *)opaque;
-
- qemu_coroutine_enter(acb->coroutine);
-}
-
/*
* AIO callback routine called from GlusterFS thread.
*/
@@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
acb->ret = -EIO; /* Partial read/write - fail it */
}
- aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb);
+ aio_co_schedule(acb->aio_context, acb->coroutine);
}
static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags)
diff --git a/block/io.c b/block/io.c
index c42b34a965..d5c45447fd 100644
--- a/block/io.c
+++ b/block/io.c
@@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_dec_in_flight(bs);
bdrv_drained_begin(bs);
data->done = true;
- qemu_coroutine_enter(co);
+ aio_co_wake(co);
}
static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
@@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
* (instead of producing a deadlock in the former case). */
if (!req->waiting_for) {
self->waiting_for = req;
- qemu_co_queue_wait(&req->wait_queue);
+ qemu_co_queue_wait(&req->wait_queue, NULL);
self->waiting_for = NULL;
retry = true;
waited = true;
@@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret)
CoroutineIOCompletion *co = opaque;
co->ret = ret;
- qemu_coroutine_enter(co->coroutine);
+ aio_co_wake(co->coroutine);
}
static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs,
@@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb)
if (acb->aiocb_info->get_aio_context) {
aio_poll(acb->aiocb_info->get_aio_context(acb), true);
} else if (acb->bs) {
+ /* qemu_aio_ref and qemu_aio_unref are not thread-safe, so
+ * assert that we're not using an I/O thread. Thread-safe
+ * code should use bdrv_aio_cancel_async exclusively.
+ */
+ assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context());
aio_poll(bdrv_get_aio_context(acb->bs), true);
} else {
abort();
@@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
return &acb->common;
}
-void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
- BlockCompletionFunc *cb, void *opaque)
-{
- BlockAIOCB *acb;
-
- acb = g_malloc(aiocb_info->aiocb_size);
- acb->aiocb_info = aiocb_info;
- acb->bs = bs;
- acb->cb = cb;
- acb->opaque = opaque;
- acb->refcnt = 1;
- return acb;
-}
-
-void qemu_aio_ref(void *p)
-{
- BlockAIOCB *acb = p;
- acb->refcnt++;
-}
-
-void qemu_aio_unref(void *p)
-{
- BlockAIOCB *acb = p;
- assert(acb->refcnt > 0);
- if (--acb->refcnt == 0) {
- g_free(acb);
- }
-}
-
/**************************************************************/
/* Coroutine block device emulation */
@@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
/* Wait until any previous flushes are completed */
while (bs->active_flush_req) {
- qemu_co_queue_wait(&bs->flush_queue);
+ qemu_co_queue_wait(&bs->flush_queue, NULL);
}
bs->active_flush_req = true;
diff --git a/block/iscsi.c b/block/iscsi.c
index 1860f1bc91..2561be90de 100644
--- a/block/iscsi.c
+++ b/block/iscsi.c
@@ -165,8 +165,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
static void iscsi_co_generic_bh_cb(void *opaque)
{
struct IscsiTask *iTask = opaque;
+
iTask->complete = 1;
- qemu_coroutine_enter(iTask->co);
+ aio_co_wake(iTask->co);
}
static void iscsi_retry_timer_expired(void *opaque)
@@ -174,7 +175,7 @@ static void iscsi_retry_timer_expired(void *opaque)
struct IscsiTask *iTask = opaque;
iTask->complete = 1;
if (iTask->co) {
- qemu_coroutine_enter(iTask->co);
+ aio_co_wake(iTask->co);
}
}
@@ -394,8 +395,10 @@ iscsi_process_read(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
+ aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLIN);
iscsi_set_events(iscsilun);
+ aio_context_release(iscsilun->aio_context);
}
static void
@@ -404,8 +407,10 @@ iscsi_process_write(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
+ aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLOUT);
iscsi_set_events(iscsilun);
+ aio_context_release(iscsilun->aio_context);
}
static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
@@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque)
{
IscsiLun *iscsilun = opaque;
+ aio_context_acquire(iscsilun->aio_context);
if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
error_report("iSCSI: NOP timeout. Reconnecting...");
iscsilun->request_timed_out = true;
} else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) {
error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages.");
- return;
+ goto out;
}
timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
iscsi_set_events(iscsilun);
+
+out:
+ aio_context_release(iscsilun->aio_context);
}
static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
diff --git a/block/linux-aio.c b/block/linux-aio.c
index 03ab741d37..88b8d55ec7 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -54,10 +54,10 @@ struct LinuxAioState {
io_context_t ctx;
EventNotifier e;
- /* io queue for submit at batch */
+ /* io queue for submit at batch. Protected by AioContext lock. */
LaioQueue io_q;
- /* I/O completion processing */
+ /* I/O completion processing. Only runs in I/O thread. */
QEMUBH *completion_bh;
int event_idx;
int event_max;
@@ -100,7 +100,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
* that!
*/
if (!qemu_coroutine_entered(laiocb->co)) {
- qemu_coroutine_enter(laiocb->co);
+ aio_co_wake(laiocb->co);
}
} else {
laiocb->common.cb(laiocb->common.opaque, ret);
@@ -234,9 +234,12 @@ static void qemu_laio_process_completions(LinuxAioState *s)
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
qemu_laio_process_completions(s);
+
+ aio_context_acquire(s->aio_context);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}
+ aio_context_release(s->aio_context);
}
static void qemu_laio_completion_bh(void *opaque)
@@ -455,6 +458,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{
aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
qemu_bh_delete(s->completion_bh);
+ s->aio_context = NULL;
}
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
diff --git a/block/mirror.c b/block/mirror.c
index 301ba9219a..698a54e50f 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
+
+ aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
@@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret)
}
}
mirror_iteration_done(op, ret);
+ aio_context_release(blk_get_aio_context(s->common.blk));
}
static void mirror_read_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
+
+ aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
@@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret)
}
mirror_iteration_done(op, ret);
- return;
+ } else {
+ blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
+ 0, mirror_write_complete, op);
}
- blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
- 0, mirror_write_complete, op);
+ aio_context_release(blk_get_aio_context(s->common.blk));
}
static inline void mirror_clip_sectors(MirrorBlockJob *s,
diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532805..0dc12c2d67 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i;
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
qemu_coroutine_enter(s->recv_coroutine[i]);
}
}
+ BDRV_POLL_WHILE(bs, s->read_reply_co);
}
static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
qio_channel_shutdown(client->ioc,
QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
- nbd_recv_coroutines_enter_all(client);
+ nbd_recv_coroutines_enter_all(bs);
nbd_client_detach_aio_context(bs);
object_unref(OBJECT(client->sioc));
@@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs)
client->ioc = NULL;
}
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
{
- BlockDriverState *bs = opaque;
- NBDClientSession *s = nbd_get_client_session(bs);
+ NBDClientSession *s = opaque;
uint64_t i;
int ret;
- if (!s->ioc) { /* Already closed */
- return;
- }
-
- if (s->reply.handle == 0) {
- /* No reply already in flight. Fetch a header. It is possible
- * that another thread has done the same thing in parallel, so
- * the socket is not readable anymore.
- */
+ for (;;) {
+ assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->ioc, &s->reply);
- if (ret == -EAGAIN) {
- return;
- }
if (ret < 0) {
- s->reply.handle = 0;
- goto fail;
+ break;
}
- }
- /* There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes. */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS) {
- goto fail;
- }
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes.
+ */
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+ break;
+ }
- if (s->recv_coroutine[i]) {
- qemu_coroutine_enter(s->recv_coroutine[i]);
- return;
+ /* We're woken up by the recv_coroutine itself. Note that there
+ * is no race between yielding and reentering read_reply_co. This
+ * is because:
+ *
+ * - if recv_coroutine[i] runs on the same AioContext, it is only
+ * entered after we yield
+ *
+ * - if recv_coroutine[i] runs on a different AioContext, reentering
+ * read_reply_co happens through a bottom half, which can only
+ * run after we yield.
+ */
+ aio_co_wake(s->recv_coroutine[i]);
+ qemu_coroutine_yield();
}
-
-fail:
- nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
- BlockDriverState *bs = opaque;
-
- qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+ s->read_reply_co = NULL;
}
static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
QEMUIOVector *qiov)
{
NBDClientSession *s = nbd_get_client_session(bs);
- AioContext *aio_context;
int rc, ret, i;
qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
return -EPIPE;
}
- s->send_coroutine = qemu_coroutine_self();
- aio_context = bdrv_get_aio_context(bs);
-
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, nbd_restart_write, NULL, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
@@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, NULL, NULL, bs);
- s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
{
int ret;
- /* Wait until we're woken up by the read handler. TODO: perhaps
- * peek at the next reply and avoid yielding if it's ours? */
+ /* Wait until we're woken up by nbd_read_reply_entry. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle ||
@@ -201,7 +182,7 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* Poor man semaphore. The free_sema is locked when no other request
* can be accepted, and unlocked after receiving one reply. */
if (s->in_flight == MAX_NBD_REQUESTS) {
- qemu_co_queue_wait(&s->free_sema);
+ qemu_co_queue_wait(&s->free_sema, NULL);
assert(s->in_flight < MAX_NBD_REQUESTS);
}
s->in_flight++;
@@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* s->recv_coroutine[i] is set as soon as we get the send_lock. */
}
-static void nbd_coroutine_end(NBDClientSession *s,
+static void nbd_coroutine_end(BlockDriverState *bs,
NBDRequest *request)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i = HANDLE_TO_INDEX(s, request->handle);
+
s->recv_coroutine[i] = NULL;
- if (s->in_flight-- == MAX_NBD_REQUESTS) {
- qemu_co_queue_next(&s->free_sema);
+ s->in_flight--;
+ qemu_co_queue_next(&s->free_sema);
+
+ /* Kick the read_reply_co to get the next reply. */
+ if (s->read_reply_co) {
+ aio_co_wake(s->read_reply_co);
}
}
@@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, qiov);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
- aio_set_fd_handler(bdrv_get_aio_context(bs),
- nbd_get_client_session(bs)->sioc->fd,
- false, NULL, NULL, NULL, NULL);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
- aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
- false, nbd_reply_ready, NULL, NULL, bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+ aio_co_schedule(new_context, client->read_reply_co);
}
void nbd_client_close(BlockDriverState *bs)
@@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+ client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f8d6006849..8cdfc92e94 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
CoMutex send_mutex;
CoQueue free_sema;
- Coroutine *send_coroutine;
+ Coroutine *read_reply_co;
int in_flight;
Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
diff --git a/block/nfs.c b/block/nfs.c
index 689eaa792e..08b43dd189 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -208,15 +208,21 @@ static void nfs_set_events(NFSClient *client)
static void nfs_process_read(void *arg)
{
NFSClient *client = arg;
+
+ aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLIN);
nfs_set_events(client);
+ aio_context_release(client->aio_context);
}
static void nfs_process_write(void *arg)
{
NFSClient *client = arg;
+
+ aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLOUT);
nfs_set_events(client);
+ aio_context_release(client->aio_context);
}
static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
@@ -231,8 +237,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
static void nfs_co_generic_bh_cb(void *opaque)
{
NFSRPC *task = opaque;
+
task->complete = 1;
- qemu_coroutine_enter(task->co);
+ aio_co_wake(task->co);
}
static void
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 928c1e298d..78c11d4948 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -932,9 +932,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
if (bytes == 0) {
/* Wait for the dependency to complete. We need to recheck
* the free/allocated clusters when we continue. */
- qemu_co_mutex_unlock(&s->lock);
- qemu_co_queue_wait(&old_alloc->dependent_requests);
- qemu_co_mutex_lock(&s->lock);
+ qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock);
return -EAGAIN;
}
}
diff --git a/block/qed-cluster.c b/block/qed-cluster.c
index c24e75616a..8f5da74c4d 100644
--- a/block/qed-cluster.c
+++ b/block/qed-cluster.c
@@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
unsigned int index;
unsigned int n;
+ qed_acquire(s);
if (ret) {
goto out;
}
@@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
out:
find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
+ qed_release(s);
g_free(find_cluster_cb);
}
diff --git a/block/qed-table.c b/block/qed-table.c
index ed443e2b70..b12c298a8a 100644
--- a/block/qed-table.c
+++ b/block/qed-table.c
@@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret)
{
QEDReadTableCB *read_table_cb = opaque;
QEDTable *table = read_table_cb->table;
+ BDRVQEDState *s = read_table_cb->s;
int noffsets = read_table_cb->qiov.size / sizeof(uint64_t);
int i;
@@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret)
}
/* Byteswap offsets */
+ qed_acquire(s);
for (i = 0; i < noffsets; i++) {
table->offsets[i] = le64_to_cpu(table->offsets[i]);
}
+ qed_release(s);
out:
/* Completion */
- trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
+ trace_qed_read_table_cb(s, read_table_cb->table, ret);
gencb_complete(&read_table_cb->gencb, ret);
}
@@ -84,8 +87,9 @@ typedef struct {
static void qed_write_table_cb(void *opaque, int ret)
{
QEDWriteTableCB *write_table_cb = opaque;
+ BDRVQEDState *s = write_table_cb->s;
- trace_qed_write_table_cb(write_table_cb->s,
+ trace_qed_write_table_cb(s,
write_table_cb->orig_table,
write_table_cb->flush,
ret);
@@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret)
if (write_table_cb->flush) {
/* We still need to flush first */
write_table_cb->flush = false;
+ qed_acquire(s);
bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
write_table_cb);
+ qed_release(s);
return;
}
@@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
CachedL2Table *l2_table = request->l2_table;
uint64_t l2_offset = read_l2_table_cb->l2_offset;
+ qed_acquire(s);
if (ret) {
/* can't trust loaded L2 table anymore */
qed_unref_l2_cache_entry(l2_table);
@@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(request->l2_table != NULL);
}
+ qed_release(s);
gencb_complete(&read_l2_table_cb->gencb, ret);
}
diff --git a/block/qed.c b/block/qed.c
index 1a7ef0a9ce..0b62c7799e 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
return l2_table;
}
-static void qed_aio_next_io(void *opaque, int ret);
+static void qed_aio_next_io(QEDAIOCB *acb, int ret);
+
+static void qed_aio_start_io(QEDAIOCB *acb)
+{
+ qed_aio_next_io(acb, 0);
+}
+
+static void qed_aio_next_io_cb(void *opaque, int ret)
+{
+ QEDAIOCB *acb = opaque;
+
+ qed_aio_next_io(acb, ret);
+}
static void qed_plug_allocating_write_reqs(BDRVQEDState *s)
{
@@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
}
}
@@ -333,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque)
trace_qed_need_check_timer_cb(s);
+ qed_acquire(s);
qed_plug_allocating_write_reqs(s);
/* Ensure writes are on disk before clearing flag */
bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
+ qed_release(s);
+}
+
+void qed_acquire(BDRVQEDState *s)
+{
+ aio_context_acquire(bdrv_get_aio_context(s->bs));
+}
+
+void qed_release(BDRVQEDState *s)
+{
+ aio_context_release(bdrv_get_aio_context(s->bs));
}
static void qed_start_need_check_timer(BDRVQEDState *s)
@@ -721,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l
}
if (cb->co) {
- qemu_coroutine_enter(cb->co);
+ aio_co_wake(cb->co);
}
}
@@ -918,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index,
static void qed_aio_complete_bh(void *opaque)
{
QEDAIOCB *acb = opaque;
+ BDRVQEDState *s = acb_to_s(acb);
BlockCompletionFunc *cb = acb->common.cb;
void *user_opaque = acb->common.opaque;
int ret = acb->bh_ret;
@@ -925,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque)
qemu_aio_unref(acb);
/* Invoke callback */
+ qed_acquire(s);
cb(user_opaque, ret);
+ qed_release(s);
}
static void qed_aio_complete(QEDAIOCB *acb, int ret)
@@ -959,7 +986,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret)
QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next);
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
} else if (s->header.features & QED_F_NEED_CHECK) {
qed_start_need_check_timer(s);
}
@@ -984,7 +1011,7 @@ static void qed_commit_l2_update(void *opaque, int ret)
acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(acb->request.l2_table != NULL);
- qed_aio_next_io(opaque, ret);
+ qed_aio_next_io(acb, ret);
}
/**
@@ -1032,11 +1059,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset)
if (need_alloc) {
/* Write out the whole new L2 table */
qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true,
- qed_aio_write_l1_update, acb);
+ qed_aio_write_l1_update, acb);
} else {
/* Write out only the updated part of the L2 table */
qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false,
- qed_aio_next_io, acb);
+ qed_aio_next_io_cb, acb);
}
return;
@@ -1088,7 +1115,7 @@ static void qed_aio_write_main(void *opaque, int ret)
}
if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
- next_fn = qed_aio_next_io;
+ next_fn = qed_aio_next_io_cb;
} else {
if (s->bs->backing) {
next_fn = qed_aio_write_flush_before_l2_update;
@@ -1201,7 +1228,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len)
if (acb->flags & QED_AIOCB_ZERO) {
/* Skip ahead if the clusters are already zero */
if (acb->find_cluster_ret == QED_CLUSTER_ZERO) {
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return;
}
@@ -1321,18 +1348,18 @@ static void qed_aio_read_data(void *opaque, int ret,
/* Handle zero cluster and backing file reads */
if (ret == QED_CLUSTER_ZERO) {
qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size);
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return;
} else if (ret != QED_CLUSTER_FOUND) {
qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov,
- &acb->backing_qiov, qed_aio_next_io, acb);
+ &acb->backing_qiov, qed_aio_next_io_cb, acb);
return;
}
BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE,
&acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE,
- qed_aio_next_io, acb);
+ qed_aio_next_io_cb, acb);
return;
err:
@@ -1342,9 +1369,8 @@ err:
/**
* Begin next I/O or complete the request
*/
-static void qed_aio_next_io(void *opaque, int ret)
+static void qed_aio_next_io(QEDAIOCB *acb, int ret)
{
- QEDAIOCB *acb = opaque;
BDRVQEDState *s = acb_to_s(acb);
QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ?
qed_aio_write_data : qed_aio_read_data;
@@ -1400,7 +1426,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs,
qemu_iovec_init(&acb->cur_qiov, qiov->niov);
/* Start request */
- qed_aio_next_io(acb, 0);
+ qed_aio_start_io(acb);
return &acb->common;
}
@@ -1436,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret)
cb->done = true;
cb->ret = ret;
if (cb->co) {
- qemu_coroutine_enter(cb->co);
+ aio_co_wake(cb->co);
}
}
diff --git a/block/qed.h b/block/qed.h
index 9676ab9479..ce8c314089 100644
--- a/block/qed.h
+++ b/block/qed.h
@@ -198,6 +198,9 @@ enum {
*/
typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len);
+void qed_acquire(BDRVQEDState *s);
+void qed_release(BDRVQEDState *s);
+
/**
* Generic callback for chaining async callbacks
*/
diff --git a/block/sheepdog.c b/block/sheepdog.c
index f757157cea..860ba61502 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -486,7 +486,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
retry:
QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
if (AIOCBOverlapping(acb, cb)) {
- qemu_co_queue_wait(&s->overlapping_queue);
+ qemu_co_queue_wait(&s->overlapping_queue, NULL);
goto retry;
}
}
@@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
return ret;
}
-static void restart_co_req(void *opaque)
-{
- Coroutine *co = opaque;
-
- qemu_coroutine_enter(co);
-}
-
typedef struct SheepdogReqCo {
int sockfd;
BlockDriverState *bs;
@@ -592,12 +585,19 @@ typedef struct SheepdogReqCo {
unsigned int *rlen;
int ret;
bool finished;
+ Coroutine *co;
} SheepdogReqCo;
+static void restart_co_req(void *opaque)
+{
+ SheepdogReqCo *srco = opaque;
+
+ aio_co_wake(srco->co);
+}
+
static coroutine_fn void do_co_req(void *opaque)
{
int ret;
- Coroutine *co;
SheepdogReqCo *srco = opaque;
int sockfd = srco->sockfd;
SheepdogReq *hdr = srco->hdr;
@@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque)
unsigned int *wlen = srco->wlen;
unsigned int *rlen = srco->rlen;
- co = qemu_coroutine_self();
+ srco->co = qemu_coroutine_self();
aio_set_fd_handler(srco->aio_context, sockfd, false,
- NULL, restart_co_req, NULL, co);
+ NULL, restart_co_req, NULL, srco);
ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) {
@@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque)
}
aio_set_fd_handler(srco->aio_context, sockfd, false,
- restart_co_req, NULL, NULL, co);
+ restart_co_req, NULL, NULL, srco);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret != sizeof(*hdr)) {
@@ -643,6 +643,7 @@ out:
aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, NULL, NULL, NULL);
+ srco->co = NULL;
srco->ret = ret;
srco->finished = true;
if (srco->bs) {
@@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque)
* We've finished all requests which belong to the AIOCB, so
* we can switch back to sd_co_readv/writev now.
*/
- qemu_coroutine_enter(acb->coroutine);
+ aio_co_wake(acb->coroutine);
}
return;
@@ -883,14 +884,14 @@ static void co_read_response(void *opaque)
s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
}
- qemu_coroutine_enter(s->co_recv);
+ aio_co_wake(s->co_recv);
}
static void co_write_request(void *opaque)
{
BDRVSheepdogState *s = opaque;
- qemu_coroutine_enter(s->co_send);
+ aio_co_wake(s->co_send);
}
/*
diff --git a/block/ssh.c b/block/ssh.c
index e0edf20f78..835932e6a4 100644
--- a/block/ssh.c
+++ b/block/ssh.c
@@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque)
DPRINTF("co=%p", co);
- qemu_coroutine_enter(co);
+ aio_co_wake(co);
}
-static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
+/* A non-blocking call returned EAGAIN, so yield, ensuring the
+ * handlers are set up so that we'll be rescheduled when there is an
+ * interesting event on the socket.
+ */
+static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
{
int r;
IOHandler *rd_handler = NULL, *wr_handler = NULL;
@@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, rd_handler, wr_handler, NULL, co);
-}
-
-static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
- BlockDriverState *bs)
-{
- DPRINTF("s->sock=%d", s->sock);
- aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
- false, NULL, NULL, NULL, NULL);
-}
-
-/* A non-blocking call returned EAGAIN, so yield, ensuring the
- * handlers are set up so that we'll be rescheduled when there is an
- * interesting event on the socket.
- */
-static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
-{
- set_fd_handler(s, bs);
qemu_coroutine_yield();
- clear_fd_handler(s, bs);
+ DPRINTF("s->sock=%d - back", s->sock);
+ aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false,
+ NULL, NULL, NULL, NULL);
}
/* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 17b2efb7c7..b73e7a800b 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -326,7 +326,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk,
if (must_wait || blkp->pending_reqs[is_write]) {
blkp->pending_reqs[is_write]++;
qemu_mutex_unlock(&tg->lock);
- qemu_co_queue_wait(&blkp->throttled_reqs[is_write]);
+ qemu_co_queue_wait(&blkp->throttled_reqs[is_write], NULL);
qemu_mutex_lock(&tg->lock);
blkp->pending_reqs[is_write]--;
}
@@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write)
qemu_mutex_unlock(&tg->lock);
/* Run the request that was waiting for this timer */
+ aio_context_acquire(blk_get_aio_context(blk));
empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]);
+ aio_context_release(blk_get_aio_context(blk));
/* If the request queue was empty then we have to take care of
* scheduling the next one */
diff --git a/block/win32-aio.c b/block/win32-aio.c
index 8cdf73b00d..3be8f458fa 100644
--- a/block/win32-aio.c
+++ b/block/win32-aio.c
@@ -41,7 +41,7 @@ struct QEMUWin32AIOState {
HANDLE hIOCP;
EventNotifier e;
int count;
- bool is_aio_context_attached;
+ AioContext *aio_ctx;
};
typedef struct QEMUWin32AIOCB {
@@ -87,7 +87,6 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
qemu_vfree(waiocb->buf);
}
-
waiocb->common.cb(waiocb->common.opaque, ret);
qemu_aio_unref(waiocb);
}
@@ -176,13 +175,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
AioContext *old_context)
{
aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL);
- aio->is_aio_context_attached = false;
+ aio->aio_ctx = NULL;
}
void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
AioContext *new_context)
{
- aio->is_aio_context_attached = true;
+ aio->aio_ctx = new_context;
aio_set_event_notifier(new_context, &aio->e, false,
win32_aio_completion_cb, NULL);
}
@@ -212,7 +211,7 @@ out_free_state:
void win32_aio_cleanup(QEMUWin32AIOState *aio)
{
- assert(!aio->is_aio_context_attached);
+ assert(!aio->aio_ctx);
CloseHandle(aio->hIOCP);
event_notifier_cleanup(&aio->e);
g_free(aio);