aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c29
-rw-r--r--block.c6
-rw-r--r--block/backup.c17
-rw-r--r--block/block-backend.c30
-rw-r--r--block/commit.c2
-rw-r--r--block/io.c137
-rw-r--r--block/mirror.c70
-rw-r--r--block/nfs.c55
-rw-r--r--block/qed-table.c16
-rw-r--r--block/qed.c16
-rw-r--r--block/replication.c27
-rw-r--r--block/sheepdog.c67
-rw-r--r--blockjob.c37
-rw-r--r--docs/multiple-iothreads.txt40
-rw-r--r--hw/scsi/virtio-scsi-dataplane.c4
-rw-r--r--include/block/aio.h24
-rw-r--r--include/block/block.h31
-rw-r--r--include/block/block_int.h27
-rw-r--r--include/block/blockjob.h7
-rw-r--r--include/qemu/rfifolock.h54
-rw-r--r--include/qemu/thread-posix.h6
-rw-r--r--include/qemu/thread-win32.h10
-rw-r--r--include/qemu/thread.h3
-rw-r--r--iothread.c33
-rw-r--r--qemu-img.c6
-rw-r--r--qemu-io-cmds.c6
-rw-r--r--stubs/Makefile.objs1
-rw-r--r--stubs/iothread.c8
-rw-r--r--tests/.gitignore1
-rw-r--r--tests/Makefile.include2
-rw-r--r--tests/test-aio.c22
-rw-r--r--tests/test-rfifolock.c91
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/qemu-thread-posix.c14
-rw-r--r--util/qemu-thread-win32.c25
-rw-r--r--util/rfifolock.c78
36 files changed, 521 insertions, 482 deletions
diff --git a/async.c b/async.c
index f30d011ebc..b2de360c23 100644
--- a/async.c
+++ b/async.c
@@ -61,6 +61,7 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
smp_wmb();
ctx->first_bh = bh;
qemu_mutex_unlock(&ctx->bh_lock);
+ aio_notify(ctx);
}
QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
@@ -106,8 +107,8 @@ int aio_bh_poll(AioContext *ctx)
* aio_notify again if necessary.
*/
if (atomic_xchg(&bh->scheduled, 0)) {
- /* Idle BHs and the notify BH don't count as progress */
- if (!bh->idle && bh != ctx->notify_dummy_bh) {
+ /* Idle BHs don't count as progress */
+ if (!bh->idle) {
ret = 1;
}
bh->idle = 0;
@@ -259,7 +260,6 @@ aio_ctx_finalize(GSource *source)
{
AioContext *ctx = (AioContext *) source;
- qemu_bh_delete(ctx->notify_dummy_bh);
thread_pool_free(ctx->thread_pool);
#ifdef CONFIG_LINUX_AIO
@@ -284,7 +284,7 @@ aio_ctx_finalize(GSource *source)
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
event_notifier_cleanup(&ctx->notifier);
- rfifolock_destroy(&ctx->lock);
+ qemu_rec_mutex_destroy(&ctx->lock);
qemu_mutex_destroy(&ctx->bh_lock);
timerlistgroup_deinit(&ctx->tlg);
}
@@ -345,19 +345,6 @@ static void aio_timerlist_notify(void *opaque)
aio_notify(opaque);
}
-static void aio_rfifolock_cb(void *opaque)
-{
- AioContext *ctx = opaque;
-
- /* Kick owner thread in case they are blocked in aio_poll() */
- qemu_bh_schedule(ctx->notify_dummy_bh);
-}
-
-static void notify_dummy_bh(void *opaque)
-{
- /* Do nothing, we were invoked just to force the event loop to iterate */
-}
-
static void event_notifier_dummy_cb(EventNotifier *e)
{
}
@@ -385,11 +372,9 @@ AioContext *aio_context_new(Error **errp)
#endif
ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock);
- rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
+ qemu_rec_mutex_init(&ctx->lock);
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
- ctx->notify_dummy_bh = aio_bh_new(ctx, notify_dummy_bh, NULL);
-
return ctx;
fail:
g_source_destroy(&ctx->source);
@@ -408,10 +393,10 @@ void aio_context_unref(AioContext *ctx)
void aio_context_acquire(AioContext *ctx)
{
- rfifolock_lock(&ctx->lock);
+ qemu_rec_mutex_lock(&ctx->lock);
}
void aio_context_release(AioContext *ctx)
{
- rfifolock_unlock(&ctx->lock);
+ qemu_rec_mutex_unlock(&ctx->lock);
}
diff --git a/block.c b/block.c
index 7f3e7bcdc3..a17baab1d0 100644
--- a/block.c
+++ b/block.c
@@ -2082,7 +2082,7 @@ BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
* to all devices.
*
*/
-int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp)
+int bdrv_reopen_multiple(AioContext *ctx, BlockReopenQueue *bs_queue, Error **errp)
{
int ret = -1;
BlockReopenQueueEntry *bs_entry, *next;
@@ -2090,7 +2090,9 @@ int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp)
assert(bs_queue != NULL);
+ aio_context_release(ctx);
bdrv_drain_all();
+ aio_context_acquire(ctx);
QSIMPLEQ_FOREACH(bs_entry, bs_queue, entry) {
if (bdrv_reopen_prepare(&bs_entry->state, bs_queue, &local_err)) {
@@ -2131,7 +2133,7 @@ int bdrv_reopen(BlockDriverState *bs, int bdrv_flags, Error **errp)
Error *local_err = NULL;
BlockReopenQueue *queue = bdrv_reopen_queue(NULL, bs, NULL, bdrv_flags);
- ret = bdrv_reopen_multiple(queue, &local_err);
+ ret = bdrv_reopen_multiple(bdrv_get_aio_context(bs), queue, &local_err);
if (local_err != NULL) {
error_propagate(errp, local_err);
}
diff --git a/block/backup.c b/block/backup.c
index 02dbe48035..81d4042ae8 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -300,6 +300,21 @@ void backup_cow_request_end(CowRequest *req)
cow_request_end(req);
}
+static void backup_drain(BlockJob *job)
+{
+ BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+
+ /* Need to keep a reference in case blk_drain triggers execution
+ * of backup_complete...
+ */
+ if (s->target) {
+ BlockBackend *target = s->target;
+ blk_ref(target);
+ blk_drain(target);
+ blk_unref(target);
+ }
+}
+
static const BlockJobDriver backup_job_driver = {
.instance_size = sizeof(BackupBlockJob),
.job_type = BLOCK_JOB_TYPE_BACKUP,
@@ -307,6 +322,7 @@ static const BlockJobDriver backup_job_driver = {
.commit = backup_commit,
.abort = backup_abort,
.attached_aio_context = backup_attached_aio_context,
+ .drain = backup_drain,
};
static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -331,6 +347,7 @@ static void backup_complete(BlockJob *job, void *opaque)
BackupCompleteData *data = opaque;
blk_unref(s->target);
+ s->target = NULL;
block_job_completed(job, data->ret);
g_free(data);
diff --git a/block/block-backend.c b/block/block-backend.c
index c53ca30000..27a7f6f523 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -799,20 +799,25 @@ int coroutine_fn blk_co_preadv(BlockBackend *blk, int64_t offset,
BdrvRequestFlags flags)
{
int ret;
+ BlockDriverState *bs = blk_bs(blk);
- trace_blk_co_preadv(blk, blk_bs(blk), offset, bytes, flags);
+ trace_blk_co_preadv(blk, bs, offset, bytes, flags);
ret = blk_check_byte_request(blk, offset, bytes);
if (ret < 0) {
return ret;
}
+ bdrv_inc_in_flight(bs);
+
/* throttling disk I/O */
if (blk->public.throttle_state) {
throttle_group_co_io_limits_intercept(blk, bytes, false);
}
- return bdrv_co_preadv(blk->root, offset, bytes, qiov, flags);
+ ret = bdrv_co_preadv(blk->root, offset, bytes, qiov, flags);
+ bdrv_dec_in_flight(bs);
+ return ret;
}
int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
@@ -820,14 +825,17 @@ int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
BdrvRequestFlags flags)
{
int ret;
+ BlockDriverState *bs = blk_bs(blk);
- trace_blk_co_pwritev(blk, blk_bs(blk), offset, bytes, flags);
+ trace_blk_co_pwritev(blk, bs, offset, bytes, flags);
ret = blk_check_byte_request(blk, offset, bytes);
if (ret < 0) {
return ret;
}
+ bdrv_inc_in_flight(bs);
+
/* throttling disk I/O */
if (blk->public.throttle_state) {
throttle_group_co_io_limits_intercept(blk, bytes, true);
@@ -837,7 +845,9 @@ int coroutine_fn blk_co_pwritev(BlockBackend *blk, int64_t offset,
flags |= BDRV_REQ_FUA;
}
- return bdrv_co_pwritev(blk->root, offset, bytes, qiov, flags);
+ ret = bdrv_co_pwritev(blk->root, offset, bytes, qiov, flags);
+ bdrv_dec_in_flight(bs);
+ return ret;
}
typedef struct BlkRwCo {
@@ -868,7 +878,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
int64_t bytes, CoroutineEntry co_entry,
BdrvRequestFlags flags)
{
- AioContext *aio_context;
QEMUIOVector qiov;
struct iovec iov;
Coroutine *co;
@@ -890,11 +899,7 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
co = qemu_coroutine_create(co_entry, &rwco);
qemu_coroutine_enter(co);
-
- aio_context = blk_get_aio_context(blk);
- while (rwco.ret == NOT_DONE) {
- aio_poll(aio_context, true);
- }
+ BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
return rwco.ret;
}
@@ -930,6 +935,8 @@ int blk_make_zero(BlockBackend *blk, BdrvRequestFlags flags)
static void error_callback_bh(void *opaque)
{
struct BlockBackendAIOCB *acb = opaque;
+
+ bdrv_dec_in_flight(acb->common.bs);
acb->common.cb(acb->common.opaque, acb->ret);
qemu_aio_unref(acb);
}
@@ -940,6 +947,7 @@ BlockAIOCB *blk_abort_aio_request(BlockBackend *blk,
{
struct BlockBackendAIOCB *acb;
+ bdrv_inc_in_flight(blk_bs(blk));
acb = blk_aio_get(&block_backend_aiocb_info, blk, cb, opaque);
acb->blk = blk;
acb->ret = ret;
@@ -962,6 +970,7 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
static void blk_aio_complete(BlkAioEmAIOCB *acb)
{
if (acb->has_returned) {
+ bdrv_dec_in_flight(acb->common.bs);
acb->common.cb(acb->common.opaque, acb->rwco.ret);
qemu_aio_unref(acb);
}
@@ -983,6 +992,7 @@ static BlockAIOCB *blk_aio_prwv(BlockBackend *blk, int64_t offset, int bytes,
BlkAioEmAIOCB *acb;
Coroutine *co;
+ bdrv_inc_in_flight(blk_bs(blk));
acb = blk_aio_get(&blk_aio_em_aiocb_info, blk, cb, opaque);
acb->rwco = (BlkRwCo) {
.blk = blk,
diff --git a/block/commit.c b/block/commit.c
index 9f67a8b121..499eccaeee 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -251,7 +251,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
orig_overlay_flags | BDRV_O_RDWR);
}
if (reopen_queue) {
- bdrv_reopen_multiple(reopen_queue, &local_err);
+ bdrv_reopen_multiple(bdrv_get_aio_context(bs), reopen_queue, &local_err);
if (local_err != NULL) {
error_propagate(errp, local_err);
block_job_unref(&s->common);
diff --git a/block/io.c b/block/io.c
index 79cbbdf769..be0d862ca6 100644
--- a/block/io.c
+++ b/block/io.c
@@ -143,7 +143,7 @@ bool bdrv_requests_pending(BlockDriverState *bs)
{
BdrvChild *child;
- if (!QLIST_EMPTY(&bs->tracked_requests)) {
+ if (atomic_read(&bs->in_flight)) {
return true;
}
@@ -156,16 +156,22 @@ bool bdrv_requests_pending(BlockDriverState *bs)
return false;
}
-static void bdrv_drain_recurse(BlockDriverState *bs)
+static bool bdrv_drain_recurse(BlockDriverState *bs)
{
BdrvChild *child;
+ bool waited;
+
+ waited = BDRV_POLL_WHILE(bs, atomic_read(&bs->in_flight) > 0);
if (bs->drv && bs->drv->bdrv_drain) {
bs->drv->bdrv_drain(bs);
}
+
QLIST_FOREACH(child, &bs->children, next) {
- bdrv_drain_recurse(child->bs);
+ waited |= bdrv_drain_recurse(child->bs);
}
+
+ return waited;
}
typedef struct {
@@ -174,23 +180,14 @@ typedef struct {
bool done;
} BdrvCoDrainData;
-static void bdrv_drain_poll(BlockDriverState *bs)
-{
- bool busy = true;
-
- while (busy) {
- /* Keep iterating */
- busy = bdrv_requests_pending(bs);
- busy |= aio_poll(bdrv_get_aio_context(bs), busy);
- }
-}
-
static void bdrv_co_drain_bh_cb(void *opaque)
{
BdrvCoDrainData *data = opaque;
Coroutine *co = data->co;
+ BlockDriverState *bs = data->bs;
- bdrv_drain_poll(data->bs);
+ bdrv_dec_in_flight(bs);
+ bdrv_drained_begin(bs);
data->done = true;
qemu_coroutine_enter(co);
}
@@ -209,6 +206,7 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
.bs = bs,
.done = false,
};
+ bdrv_inc_in_flight(bs);
aio_bh_schedule_oneshot(bdrv_get_aio_context(bs),
bdrv_co_drain_bh_cb, &data);
@@ -220,6 +218,11 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
void bdrv_drained_begin(BlockDriverState *bs)
{
+ if (qemu_in_coroutine()) {
+ bdrv_co_yield_to_drain(bs);
+ return;
+ }
+
if (!bs->quiesce_counter++) {
aio_disable_external(bdrv_get_aio_context(bs));
bdrv_parent_drained_begin(bs);
@@ -227,11 +230,6 @@ void bdrv_drained_begin(BlockDriverState *bs)
bdrv_io_unplugged_begin(bs);
bdrv_drain_recurse(bs);
- if (qemu_in_coroutine()) {
- bdrv_co_yield_to_drain(bs);
- } else {
- bdrv_drain_poll(bs);
- }
bdrv_io_unplugged_end(bs);
}
@@ -279,7 +277,7 @@ void bdrv_drain(BlockDriverState *bs)
void bdrv_drain_all(void)
{
/* Always run first iteration so any pending completion BHs run */
- bool busy = true;
+ bool waited = true;
BlockDriverState *bs;
BdrvNextIterator it;
BlockJob *job = NULL;
@@ -299,7 +297,6 @@ void bdrv_drain_all(void)
aio_context_acquire(aio_context);
bdrv_parent_drained_begin(bs);
bdrv_io_unplugged_begin(bs);
- bdrv_drain_recurse(bs);
aio_context_release(aio_context);
if (!g_slist_find(aio_ctxs, aio_context)) {
@@ -313,8 +310,8 @@ void bdrv_drain_all(void)
* request completion. Therefore we must keep looping until there was no
* more activity rather than simply draining each device independently.
*/
- while (busy) {
- busy = false;
+ while (waited) {
+ waited = false;
for (ctx = aio_ctxs; ctx != NULL; ctx = ctx->next) {
AioContext *aio_context = ctx->data;
@@ -322,13 +319,9 @@ void bdrv_drain_all(void)
aio_context_acquire(aio_context);
for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
if (aio_context == bdrv_get_aio_context(bs)) {
- if (bdrv_requests_pending(bs)) {
- busy = true;
- aio_poll(aio_context, busy);
- }
+ waited |= bdrv_drain_recurse(bs);
}
}
- busy |= aio_poll(aio_context, false);
aio_context_release(aio_context);
}
}
@@ -476,6 +469,28 @@ static bool tracked_request_overlaps(BdrvTrackedRequest *req,
return true;
}
+void bdrv_inc_in_flight(BlockDriverState *bs)
+{
+ atomic_inc(&bs->in_flight);
+}
+
+static void dummy_bh_cb(void *opaque)
+{
+}
+
+void bdrv_wakeup(BlockDriverState *bs)
+{
+ if (bs->wakeup) {
+ aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
+ }
+}
+
+void bdrv_dec_in_flight(BlockDriverState *bs)
+{
+ atomic_dec(&bs->in_flight);
+ bdrv_wakeup(bs);
+}
+
static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
{
BlockDriverState *bs = self->bs;
@@ -583,13 +598,9 @@ static int bdrv_prwv_co(BdrvChild *child, int64_t offset,
/* Fast-path if already in coroutine context */
bdrv_rw_co_entry(&rwco);
} else {
- AioContext *aio_context = bdrv_get_aio_context(child->bs);
-
co = qemu_coroutine_create(bdrv_rw_co_entry, &rwco);
qemu_coroutine_enter(co);
- while (rwco.ret == NOT_DONE) {
- aio_poll(aio_context, true);
- }
+ BDRV_POLL_WHILE(child->bs, rwco.ret == NOT_DONE);
}
return rwco.ret;
}
@@ -1097,6 +1108,8 @@ int coroutine_fn bdrv_co_preadv(BdrvChild *child,
return ret;
}
+ bdrv_inc_in_flight(bs);
+
/* Don't do copy-on-read if we read data before write operation */
if (bs->copy_on_read && !(flags & BDRV_REQ_NO_SERIALISING)) {
flags |= BDRV_REQ_COPY_ON_READ;
@@ -1132,6 +1145,7 @@ int coroutine_fn bdrv_co_preadv(BdrvChild *child,
use_local_qiov ? &local_qiov : qiov,
flags);
tracked_request_end(&req);
+ bdrv_dec_in_flight(bs);
if (use_local_qiov) {
qemu_iovec_destroy(&local_qiov);
@@ -1480,6 +1494,7 @@ int coroutine_fn bdrv_co_pwritev(BdrvChild *child,
return ret;
}
+ bdrv_inc_in_flight(bs);
/*
* Align write if necessary by performing a read-modify-write cycle.
* Pad qiov with the read parts and be sure to have a tracked request not
@@ -1581,6 +1596,7 @@ fail:
qemu_vfree(tail_buf);
out:
tracked_request_end(&req);
+ bdrv_dec_in_flight(bs);
return ret;
}
@@ -1705,17 +1721,19 @@ static int64_t coroutine_fn bdrv_co_get_block_status(BlockDriverState *bs,
}
*file = NULL;
+ bdrv_inc_in_flight(bs);
ret = bs->drv->bdrv_co_get_block_status(bs, sector_num, nb_sectors, pnum,
file);
if (ret < 0) {
*pnum = 0;
- return ret;
+ goto out;
}
if (ret & BDRV_BLOCK_RAW) {
assert(ret & BDRV_BLOCK_OFFSET_VALID);
- return bdrv_get_block_status(bs->file->bs, ret >> BDRV_SECTOR_BITS,
- *pnum, pnum, file);
+ ret = bdrv_get_block_status(bs->file->bs, ret >> BDRV_SECTOR_BITS,
+ *pnum, pnum, file);
+ goto out;
}
if (ret & (BDRV_BLOCK_DATA | BDRV_BLOCK_ZERO)) {
@@ -1757,6 +1775,8 @@ static int64_t coroutine_fn bdrv_co_get_block_status(BlockDriverState *bs,
}
}
+out:
+ bdrv_dec_in_flight(bs);
return ret;
}
@@ -1822,14 +1842,10 @@ int64_t bdrv_get_block_status_above(BlockDriverState *bs,
/* Fast-path if already in coroutine context */
bdrv_get_block_status_above_co_entry(&data);
} else {
- AioContext *aio_context = bdrv_get_aio_context(bs);
-
co = qemu_coroutine_create(bdrv_get_block_status_above_co_entry,
&data);
qemu_coroutine_enter(co);
- while (!data.done) {
- aio_poll(aio_context, true);
- }
+ BDRV_POLL_WHILE(bs, !data.done);
}
return data.ret;
}
@@ -2102,6 +2118,7 @@ static const AIOCBInfo bdrv_em_co_aiocb_info = {
static void bdrv_co_complete(BlockAIOCBCoroutine *acb)
{
if (!acb->need_bh) {
+ bdrv_dec_in_flight(acb->common.bs);
acb->common.cb(acb->common.opaque, acb->req.error);
qemu_aio_unref(acb);
}
@@ -2152,6 +2169,9 @@ static BlockAIOCB *bdrv_co_aio_prw_vector(BdrvChild *child,
Coroutine *co;
BlockAIOCBCoroutine *acb;
+ /* Matched by bdrv_co_complete's bdrv_dec_in_flight. */
+ bdrv_inc_in_flight(child->bs);
+
acb = qemu_aio_get(&bdrv_em_co_aiocb_info, child->bs, cb, opaque);
acb->child = child;
acb->need_bh = true;
@@ -2185,6 +2205,9 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
Coroutine *co;
BlockAIOCBCoroutine *acb;
+ /* Matched by bdrv_co_complete's bdrv_dec_in_flight. */
+ bdrv_inc_in_flight(bs);
+
acb = qemu_aio_get(&bdrv_em_co_aiocb_info, bs, cb, opaque);
acb->need_bh = true;
acb->req.error = -EINPROGRESS;
@@ -2244,23 +2267,22 @@ static void coroutine_fn bdrv_flush_co_entry(void *opaque)
int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
{
int ret;
- BdrvTrackedRequest req;
if (!bs || !bdrv_is_inserted(bs) || bdrv_is_read_only(bs) ||
bdrv_is_sg(bs)) {
return 0;
}
- tracked_request_begin(&req, bs, 0, 0, BDRV_TRACKED_FLUSH);
+ bdrv_inc_in_flight(bs);
int current_gen = bs->write_gen;
/* Wait until any previous flushes are completed */
- while (bs->active_flush_req != NULL) {
+ while (bs->active_flush_req) {
qemu_co_queue_wait(&bs->flush_queue);
}
- bs->active_flush_req = &req;
+ bs->active_flush_req = true;
/* Write back all layers by calling one driver function */
if (bs->drv->bdrv_co_flush) {
@@ -2330,11 +2352,11 @@ flush_parent:
out:
/* Notify any pending flushes that we have completed */
bs->flushed_gen = current_gen;
- bs->active_flush_req = NULL;
+ bs->active_flush_req = false;
/* Return value is ignored - it's ok if wait queue is empty */
qemu_co_queue_next(&bs->flush_queue);
- tracked_request_end(&req);
+ bdrv_dec_in_flight(bs);
return ret;
}
@@ -2350,13 +2372,9 @@ int bdrv_flush(BlockDriverState *bs)
/* Fast-path if already in coroutine context */
bdrv_flush_co_entry(&flush_co);
} else {
- AioContext *aio_context = bdrv_get_aio_context(bs);
-
co = qemu_coroutine_create(bdrv_flush_co_entry, &flush_co);
qemu_coroutine_enter(co);
- while (flush_co.ret == NOT_DONE) {
- aio_poll(aio_context, true);
- }
+ BDRV_POLL_WHILE(bs, flush_co.ret == NOT_DONE);
}
return flush_co.ret;
@@ -2417,6 +2435,7 @@ int coroutine_fn bdrv_co_pdiscard(BlockDriverState *bs, int64_t offset,
return 0;
}
+ bdrv_inc_in_flight(bs);
tracked_request_begin(&req, bs, offset, count, BDRV_TRACKED_DISCARD);
ret = notifier_with_return_list_notify(&bs->before_write_notifiers, &req);
@@ -2463,6 +2482,7 @@ out:
bdrv_set_dirty(bs, req.offset >> BDRV_SECTOR_BITS,
req.bytes >> BDRV_SECTOR_BITS);
tracked_request_end(&req);
+ bdrv_dec_in_flight(bs);
return ret;
}
@@ -2480,13 +2500,9 @@ int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count)
/* Fast-path if already in coroutine context */
bdrv_pdiscard_co_entry(&rwco);
} else {
- AioContext *aio_context = bdrv_get_aio_context(bs);
-
co = qemu_coroutine_create(bdrv_pdiscard_co_entry, &rwco);
qemu_coroutine_enter(co);
- while (rwco.ret == NOT_DONE) {
- aio_poll(aio_context, true);
- }
+ BDRV_POLL_WHILE(bs, rwco.ret == NOT_DONE);
}
return rwco.ret;
@@ -2495,13 +2511,12 @@ int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count)
int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
{
BlockDriver *drv = bs->drv;
- BdrvTrackedRequest tracked_req;
CoroutineIOCompletion co = {
.coroutine = qemu_coroutine_self(),
};
BlockAIOCB *acb;
- tracked_request_begin(&tracked_req, bs, 0, 0, BDRV_TRACKED_IOCTL);
+ bdrv_inc_in_flight(bs);
if (!drv || (!drv->bdrv_aio_ioctl && !drv->bdrv_co_ioctl)) {
co.ret = -ENOTSUP;
goto out;
@@ -2518,7 +2533,7 @@ int bdrv_co_ioctl(BlockDriverState *bs, int req, void *buf)
qemu_coroutine_yield();
}
out:
- tracked_request_end(&tracked_req);
+ bdrv_dec_in_flight(bs);
return co.ret;
}
diff --git a/block/mirror.c b/block/mirror.c
index a433e6848c..3a0788ede3 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -469,7 +469,11 @@ static void mirror_free_init(MirrorBlockJob *s)
}
}
-static void mirror_drain(MirrorBlockJob *s)
+/* This is also used for the .pause callback. There is no matching
+ * mirror_resume() because mirror_run() will begin iterating again
+ * when the job is resumed.
+ */
+static void mirror_wait_for_all_io(MirrorBlockJob *s)
{
while (s->in_flight > 0) {
mirror_wait_for_io(s);
@@ -528,6 +532,7 @@ static void mirror_exit(BlockJob *job, void *opaque)
g_free(s->replaces);
bdrv_op_unblock_all(target_bs, s->common.blocker);
blk_unref(s->target);
+ s->target = NULL;
block_job_completed(&s->common, data->ret);
g_free(data);
bdrv_drained_end(src);
@@ -582,7 +587,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
sector_num += nb_sectors;
}
- mirror_drain(s);
+ mirror_wait_for_all_io(s);
}
/* First part, loop on the sectors and initialize the dirty bitmap. */
@@ -617,6 +622,7 @@ static void coroutine_fn mirror_run(void *opaque)
MirrorExitData *data;
BlockDriverState *bs = blk_bs(s->common.blk);
BlockDriverState *target_bs = blk_bs(s->target);
+ bool need_drain = true;
int64_t length;
BlockDriverInfo bdi;
char backing_filename[2]; /* we only need 2 characters because we are only
@@ -752,11 +758,26 @@ static void coroutine_fn mirror_run(void *opaque)
* source has dirty data to copy!
*
* Note that I/O can be submitted by the guest while
- * mirror_populate runs.
+ * mirror_populate runs, so pause it now. Before deciding
+ * whether to switch to target check one last time if I/O has
+ * come in the meanwhile, and if not flush the data to disk.
*/
trace_mirror_before_drain(s, cnt);
- bdrv_co_drain(bs);
+
+ bdrv_drained_begin(bs);
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
+ if (cnt > 0) {
+ bdrv_drained_end(bs);
+ continue;
+ }
+
+ /* The two disks are in sync. Exit and report successful
+ * completion.
+ */
+ assert(QLIST_EMPTY(&bs->tracked_requests));
+ s->common.cancelled = false;
+ need_drain = false;
+ break;
}
ret = 0;
@@ -769,13 +790,6 @@ static void coroutine_fn mirror_run(void *opaque)
} else if (!should_complete) {
delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
block_job_sleep_ns(&s->common, QEMU_CLOCK_REALTIME, delay_ns);
- } else if (cnt == 0) {
- /* The two disks are in sync. Exit and report successful
- * completion.
- */
- assert(QLIST_EMPTY(&bs->tracked_requests));
- s->common.cancelled = false;
- break;
}
s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
@@ -787,7 +801,8 @@ immediate_exit:
* the target is a copy of the source.
*/
assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
- mirror_drain(s);
+ assert(need_drain);
+ mirror_wait_for_all_io(s);
}
assert(s->in_flight == 0);
@@ -799,9 +814,10 @@ immediate_exit:
data = g_malloc(sizeof(*data));
data->ret = ret;
- /* Before we switch to target in mirror_exit, make sure data doesn't
- * change. */
- bdrv_drained_begin(bs);
+
+ if (need_drain) {
+ bdrv_drained_begin(bs);
+ }
block_job_defer_to_main_loop(&s->common, mirror_exit, data);
}
@@ -872,14 +888,11 @@ static void mirror_complete(BlockJob *job, Error **errp)
block_job_enter(&s->common);
}
-/* There is no matching mirror_resume() because mirror_run() will begin
- * iterating again when the job is resumed.
- */
-static void coroutine_fn mirror_pause(BlockJob *job)
+static void mirror_pause(BlockJob *job)
{
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
- mirror_drain(s);
+ mirror_wait_for_all_io(s);
}
static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
@@ -889,6 +902,21 @@ static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
blk_set_aio_context(s->target, new_context);
}
+static void mirror_drain(BlockJob *job)
+{
+ MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+
+ /* Need to keep a reference in case blk_drain triggers execution
+ * of mirror_complete...
+ */
+ if (s->target) {
+ BlockBackend *target = s->target;
+ blk_ref(target);
+ blk_drain(target);
+ blk_unref(target);
+ }
+}
+
static const BlockJobDriver mirror_job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = BLOCK_JOB_TYPE_MIRROR,
@@ -896,6 +924,7 @@ static const BlockJobDriver mirror_job_driver = {
.complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
+ .drain = mirror_drain,
};
static const BlockJobDriver commit_active_job_driver = {
@@ -905,6 +934,7 @@ static const BlockJobDriver commit_active_job_driver = {
.complete = mirror_complete,
.pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
+ .drain = mirror_drain,
};
static void mirror_start_job(const char *job_id, BlockDriverState *bs,
diff --git a/block/nfs.c b/block/nfs.c
index c3db2ec58d..88c60a9118 100644
--- a/block/nfs.c
+++ b/block/nfs.c
@@ -52,6 +52,7 @@ typedef struct NFSClient {
} NFSClient;
typedef struct NFSRPC {
+ BlockDriverState *bs;
int ret;
int complete;
QEMUIOVector *iov;
@@ -90,11 +91,12 @@ static void nfs_process_write(void *arg)
nfs_set_events(client);
}
-static void nfs_co_init_task(NFSClient *client, NFSRPC *task)
+static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
{
*task = (NFSRPC) {
.co = qemu_coroutine_self(),
- .client = client,
+ .bs = bs,
+ .client = bs->opaque,
};
}
@@ -111,6 +113,7 @@ nfs_co_generic_cb(int ret, struct nfs_context *nfs, void *data,
{
NFSRPC *task = private_data;
task->ret = ret;
+ assert(!task->st);
if (task->ret > 0 && task->iov) {
if (task->ret <= task->iov->size) {
qemu_iovec_from_buf(task->iov, 0, data, task->ret);
@@ -118,18 +121,11 @@ nfs_co_generic_cb(int ret, struct nfs_context *nfs, void *data,
task->ret = -EIO;
}
}
- if (task->ret == 0 && task->st) {
- memcpy(task->st, data, sizeof(struct stat));
- }
if (task->ret < 0) {
error_report("NFS Error: %s", nfs_get_error(nfs));
}
- if (task->co) {
- aio_bh_schedule_oneshot(task->client->aio_context,
- nfs_co_generic_bh_cb, task);
- } else {
- task->complete = 1;
- }
+ aio_bh_schedule_oneshot(task->client->aio_context,
+ nfs_co_generic_bh_cb, task);
}
static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
@@ -139,7 +135,7 @@ static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
NFSClient *client = bs->opaque;
NFSRPC task;
- nfs_co_init_task(client, &task);
+ nfs_co_init_task(bs, &task);
task.iov = iov;
if (nfs_pread_async(client->context, client->fh,
@@ -149,8 +145,8 @@ static int coroutine_fn nfs_co_readv(BlockDriverState *bs,
return -ENOMEM;
}
+ nfs_set_events(client);
while (!task.complete) {
- nfs_set_events(client);
qemu_coroutine_yield();
}
@@ -174,7 +170,7 @@ static int coroutine_fn nfs_co_writev(BlockDriverState *bs,
NFSRPC task;
char *buf = NULL;
- nfs_co_init_task(client, &task);
+ nfs_co_init_task(bs, &task);
buf = g_try_malloc(nb_sectors * BDRV_SECTOR_SIZE);
if (nb_sectors && buf == NULL) {
@@ -191,8 +187,8 @@ static int coroutine_fn nfs_co_writev(BlockDriverState *bs,
return -ENOMEM;
}
+ nfs_set_events(client);
while (!task.complete) {
- nfs_set_events(client);
qemu_coroutine_yield();
}
@@ -210,15 +206,15 @@ static int coroutine_fn nfs_co_flush(BlockDriverState *bs)
NFSClient *client = bs->opaque;
NFSRPC task;
- nfs_co_init_task(client, &task);
+ nfs_co_init_task(bs, &task);
if (nfs_fsync_async(client->context, client->fh, nfs_co_generic_cb,
&task) != 0) {
return -ENOMEM;
}
+ nfs_set_events(client);
while (!task.complete) {
- nfs_set_events(client);
qemu_coroutine_yield();
}
@@ -496,6 +492,22 @@ static int nfs_has_zero_init(BlockDriverState *bs)
return client->has_zero_init;
}
+static void
+nfs_get_allocated_file_size_cb(int ret, struct nfs_context *nfs, void *data,
+ void *private_data)
+{
+ NFSRPC *task = private_data;
+ task->ret = ret;
+ if (task->ret == 0) {
+ memcpy(task->st, data, sizeof(struct stat));
+ }
+ if (task->ret < 0) {
+ error_report("NFS Error: %s", nfs_get_error(nfs));
+ }
+ task->complete = 1;
+ bdrv_wakeup(task->bs);
+}
+
static int64_t nfs_get_allocated_file_size(BlockDriverState *bs)
{
NFSClient *client = bs->opaque;
@@ -507,16 +519,15 @@ static int64_t nfs_get_allocated_file_size(BlockDriverState *bs)
return client->st_blocks * 512;
}
+ task.bs = bs;
task.st = &st;
- if (nfs_fstat_async(client->context, client->fh, nfs_co_generic_cb,
+ if (nfs_fstat_async(client->context, client->fh, nfs_get_allocated_file_size_cb,
&task) != 0) {
return -ENOMEM;
}
- while (!task.complete) {
- nfs_set_events(client);
- aio_poll(client->aio_context, true);
- }
+ nfs_set_events(client);
+ BDRV_POLL_WHILE(bs, !task.complete);
return (task.ret < 0 ? task.ret : st.st_blocks * 512);
}
diff --git a/block/qed-table.c b/block/qed-table.c
index 1a731dff51..ed443e2b70 100644
--- a/block/qed-table.c
+++ b/block/qed-table.c
@@ -174,9 +174,7 @@ int qed_read_l1_table_sync(BDRVQEDState *s)
qed_read_table(s, s->header.l1_table_offset,
s->l1_table, qed_sync_cb, &ret);
- while (ret == -EINPROGRESS) {
- aio_poll(bdrv_get_aio_context(s->bs), true);
- }
+ BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
return ret;
}
@@ -195,9 +193,7 @@ int qed_write_l1_table_sync(BDRVQEDState *s, unsigned int index,
int ret = -EINPROGRESS;
qed_write_l1_table(s, index, n, qed_sync_cb, &ret);
- while (ret == -EINPROGRESS) {
- aio_poll(bdrv_get_aio_context(s->bs), true);
- }
+ BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
return ret;
}
@@ -268,9 +264,7 @@ int qed_read_l2_table_sync(BDRVQEDState *s, QEDRequest *request, uint64_t offset
int ret = -EINPROGRESS;
qed_read_l2_table(s, request, offset, qed_sync_cb, &ret);
- while (ret == -EINPROGRESS) {
- aio_poll(bdrv_get_aio_context(s->bs), true);
- }
+ BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
return ret;
}
@@ -290,9 +284,7 @@ int qed_write_l2_table_sync(BDRVQEDState *s, QEDRequest *request,
int ret = -EINPROGRESS;
qed_write_l2_table(s, request, index, n, flush, qed_sync_cb, &ret);
- while (ret == -EINPROGRESS) {
- aio_poll(bdrv_get_aio_context(s->bs), true);
- }
+ BDRV_POLL_WHILE(s->bs, ret == -EINPROGRESS);
return ret;
}
diff --git a/block/qed.c b/block/qed.c
index 3ee879b52e..1a7ef0a9ce 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -336,7 +336,7 @@ static void qed_need_check_timer_cb(void *opaque)
qed_plug_allocating_write_reqs(s);
/* Ensure writes are on disk before clearing flag */
- bdrv_aio_flush(s->bs, qed_clear_need_check, s);
+ bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
}
static void qed_start_need_check_timer(BDRVQEDState *s)
@@ -378,6 +378,19 @@ static void bdrv_qed_attach_aio_context(BlockDriverState *bs,
}
}
+static void bdrv_qed_drain(BlockDriverState *bs)
+{
+ BDRVQEDState *s = bs->opaque;
+
+ /* Fire the timer immediately in order to start doing I/O as soon as the
+ * header is flushed.
+ */
+ if (s->need_check_timer && timer_pending(s->need_check_timer)) {
+ qed_cancel_need_check_timer(s);
+ qed_need_check_timer_cb(s);
+ }
+}
+
static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags,
Error **errp)
{
@@ -1668,6 +1681,7 @@ static BlockDriver bdrv_qed = {
.bdrv_check = bdrv_qed_check,
.bdrv_detach_aio_context = bdrv_qed_detach_aio_context,
.bdrv_attach_aio_context = bdrv_qed_attach_aio_context,
+ .bdrv_drain = bdrv_qed_drain,
};
static void bdrv_qed_init(void)
diff --git a/block/replication.c b/block/replication.c
index 8bbfc8f870..02aeaaf7d0 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -138,6 +138,9 @@ static void replication_close(BlockDriverState *bs)
if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
replication_stop(s->rs, false, NULL);
}
+ if (s->replication_state == BLOCK_REPLICATION_FAILOVER) {
+ block_job_cancel_sync(s->active_disk->bs->job);
+ }
if (s->mode == REPLICATION_MODE_SECONDARY) {
g_free(s->top_id);
@@ -319,9 +322,10 @@ static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
}
}
-static void reopen_backing_file(BDRVReplicationState *s, bool writable,
+static void reopen_backing_file(BlockDriverState *bs, bool writable,
Error **errp)
{
+ BDRVReplicationState *s = bs->opaque;
BlockReopenQueue *reopen_queue = NULL;
int orig_hidden_flags, orig_secondary_flags;
int new_hidden_flags, new_secondary_flags;
@@ -356,13 +360,15 @@ static void reopen_backing_file(BDRVReplicationState *s, bool writable,
}
if (reopen_queue) {
- bdrv_reopen_multiple(reopen_queue, &local_err);
+ bdrv_reopen_multiple(bdrv_get_aio_context(bs),
+ reopen_queue, &local_err);
error_propagate(errp, local_err);
}
}
-static void backup_job_cleanup(BDRVReplicationState *s)
+static void backup_job_cleanup(BlockDriverState *bs)
{
+ BDRVReplicationState *s = bs->opaque;
BlockDriverState *top_bs;
top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
@@ -371,19 +377,20 @@ static void backup_job_cleanup(BDRVReplicationState *s)
}
bdrv_op_unblock_all(top_bs, s->blocker);
error_free(s->blocker);
- reopen_backing_file(s, false, NULL);
+ reopen_backing_file(bs, false, NULL);
}
static void backup_job_completed(void *opaque, int ret)
{
- BDRVReplicationState *s = opaque;
+ BlockDriverState *bs = opaque;
+ BDRVReplicationState *s = bs->opaque;
if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
/* The backup job is cancelled unexpectedly */
s->error = -EIO;
}
- backup_job_cleanup(s);
+ backup_job_cleanup(bs);
}
static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
@@ -479,7 +486,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
}
/* reopen the backing file in r/w mode */
- reopen_backing_file(s, true, &local_err);
+ reopen_backing_file(bs, true, &local_err);
if (local_err) {
error_propagate(errp, local_err);
aio_context_release(aio_context);
@@ -494,7 +501,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
if (!top_bs || !bdrv_is_root_node(top_bs) ||
!check_top_bs(top_bs, bs)) {
error_setg(errp, "No top_bs or it is invalid");
- reopen_backing_file(s, false, NULL);
+ reopen_backing_file(bs, false, NULL);
aio_context_release(aio_context);
return;
}
@@ -504,10 +511,10 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
backup_start("replication-backup", s->secondary_disk->bs,
s->hidden_disk->bs, 0, MIRROR_SYNC_MODE_NONE, NULL, false,
BLOCKDEV_ON_ERROR_REPORT, BLOCKDEV_ON_ERROR_REPORT,
- backup_job_completed, s, NULL, &local_err);
+ backup_job_completed, bs, NULL, &local_err);
if (local_err) {
error_propagate(errp, local_err);
- backup_job_cleanup(s);
+ backup_job_cleanup(bs);
aio_context_release(aio_context);
return;
}
diff --git a/block/sheepdog.c b/block/sheepdog.c
index ccbf7e1fa6..1fb917343a 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -641,6 +641,7 @@ static void restart_co_req(void *opaque)
typedef struct SheepdogReqCo {
int sockfd;
+ BlockDriverState *bs;
AioContext *aio_context;
SheepdogReq *hdr;
void *data;
@@ -701,6 +702,9 @@ out:
srco->ret = ret;
srco->finished = true;
+ if (srco->bs) {
+ bdrv_wakeup(srco->bs);
+ }
}
/*
@@ -708,13 +712,14 @@ out:
*
* Return 0 on success, -errno in case of error.
*/
-static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
+static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
void *data, unsigned int *wlen, unsigned int *rlen)
{
Coroutine *co;
SheepdogReqCo srco = {
.sockfd = sockfd,
- .aio_context = aio_context,
+ .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
+ .bs = bs,
.hdr = hdr,
.data = data,
.wlen = wlen,
@@ -727,9 +732,14 @@ static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
do_co_req(&srco);
} else {
co = qemu_coroutine_create(do_co_req, &srco);
- qemu_coroutine_enter(co);
- while (!srco.finished) {
- aio_poll(aio_context, true);
+ if (bs) {
+ qemu_coroutine_enter(co);
+ BDRV_POLL_WHILE(bs, !srco.finished);
+ } else {
+ qemu_coroutine_enter(co);
+ while (!srco.finished) {
+ aio_poll(qemu_get_aio_context(), true);
+ }
}
}
@@ -1125,7 +1135,7 @@ static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
hdr.snapid = snapid;
hdr.flags = SD_FLAG_CMD_WRITE;
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+ ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
if (ret) {
error_setg_errno(errp, -ret, "cannot get vdi info");
goto out;
@@ -1240,7 +1250,7 @@ out:
qemu_co_mutex_unlock(&s->lock);
}
-static int read_write_object(int fd, AioContext *aio_context, char *buf,
+static int read_write_object(int fd, BlockDriverState *bs, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset,
bool write, bool create, uint32_t cache_flags)
@@ -1274,7 +1284,7 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
hdr.offset = offset;
hdr.copies = copies;
- ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+ ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
if (ret) {
error_report("failed to send a request to the sheep");
return ret;
@@ -1289,22 +1299,22 @@ static int read_write_object(int fd, AioContext *aio_context, char *buf,
}
}
-static int read_object(int fd, AioContext *aio_context, char *buf,
+static int read_object(int fd, BlockDriverState *bs, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset,
uint32_t cache_flags)
{
- return read_write_object(fd, aio_context, buf, oid, copies,
+ return read_write_object(fd, bs, buf, oid, copies,
datalen, offset, false,
false, cache_flags);
}
-static int write_object(int fd, AioContext *aio_context, char *buf,
+static int write_object(int fd, BlockDriverState *bs, char *buf,
uint64_t oid, uint8_t copies,
unsigned int datalen, uint64_t offset, bool create,
uint32_t cache_flags)
{
- return read_write_object(fd, aio_context, buf, oid, copies,
+ return read_write_object(fd, bs, buf, oid, copies,
datalen, offset, true,
create, cache_flags);
}
@@ -1331,7 +1341,7 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
goto out;
}
- ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
+ ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
s->cache_flags);
if (ret < 0) {
@@ -1489,7 +1499,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
}
buf = g_malloc(SD_INODE_SIZE);
- ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
+ ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
0, SD_INODE_SIZE, 0, s->cache_flags);
closesocket(fd);
@@ -1618,7 +1628,7 @@ static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
hdr.copies = s->inode.nr_copies;
hdr.block_size_shift = s->inode.block_size_shift;
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+ ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
closesocket(fd);
@@ -1886,7 +1896,7 @@ static int sd_create(const char *filename, QemuOpts *opts,
hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
hdr.proto_ver = SD_PROTO_VER;
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+ ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
NULL, &wlen, &rlen);
closesocket(fd);
if (ret) {
@@ -1951,7 +1961,7 @@ static void sd_close(BlockDriverState *bs)
hdr.data_length = wlen;
hdr.flags = SD_FLAG_CMD_WRITE;
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+ ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
s->name, &wlen, &rlen);
closesocket(fd);
@@ -2000,7 +2010,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset)
/* we don't need to update entire object */
datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
s->inode.vdi_size = offset;
- ret = write_object(fd, s->aio_context, (char *)&s->inode,
+ ret = write_object(fd, s->bs, (char *)&s->inode,
vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
datalen, 0, false, s->cache_flags);
close(fd);
@@ -2070,7 +2080,7 @@ static bool sd_delete(BDRVSheepdogState *s)
return false;
}
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+ ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
s->name, &wlen, &rlen);
closesocket(fd);
if (ret) {
@@ -2126,7 +2136,7 @@ static int sd_create_branch(BDRVSheepdogState *s)
goto out;
}
- ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
+ ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
closesocket(fd);
@@ -2411,7 +2421,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
goto cleanup;
}
- ret = write_object(fd, s->aio_context, (char *)&s->inode,
+ ret = write_object(fd, s->bs, (char *)&s->inode,
vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
datalen, 0, false, s->cache_flags);
if (ret < 0) {
@@ -2426,7 +2436,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
goto cleanup;
}
- ret = read_object(fd, s->aio_context, (char *)inode,
+ ret = read_object(fd, s->bs, (char *)inode,
vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
s->cache_flags);
@@ -2528,7 +2538,7 @@ static bool remove_objects(BDRVSheepdogState *s)
i++;
}
- ret = write_object(fd, s->aio_context,
+ ret = write_object(fd, s->bs,
(char *)&inode->data_vdi_id[start_idx],
vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
(i - start_idx) * sizeof(uint32_t),
@@ -2600,7 +2610,7 @@ static int sd_snapshot_delete(BlockDriverState *bs,
return -1;
}
- ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
+ ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
buf, &wlen, &rlen);
closesocket(fd);
if (ret) {
@@ -2652,8 +2662,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
req.opcode = SD_OP_READ_VDIS;
req.data_length = max;
- ret = do_req(fd, s->aio_context, &req,
- vdi_inuse, &wlen, &rlen);
+ ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
closesocket(fd);
if (ret) {
@@ -2679,7 +2688,7 @@ static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
}
/* we don't need to read entire object */
- ret = read_object(fd, s->aio_context, (char *)&inode,
+ ret = read_object(fd, s->bs, (char *)&inode,
vid_to_vdi_oid(vid),
0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
s->cache_flags);
@@ -2745,11 +2754,11 @@ static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
create = (offset == 0);
if (load) {
- ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
+ ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
s->inode.nr_copies, data_len, offset,
s->cache_flags);
} else {
- ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
+ ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
s->inode.nr_copies, data_len, offset, create,
s->cache_flags);
}
diff --git a/blockjob.c b/blockjob.c
index 43fecbe13e..7c88b30074 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -74,17 +74,6 @@ BlockJob *block_job_get(const char *id)
return NULL;
}
-/* Normally the job runs in its BlockBackend's AioContext. The exception is
- * block_job_defer_to_main_loop() where it runs in the QEMU main loop. Code
- * that supports both cases uses this helper function.
- */
-static AioContext *block_job_get_aio_context(BlockJob *job)
-{
- return job->deferred_to_main_loop ?
- qemu_get_aio_context() :
- blk_get_aio_context(job->blk);
-}
-
static void block_job_attached_aio_context(AioContext *new_context,
void *opaque)
{
@@ -97,6 +86,17 @@ static void block_job_attached_aio_context(AioContext *new_context,
block_job_resume(job);
}
+static void block_job_drain(BlockJob *job)
+{
+ /* If job is !job->busy this kicks it into the next pause point. */
+ block_job_enter(job);
+
+ blk_drain(job->blk);
+ if (job->driver->drain) {
+ job->driver->drain(job);
+ }
+}
+
static void block_job_detach_aio_context(void *opaque)
{
BlockJob *job = opaque;
@@ -106,12 +106,8 @@ static void block_job_detach_aio_context(void *opaque)
block_job_pause(job);
- if (!job->paused) {
- /* If job is !job->busy this kicks it into the next pause point. */
- block_job_enter(job);
- }
while (!job->paused && !job->completed) {
- aio_poll(block_job_get_aio_context(job), true);
+ block_job_drain(job);
}
block_job_unref(job);
@@ -413,14 +409,21 @@ static int block_job_finish_sync(BlockJob *job,
assert(blk_bs(job->blk)->job == job);
block_job_ref(job);
+
finish(job, &local_err);
if (local_err) {
error_propagate(errp, local_err);
block_job_unref(job);
return -EBUSY;
}
+ /* block_job_drain calls block_job_enter, and it should be enough to
+ * induce progress until the job completes or moves to the main thread.
+ */
+ while (!job->deferred_to_main_loop && !job->completed) {
+ block_job_drain(job);
+ }
while (!job->completed) {
- aio_poll(block_job_get_aio_context(job), true);
+ aio_poll(qemu_get_aio_context(), true);
}
ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
block_job_unref(job);
diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt
index 40b8419916..0e7cdb2c28 100644
--- a/docs/multiple-iothreads.txt
+++ b/docs/multiple-iothreads.txt
@@ -105,13 +105,10 @@ a BH in the target AioContext beforehand and then call qemu_bh_schedule(). No
acquire/release or locking is needed for the qemu_bh_schedule() call. But be
sure to acquire the AioContext for aio_bh_new() if necessary.
-The relationship between AioContext and the block layer
--------------------------------------------------------
-The AioContext originates from the QEMU block layer because it provides a
-scoped way of running event loop iterations until all work is done. This
-feature is used to complete all in-flight block I/O requests (see
-bdrv_drain_all()). Nowadays AioContext is a generic event loop that can be
-used by any QEMU subsystem.
+AioContext and the block layer
+------------------------------
+The AioContext originates from the QEMU block layer, even though nowadays
+AioContext is a generic event loop that can be used by any QEMU subsystem.
The block layer has support for AioContext integrated. Each BlockDriverState
is associated with an AioContext using bdrv_set_aio_context() and
@@ -122,13 +119,22 @@ Block layer code must therefore expect to run in an IOThread and avoid using
old APIs that implicitly use the main loop. See the "How to program for
IOThreads" above for information on how to do that.
-If main loop code such as a QMP function wishes to access a BlockDriverState it
-must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure the
-IOThread does not run in parallel.
-
-Long-running jobs (usually in the form of coroutines) are best scheduled in the
-BlockDriverState's AioContext to avoid the need to acquire/release around each
-bdrv_*() call. Be aware that there is currently no mechanism to get notified
-when bdrv_set_aio_context() moves this BlockDriverState to a different
-AioContext (see bdrv_detach_aio_context()/bdrv_attach_aio_context()), so you
-may need to add this if you want to support long-running jobs.
+If main loop code such as a QMP function wishes to access a BlockDriverState
+it must first call aio_context_acquire(bdrv_get_aio_context(bs)) to ensure
+that callbacks in the IOThread do not run in parallel.
+
+Code running in the monitor typically needs to ensure that past
+requests from the guest are completed. When a block device is running
+in an IOThread, the IOThread can also process requests from the guest
+(via ioeventfd). To achieve both objects, wrap the code between
+bdrv_drained_begin() and bdrv_drained_end(), thus creating a "drained
+section". The functions must be called between aio_context_acquire()
+and aio_context_release(). You can freely release and re-acquire the
+AioContext within a drained section.
+
+Long-running jobs (usually in the form of coroutines) are best scheduled in
+the BlockDriverState's AioContext to avoid the need to acquire/release around
+each bdrv_*() call. The functions bdrv_add/remove_aio_context_notifier,
+or alternatively blk_add/remove_aio_context_notifier if you use BlockBackends,
+can be used to get a notification whenever bdrv_set_aio_context() moves a
+BlockDriverState to a different AioContext.
diff --git a/hw/scsi/virtio-scsi-dataplane.c b/hw/scsi/virtio-scsi-dataplane.c
index b173b94949..9424f0e057 100644
--- a/hw/scsi/virtio-scsi-dataplane.c
+++ b/hw/scsi/virtio-scsi-dataplane.c
@@ -189,13 +189,11 @@ void virtio_scsi_dataplane_stop(VirtIOSCSI *s)
assert(s->ctx == iothread_get_aio_context(vs->conf.iothread));
aio_context_acquire(s->ctx);
-
virtio_scsi_clear_aio(s);
+ aio_context_release(s->ctx);
blk_drain_all(); /* ensure there are no in-flight requests */
- aio_context_release(s->ctx);
-
for (i = 0; i < vs->conf.num_queues + 2; i++) {
virtio_bus_set_host_notifier(VIRTIO_BUS(qbus), i, false);
}
diff --git a/include/block/aio.h b/include/block/aio.h
index b9fe2cb37e..c7ae27c91c 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -18,7 +18,6 @@
#include "qemu/queue.h"
#include "qemu/event_notifier.h"
#include "qemu/thread.h"
-#include "qemu/rfifolock.h"
#include "qemu/timer.h"
typedef struct BlockAIOCB BlockAIOCB;
@@ -54,7 +53,7 @@ struct AioContext {
GSource source;
/* Protects all fields from multi-threaded access */
- RFifoLock lock;
+ QemuRecMutex lock;
/* The list of registered AIO handlers */
QLIST_HEAD(, AioHandler) aio_handlers;
@@ -116,9 +115,6 @@ struct AioContext {
bool notified;
EventNotifier notifier;
- /* Scheduling this BH forces the event loop it iterate */
- QEMUBH *notify_dummy_bh;
-
/* Thread pool for performing work and receiving completion callbacks */
struct ThreadPool *thread_pool;
@@ -453,6 +449,24 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
}
/**
+ * Return the AioContext whose event loop runs in the current thread.
+ *
+ * If called from an IOThread this will be the IOThread's AioContext. If
+ * called from another thread it will be the main loop AioContext.
+ */
+AioContext *qemu_get_current_aio_context(void);
+
+/**
+ * @ctx: the aio context
+ *
+ * Return whether we are running in the I/O thread that manages @ctx.
+ */
+static inline bool aio_context_in_iothread(AioContext *ctx)
+{
+ return ctx == qemu_get_current_aio_context();
+}
+
+/**
* aio_context_setup:
* @ctx: the aio context
*
diff --git a/include/block/block.h b/include/block/block.h
index 398a050176..b7dc7d54ae 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -218,7 +218,7 @@ BlockDriverState *bdrv_open(const char *filename, const char *reference,
BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
BlockDriverState *bs,
QDict *options, int flags);
-int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp);
+int bdrv_reopen_multiple(AioContext *ctx, BlockReopenQueue *bs_queue, Error **errp);
int bdrv_reopen(BlockDriverState *bs, int bdrv_flags, Error **errp);
int bdrv_reopen_prepare(BDRVReopenState *reopen_state,
BlockReopenQueue *queue, Error **errp);
@@ -334,6 +334,35 @@ void bdrv_drain(BlockDriverState *bs);
void coroutine_fn bdrv_co_drain(BlockDriverState *bs);
void bdrv_drain_all(void);
+#define BDRV_POLL_WHILE(bs, cond) ({ \
+ bool waited_ = false; \
+ BlockDriverState *bs_ = (bs); \
+ AioContext *ctx_ = bdrv_get_aio_context(bs_); \
+ if (aio_context_in_iothread(ctx_)) { \
+ while ((cond)) { \
+ aio_poll(ctx_, true); \
+ waited_ = true; \
+ } \
+ } else { \
+ assert(qemu_get_current_aio_context() == \
+ qemu_get_aio_context()); \
+ /* Ask bdrv_dec_in_flight to wake up the main \
+ * QEMU AioContext. Extra I/O threads never take \
+ * other I/O threads' AioContexts (see for example \
+ * block_job_defer_to_main_loop for how to do it). \
+ */ \
+ assert(!bs_->wakeup); \
+ bs_->wakeup = true; \
+ while ((cond)) { \
+ aio_context_release(ctx_); \
+ aio_poll(qemu_get_aio_context(), true); \
+ aio_context_acquire(ctx_); \
+ waited_ = true; \
+ } \
+ bs_->wakeup = false; \
+ } \
+ waited_; })
+
int bdrv_pdiscard(BlockDriverState *bs, int64_t offset, int count);
int bdrv_co_pdiscard(BlockDriverState *bs, int64_t offset, int count);
int bdrv_has_zero_init_1(BlockDriverState *bs);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index e96e9ada57..e7ff58419c 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -62,8 +62,6 @@
enum BdrvTrackedRequestType {
BDRV_TRACKED_READ,
BDRV_TRACKED_WRITE,
- BDRV_TRACKED_FLUSH,
- BDRV_TRACKED_IOCTL,
BDRV_TRACKED_DISCARD,
};
@@ -445,7 +443,7 @@ struct BlockDriverState {
note this is a reference count */
CoQueue flush_queue; /* Serializing flush queue */
- BdrvTrackedRequest *active_flush_req; /* Flush request in flight */
+ bool active_flush_req; /* Flush request in flight? */
unsigned int write_gen; /* Current data generation */
unsigned int flushed_gen; /* Flushed write generation */
@@ -473,9 +471,12 @@ struct BlockDriverState {
/* Callback before write request is processed */
NotifierWithReturnList before_write_notifiers;
- /* number of in-flight serialising requests */
+ /* number of in-flight requests; overall and serialising */
+ unsigned int in_flight;
unsigned int serialising_in_flight;
+ bool wakeup;
+
/* Offset after the highest byte written to */
uint64_t wr_highest_offset;
@@ -634,6 +635,21 @@ void bdrv_remove_aio_context_notifier(BlockDriverState *bs,
void (*aio_context_detached)(void *),
void *opaque);
+/**
+ * bdrv_wakeup:
+ * @bs: The BlockDriverState for which an I/O operation has been completed.
+ *
+ * Wake up the main thread if it is waiting on BDRV_POLL_WHILE. During
+ * synchronous I/O on a BlockDriverState that is attached to another
+ * I/O thread, the main thread lets the I/O thread's event loop run,
+ * waiting for the I/O operation to complete. A bdrv_wakeup will wake
+ * up the main thread if necessary.
+ *
+ * Manual calls to bdrv_wakeup are rarely necessary, because
+ * bdrv_dec_in_flight already calls it.
+ */
+void bdrv_wakeup(BlockDriverState *bs);
+
#ifdef _WIN32
int is_windows_drive(const char *filename);
#endif
@@ -787,6 +803,9 @@ bool bdrv_requests_pending(BlockDriverState *bs);
void bdrv_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap **out);
void bdrv_undo_clear_dirty_bitmap(BdrvDirtyBitmap *bitmap, HBitmap *in);
+void bdrv_inc_in_flight(BlockDriverState *bs);
+void bdrv_dec_in_flight(BlockDriverState *bs);
+
void blockdev_close_all_bdrv_states(void);
#endif /* BLOCK_INT_H */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 4ddb4ae2e1..2bb39f4d29 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -92,6 +92,13 @@ typedef struct BlockJobDriver {
* besides job->blk to the new AioContext.
*/
void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
+
+ /*
+ * If the callback is not NULL, it will be invoked when the job has to be
+ * synchronously cancelled or completed; it should drain BlockDriverStates
+ * as required to ensure progress.
+ */
+ void (*drain)(BlockJob *job);
} BlockJobDriver;
/**
diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
deleted file mode 100644
index b23ab538a6..0000000000
--- a/include/qemu/rfifolock.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- * Stefan Hajnoczi <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2 or later.
- * See the COPYING file in the top-level directory.
- *
- */
-
-#ifndef QEMU_RFIFOLOCK_H
-#define QEMU_RFIFOLOCK_H
-
-#include "qemu/thread.h"
-
-/* Recursive FIFO lock
- *
- * This lock provides more features than a plain mutex:
- *
- * 1. Fairness - enforces FIFO order.
- * 2. Nesting - can be taken recursively.
- * 3. Contention callback - optional, called when thread must wait.
- *
- * The recursive FIFO lock is heavyweight so prefer other synchronization
- * primitives if you do not need its features.
- */
-typedef struct {
- QemuMutex lock; /* protects all fields */
-
- /* FIFO order */
- unsigned int head; /* active ticket number */
- unsigned int tail; /* waiting ticket number */
- QemuCond cond; /* used to wait for our ticket number */
-
- /* Nesting */
- QemuThread owner_thread; /* thread that currently has ownership */
- unsigned int nesting; /* amount of nesting levels */
-
- /* Contention callback */
- void (*cb)(void *); /* called when thread must wait, with ->lock
- * held so it may not recursively lock/unlock
- */
- void *cb_opaque;
-} RFifoLock;
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
-void rfifolock_destroy(RFifoLock *r);
-void rfifolock_lock(RFifoLock *r);
-void rfifolock_unlock(RFifoLock *r);
-
-#endif /* QEMU_RFIFOLOCK_H */
diff --git a/include/qemu/thread-posix.h b/include/qemu/thread-posix.h
index aa03567e5e..09d1e15728 100644
--- a/include/qemu/thread-posix.h
+++ b/include/qemu/thread-posix.h
@@ -4,6 +4,12 @@
#include <pthread.h>
#include <semaphore.h>
+typedef QemuMutex QemuRecMutex;
+#define qemu_rec_mutex_destroy qemu_mutex_destroy
+#define qemu_rec_mutex_lock qemu_mutex_lock
+#define qemu_rec_mutex_try_lock qemu_mutex_try_lock
+#define qemu_rec_mutex_unlock qemu_mutex_unlock
+
struct QemuMutex {
pthread_mutex_t lock;
};
diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
index c7ce8dcd45..5fb6541ae9 100644
--- a/include/qemu/thread-win32.h
+++ b/include/qemu/thread-win32.h
@@ -8,6 +8,16 @@ struct QemuMutex {
LONG owner;
};
+typedef struct QemuRecMutex QemuRecMutex;
+struct QemuRecMutex {
+ CRITICAL_SECTION lock;
+};
+
+void qemu_rec_mutex_destroy(QemuRecMutex *mutex);
+void qemu_rec_mutex_lock(QemuRecMutex *mutex);
+int qemu_rec_mutex_trylock(QemuRecMutex *mutex);
+void qemu_rec_mutex_unlock(QemuRecMutex *mutex);
+
struct QemuCond {
LONG waiters, target;
HANDLE sema;
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index 31237e93ee..e8e665f020 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,6 +25,9 @@ void qemu_mutex_lock(QemuMutex *mutex);
int qemu_mutex_trylock(QemuMutex *mutex);
void qemu_mutex_unlock(QemuMutex *mutex);
+/* Prototypes for other functions are in thread-posix.h/thread-win32.h. */
+void qemu_rec_mutex_init(QemuRecMutex *mutex);
+
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);
diff --git a/iothread.c b/iothread.c
index fbeb8deb38..bd70344811 100644
--- a/iothread.c
+++ b/iothread.c
@@ -16,10 +16,12 @@
#include "qom/object_interfaces.h"
#include "qemu/module.h"
#include "block/aio.h"
+#include "block/block.h"
#include "sysemu/iothread.h"
#include "qmp-commands.h"
#include "qemu/error-report.h"
#include "qemu/rcu.h"
+#include "qemu/main-loop.h"
typedef ObjectClass IOThreadClass;
@@ -28,26 +30,27 @@ typedef ObjectClass IOThreadClass;
#define IOTHREAD_CLASS(klass) \
OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
+static __thread IOThread *my_iothread;
+
+AioContext *qemu_get_current_aio_context(void)
+{
+ return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+}
+
static void *iothread_run(void *opaque)
{
IOThread *iothread = opaque;
- bool blocking;
rcu_register_thread();
+ my_iothread = iothread;
qemu_mutex_lock(&iothread->init_done_lock);
iothread->thread_id = qemu_get_thread_id();
qemu_cond_signal(&iothread->init_done_cond);
qemu_mutex_unlock(&iothread->init_done_lock);
- while (!iothread->stopping) {
- aio_context_acquire(iothread->ctx);
- blocking = true;
- while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) {
- /* Progress was made, keep going */
- blocking = false;
- }
- aio_context_release(iothread->ctx);
+ while (!atomic_read(&iothread->stopping)) {
+ aio_poll(iothread->ctx, true);
}
rcu_unregister_thread();
@@ -190,6 +193,18 @@ IOThreadInfoList *qmp_query_iothreads(Error **errp)
void iothread_stop_all(void)
{
Object *container = object_get_objects_root();
+ BlockDriverState *bs;
+ BdrvNextIterator it;
+
+ for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) {
+ AioContext *ctx = bdrv_get_aio_context(bs);
+ if (ctx == qemu_get_aio_context()) {
+ continue;
+ }
+ aio_context_acquire(ctx);
+ bdrv_set_aio_context(bs, qemu_get_aio_context());
+ aio_context_release(ctx);
+ }
object_child_foreach(container, iothread_stop, NULL);
}
diff --git a/qemu-img.c b/qemu-img.c
index afcd51ff18..ac7f40d91a 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -795,6 +795,7 @@ static void run_block_job(BlockJob *job, Error **errp)
{
AioContext *aio_context = blk_get_aio_context(job->blk);
+ aio_context_acquire(aio_context);
do {
aio_poll(aio_context, true);
qemu_progress_print(job->len ?
@@ -802,6 +803,7 @@ static void run_block_job(BlockJob *job, Error **errp)
} while (!job->ready);
block_job_complete_sync(job, errp);
+ aio_context_release(aio_context);
/* A block job may finish instantaneously without publishing any progress,
* so just signal completion here */
@@ -819,6 +821,7 @@ static int img_commit(int argc, char **argv)
Error *local_err = NULL;
CommonBlockJobCBInfo cbi;
bool image_opts = false;
+ AioContext *aio_context;
fmt = NULL;
cache = BDRV_DEFAULT_CACHE;
@@ -928,8 +931,11 @@ static int img_commit(int argc, char **argv)
.bs = bs,
};
+ aio_context = bdrv_get_aio_context(bs);
+ aio_context_acquire(aio_context);
commit_active_start("commit", bs, base_bs, 0, BLOCKDEV_ON_ERROR_REPORT,
common_block_job_cb, &cbi, &local_err, false);
+ aio_context_release(aio_context);
if (local_err) {
goto done;
}
diff --git a/qemu-io-cmds.c b/qemu-io-cmds.c
index 3a3838a079..4750e9ab93 100644
--- a/qemu-io-cmds.c
+++ b/qemu-io-cmds.c
@@ -1956,7 +1956,7 @@ static int reopen_f(BlockBackend *blk, int argc, char **argv)
qemu_opts_reset(&reopen_opts);
brq = bdrv_reopen_queue(NULL, bs, opts, flags);
- bdrv_reopen_multiple(brq, &local_err);
+ bdrv_reopen_multiple(bdrv_get_aio_context(bs), brq, &local_err);
if (local_err) {
error_report_err(local_err);
} else {
@@ -2216,6 +2216,7 @@ static const cmdinfo_t help_cmd = {
bool qemuio_command(BlockBackend *blk, const char *cmd)
{
+ AioContext *ctx;
char *input;
const cmdinfo_t *ct;
char **v;
@@ -2227,7 +2228,10 @@ bool qemuio_command(BlockBackend *blk, const char *cmd)
if (c) {
ct = find_command(v[0]);
if (ct) {
+ ctx = blk ? blk_get_aio_context(blk) : qemu_get_aio_context();
+ aio_context_acquire(ctx);
done = command(blk, ct, c, v);
+ aio_context_release(ctx);
} else {
fprintf(stderr, "command \"%s\" not found\n", v[0]);
}
diff --git a/stubs/Makefile.objs b/stubs/Makefile.objs
index c5850e858e..84b9d9e160 100644
--- a/stubs/Makefile.objs
+++ b/stubs/Makefile.objs
@@ -17,6 +17,7 @@ stub-obj-y += gdbstub.o
stub-obj-y += get-fd.o
stub-obj-y += get-next-serial.o
stub-obj-y += get-vm-name.o
+stub-obj-y += iothread.o
stub-obj-y += iothread-lock.o
stub-obj-y += is-daemonized.o
stub-obj-y += machine-init-done.o
diff --git a/stubs/iothread.c b/stubs/iothread.c
new file mode 100644
index 0000000000..8cc9e28c55
--- /dev/null
+++ b/stubs/iothread.c
@@ -0,0 +1,8 @@
+#include "qemu/osdep.h"
+#include "block/aio.h"
+#include "qemu/main-loop.h"
+
+AioContext *qemu_get_current_aio_context(void)
+{
+ return qemu_get_aio_context();
+}
diff --git a/tests/.gitignore b/tests/.gitignore
index 64e050e859..c0d7857538 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -67,7 +67,6 @@ test-qmp-marshal.c
test-qobject-output-visitor
test-rcu-list
test-replication
-test-rfifolock
test-string-input-visitor
test-string-output-visitor
test-thread-pool
diff --git a/tests/Makefile.include b/tests/Makefile.include
index 1a135d2340..de516341fd 100644
--- a/tests/Makefile.include
+++ b/tests/Makefile.include
@@ -45,7 +45,6 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
check-unit-y += tests/test-iov$(EXESUF)
gcov-files-test-iov-y = util/iov.c
check-unit-y += tests/test-aio$(EXESUF)
-check-unit-$(CONFIG_POSIX) += tests/test-rfifolock$(EXESUF)
check-unit-y += tests/test-throttle$(EXESUF)
gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -491,7 +490,6 @@ tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
tests/test-char$(EXESUF): tests/test-char.o qemu-char.o qemu-timer.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y)
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
-tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o $(test-util-obj-y)
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
diff --git a/tests/test-aio.c b/tests/test-aio.c
index 03aa846970..5be99f8287 100644
--- a/tests/test-aio.c
+++ b/tests/test-aio.c
@@ -100,6 +100,7 @@ static void event_ready_cb(EventNotifier *e)
typedef struct {
QemuMutex start_lock;
+ EventNotifier notifier;
bool thread_acquired;
} AcquireTestData;
@@ -111,6 +112,11 @@ static void *test_acquire_thread(void *opaque)
qemu_mutex_lock(&data->start_lock);
qemu_mutex_unlock(&data->start_lock);
+ /* event_notifier_set might be called either before or after
+ * the main thread's call to poll(). The test case's outcome
+ * should be the same in either case.
+ */
+ event_notifier_set(&data->notifier);
aio_context_acquire(ctx);
aio_context_release(ctx);
@@ -125,20 +131,19 @@ static void set_event_notifier(AioContext *ctx, EventNotifier *notifier,
aio_set_event_notifier(ctx, notifier, false, handler);
}
-static void dummy_notifier_read(EventNotifier *unused)
+static void dummy_notifier_read(EventNotifier *n)
{
- g_assert(false); /* should never be invoked */
+ event_notifier_test_and_clear(n);
}
static void test_acquire(void)
{
QemuThread thread;
- EventNotifier notifier;
AcquireTestData data;
/* Dummy event notifier ensures aio_poll() will block */
- event_notifier_init(&notifier, false);
- set_event_notifier(ctx, &notifier, dummy_notifier_read);
+ event_notifier_init(&data.notifier, false);
+ set_event_notifier(ctx, &data.notifier, dummy_notifier_read);
g_assert(!aio_poll(ctx, false)); /* consume aio_notify() */
qemu_mutex_init(&data.start_lock);
@@ -152,12 +157,13 @@ static void test_acquire(void)
/* Block in aio_poll(), let other thread kick us and acquire context */
aio_context_acquire(ctx);
qemu_mutex_unlock(&data.start_lock); /* let the thread run */
- g_assert(!aio_poll(ctx, true));
+ g_assert(aio_poll(ctx, true));
+ g_assert(!data.thread_acquired);
aio_context_release(ctx);
qemu_thread_join(&thread);
- set_event_notifier(ctx, &notifier, NULL);
- event_notifier_cleanup(&notifier);
+ set_event_notifier(ctx, &data.notifier, NULL);
+ event_notifier_cleanup(&data.notifier);
g_assert(data.thread_acquired);
}
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
deleted file mode 100644
index 471a81114d..0000000000
--- a/tests/test-rfifolock.c
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * RFifoLock tests
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- * Stefan Hajnoczi <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- */
-
-#include "qemu/osdep.h"
-#include "qemu-common.h"
-#include "qemu/rfifolock.h"
-
-static void test_nesting(void)
-{
- RFifoLock lock;
-
- /* Trivial test, ensure the lock is recursive */
- rfifolock_init(&lock, NULL, NULL);
- rfifolock_lock(&lock);
- rfifolock_lock(&lock);
- rfifolock_lock(&lock);
- rfifolock_unlock(&lock);
- rfifolock_unlock(&lock);
- rfifolock_unlock(&lock);
- rfifolock_destroy(&lock);
-}
-
-typedef struct {
- RFifoLock lock;
- int fd[2];
-} CallbackTestData;
-
-static void rfifolock_cb(void *opaque)
-{
- CallbackTestData *data = opaque;
- int ret;
- char c = 0;
-
- ret = write(data->fd[1], &c, sizeof(c));
- g_assert(ret == 1);
-}
-
-static void *callback_thread(void *opaque)
-{
- CallbackTestData *data = opaque;
-
- /* The other thread holds the lock so the contention callback will be
- * invoked...
- */
- rfifolock_lock(&data->lock);
- rfifolock_unlock(&data->lock);
- return NULL;
-}
-
-static void test_callback(void)
-{
- CallbackTestData data;
- QemuThread thread;
- int ret;
- char c;
-
- rfifolock_init(&data.lock, rfifolock_cb, &data);
- ret = qemu_pipe(data.fd);
- g_assert(ret == 0);
-
- /* Hold lock but allow the callback to kick us by writing to the pipe */
- rfifolock_lock(&data.lock);
- qemu_thread_create(&thread, "callback_thread",
- callback_thread, &data, QEMU_THREAD_JOINABLE);
- ret = read(data.fd[0], &c, sizeof(c));
- g_assert(ret == 1);
- rfifolock_unlock(&data.lock);
- /* If we got here then the callback was invoked, as expected */
-
- qemu_thread_join(&thread);
- close(data.fd[0]);
- close(data.fd[1]);
- rfifolock_destroy(&data.lock);
-}
-
-int main(int argc, char **argv)
-{
- g_test_init(&argc, &argv, NULL);
- g_test_add_func("/nesting", test_nesting);
- g_test_add_func("/callback", test_callback);
- return g_test_run();
-}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 36c7dcc1fa..ad0f9c7fe4 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -25,7 +25,6 @@ util-obj-y += uuid.o
util-obj-y += throttle.o
util-obj-y += getauxval.o
util-obj-y += readline.o
-util-obj-y += rfifolock.o
util-obj-y += rcu.o
util-obj-y += qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
util-obj-y += qemu-coroutine-sleep.o
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index ce51b37c1d..d20cddec0c 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -80,6 +80,20 @@ void qemu_mutex_unlock(QemuMutex *mutex)
error_exit(err, __func__);
}
+void qemu_rec_mutex_init(QemuRecMutex *mutex)
+{
+ int err;
+ pthread_mutexattr_t attr;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+ err = pthread_mutex_init(&mutex->lock, &attr);
+ pthread_mutexattr_destroy(&attr);
+ if (err) {
+ error_exit(err, __func__);
+ }
+}
+
void qemu_cond_init(QemuCond *cond)
{
int err;
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 072806f792..728e76b5b2 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -79,6 +79,31 @@ void qemu_mutex_unlock(QemuMutex *mutex)
LeaveCriticalSection(&mutex->lock);
}
+void qemu_rec_mutex_init(QemuRecMutex *mutex)
+{
+ InitializeCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_destroy(QemuRecMutex *mutex)
+{
+ DeleteCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_lock(QemuRecMutex *mutex)
+{
+ EnterCriticalSection(&mutex->lock);
+}
+
+int qemu_rec_mutex_trylock(QemuRecMutex *mutex)
+{
+ return !TryEnterCriticalSection(&mutex->lock);
+}
+
+void qemu_rec_mutex_unlock(QemuRecMutex *mutex)
+{
+ LeaveCriticalSection(&mutex->lock);
+}
+
void qemu_cond_init(QemuCond *cond)
{
memset(cond, 0, sizeof(*cond));
diff --git a/util/rfifolock.c b/util/rfifolock.c
deleted file mode 100644
index 084c2f0ea1..0000000000
--- a/util/rfifolock.c
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Recursive FIFO lock
- *
- * Copyright Red Hat, Inc. 2013
- *
- * Authors:
- * Stefan Hajnoczi <stefanha@redhat.com>
- *
- * This work is licensed under the terms of the GNU LGPL, version 2 or later.
- * See the COPYING.LIB file in the top-level directory.
- *
- */
-
-#include "qemu/osdep.h"
-#include "qemu/rfifolock.h"
-
-void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
-{
- qemu_mutex_init(&r->lock);
- r->head = 0;
- r->tail = 0;
- qemu_cond_init(&r->cond);
- r->nesting = 0;
- r->cb = cb;
- r->cb_opaque = opaque;
-}
-
-void rfifolock_destroy(RFifoLock *r)
-{
- qemu_cond_destroy(&r->cond);
- qemu_mutex_destroy(&r->lock);
-}
-
-/*
- * Theory of operation:
- *
- * In order to ensure FIFO ordering, implement a ticketlock. Threads acquiring
- * the lock enqueue themselves by incrementing the tail index. When the lock
- * is unlocked, the head is incremented and waiting threads are notified.
- *
- * Recursive locking does not take a ticket since the head is only incremented
- * when the outermost recursive caller unlocks.
- */
-void rfifolock_lock(RFifoLock *r)
-{
- qemu_mutex_lock(&r->lock);
-
- /* Take a ticket */
- unsigned int ticket = r->tail++;
-
- if (r->nesting > 0 && qemu_thread_is_self(&r->owner_thread)) {
- r->tail--; /* put ticket back, we're nesting */
- } else {
- while (ticket != r->head) {
- /* Invoke optional contention callback */
- if (r->cb) {
- r->cb(r->cb_opaque);
- }
- qemu_cond_wait(&r->cond, &r->lock);
- }
- qemu_thread_get_self(&r->owner_thread);
- }
-
- r->nesting++;
- qemu_mutex_unlock(&r->lock);
-}
-
-void rfifolock_unlock(RFifoLock *r)
-{
- qemu_mutex_lock(&r->lock);
- assert(r->nesting > 0);
- assert(qemu_thread_is_self(&r->owner_thread));
- if (--r->nesting == 0) {
- r->head++;
- qemu_cond_broadcast(&r->cond);
- }
- qemu_mutex_unlock(&r->lock);
-}