diff options
-rw-r--r-- | block.c | 96 | ||||
-rw-r--r-- | block/backup.c | 2 | ||||
-rw-r--r-- | block/block-backend.c | 5 | ||||
-rw-r--r-- | block/dirty-bitmap.c | 57 | ||||
-rw-r--r-- | block/io.c | 332 | ||||
-rw-r--r-- | block/mirror.c | 613 | ||||
-rw-r--r-- | block/vvfat.c | 1 | ||||
-rw-r--r-- | blockdev.c | 9 | ||||
-rw-r--r-- | blockjob.c | 23 | ||||
-rw-r--r-- | include/block/aio-wait.h | 25 | ||||
-rw-r--r-- | include/block/block.h | 31 | ||||
-rw-r--r-- | include/block/block_int.h | 18 | ||||
-rw-r--r-- | include/block/blockjob_int.h | 8 | ||||
-rw-r--r-- | include/block/dirty-bitmap.h | 2 | ||||
-rw-r--r-- | include/qemu/hbitmap.h | 5 | ||||
-rw-r--r-- | include/qemu/job.h | 15 | ||||
-rw-r--r-- | job.c | 5 | ||||
-rw-r--r-- | qapi/block-core.json | 29 | ||||
-rwxr-xr-x | tests/qemu-iotests/151 | 120 | ||||
-rw-r--r-- | tests/qemu-iotests/151.out | 5 | ||||
-rw-r--r-- | tests/qemu-iotests/group | 1 | ||||
-rw-r--r-- | tests/test-bdrv-drain.c | 705 | ||||
-rw-r--r-- | tests/test-hbitmap.c | 38 | ||||
-rw-r--r-- | util/hbitmap.c | 10 |
24 files changed, 1836 insertions, 319 deletions
@@ -333,6 +333,10 @@ BlockDriverState *bdrv_new(void) qemu_co_queue_init(&bs->flush_queue); + for (i = 0; i < bdrv_drain_all_count; i++) { + bdrv_drained_begin(bs); + } + QTAILQ_INSERT_TAIL(&all_bdrv_states, bs, bs_list); return bs; @@ -818,7 +822,13 @@ static char *bdrv_child_get_parent_desc(BdrvChild *c) static void bdrv_child_cb_drained_begin(BdrvChild *child) { BlockDriverState *bs = child->opaque; - bdrv_drained_begin(bs); + bdrv_do_drained_begin_quiesce(bs, NULL, false); +} + +static bool bdrv_child_cb_drained_poll(BdrvChild *child) +{ + BlockDriverState *bs = child->opaque; + return bdrv_drain_poll(bs, false, NULL, false); } static void bdrv_child_cb_drained_end(BdrvChild *child) @@ -902,9 +912,11 @@ static void bdrv_inherited_options(int *child_flags, QDict *child_options, } const BdrvChildRole child_file = { + .parent_is_bds = true, .get_parent_desc = bdrv_child_get_parent_desc, .inherit_options = bdrv_inherited_options, .drained_begin = bdrv_child_cb_drained_begin, + .drained_poll = bdrv_child_cb_drained_poll, .drained_end = bdrv_child_cb_drained_end, .attach = bdrv_child_cb_attach, .detach = bdrv_child_cb_detach, @@ -926,9 +938,11 @@ static void bdrv_inherited_fmt_options(int *child_flags, QDict *child_options, } const BdrvChildRole child_format = { + .parent_is_bds = true, .get_parent_desc = bdrv_child_get_parent_desc, .inherit_options = bdrv_inherited_fmt_options, .drained_begin = bdrv_child_cb_drained_begin, + .drained_poll = bdrv_child_cb_drained_poll, .drained_end = bdrv_child_cb_drained_end, .attach = bdrv_child_cb_attach, .detach = bdrv_child_cb_detach, @@ -1043,11 +1057,13 @@ static int bdrv_backing_update_filename(BdrvChild *c, BlockDriverState *base, } const BdrvChildRole child_backing = { + .parent_is_bds = true, .get_parent_desc = bdrv_child_get_parent_desc, .attach = bdrv_backing_attach, .detach = bdrv_backing_detach, .inherit_options = bdrv_backing_options, .drained_begin = bdrv_child_cb_drained_begin, + .drained_poll = bdrv_child_cb_drained_poll, .drained_end = bdrv_child_cb_drained_end, .inactivate = bdrv_child_cb_inactivate, .update_filename = bdrv_backing_update_filename, @@ -1152,7 +1168,7 @@ static int bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv, int open_flags, Error **errp) { Error *local_err = NULL; - int ret; + int i, ret; bdrv_assign_node_name(bs, node_name, &local_err); if (local_err) { @@ -1200,6 +1216,12 @@ static int bdrv_open_driver(BlockDriverState *bs, BlockDriver *drv, assert(bdrv_min_mem_align(bs) != 0); assert(is_power_of_2(bs->bl.request_alignment)); + for (i = 0; i < bs->quiesce_counter; i++) { + if (drv->bdrv_co_drain_begin) { + drv->bdrv_co_drain_begin(bs); + } + } + return 0; open_failed: bs->drv = NULL; @@ -2021,7 +2043,12 @@ static void bdrv_replace_child_noperm(BdrvChild *child, child->role->detach(child); } if (old_bs->quiesce_counter && child->role->drained_end) { - for (i = 0; i < old_bs->quiesce_counter; i++) { + int num = old_bs->quiesce_counter; + if (child->role->parent_is_bds) { + num -= bdrv_drain_all_count; + } + assert(num >= 0); + for (i = 0; i < num; i++) { child->role->drained_end(child); } } @@ -2033,7 +2060,12 @@ static void bdrv_replace_child_noperm(BdrvChild *child, if (new_bs) { QLIST_INSERT_HEAD(&new_bs->parents, child, next_parent); if (new_bs->quiesce_counter && child->role->drained_begin) { - for (i = 0; i < new_bs->quiesce_counter; i++) { + int num = new_bs->quiesce_counter; + if (child->role->parent_is_bds) { + num -= bdrv_drain_all_count; + } + assert(num >= 0); + for (i = 0; i < num; i++) { child->role->drained_begin(child); } } @@ -3395,16 +3427,39 @@ static bool should_update_child(BdrvChild *c, BlockDriverState *to) return false; } - if (c->role == &child_backing) { - /* If @from is a backing file of @to, ignore the child to avoid - * creating a loop. We only want to change the pointer of other - * parents. */ - QLIST_FOREACH(to_c, &to->children, next) { - if (to_c == c) { - break; - } - } - if (to_c) { + /* If the child @c belongs to the BDS @to, replacing the current + * c->bs by @to would mean to create a loop. + * + * Such a case occurs when appending a BDS to a backing chain. + * For instance, imagine the following chain: + * + * guest device -> node A -> further backing chain... + * + * Now we create a new BDS B which we want to put on top of this + * chain, so we first attach A as its backing node: + * + * node B + * | + * v + * guest device -> node A -> further backing chain... + * + * Finally we want to replace A by B. When doing that, we want to + * replace all pointers to A by pointers to B -- except for the + * pointer from B because (1) that would create a loop, and (2) + * that pointer should simply stay intact: + * + * guest device -> node B + * | + * v + * node A -> further backing chain... + * + * In general, when replacing a node A (c->bs) by a node B (@to), + * if A is a child of B, that means we cannot replace A by B there + * because that would create a loop. Silently detaching A from B + * is also not really an option. So overall just leaving A in + * place there is the most sensible choice. */ + QLIST_FOREACH(to_c, &to->children, next) { + if (to_c == c) { return false; } } @@ -3430,6 +3485,7 @@ void bdrv_replace_node(BlockDriverState *from, BlockDriverState *to, /* Put all parents into @list and calculate their cumulative permissions */ QLIST_FOREACH_SAFE(c, &from->parents, next_parent, next) { + assert(c->bs == from); if (!should_update_child(c, to)) { continue; } @@ -4037,6 +4093,14 @@ BlockDriverState *bdrv_next_node(BlockDriverState *bs) return QTAILQ_NEXT(bs, node_list); } +BlockDriverState *bdrv_next_all_states(BlockDriverState *bs) +{ + if (!bs) { + return QTAILQ_FIRST(&all_bdrv_states); + } + return QTAILQ_NEXT(bs, bs_list); +} + const char *bdrv_get_node_name(const BlockDriverState *bs) { return bs->node_name; @@ -4948,7 +5012,7 @@ void bdrv_set_aio_context(BlockDriverState *bs, AioContext *new_context) AioContext *ctx = bdrv_get_aio_context(bs); aio_disable_external(ctx); - bdrv_parent_drained_begin(bs, NULL); + bdrv_parent_drained_begin(bs, NULL, false); bdrv_drain(bs); /* ensure there are no in-flight requests */ while (aio_poll(ctx, false)) { @@ -4962,7 +5026,7 @@ void bdrv_set_aio_context(BlockDriverState *bs, AioContext *new_context) */ aio_context_acquire(new_context); bdrv_attach_aio_context(bs, new_context); - bdrv_parent_drained_end(bs, NULL); + bdrv_parent_drained_end(bs, NULL, false); aio_enable_external(ctx); aio_context_release(new_context); } diff --git a/block/backup.c b/block/backup.c index 5661435675..d18be40caf 100644 --- a/block/backup.c +++ b/block/backup.c @@ -354,7 +354,7 @@ static int coroutine_fn backup_run_incremental(BackupBlockJob *job) HBitmapIter hbi; hbitmap_iter_init(&hbi, job->copy_bitmap, 0); - while ((cluster = hbitmap_iter_next(&hbi)) != -1) { + while ((cluster = hbitmap_iter_next(&hbi, true)) != -1) { do { if (yield_and_check(job)) { return 0; diff --git a/block/block-backend.c b/block/block-backend.c index 2d1a3463e8..6b75bca317 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -767,6 +767,11 @@ void blk_remove_bs(BlockBackend *blk) blk_update_root_state(blk); + /* bdrv_root_unref_child() will cause blk->root to become stale and may + * switch to a completion coroutine later on. Let's drain all I/O here + * to avoid that and a potential QEMU crash. + */ + blk_drain(blk); bdrv_root_unref_child(blk->root); blk->root = NULL; } diff --git a/block/dirty-bitmap.c b/block/dirty-bitmap.c index 383d742cdb..db1782ec1f 100644 --- a/block/dirty-bitmap.c +++ b/block/dirty-bitmap.c @@ -519,7 +519,62 @@ void bdrv_dirty_iter_free(BdrvDirtyBitmapIter *iter) int64_t bdrv_dirty_iter_next(BdrvDirtyBitmapIter *iter) { - return hbitmap_iter_next(&iter->hbi); + return hbitmap_iter_next(&iter->hbi, true); +} + +/** + * Return the next consecutively dirty area in the dirty bitmap + * belonging to the given iterator @iter. + * + * @max_offset: Maximum value that may be returned for + * *offset + *bytes + * @offset: Will contain the start offset of the next dirty area + * @bytes: Will contain the length of the next dirty area + * + * Returns: True if a dirty area could be found before max_offset + * (which means that *offset and *bytes then contain valid + * values), false otherwise. + * + * Note that @iter is never advanced if false is returned. If an area + * is found (which means that true is returned), it will be advanced + * past that area. + */ +bool bdrv_dirty_iter_next_area(BdrvDirtyBitmapIter *iter, uint64_t max_offset, + uint64_t *offset, int *bytes) +{ + uint32_t granularity = bdrv_dirty_bitmap_granularity(iter->bitmap); + uint64_t gran_max_offset; + int64_t ret; + int size; + + if (max_offset == iter->bitmap->size) { + /* If max_offset points to the image end, round it up by the + * bitmap granularity */ + gran_max_offset = ROUND_UP(max_offset, granularity); + } else { + gran_max_offset = max_offset; + } + + ret = hbitmap_iter_next(&iter->hbi, false); + if (ret < 0 || ret + granularity > gran_max_offset) { + return false; + } + + *offset = ret; + size = 0; + + assert(granularity <= INT_MAX); + + do { + /* Advance iterator */ + ret = hbitmap_iter_next(&iter->hbi, true); + size += granularity; + } while (ret + granularity <= gran_max_offset && + hbitmap_iter_next(&iter->hbi, false) == ret + granularity && + size <= INT_MAX - granularity); + + *bytes = MIN(size, max_offset - *offset); + return true; } /* Called within bdrv_dirty_bitmap_lock..unlock */ diff --git a/block/io.c b/block/io.c index b7beaeeb9f..ef4fedd364 100644 --- a/block/io.c +++ b/block/io.c @@ -38,15 +38,18 @@ /* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */ #define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS) +static AioWait drain_all_aio_wait; + static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int bytes, BdrvRequestFlags flags); -void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore) +void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore, + bool ignore_bds_parents) { BdrvChild *c, *next; QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) { - if (c == ignore) { + if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) { continue; } if (c->role->drained_begin) { @@ -55,12 +58,13 @@ void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore) } } -void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore) +void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore, + bool ignore_bds_parents) { BdrvChild *c, *next; QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) { - if (c == ignore) { + if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) { continue; } if (c->role->drained_end) { @@ -69,6 +73,24 @@ void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore) } } +static bool bdrv_parent_drained_poll(BlockDriverState *bs, BdrvChild *ignore, + bool ignore_bds_parents) +{ + BdrvChild *c, *next; + bool busy = false; + + QLIST_FOREACH_SAFE(c, &bs->parents, next_parent, next) { + if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) { + continue; + } + if (c->role->drained_poll) { + busy |= c->role->drained_poll(c); + } + } + + return busy; +} + static void bdrv_merge_limits(BlockLimits *dst, const BlockLimits *src) { dst->opt_transfer = MAX(dst->opt_transfer, src->opt_transfer); @@ -148,7 +170,9 @@ typedef struct { bool done; bool begin; bool recursive; + bool poll; BdrvChild *parent; + bool ignore_bds_parents; } BdrvCoDrainData; static void coroutine_fn bdrv_drain_invoke_entry(void *opaque) @@ -164,67 +188,83 @@ static void coroutine_fn bdrv_drain_invoke_entry(void *opaque) /* Set data->done before reading bs->wakeup. */ atomic_mb_set(&data->done, true); - bdrv_wakeup(bs); + bdrv_dec_in_flight(bs); + + if (data->begin) { + g_free(data); + } } /* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */ -static void bdrv_drain_invoke(BlockDriverState *bs, bool begin, bool recursive) +static void bdrv_drain_invoke(BlockDriverState *bs, bool begin) { - BdrvChild *child, *tmp; - BdrvCoDrainData data = { .bs = bs, .done = false, .begin = begin}; + BdrvCoDrainData *data; if (!bs->drv || (begin && !bs->drv->bdrv_co_drain_begin) || (!begin && !bs->drv->bdrv_co_drain_end)) { return; } - data.co = qemu_coroutine_create(bdrv_drain_invoke_entry, &data); - bdrv_coroutine_enter(bs, data.co); - BDRV_POLL_WHILE(bs, !data.done); + data = g_new(BdrvCoDrainData, 1); + *data = (BdrvCoDrainData) { + .bs = bs, + .done = false, + .begin = begin + }; - if (recursive) { - QLIST_FOREACH_SAFE(child, &bs->children, next, tmp) { - bdrv_drain_invoke(child->bs, begin, true); - } + /* Make sure the driver callback completes during the polling phase for + * drain_begin. */ + bdrv_inc_in_flight(bs); + data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data); + aio_co_schedule(bdrv_get_aio_context(bs), data->co); + + if (!begin) { + BDRV_POLL_WHILE(bs, !data->done); + g_free(data); } } -static bool bdrv_drain_recurse(BlockDriverState *bs) +/* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */ +bool bdrv_drain_poll(BlockDriverState *bs, bool recursive, + BdrvChild *ignore_parent, bool ignore_bds_parents) { - BdrvChild *child, *tmp; - bool waited; + BdrvChild *child, *next; - /* Wait for drained requests to finish */ - waited = BDRV_POLL_WHILE(bs, atomic_read(&bs->in_flight) > 0); - - QLIST_FOREACH_SAFE(child, &bs->children, next, tmp) { - BlockDriverState *bs = child->bs; - bool in_main_loop = - qemu_get_current_aio_context() == qemu_get_aio_context(); - assert(bs->refcnt > 0); - if (in_main_loop) { - /* In case the recursive bdrv_drain_recurse processes a - * block_job_defer_to_main_loop BH and modifies the graph, - * let's hold a reference to bs until we are done. - * - * IOThread doesn't have such a BH, and it is not safe to call - * bdrv_unref without BQL, so skip doing it there. - */ - bdrv_ref(bs); - } - waited |= bdrv_drain_recurse(bs); - if (in_main_loop) { - bdrv_unref(bs); + if (bdrv_parent_drained_poll(bs, ignore_parent, ignore_bds_parents)) { + return true; + } + + if (atomic_read(&bs->in_flight)) { + return true; + } + + if (recursive) { + assert(!ignore_bds_parents); + QLIST_FOREACH_SAFE(child, &bs->children, next, next) { + if (bdrv_drain_poll(child->bs, recursive, child, false)) { + return true; + } } } - return waited; + return false; +} + +static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive, + BdrvChild *ignore_parent) +{ + /* Execute pending BHs first and check everything else only after the BHs + * have executed. */ + while (aio_poll(bs->aio_context, false)); + + return bdrv_drain_poll(bs, recursive, ignore_parent, false); } static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive, - BdrvChild *parent); + BdrvChild *parent, bool ignore_bds_parents, + bool poll); static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, - BdrvChild *parent); + BdrvChild *parent, bool ignore_bds_parents); static void bdrv_co_drain_bh_cb(void *opaque) { @@ -232,11 +272,18 @@ static void bdrv_co_drain_bh_cb(void *opaque) Coroutine *co = data->co; BlockDriverState *bs = data->bs; - bdrv_dec_in_flight(bs); - if (data->begin) { - bdrv_do_drained_begin(bs, data->recursive, data->parent); + if (bs) { + bdrv_dec_in_flight(bs); + if (data->begin) { + bdrv_do_drained_begin(bs, data->recursive, data->parent, + data->ignore_bds_parents, data->poll); + } else { + bdrv_do_drained_end(bs, data->recursive, data->parent, + data->ignore_bds_parents); + } } else { - bdrv_do_drained_end(bs, data->recursive, data->parent); + assert(data->begin); + bdrv_drain_all_begin(); } data->done = true; @@ -245,7 +292,9 @@ static void bdrv_co_drain_bh_cb(void *opaque) static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, bool begin, bool recursive, - BdrvChild *parent) + BdrvChild *parent, + bool ignore_bds_parents, + bool poll) { BdrvCoDrainData data; @@ -260,8 +309,12 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, .begin = begin, .recursive = recursive, .parent = parent, + .ignore_bds_parents = ignore_bds_parents, + .poll = poll, }; - bdrv_inc_in_flight(bs); + if (bs) { + bdrv_inc_in_flight(bs); + } aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), bdrv_co_drain_bh_cb, &data); @@ -271,79 +324,106 @@ static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs, assert(data.done); } -void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive, - BdrvChild *parent) +void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, + BdrvChild *parent, bool ignore_bds_parents) { - BdrvChild *child, *next; - - if (qemu_in_coroutine()) { - bdrv_co_yield_to_drain(bs, true, recursive, parent); - return; - } + assert(!qemu_in_coroutine()); /* Stop things in parent-to-child order */ if (atomic_fetch_inc(&bs->quiesce_counter) == 0) { aio_disable_external(bdrv_get_aio_context(bs)); } - bdrv_parent_drained_begin(bs, parent); - bdrv_drain_invoke(bs, true, false); - bdrv_drain_recurse(bs); + bdrv_parent_drained_begin(bs, parent, ignore_bds_parents); + bdrv_drain_invoke(bs, true); +} + +static void bdrv_do_drained_begin(BlockDriverState *bs, bool recursive, + BdrvChild *parent, bool ignore_bds_parents, + bool poll) +{ + BdrvChild *child, *next; + + if (qemu_in_coroutine()) { + bdrv_co_yield_to_drain(bs, true, recursive, parent, ignore_bds_parents, + poll); + return; + } + + bdrv_do_drained_begin_quiesce(bs, parent, ignore_bds_parents); if (recursive) { + assert(!ignore_bds_parents); bs->recursive_quiesce_counter++; QLIST_FOREACH_SAFE(child, &bs->children, next, next) { - bdrv_do_drained_begin(child->bs, true, child); + bdrv_do_drained_begin(child->bs, true, child, ignore_bds_parents, + false); } } + + /* + * Wait for drained requests to finish. + * + * Calling BDRV_POLL_WHILE() only once for the top-level node is okay: The + * call is needed so things in this AioContext can make progress even + * though we don't return to the main AioContext loop - this automatically + * includes other nodes in the same AioContext and therefore all child + * nodes. + */ + if (poll) { + assert(!ignore_bds_parents); + BDRV_POLL_WHILE(bs, bdrv_drain_poll_top_level(bs, recursive, parent)); + } } void bdrv_drained_begin(BlockDriverState *bs) { - bdrv_do_drained_begin(bs, false, NULL); + bdrv_do_drained_begin(bs, false, NULL, false, true); } void bdrv_subtree_drained_begin(BlockDriverState *bs) { - bdrv_do_drained_begin(bs, true, NULL); + bdrv_do_drained_begin(bs, true, NULL, false, true); } -void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, - BdrvChild *parent) +static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive, + BdrvChild *parent, bool ignore_bds_parents) { BdrvChild *child, *next; int old_quiesce_counter; if (qemu_in_coroutine()) { - bdrv_co_yield_to_drain(bs, false, recursive, parent); + bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents, + false); return; } assert(bs->quiesce_counter > 0); old_quiesce_counter = atomic_fetch_dec(&bs->quiesce_counter); /* Re-enable things in child-to-parent order */ - bdrv_drain_invoke(bs, false, false); - bdrv_parent_drained_end(bs, parent); + bdrv_drain_invoke(bs, false); + bdrv_parent_drained_end(bs, parent, ignore_bds_parents); if (old_quiesce_counter == 1) { aio_enable_external(bdrv_get_aio_context(bs)); } if (recursive) { + assert(!ignore_bds_parents); bs->recursive_quiesce_counter--; QLIST_FOREACH_SAFE(child, &bs->children, next, next) { - bdrv_do_drained_end(child->bs, true, child); + bdrv_do_drained_end(child->bs, true, child, ignore_bds_parents); } } } void bdrv_drained_end(BlockDriverState *bs) { - bdrv_do_drained_end(bs, false, NULL); + bdrv_do_drained_end(bs, false, NULL, false); } void bdrv_subtree_drained_end(BlockDriverState *bs) { - bdrv_do_drained_end(bs, true, NULL); + bdrv_do_drained_end(bs, true, NULL, false); } void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent) @@ -351,7 +431,7 @@ void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent) int i; for (i = 0; i < new_parent->recursive_quiesce_counter; i++) { - bdrv_do_drained_begin(child->bs, true, child); + bdrv_do_drained_begin(child->bs, true, child, false, true); } } @@ -360,7 +440,7 @@ void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent) int i; for (i = 0; i < old_parent->recursive_quiesce_counter; i++) { - bdrv_do_drained_end(child->bs, true, child); + bdrv_do_drained_end(child->bs, true, child, false); } } @@ -370,10 +450,6 @@ void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent) * * Note that unlike bdrv_drain_all(), the caller must hold the BlockDriverState * AioContext. - * - * Only this BlockDriverState's AioContext is run, so in-flight requests must - * not depend on events in other AioContexts. In that case, use - * bdrv_drain_all() instead. */ void coroutine_fn bdrv_co_drain(BlockDriverState *bs) { @@ -388,6 +464,39 @@ void bdrv_drain(BlockDriverState *bs) bdrv_drained_end(bs); } +static void bdrv_drain_assert_idle(BlockDriverState *bs) +{ + BdrvChild *child, *next; + + assert(atomic_read(&bs->in_flight) == 0); + QLIST_FOREACH_SAFE(child, &bs->children, next, next) { + bdrv_drain_assert_idle(child->bs); + } +} + +unsigned int bdrv_drain_all_count = 0; + +static bool bdrv_drain_all_poll(void) +{ + BlockDriverState *bs = NULL; + bool result = false; + + /* Execute pending BHs first (may modify the graph) and check everything + * else only after the BHs have executed. */ + while (aio_poll(qemu_get_aio_context(), false)); + + /* bdrv_drain_poll() can't make changes to the graph and we are holding the + * main AioContext lock, so iterating bdrv_next_all_states() is safe. */ + while ((bs = bdrv_next_all_states(bs))) { + AioContext *aio_context = bdrv_get_aio_context(bs); + aio_context_acquire(aio_context); + result |= bdrv_drain_poll(bs, false, NULL, true); + aio_context_release(aio_context); + } + + return result; +} + /* * Wait for pending requests to complete across all BlockDriverStates * @@ -402,73 +511,51 @@ void bdrv_drain(BlockDriverState *bs) */ void bdrv_drain_all_begin(void) { - /* Always run first iteration so any pending completion BHs run */ - bool waited = true; - BlockDriverState *bs; - BdrvNextIterator it; - GSList *aio_ctxs = NULL, *ctx; + BlockDriverState *bs = NULL; + + if (qemu_in_coroutine()) { + bdrv_co_yield_to_drain(NULL, true, false, NULL, true, true); + return; + } - /* BDRV_POLL_WHILE() for a node can only be called from its own I/O thread - * or the main loop AioContext. We potentially use BDRV_POLL_WHILE() on - * nodes in several different AioContexts, so make sure we're in the main - * context. */ + /* AIO_WAIT_WHILE() with a NULL context can only be called from the main + * loop AioContext, so make sure we're in the main context. */ assert(qemu_get_current_aio_context() == qemu_get_aio_context()); + assert(bdrv_drain_all_count < INT_MAX); + bdrv_drain_all_count++; - for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) { + /* Quiesce all nodes, without polling in-flight requests yet. The graph + * cannot change during this loop. */ + while ((bs = bdrv_next_all_states(bs))) { AioContext *aio_context = bdrv_get_aio_context(bs); - /* Stop things in parent-to-child order */ aio_context_acquire(aio_context); - aio_disable_external(aio_context); - bdrv_parent_drained_begin(bs, NULL); - bdrv_drain_invoke(bs, true, true); + bdrv_do_drained_begin(bs, false, NULL, true, false); aio_context_release(aio_context); - - if (!g_slist_find(aio_ctxs, aio_context)) { - aio_ctxs = g_slist_prepend(aio_ctxs, aio_context); - } } - /* Note that completion of an asynchronous I/O operation can trigger any - * number of other I/O operations on other devices---for example a - * coroutine can submit an I/O request to another device in response to - * request completion. Therefore we must keep looping until there was no - * more activity rather than simply draining each device independently. - */ - while (waited) { - waited = false; - - for (ctx = aio_ctxs; ctx != NULL; ctx = ctx->next) { - AioContext *aio_context = ctx->data; + /* Now poll the in-flight requests */ + AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll()); - aio_context_acquire(aio_context); - for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) { - if (aio_context == bdrv_get_aio_context(bs)) { - waited |= bdrv_drain_recurse(bs); - } - } - aio_context_release(aio_context); - } + while ((bs = bdrv_next_all_states(bs))) { + bdrv_drain_assert_idle(bs); } - - g_slist_free(aio_ctxs); } void bdrv_drain_all_end(void) { - BlockDriverState *bs; - BdrvNextIterator it; + BlockDriverState *bs = NULL; - for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) { + while ((bs = bdrv_next_all_states(bs))) { AioContext *aio_context = bdrv_get_aio_context(bs); - /* Re-enable things in child-to-parent order */ aio_context_acquire(aio_context); - bdrv_drain_invoke(bs, false, true); - bdrv_parent_drained_end(bs, NULL); - aio_enable_external(aio_context); + bdrv_do_drained_end(bs, false, NULL, true); aio_context_release(aio_context); } + + assert(bdrv_drain_all_count > 0); + bdrv_drain_all_count--; } void bdrv_drain_all(void) @@ -591,6 +678,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs) void bdrv_wakeup(BlockDriverState *bs) { aio_wait_kick(bdrv_get_aio_wait(bs)); + aio_wait_kick(&drain_all_aio_wait); } void bdrv_dec_in_flight(BlockDriverState *bs) diff --git a/block/mirror.c b/block/mirror.c index 435268bbbf..61bd9f3cf1 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -13,6 +13,8 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/coroutine.h" +#include "qemu/range.h" #include "trace.h" #include "block/blockjob_int.h" #include "block/block_int.h" @@ -33,11 +35,12 @@ typedef struct MirrorBuffer { QSIMPLEQ_ENTRY(MirrorBuffer) next; } MirrorBuffer; +typedef struct MirrorOp MirrorOp; + typedef struct MirrorBlockJob { BlockJob common; BlockBackend *target; BlockDriverState *mirror_top_bs; - BlockDriverState *source; BlockDriverState *base; /* The name of the graph node to replace */ @@ -48,8 +51,12 @@ typedef struct MirrorBlockJob { Error *replace_blocker; bool is_none_mode; BlockMirrorBackingMode backing_mode; + MirrorCopyMode copy_mode; BlockdevOnError on_source_error, on_target_error; bool synced; + /* Set when the target is synced (dirty bitmap is clean, nothing + * in flight) and the job is running in active mode */ + bool actively_synced; bool should_complete; int64_t granularity; size_t buf_size; @@ -65,25 +72,47 @@ typedef struct MirrorBlockJob { unsigned long *in_flight_bitmap; int in_flight; int64_t bytes_in_flight; + QTAILQ_HEAD(MirrorOpList, MirrorOp) ops_in_flight; int ret; bool unmap; - bool waiting_for_io; int target_cluster_size; int max_iov; bool initial_zeroing_ongoing; + int in_active_write_counter; } MirrorBlockJob; -typedef struct MirrorOp { +typedef struct MirrorBDSOpaque { + MirrorBlockJob *job; +} MirrorBDSOpaque; + +struct MirrorOp { MirrorBlockJob *s; QEMUIOVector qiov; int64_t offset; uint64_t bytes; -} MirrorOp; + + /* The pointee is set by mirror_co_read(), mirror_co_zero(), and + * mirror_co_discard() before yielding for the first time */ + int64_t *bytes_handled; + + bool is_pseudo_op; + bool is_active_write; + CoQueue waiting_requests; + + QTAILQ_ENTRY(MirrorOp) next; +}; + +typedef enum MirrorMethod { + MIRROR_METHOD_COPY, + MIRROR_METHOD_ZERO, + MIRROR_METHOD_DISCARD, +} MirrorMethod; static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, int error) { s->synced = false; + s->actively_synced = false; if (read) { return block_job_error_action(&s->common, s->on_source_error, true, error); @@ -93,7 +122,42 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, } } -static void mirror_iteration_done(MirrorOp *op, int ret) +static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self, + MirrorBlockJob *s, + uint64_t offset, + uint64_t bytes) +{ + uint64_t self_start_chunk = offset / s->granularity; + uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity); + uint64_t self_nb_chunks = self_end_chunk - self_start_chunk; + + while (find_next_bit(s->in_flight_bitmap, self_end_chunk, + self_start_chunk) < self_end_chunk && + s->ret >= 0) + { + MirrorOp *op; + + QTAILQ_FOREACH(op, &s->ops_in_flight, next) { + uint64_t op_start_chunk = op->offset / s->granularity; + uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes, + s->granularity) - + op_start_chunk; + + if (op == self) { + continue; + } + + if (ranges_overlap(self_start_chunk, self_nb_chunks, + op_start_chunk, op_nb_chunks)) + { + qemu_co_queue_wait(&op->waiting_requests, NULL); + break; + } + } + } +} + +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) { MirrorBlockJob *s = op->s; struct iovec *iov; @@ -113,7 +177,9 @@ static void mirror_iteration_done(MirrorOp *op, int ret) chunk_num = op->offset / s->granularity; nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); + bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks); + QTAILQ_REMOVE(&s->ops_in_flight, op, next); if (ret >= 0) { if (s->cow_bitmap) { bitmap_set(s->cow_bitmap, chunk_num, nb_chunks); @@ -123,16 +189,13 @@ static void mirror_iteration_done(MirrorOp *op, int ret) } } qemu_iovec_destroy(&op->qiov); - g_free(op); - if (s->waiting_for_io) { - qemu_coroutine_enter(s->common.job.co); - } + qemu_co_queue_restart_all(&op->waiting_requests); + g_free(op); } -static void mirror_write_complete(void *opaque, int ret) +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -149,9 +212,8 @@ static void mirror_write_complete(void *opaque, int ret) aio_context_release(blk_get_aio_context(s->common.blk)); } -static void mirror_read_complete(void *opaque, int ret) +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -166,8 +228,9 @@ static void mirror_read_complete(void *opaque, int ret) mirror_iteration_done(op, ret); } else { - blk_aio_pwritev(s->target, op->offset, &op->qiov, - 0, mirror_write_complete, op); + ret = blk_co_pwritev(s->target, op->offset, + op->qiov.size, &op->qiov, 0); + mirror_write_complete(op, ret); } aio_context_release(blk_get_aio_context(s->common.blk)); } @@ -216,68 +279,80 @@ static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset, return ret; } -static inline void mirror_wait_for_io(MirrorBlockJob *s) +static inline void mirror_wait_for_any_operation(MirrorBlockJob *s, bool active) { - assert(!s->waiting_for_io); - s->waiting_for_io = true; - qemu_coroutine_yield(); - s->waiting_for_io = false; + MirrorOp *op; + + QTAILQ_FOREACH(op, &s->ops_in_flight, next) { + /* Do not wait on pseudo ops, because it may in turn wait on + * some other operation to start, which may in fact be the + * caller of this function. Since there is only one pseudo op + * at any given time, we will always find some real operation + * to wait on. */ + if (!op->is_pseudo_op && op->is_active_write == active) { + qemu_co_queue_wait(&op->waiting_requests, NULL); + return; + } + } + abort(); } -/* Submit async read while handling COW. - * Returns: The number of bytes copied after and including offset, - * excluding any bytes copied prior to offset due to alignment. - * This will be @bytes if no alignment is necessary, or - * (new_end - offset) if tail is rounded up or down due to - * alignment or buffer limit. +static inline void mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s) +{ + /* Only non-active operations use up in-flight slots */ + mirror_wait_for_any_operation(s, false); +} + +/* Perform a mirror copy operation. + * + * *op->bytes_handled is set to the number of bytes copied after and + * including offset, excluding any bytes copied prior to offset due + * to alignment. This will be op->bytes if no alignment is necessary, + * or (new_end - op->offset) if the tail is rounded up or down due to + * alignment or buffer limit. */ -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, - uint64_t bytes) +static void coroutine_fn mirror_co_read(void *opaque) { - BlockBackend *source = s->common.blk; + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; int nb_chunks; uint64_t ret; - MirrorOp *op; uint64_t max_bytes; max_bytes = s->granularity * s->max_iov; /* We can only handle as much as buf_size at a time. */ - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); - assert(bytes); - assert(bytes < BDRV_REQUEST_MAX_BYTES); - ret = bytes; + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); + assert(op->bytes); + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); + *op->bytes_handled = op->bytes; if (s->cow_bitmap) { - ret += mirror_cow_align(s, &offset, &bytes); + *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); } - assert(bytes <= s->buf_size); + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ + assert(*op->bytes_handled <= UINT_MAX); + assert(op->bytes <= s->buf_size); /* The offset is granularity-aligned because: * 1) Caller passes in aligned values; * 2) mirror_cow_align is used only when target cluster is larger. */ - assert(QEMU_IS_ALIGNED(offset, s->granularity)); + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); /* The range is sector-aligned, since bdrv_getlength() rounds up. */ - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); while (s->buf_free_count < nb_chunks) { - trace_mirror_yield_in_flight(s, offset, s->in_flight); - mirror_wait_for_io(s); + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); + mirror_wait_for_free_in_flight_slot(s); } - /* Allocate a MirrorOp that is used as an AIO callback. */ - op = g_new(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; - /* Now make a QEMUIOVector taking enough granularity-sized chunks * from s->buf_free. */ qemu_iovec_init(&op->qiov, nb_chunks); while (nb_chunks-- > 0) { MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); - size_t remaining = bytes - op->qiov.size; + size_t remaining = op->bytes - op->qiov.size; QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); s->buf_free_count--; @@ -286,44 +361,92 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, /* Copy the dirty cluster. */ s->in_flight++; - s->bytes_in_flight += bytes; - trace_mirror_one_iteration(s, offset, bytes); + s->bytes_in_flight += op->bytes; + trace_mirror_one_iteration(s, op->offset, op->bytes); - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); - return ret; + ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes, + &op->qiov, 0); + mirror_read_complete(op, ret); } -static void mirror_do_zero_or_discard(MirrorBlockJob *s, - int64_t offset, - uint64_t bytes, - bool is_discard) +static void coroutine_fn mirror_co_zero(void *opaque) { - MirrorOp *op; + MirrorOp *op = opaque; + int ret; - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed - * so the freeing in mirror_iteration_done is nop. */ - op = g_new0(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; - s->in_flight++; - s->bytes_in_flight += bytes; - if (is_discard) { - blk_aio_pdiscard(s->target, offset, - op->bytes, mirror_write_complete, op); - } else { - blk_aio_pwrite_zeroes(s->target, offset, - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, - mirror_write_complete, op); + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); + mirror_write_complete(op, ret); +} + +static void coroutine_fn mirror_co_discard(void *opaque) +{ + MirrorOp *op = opaque; + int ret; + + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; + + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); + mirror_write_complete(op, ret); +} + +static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, + unsigned bytes, MirrorMethod mirror_method) +{ + MirrorOp *op; + Coroutine *co; + int64_t bytes_handled = -1; + + op = g_new(MirrorOp, 1); + *op = (MirrorOp){ + .s = s, + .offset = offset, + .bytes = bytes, + .bytes_handled = &bytes_handled, + }; + qemu_co_queue_init(&op->waiting_requests); + + switch (mirror_method) { + case MIRROR_METHOD_COPY: + co = qemu_coroutine_create(mirror_co_read, op); + break; + case MIRROR_METHOD_ZERO: + co = qemu_coroutine_create(mirror_co_zero, op); + break; + case MIRROR_METHOD_DISCARD: + co = qemu_coroutine_create(mirror_co_discard, op); + break; + default: + abort(); } + + QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next); + qemu_coroutine_enter(co); + /* At this point, ownership of op has been moved to the coroutine + * and the object may already be freed */ + + /* Assert that this value has been set */ + assert(bytes_handled >= 0); + + /* Same assertion as in mirror_co_read() (and for mirror_co_read() + * and mirror_co_discard(), bytes_handled == op->bytes, which + * is the @bytes parameter given to this function) */ + assert(bytes_handled <= UINT_MAX); + return bytes_handled; } static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) { - BlockDriverState *source = s->source; - int64_t offset, first_chunk; - uint64_t delay_ns = 0; + BlockDriverState *source = s->mirror_top_bs->backing->bs; + MirrorOp *pseudo_op; + int64_t offset; + uint64_t delay_ns = 0, ret = 0; /* At least the first dirty chunk is mirrored in one iteration. */ int nb_chunks = 1; bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target)); @@ -339,11 +462,7 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) } bdrv_dirty_bitmap_unlock(s->dirty_bitmap); - first_chunk = offset / s->granularity; - while (test_bit(first_chunk, s->in_flight_bitmap)) { - trace_mirror_yield_in_flight(s, offset, s->in_flight); - mirror_wait_for_io(s); - } + mirror_wait_on_conflicts(NULL, s, offset, 1); job_pause_point(&s->common.job); @@ -380,16 +499,27 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) nb_chunks * s->granularity); bdrv_dirty_bitmap_unlock(s->dirty_bitmap); + /* Before claiming an area in the in-flight bitmap, we have to + * create a MirrorOp for it so that conflicting requests can wait + * for it. mirror_perform() will create the real MirrorOps later, + * for now we just create a pseudo operation that will wake up all + * conflicting requests once all real operations have been + * launched. */ + pseudo_op = g_new(MirrorOp, 1); + *pseudo_op = (MirrorOp){ + .offset = offset, + .bytes = nb_chunks * s->granularity, + .is_pseudo_op = true, + }; + qemu_co_queue_init(&pseudo_op->waiting_requests); + QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next); + bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks); while (nb_chunks > 0 && offset < s->bdev_length) { int ret; int64_t io_bytes; int64_t io_bytes_acct; - enum MirrorMethod { - MIRROR_METHOD_COPY, - MIRROR_METHOD_ZERO, - MIRROR_METHOD_DISCARD - } mirror_method = MIRROR_METHOD_COPY; + MirrorMethod mirror_method = MIRROR_METHOD_COPY; assert(!(offset % s->granularity)); ret = bdrv_block_status_above(source, NULL, offset, @@ -419,37 +549,34 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) while (s->in_flight >= MAX_IN_FLIGHT) { trace_mirror_yield_in_flight(s, offset, s->in_flight); - mirror_wait_for_io(s); + mirror_wait_for_free_in_flight_slot(s); } if (s->ret < 0) { - return 0; + ret = 0; + goto fail; } io_bytes = mirror_clip_bytes(s, offset, io_bytes); - switch (mirror_method) { - case MIRROR_METHOD_COPY: - io_bytes = io_bytes_acct = mirror_do_read(s, offset, io_bytes); - break; - case MIRROR_METHOD_ZERO: - case MIRROR_METHOD_DISCARD: - mirror_do_zero_or_discard(s, offset, io_bytes, - mirror_method == MIRROR_METHOD_DISCARD); - if (write_zeroes_ok) { - io_bytes_acct = 0; - } else { - io_bytes_acct = io_bytes; - } - break; - default: - abort(); + io_bytes = mirror_perform(s, offset, io_bytes, mirror_method); + if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) { + io_bytes_acct = 0; + } else { + io_bytes_acct = io_bytes; } assert(io_bytes); offset += io_bytes; nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity); delay_ns = block_job_ratelimit_get_delay(&s->common, io_bytes_acct); } - return delay_ns; + + ret = delay_ns; +fail: + QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next); + qemu_co_queue_restart_all(&pseudo_op->waiting_requests); + g_free(pseudo_op); + + return ret; } static void mirror_free_init(MirrorBlockJob *s) @@ -476,7 +603,7 @@ static void mirror_free_init(MirrorBlockJob *s) static void mirror_wait_for_all_io(MirrorBlockJob *s) { while (s->in_flight > 0) { - mirror_wait_for_io(s); + mirror_wait_for_free_in_flight_slot(s); } } @@ -489,8 +616,9 @@ static void mirror_exit(Job *job, void *opaque) MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); BlockJob *bjob = &s->common; MirrorExitData *data = opaque; + MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque; AioContext *replace_aio_context = NULL; - BlockDriverState *src = s->source; + BlockDriverState *src = s->mirror_top_bs->backing->bs; BlockDriverState *target_bs = blk_bs(s->target); BlockDriverState *mirror_top_bs = s->mirror_top_bs; Error *local_err = NULL; @@ -581,6 +709,7 @@ static void mirror_exit(Job *job, void *opaque) blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort); blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort); + bs_opaque->job = NULL; job_completed(job, data->ret, NULL); g_free(data); @@ -605,7 +734,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s) { int64_t offset; BlockDriverState *base = s->base; - BlockDriverState *bs = s->source; + BlockDriverState *bs = s->mirror_top_bs->backing->bs; BlockDriverState *target_bs = blk_bs(s->target); int ret; int64_t count; @@ -631,11 +760,11 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s) if (s->in_flight >= MAX_IN_FLIGHT) { trace_mirror_yield(s, UINT64_MAX, s->buf_free_count, s->in_flight); - mirror_wait_for_io(s); + mirror_wait_for_free_in_flight_slot(s); continue; } - mirror_do_zero_or_discard(s, offset, bytes, false); + mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO); offset += bytes; } @@ -687,7 +816,7 @@ static void coroutine_fn mirror_run(void *opaque) { MirrorBlockJob *s = opaque; MirrorExitData *data; - BlockDriverState *bs = s->source; + BlockDriverState *bs = s->mirror_top_bs->backing->bs; BlockDriverState *target_bs = blk_bs(s->target); bool need_drain = true; int64_t length; @@ -730,6 +859,7 @@ static void coroutine_fn mirror_run(void *opaque) /* Transition to the READY state and wait for complete. */ job_transition_to_ready(&s->common.job); s->synced = true; + s->actively_synced = true; while (!job_is_cancelled(&s->common.job) && !s->should_complete) { job_yield(&s->common.job); } @@ -781,6 +911,12 @@ static void coroutine_fn mirror_run(void *opaque) int64_t cnt, delta; bool should_complete; + /* Do not start passive operations while there are active + * writes in progress */ + while (s->in_active_write_counter) { + mirror_wait_for_any_operation(s, true); + } + if (s->ret < 0) { ret = s->ret; goto immediate_exit; @@ -804,7 +940,7 @@ static void coroutine_fn mirror_run(void *opaque) if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 || (cnt == 0 && s->in_flight > 0)) { trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight); - mirror_wait_for_io(s); + mirror_wait_for_free_in_flight_slot(s); continue; } else if (cnt != 0) { delay_ns = mirror_iteration(s); @@ -826,6 +962,9 @@ static void coroutine_fn mirror_run(void *opaque) */ job_transition_to_ready(&s->common.job); s->synced = true; + if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) { + s->actively_synced = true; + } } should_complete = s->should_complete || @@ -964,6 +1103,12 @@ static void mirror_pause(Job *job) mirror_wait_for_all_io(s); } +static bool mirror_drained_poll(BlockJob *job) +{ + MirrorBlockJob *s = container_of(job, MirrorBlockJob, common); + return !!s->in_flight; +} + static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context) { MirrorBlockJob *s = container_of(job, MirrorBlockJob, common); @@ -997,6 +1142,7 @@ static const BlockJobDriver mirror_job_driver = { .pause = mirror_pause, .complete = mirror_complete, }, + .drained_poll = mirror_drained_poll, .attached_aio_context = mirror_attached_aio_context, .drain = mirror_drain, }; @@ -1012,20 +1158,237 @@ static const BlockJobDriver commit_active_job_driver = { .pause = mirror_pause, .complete = mirror_complete, }, + .drained_poll = mirror_drained_poll, .attached_aio_context = mirror_attached_aio_context, .drain = mirror_drain, }; +static void do_sync_target_write(MirrorBlockJob *job, MirrorMethod method, + uint64_t offset, uint64_t bytes, + QEMUIOVector *qiov, int flags) +{ + BdrvDirtyBitmapIter *iter; + QEMUIOVector target_qiov; + uint64_t dirty_offset; + int dirty_bytes; + + if (qiov) { + qemu_iovec_init(&target_qiov, qiov->niov); + } + + iter = bdrv_dirty_iter_new(job->dirty_bitmap); + bdrv_set_dirty_iter(iter, offset); + + while (true) { + bool valid_area; + int ret; + + bdrv_dirty_bitmap_lock(job->dirty_bitmap); + valid_area = bdrv_dirty_iter_next_area(iter, offset + bytes, + &dirty_offset, &dirty_bytes); + if (!valid_area) { + bdrv_dirty_bitmap_unlock(job->dirty_bitmap); + break; + } + + bdrv_reset_dirty_bitmap_locked(job->dirty_bitmap, + dirty_offset, dirty_bytes); + bdrv_dirty_bitmap_unlock(job->dirty_bitmap); + + job_progress_increase_remaining(&job->common.job, dirty_bytes); + + assert(dirty_offset - offset <= SIZE_MAX); + if (qiov) { + qemu_iovec_reset(&target_qiov); + qemu_iovec_concat(&target_qiov, qiov, + dirty_offset - offset, dirty_bytes); + } + + switch (method) { + case MIRROR_METHOD_COPY: + ret = blk_co_pwritev(job->target, dirty_offset, dirty_bytes, + qiov ? &target_qiov : NULL, flags); + break; + + case MIRROR_METHOD_ZERO: + assert(!qiov); + ret = blk_co_pwrite_zeroes(job->target, dirty_offset, dirty_bytes, + flags); + break; + + case MIRROR_METHOD_DISCARD: + assert(!qiov); + ret = blk_co_pdiscard(job->target, dirty_offset, dirty_bytes); + break; + + default: + abort(); + } + + if (ret >= 0) { + job_progress_update(&job->common.job, dirty_bytes); + } else { + BlockErrorAction action; + + bdrv_set_dirty_bitmap(job->dirty_bitmap, dirty_offset, dirty_bytes); + job->actively_synced = false; + + action = mirror_error_action(job, false, -ret); + if (action == BLOCK_ERROR_ACTION_REPORT) { + if (!job->ret) { + job->ret = ret; + } + break; + } + } + } + + bdrv_dirty_iter_free(iter); + if (qiov) { + qemu_iovec_destroy(&target_qiov); + } +} + +static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s, + uint64_t offset, + uint64_t bytes) +{ + MirrorOp *op; + uint64_t start_chunk = offset / s->granularity; + uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity); + + op = g_new(MirrorOp, 1); + *op = (MirrorOp){ + .s = s, + .offset = offset, + .bytes = bytes, + .is_active_write = true, + }; + qemu_co_queue_init(&op->waiting_requests); + QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next); + + s->in_active_write_counter++; + + mirror_wait_on_conflicts(op, s, offset, bytes); + + bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk); + + return op; +} + +static void coroutine_fn active_write_settle(MirrorOp *op) +{ + uint64_t start_chunk = op->offset / op->s->granularity; + uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes, + op->s->granularity); + + if (!--op->s->in_active_write_counter && op->s->actively_synced) { + BdrvChild *source = op->s->mirror_top_bs->backing; + + if (QLIST_FIRST(&source->bs->parents) == source && + QLIST_NEXT(source, next_parent) == NULL) + { + /* Assert that we are back in sync once all active write + * operations are settled. + * Note that we can only assert this if the mirror node + * is the source node's only parent. */ + assert(!bdrv_get_dirty_count(op->s->dirty_bitmap)); + } + } + bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk); + QTAILQ_REMOVE(&op->s->ops_in_flight, op, next); + qemu_co_queue_restart_all(&op->waiting_requests); + g_free(op); +} + static int coroutine_fn bdrv_mirror_top_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) { return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags); } +static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs, + MirrorMethod method, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, + int flags) +{ + MirrorOp *op = NULL; + MirrorBDSOpaque *s = bs->opaque; + int ret = 0; + bool copy_to_target; + + copy_to_target = s->job->ret >= 0 && + s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING; + + if (copy_to_target) { + op = active_write_prepare(s->job, offset, bytes); + } + + switch (method) { + case MIRROR_METHOD_COPY: + ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags); + break; + + case MIRROR_METHOD_ZERO: + ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags); + break; + + case MIRROR_METHOD_DISCARD: + ret = bdrv_co_pdiscard(bs->backing->bs, offset, bytes); + break; + + default: + abort(); + } + + if (ret < 0) { + goto out; + } + + if (copy_to_target) { + do_sync_target_write(s->job, method, offset, bytes, qiov, flags); + } + +out: + if (copy_to_target) { + active_write_settle(op); + } + return ret; +} + static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) { - return bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags); + MirrorBDSOpaque *s = bs->opaque; + QEMUIOVector bounce_qiov; + void *bounce_buf; + int ret = 0; + bool copy_to_target; + + copy_to_target = s->job->ret >= 0 && + s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING; + + if (copy_to_target) { + /* The guest might concurrently modify the data to write; but + * the data on source and destination must match, so we have + * to use a bounce buffer if we are going to write to the + * target now. */ + bounce_buf = qemu_blockalign(bs, bytes); + iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes); + + qemu_iovec_init(&bounce_qiov, 1); + qemu_iovec_add(&bounce_qiov, bounce_buf, bytes); + qiov = &bounce_qiov; + } + + ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov, + flags); + + if (copy_to_target) { + qemu_iovec_destroy(&bounce_qiov); + qemu_vfree(bounce_buf); + } + + return ret; } static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs) @@ -1040,13 +1403,15 @@ static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs) static int coroutine_fn bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int bytes, BdrvRequestFlags flags) { - return bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags); + return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL, + flags); } static int coroutine_fn bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int bytes) { - return bdrv_co_pdiscard(bs->backing->bs, offset, bytes); + return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes, + NULL, 0); } static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs, QDict *opts) @@ -1108,10 +1473,11 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, const BlockJobDriver *driver, bool is_none_mode, BlockDriverState *base, bool auto_complete, const char *filter_node_name, - bool is_mirror, + bool is_mirror, MirrorCopyMode copy_mode, Error **errp) { MirrorBlockJob *s; + MirrorBDSOpaque *bs_opaque; BlockDriverState *mirror_top_bs; bool target_graph_mod; bool target_is_backing; @@ -1147,6 +1513,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, mirror_top_bs->total_sectors = bs->total_sectors; mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED; + bs_opaque = g_new0(MirrorBDSOpaque, 1); + mirror_top_bs->opaque = bs_opaque; bdrv_set_aio_context(mirror_top_bs, bdrv_get_aio_context(bs)); /* bdrv_append takes ownership of the mirror_top_bs reference, need to keep @@ -1171,10 +1539,11 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, if (!s) { goto fail; } + bs_opaque->job = s; + /* The block job now has a reference to this node */ bdrv_unref(mirror_top_bs); - s->source = bs; s->mirror_top_bs = mirror_top_bs; /* No resize for the target either; while the mirror is still running, a @@ -1212,6 +1581,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, s->on_target_error = on_target_error; s->is_none_mode = is_none_mode; s->backing_mode = backing_mode; + s->copy_mode = copy_mode; s->base = base; s->granularity = granularity; s->buf_size = ROUND_UP(buf_size, granularity); @@ -1247,6 +1617,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs, } } + QTAILQ_INIT(&s->ops_in_flight); + trace_mirror_start(bs, s, opaque); job_start(&s->common.job); return; @@ -1259,6 +1631,7 @@ fail: g_free(s->replaces); blk_unref(s->target); + bs_opaque->job = NULL; job_early_fail(&s->common.job); } @@ -1275,7 +1648,8 @@ void mirror_start(const char *job_id, BlockDriverState *bs, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, BlockdevOnError on_source_error, BlockdevOnError on_target_error, - bool unmap, const char *filter_node_name, Error **errp) + bool unmap, const char *filter_node_name, + MirrorCopyMode copy_mode, Error **errp) { bool is_none_mode; BlockDriverState *base; @@ -1290,7 +1664,7 @@ void mirror_start(const char *job_id, BlockDriverState *bs, speed, granularity, buf_size, backing_mode, on_source_error, on_target_error, unmap, NULL, NULL, &mirror_job_driver, is_none_mode, base, false, - filter_node_name, true, errp); + filter_node_name, true, copy_mode, errp); } void commit_active_start(const char *job_id, BlockDriverState *bs, @@ -1313,7 +1687,8 @@ void commit_active_start(const char *job_id, BlockDriverState *bs, MIRROR_LEAVE_BACKING_CHAIN, on_error, on_error, true, cb, opaque, &commit_active_job_driver, false, base, auto_complete, - filter_node_name, false, &local_err); + filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND, + &local_err); if (local_err) { error_propagate(errp, local_err); goto error_restore_flags; diff --git a/block/vvfat.c b/block/vvfat.c index 4595f335b8..c7d2ed2d96 100644 --- a/block/vvfat.c +++ b/block/vvfat.c @@ -3134,6 +3134,7 @@ static void vvfat_qcow_options(int *child_flags, QDict *child_options, } static const BdrvChildRole child_vvfat_qcow = { + .parent_is_bds = true, .inherit_options = vvfat_qcow_options, }; diff --git a/blockdev.c b/blockdev.c index 7f65cd7497..58d7570932 100644 --- a/blockdev.c +++ b/blockdev.c @@ -3586,6 +3586,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, bool has_unmap, bool unmap, bool has_filter_node_name, const char *filter_node_name, + bool has_copy_mode, MirrorCopyMode copy_mode, Error **errp) { @@ -3610,6 +3611,9 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, if (!has_filter_node_name) { filter_node_name = NULL; } + if (!has_copy_mode) { + copy_mode = MIRROR_COPY_MODE_BACKGROUND; + } if (granularity != 0 && (granularity < 512 || granularity > 1048576 * 64)) { error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "granularity", @@ -3640,7 +3644,7 @@ static void blockdev_mirror_common(const char *job_id, BlockDriverState *bs, has_replaces ? replaces : NULL, speed, granularity, buf_size, sync, backing_mode, on_source_error, on_target_error, unmap, filter_node_name, - errp); + copy_mode, errp); } void qmp_drive_mirror(DriveMirror *arg, Error **errp) @@ -3786,6 +3790,7 @@ void qmp_drive_mirror(DriveMirror *arg, Error **errp) arg->has_on_target_error, arg->on_target_error, arg->has_unmap, arg->unmap, false, NULL, + arg->has_copy_mode, arg->copy_mode, &local_err); bdrv_unref(target_bs); error_propagate(errp, local_err); @@ -3806,6 +3811,7 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id, BlockdevOnError on_target_error, bool has_filter_node_name, const char *filter_node_name, + bool has_copy_mode, MirrorCopyMode copy_mode, Error **errp) { BlockDriverState *bs; @@ -3838,6 +3844,7 @@ void qmp_blockdev_mirror(bool has_job_id, const char *job_id, has_on_target_error, on_target_error, true, true, has_filter_node_name, filter_node_name, + has_copy_mode, copy_mode, &local_err); error_propagate(errp, local_err); diff --git a/blockjob.c b/blockjob.c index 0306533a2e..be5903aa96 100644 --- a/blockjob.c +++ b/blockjob.c @@ -155,6 +155,28 @@ static void child_job_drained_begin(BdrvChild *c) job_pause(&job->job); } +static bool child_job_drained_poll(BdrvChild *c) +{ + BlockJob *bjob = c->opaque; + Job *job = &bjob->job; + const BlockJobDriver *drv = block_job_driver(bjob); + + /* An inactive or completed job doesn't have any pending requests. Jobs + * with !job->busy are either already paused or have a pause point after + * being reentered, so no job driver code will run before they pause. */ + if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) { + return false; + } + + /* Otherwise, assume that it isn't fully stopped yet, but allow the job to + * override this assumption. */ + if (drv->drained_poll) { + return drv->drained_poll(bjob); + } else { + return true; + } +} + static void child_job_drained_end(BdrvChild *c) { BlockJob *job = c->opaque; @@ -164,6 +186,7 @@ static void child_job_drained_end(BdrvChild *c) static const BdrvChildRole child_job = { .get_parent_desc = child_job_get_parent_desc, .drained_begin = child_job_drained_begin, + .drained_poll = child_job_drained_poll, .drained_end = child_job_drained_end, .stay_at_node = true, }; diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h index 8c90a2e66e..c85a62f798 100644 --- a/include/block/aio-wait.h +++ b/include/block/aio-wait.h @@ -57,7 +57,8 @@ typedef struct { /** * AIO_WAIT_WHILE: * @wait: the aio wait object - * @ctx: the aio context + * @ctx: the aio context, or NULL if multiple aio contexts (for which the + * caller does not hold a lock) are involved in the polling condition. * @cond: wait while this conditional expression is true * * Wait while a condition is true. Use this to implement synchronous @@ -73,29 +74,27 @@ typedef struct { */ #define AIO_WAIT_WHILE(wait, ctx, cond) ({ \ bool waited_ = false; \ - bool busy_ = true; \ AioWait *wait_ = (wait); \ AioContext *ctx_ = (ctx); \ - if (in_aio_context_home_thread(ctx_)) { \ - while ((cond) || busy_) { \ - busy_ = aio_poll(ctx_, (cond)); \ - waited_ |= !!(cond) | busy_; \ + if (ctx_ && in_aio_context_home_thread(ctx_)) { \ + while ((cond)) { \ + aio_poll(ctx_, true); \ + waited_ = true; \ } \ } else { \ assert(qemu_get_current_aio_context() == \ qemu_get_aio_context()); \ /* Increment wait_->num_waiters before evaluating cond. */ \ atomic_inc(&wait_->num_waiters); \ - while (busy_) { \ - if ((cond)) { \ - waited_ = busy_ = true; \ + while ((cond)) { \ + if (ctx_) { \ aio_context_release(ctx_); \ - aio_poll(qemu_get_aio_context(), true); \ + } \ + aio_poll(qemu_get_aio_context(), true); \ + if (ctx_) { \ aio_context_acquire(ctx_); \ - } else { \ - busy_ = aio_poll(ctx_, false); \ - waited_ |= busy_; \ } \ + waited_ = true; \ } \ atomic_dec(&wait_->num_waiters); \ } \ diff --git a/include/block/block.h b/include/block/block.h index e677080c4e..b1d6fdb97a 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -421,6 +421,7 @@ BlockDriverState *bdrv_lookup_bs(const char *device, Error **errp); bool bdrv_chain_contains(BlockDriverState *top, BlockDriverState *base); BlockDriverState *bdrv_next_node(BlockDriverState *bs); +BlockDriverState *bdrv_next_all_states(BlockDriverState *bs); typedef struct BdrvNextIterator { enum { @@ -557,7 +558,8 @@ void bdrv_io_unplug(BlockDriverState *bs); * Begin a quiesced section of all users of @bs. This is part of * bdrv_drained_begin. */ -void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore); +void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore, + bool ignore_bds_parents); /** * bdrv_parent_drained_end: @@ -565,7 +567,23 @@ void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore); * End a quiesced section of all users of @bs. This is part of * bdrv_drained_end. */ -void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore); +void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore, + bool ignore_bds_parents); + +/** + * bdrv_drain_poll: + * + * Poll for pending requests in @bs, its parents (except for @ignore_parent), + * and if @recursive is true its children as well (used for subtree drain). + * + * If @ignore_bds_parents is true, parents that are BlockDriverStates must + * ignore the drain request because they will be drained separately (used for + * drain_all). + * + * This is part of bdrv_drained_begin. + */ +bool bdrv_drain_poll(BlockDriverState *bs, bool recursive, + BdrvChild *ignore_parent, bool ignore_bds_parents); /** * bdrv_drained_begin: @@ -580,6 +598,15 @@ void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore); void bdrv_drained_begin(BlockDriverState *bs); /** + * bdrv_do_drained_begin_quiesce: + * + * Quiesces a BDS like bdrv_drained_begin(), but does not wait for already + * running requests to complete. + */ +void bdrv_do_drained_begin_quiesce(BlockDriverState *bs, + BdrvChild *parent, bool ignore_bds_parents); + +/** * Like bdrv_drained_begin, but recursively begins a quiesced section for * exclusive access to all child nodes as well. */ diff --git a/include/block/block_int.h b/include/block/block_int.h index 327e478a73..74646ed722 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -577,6 +577,12 @@ struct BdrvChildRole { * points to. */ bool stay_at_node; + /* If true, the parent is a BlockDriverState and bdrv_next_all_states() + * will return it. This information is used for drain_all, where every node + * will be drained separately, so the drain only needs to be propagated to + * non-BDS parents. */ + bool parent_is_bds; + void (*inherit_options)(int *child_flags, QDict *child_options, int parent_flags, QDict *parent_options); @@ -605,6 +611,13 @@ struct BdrvChildRole { void (*drained_begin)(BdrvChild *child); void (*drained_end)(BdrvChild *child); + /* + * Returns whether the parent has pending requests for the child. This + * callback is polled after .drained_begin() has been called until all + * activity on the child has stopped. + */ + bool (*drained_poll)(BdrvChild *child); + /* Notifies the parent that the child has been activated/inactivated (e.g. * when migration is completing) and it can start/stop requesting * permissions and doing I/O on it. */ @@ -841,6 +854,7 @@ int coroutine_fn bdrv_co_pwritev(BdrvChild *child, int64_t offset, unsigned int bytes, QEMUIOVector *qiov, BdrvRequestFlags flags); +extern unsigned int bdrv_drain_all_count; void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent); void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent); @@ -1017,6 +1031,7 @@ void commit_active_start(const char *job_id, BlockDriverState *bs, * @filter_node_name: The node name that should be assigned to the filter * driver that the mirror job inserts into the graph above @bs. NULL means that * a node name should be autogenerated. + * @copy_mode: When to trigger writes to the target. * @errp: Error object. * * Start a mirroring operation on @bs. Clusters that are allocated @@ -1030,7 +1045,8 @@ void mirror_start(const char *job_id, BlockDriverState *bs, MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, BlockdevOnError on_source_error, BlockdevOnError on_target_error, - bool unmap, const char *filter_node_name, Error **errp); + bool unmap, const char *filter_node_name, + MirrorCopyMode copy_mode, Error **errp); /* * backup_job_create: diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index 5cd50c6639..e4a318dd15 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -39,6 +39,14 @@ struct BlockJobDriver { JobDriver job_driver; /* + * Returns whether the job has pending requests for the child or will + * submit new requests before the next pause point. This callback is polled + * in the context of draining a job node after requesting that the job be + * paused, until all activity on the child has stopped. + */ + bool (*drained_poll)(BlockJob *job); + + /* * If the callback is not NULL, it will be invoked before the job is * resumed in a new AioContext. This is the place to move any resources * besides job->blk to the new AioContext. diff --git a/include/block/dirty-bitmap.h b/include/block/dirty-bitmap.h index 02e0cbabd2..288dc6adb6 100644 --- a/include/block/dirty-bitmap.h +++ b/include/block/dirty-bitmap.h @@ -82,6 +82,8 @@ void bdrv_set_dirty_bitmap_locked(BdrvDirtyBitmap *bitmap, void bdrv_reset_dirty_bitmap_locked(BdrvDirtyBitmap *bitmap, int64_t offset, int64_t bytes); int64_t bdrv_dirty_iter_next(BdrvDirtyBitmapIter *iter); +bool bdrv_dirty_iter_next_area(BdrvDirtyBitmapIter *iter, uint64_t max_offset, + uint64_t *offset, int *bytes); void bdrv_set_dirty_iter(BdrvDirtyBitmapIter *hbi, int64_t offset); int64_t bdrv_get_dirty_count(BdrvDirtyBitmap *bitmap); int64_t bdrv_get_meta_dirty_count(BdrvDirtyBitmap *bitmap); diff --git a/include/qemu/hbitmap.h b/include/qemu/hbitmap.h index 6b6490ecad..ddca52c48e 100644 --- a/include/qemu/hbitmap.h +++ b/include/qemu/hbitmap.h @@ -324,11 +324,14 @@ void hbitmap_free_meta(HBitmap *hb); /** * hbitmap_iter_next: * @hbi: HBitmapIter to operate on. + * @advance: If true, advance the iterator. Otherwise, the next call + * of this function will return the same result (if that + * position is still dirty). * * Return the next bit that is set in @hbi's associated HBitmap, * or -1 if all remaining bits are zero. */ -int64_t hbitmap_iter_next(HBitmapIter *hbi); +int64_t hbitmap_iter_next(HBitmapIter *hbi, bool advance); /** * hbitmap_iter_next_word: diff --git a/include/qemu/job.h b/include/qemu/job.h index 1d820530fa..18c9223e31 100644 --- a/include/qemu/job.h +++ b/include/qemu/job.h @@ -335,6 +335,21 @@ void job_progress_update(Job *job, uint64_t done); */ void job_progress_set_remaining(Job *job, uint64_t remaining); +/** + * @job: The job whose expected progress end value is updated + * @delta: Value which is to be added to the current expected end + * value + * + * Increases the expected end value of the progress counter of a job. + * This is useful for parenthesis operations: If a job has to + * conditionally perform a high-priority operation as part of its + * progress, it calls this function with the expected operation's + * length before, and job_progress_update() afterwards. + * (So the operation acts as a parenthesis in regards to the main job + * operation running in background.) + */ +void job_progress_increase_remaining(Job *job, uint64_t delta); + /** To be called when a cancelled job is finalised. */ void job_event_cancelled(Job *job); @@ -385,6 +385,11 @@ void job_progress_set_remaining(Job *job, uint64_t remaining) job->progress_total = job->progress_current + remaining; } +void job_progress_increase_remaining(Job *job, uint64_t delta) +{ + job->progress_total += delta; +} + void job_event_cancelled(Job *job) { notifier_list_notify(&job->on_finalize_cancelled, job); diff --git a/qapi/block-core.json b/qapi/block-core.json index ab629d1647..cc3ede0630 100644 --- a/qapi/block-core.json +++ b/qapi/block-core.json @@ -1051,6 +1051,24 @@ 'data': ['top', 'full', 'none', 'incremental'] } ## +# @MirrorCopyMode: +# +# An enumeration whose values tell the mirror block job when to +# trigger writes to the target. +# +# @background: copy data in background only. +# +# @write-blocking: when data is written to the source, write it +# (synchronously) to the target as well. In +# addition, data is copied in background just like in +# @background mode. +# +# Since: 3.0 +## +{ 'enum': 'MirrorCopyMode', + 'data': ['background', 'write-blocking'] } + +## # @BlockJobInfo: # # Information about a long-running block device operation. @@ -1692,6 +1710,9 @@ # written. Both will result in identical contents. # Default is true. (Since 2.4) # +# @copy-mode: when to copy data to the destination; defaults to 'background' +# (Since: 3.0) +# # Since: 1.3 ## { 'struct': 'DriveMirror', @@ -1701,7 +1722,7 @@ '*speed': 'int', '*granularity': 'uint32', '*buf-size': 'int', '*on-source-error': 'BlockdevOnError', '*on-target-error': 'BlockdevOnError', - '*unmap': 'bool' } } + '*unmap': 'bool', '*copy-mode': 'MirrorCopyMode' } } ## # @BlockDirtyBitmap: @@ -1964,6 +1985,9 @@ # above @device. If this option is not given, a node name is # autogenerated. (Since: 2.9) # +# @copy-mode: when to copy data to the destination; defaults to 'background' +# (Since: 3.0) +# # Returns: nothing on success. # # Since: 2.6 @@ -1984,7 +2008,8 @@ '*speed': 'int', '*granularity': 'uint32', '*buf-size': 'int', '*on-source-error': 'BlockdevOnError', '*on-target-error': 'BlockdevOnError', - '*filter-node-name': 'str' } } + '*filter-node-name': 'str', + '*copy-mode': 'MirrorCopyMode' } } ## # @block_set_io_throttle: diff --git a/tests/qemu-iotests/151 b/tests/qemu-iotests/151 new file mode 100755 index 0000000000..fe53b9f446 --- /dev/null +++ b/tests/qemu-iotests/151 @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# +# Tests for active mirroring +# +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import iotests +from iotests import qemu_img + +source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt) +target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt) + +class TestActiveMirror(iotests.QMPTestCase): + image_len = 128 * 1024 * 1024 # MB + potential_writes_in_flight = True + + def setUp(self): + qemu_img('create', '-f', iotests.imgfmt, source_img, '128M') + qemu_img('create', '-f', iotests.imgfmt, target_img, '128M') + + blk_source = {'id': 'source', + 'if': 'none', + 'node-name': 'source-node', + 'driver': iotests.imgfmt, + 'file': {'driver': 'file', + 'filename': source_img}} + + blk_target = {'node-name': 'target-node', + 'driver': iotests.imgfmt, + 'file': {'driver': 'file', + 'filename': target_img}} + + self.vm = iotests.VM() + self.vm.add_drive_raw(self.vm.qmp_to_opts(blk_source)) + self.vm.add_blockdev(self.vm.qmp_to_opts(blk_target)) + self.vm.add_device('virtio-blk,drive=source') + self.vm.launch() + + def tearDown(self): + self.vm.shutdown() + + if not self.potential_writes_in_flight: + self.assertTrue(iotests.compare_images(source_img, target_img), + 'mirror target does not match source') + + os.remove(source_img) + os.remove(target_img) + + def doActiveIO(self, sync_source_and_target): + # Fill the source image + self.vm.hmp_qemu_io('source', + 'write -P 1 0 %i' % self.image_len); + + # Start some background requests + for offset in range(1 * self.image_len / 8, 3 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -P 2 %i 1M' % offset) + for offset in range(2 * self.image_len / 8, 3 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset) + + # Start the block job + result = self.vm.qmp('blockdev-mirror', + job_id='mirror', + filter_node_name='mirror-node', + device='source-node', + target='target-node', + sync='full', + copy_mode='write-blocking') + self.assert_qmp(result, 'return', {}) + + # Start some more requests + for offset in range(3 * self.image_len / 8, 5 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -P 3 %i 1M' % offset) + for offset in range(4 * self.image_len / 8, 5 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset) + + # Wait for the READY event + self.wait_ready(drive='mirror') + + # Now start some final requests; all of these (which land on + # the source) should be settled using the active mechanism. + # The mirror code itself asserts that the source BDS's dirty + # bitmap will stay clean between READY and COMPLETED. + for offset in range(5 * self.image_len / 8, 7 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -P 3 %i 1M' % offset) + for offset in range(6 * self.image_len / 8, 7 * self.image_len / 8, 1024 * 1024): + self.vm.hmp_qemu_io('source', 'aio_write -z %i 1M' % offset) + + if sync_source_and_target: + # If source and target should be in sync after the mirror, + # we have to flush before completion + self.vm.hmp_qemu_io('source', 'aio_flush') + self.potential_writes_in_flight = False + + self.complete_and_wait(drive='mirror', wait_ready=False) + + def testActiveIO(self): + self.doActiveIO(False) + + def testActiveIOFlushed(self): + self.doActiveIO(True) + + + +if __name__ == '__main__': + iotests.main(supported_fmts=['qcow2', 'raw']) diff --git a/tests/qemu-iotests/151.out b/tests/qemu-iotests/151.out new file mode 100644 index 0000000000..fbc63e62f8 --- /dev/null +++ b/tests/qemu-iotests/151.out @@ -0,0 +1,5 @@ +.. +---------------------------------------------------------------------- +Ran 2 tests + +OK diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group index 937a3d0a4d..eea75819d2 100644 --- a/tests/qemu-iotests/group +++ b/tests/qemu-iotests/group @@ -157,6 +157,7 @@ 148 rw auto quick 149 rw auto sudo 150 rw auto quick +151 rw auto 152 rw auto quick 153 rw auto quick 154 rw auto backing quick diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c index a11c4cfbf2..291a050f86 100644 --- a/tests/test-bdrv-drain.c +++ b/tests/test-bdrv-drain.c @@ -27,15 +27,23 @@ #include "block/blockjob_int.h" #include "sysemu/block-backend.h" #include "qapi/error.h" +#include "iothread.h" + +static QemuEvent done_event; typedef struct BDRVTestState { int drain_count; + AioContext *bh_indirection_ctx; + bool sleep_in_drain_begin; } BDRVTestState; static void coroutine_fn bdrv_test_co_drain_begin(BlockDriverState *bs) { BDRVTestState *s = bs->opaque; s->drain_count++; + if (s->sleep_in_drain_begin) { + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000); + } } static void coroutine_fn bdrv_test_co_drain_end(BlockDriverState *bs) @@ -50,19 +58,48 @@ static void bdrv_test_close(BlockDriverState *bs) g_assert_cmpint(s->drain_count, >, 0); } +static void co_reenter_bh(void *opaque) +{ + aio_co_wake(opaque); +} + static int coroutine_fn bdrv_test_co_preadv(BlockDriverState *bs, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) { + BDRVTestState *s = bs->opaque; + /* We want this request to stay until the polling loop in drain waits for * it to complete. We need to sleep a while as bdrv_drain_invoke() comes * first and polls its result, too, but it shouldn't accidentally complete * this request yet. */ qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000); + if (s->bh_indirection_ctx) { + aio_bh_schedule_oneshot(s->bh_indirection_ctx, co_reenter_bh, + qemu_coroutine_self()); + qemu_coroutine_yield(); + } + return 0; } +static void bdrv_test_child_perm(BlockDriverState *bs, BdrvChild *c, + const BdrvChildRole *role, + BlockReopenQueue *reopen_queue, + uint64_t perm, uint64_t shared, + uint64_t *nperm, uint64_t *nshared) +{ + /* bdrv_format_default_perms() accepts only these two, so disguise + * detach_by_driver_cb_role as one of them. */ + if (role != &child_file && role != &child_backing) { + role = &child_file; + } + + bdrv_format_default_perms(bs, c, role, reopen_queue, perm, shared, + nperm, nshared); +} + static BlockDriver bdrv_test = { .format_name = "test", .instance_size = sizeof(BDRVTestState), @@ -73,7 +110,7 @@ static BlockDriver bdrv_test = { .bdrv_co_drain_begin = bdrv_test_co_drain_begin, .bdrv_co_drain_end = bdrv_test_co_drain_end, - .bdrv_child_perm = bdrv_format_default_perms, + .bdrv_child_perm = bdrv_test_child_perm, }; static void aio_ret_cb(void *opaque, int ret) @@ -216,6 +253,11 @@ static void test_drv_cb_drain_subtree(void) test_drv_cb_common(BDRV_SUBTREE_DRAIN, true); } +static void test_drv_cb_co_drain_all(void) +{ + call_in_coroutine(test_drv_cb_drain_all); +} + static void test_drv_cb_co_drain(void) { call_in_coroutine(test_drv_cb_drain); @@ -259,8 +301,7 @@ static void test_quiesce_common(enum drain_type drain_type, bool recursive) static void test_quiesce_drain_all(void) { - // XXX drain_all doesn't quiesce - //test_quiesce_common(BDRV_DRAIN_ALL, true); + test_quiesce_common(BDRV_DRAIN_ALL, true); } static void test_quiesce_drain(void) @@ -273,6 +314,11 @@ static void test_quiesce_drain_subtree(void) test_quiesce_common(BDRV_SUBTREE_DRAIN, true); } +static void test_quiesce_co_drain_all(void) +{ + call_in_coroutine(test_quiesce_drain_all); +} + static void test_quiesce_co_drain(void) { call_in_coroutine(test_quiesce_drain); @@ -302,12 +348,7 @@ static void test_nested(void) for (outer = 0; outer < DRAIN_TYPE_MAX; outer++) { for (inner = 0; inner < DRAIN_TYPE_MAX; inner++) { - /* XXX bdrv_drain_all() doesn't increase the quiesce_counter */ - int bs_quiesce = (outer != BDRV_DRAIN_ALL) + - (inner != BDRV_DRAIN_ALL); - int backing_quiesce = (outer == BDRV_SUBTREE_DRAIN) + - (inner == BDRV_SUBTREE_DRAIN); - int backing_cb_cnt = (outer != BDRV_DRAIN) + + int backing_quiesce = (outer != BDRV_DRAIN) + (inner != BDRV_DRAIN); g_assert_cmpint(bs->quiesce_counter, ==, 0); @@ -318,10 +359,10 @@ static void test_nested(void) do_drain_begin(outer, bs); do_drain_begin(inner, bs); - g_assert_cmpint(bs->quiesce_counter, ==, bs_quiesce); + g_assert_cmpint(bs->quiesce_counter, ==, 2); g_assert_cmpint(backing->quiesce_counter, ==, backing_quiesce); g_assert_cmpint(s->drain_count, ==, 2); - g_assert_cmpint(backing_s->drain_count, ==, backing_cb_cnt); + g_assert_cmpint(backing_s->drain_count, ==, backing_quiesce); do_drain_end(inner, bs); do_drain_end(outer, bs); @@ -411,7 +452,7 @@ static void test_multiparent(void) blk_unref(blk_b); } -static void test_graph_change(void) +static void test_graph_change_drain_subtree(void) { BlockBackend *blk_a, *blk_b; BlockDriverState *bs_a, *bs_b, *backing; @@ -490,6 +531,221 @@ static void test_graph_change(void) blk_unref(blk_b); } +static void test_graph_change_drain_all(void) +{ + BlockBackend *blk_a, *blk_b; + BlockDriverState *bs_a, *bs_b; + BDRVTestState *a_s, *b_s; + + /* Create node A with a BlockBackend */ + blk_a = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); + bs_a = bdrv_new_open_driver(&bdrv_test, "test-node-a", BDRV_O_RDWR, + &error_abort); + a_s = bs_a->opaque; + blk_insert_bs(blk_a, bs_a, &error_abort); + + g_assert_cmpint(bs_a->quiesce_counter, ==, 0); + g_assert_cmpint(a_s->drain_count, ==, 0); + + /* Call bdrv_drain_all_begin() */ + bdrv_drain_all_begin(); + + g_assert_cmpint(bs_a->quiesce_counter, ==, 1); + g_assert_cmpint(a_s->drain_count, ==, 1); + + /* Create node B with a BlockBackend */ + blk_b = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); + bs_b = bdrv_new_open_driver(&bdrv_test, "test-node-b", BDRV_O_RDWR, + &error_abort); + b_s = bs_b->opaque; + blk_insert_bs(blk_b, bs_b, &error_abort); + + g_assert_cmpint(bs_a->quiesce_counter, ==, 1); + g_assert_cmpint(bs_b->quiesce_counter, ==, 1); + g_assert_cmpint(a_s->drain_count, ==, 1); + g_assert_cmpint(b_s->drain_count, ==, 1); + + /* Unref and finally delete node A */ + blk_unref(blk_a); + + g_assert_cmpint(bs_a->quiesce_counter, ==, 1); + g_assert_cmpint(bs_b->quiesce_counter, ==, 1); + g_assert_cmpint(a_s->drain_count, ==, 1); + g_assert_cmpint(b_s->drain_count, ==, 1); + + bdrv_unref(bs_a); + + g_assert_cmpint(bs_b->quiesce_counter, ==, 1); + g_assert_cmpint(b_s->drain_count, ==, 1); + + /* End the drained section */ + bdrv_drain_all_end(); + + g_assert_cmpint(bs_b->quiesce_counter, ==, 0); + g_assert_cmpint(b_s->drain_count, ==, 0); + + bdrv_unref(bs_b); + blk_unref(blk_b); +} + +struct test_iothread_data { + BlockDriverState *bs; + enum drain_type drain_type; + int *aio_ret; +}; + +static void test_iothread_drain_entry(void *opaque) +{ + struct test_iothread_data *data = opaque; + + aio_context_acquire(bdrv_get_aio_context(data->bs)); + do_drain_begin(data->drain_type, data->bs); + g_assert_cmpint(*data->aio_ret, ==, 0); + do_drain_end(data->drain_type, data->bs); + aio_context_release(bdrv_get_aio_context(data->bs)); + + qemu_event_set(&done_event); +} + +static void test_iothread_aio_cb(void *opaque, int ret) +{ + int *aio_ret = opaque; + *aio_ret = ret; + qemu_event_set(&done_event); +} + +/* + * Starts an AIO request on a BDS that runs in the AioContext of iothread 1. + * The request involves a BH on iothread 2 before it can complete. + * + * @drain_thread = 0 means that do_drain_begin/end are called from the main + * thread, @drain_thread = 1 means that they are called from iothread 1. Drain + * for this BDS cannot be called from iothread 2 because only the main thread + * may do cross-AioContext polling. + */ +static void test_iothread_common(enum drain_type drain_type, int drain_thread) +{ + BlockBackend *blk; + BlockDriverState *bs; + BDRVTestState *s; + BlockAIOCB *acb; + int aio_ret; + struct test_iothread_data data; + + IOThread *a = iothread_new(); + IOThread *b = iothread_new(); + AioContext *ctx_a = iothread_get_aio_context(a); + AioContext *ctx_b = iothread_get_aio_context(b); + + QEMUIOVector qiov; + struct iovec iov = { + .iov_base = NULL, + .iov_len = 0, + }; + qemu_iovec_init_external(&qiov, &iov, 1); + + /* bdrv_drain_all() may only be called from the main loop thread */ + if (drain_type == BDRV_DRAIN_ALL && drain_thread != 0) { + goto out; + } + + blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); + bs = bdrv_new_open_driver(&bdrv_test, "test-node", BDRV_O_RDWR, + &error_abort); + s = bs->opaque; + blk_insert_bs(blk, bs, &error_abort); + + blk_set_aio_context(blk, ctx_a); + aio_context_acquire(ctx_a); + + s->bh_indirection_ctx = ctx_b; + + aio_ret = -EINPROGRESS; + if (drain_thread == 0) { + acb = blk_aio_preadv(blk, 0, &qiov, 0, test_iothread_aio_cb, &aio_ret); + } else { + acb = blk_aio_preadv(blk, 0, &qiov, 0, aio_ret_cb, &aio_ret); + } + g_assert(acb != NULL); + g_assert_cmpint(aio_ret, ==, -EINPROGRESS); + + aio_context_release(ctx_a); + + data = (struct test_iothread_data) { + .bs = bs, + .drain_type = drain_type, + .aio_ret = &aio_ret, + }; + + switch (drain_thread) { + case 0: + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_acquire(ctx_a); + } + + /* The request is running on the IOThread a. Draining its block device + * will make sure that it has completed as far as the BDS is concerned, + * but the drain in this thread can continue immediately after + * bdrv_dec_in_flight() and aio_ret might be assigned only slightly + * later. */ + qemu_event_reset(&done_event); + do_drain_begin(drain_type, bs); + g_assert_cmpint(bs->in_flight, ==, 0); + + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_release(ctx_a); + } + qemu_event_wait(&done_event); + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_acquire(ctx_a); + } + + g_assert_cmpint(aio_ret, ==, 0); + do_drain_end(drain_type, bs); + + if (drain_type != BDRV_DRAIN_ALL) { + aio_context_release(ctx_a); + } + break; + case 1: + qemu_event_reset(&done_event); + aio_bh_schedule_oneshot(ctx_a, test_iothread_drain_entry, &data); + qemu_event_wait(&done_event); + break; + default: + g_assert_not_reached(); + } + + aio_context_acquire(ctx_a); + blk_set_aio_context(blk, qemu_get_aio_context()); + aio_context_release(ctx_a); + + bdrv_unref(bs); + blk_unref(blk); + +out: + iothread_join(a); + iothread_join(b); +} + +static void test_iothread_drain_all(void) +{ + test_iothread_common(BDRV_DRAIN_ALL, 0); + test_iothread_common(BDRV_DRAIN_ALL, 1); +} + +static void test_iothread_drain(void) +{ + test_iothread_common(BDRV_DRAIN, 0); + test_iothread_common(BDRV_DRAIN, 1); +} + +static void test_iothread_drain_subtree(void) +{ + test_iothread_common(BDRV_SUBTREE_DRAIN, 0); + test_iothread_common(BDRV_SUBTREE_DRAIN, 1); +} + typedef struct TestBlockJob { BlockJob common; @@ -507,7 +763,11 @@ static void coroutine_fn test_job_start(void *opaque) job_transition_to_ready(&s->common.job); while (!s->should_complete) { - job_sleep_ns(&s->common.job, 100000); + /* Avoid block_job_sleep_ns() because it marks the job as !busy. We + * want to emulate some actual activity (probably some I/O) here so + * that drain has to wait for this acitivity to stop. */ + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000); + job_pause_point(&s->common.job); } job_defer_to_main_loop(&s->common.job, test_job_completed, NULL); @@ -554,7 +814,7 @@ static void test_blockjob_common(enum drain_type drain_type) g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); - g_assert_false(job->job.busy); /* We're in job_sleep_ns() */ + g_assert_true(job->job.busy); /* We're in job_sleep_ns() */ do_drain_begin(drain_type, src); @@ -564,15 +824,14 @@ static void test_blockjob_common(enum drain_type drain_type) } else { g_assert_cmpint(job->job.pause_count, ==, 1); } - /* XXX We don't wait until the job is actually paused. Is this okay? */ - /* g_assert_true(job->job.paused); */ + g_assert_true(job->job.paused); g_assert_false(job->job.busy); /* The job is paused */ do_drain_end(drain_type, src); g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); - g_assert_false(job->job.busy); /* We're in job_sleep_ns() */ + g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */ do_drain_begin(drain_type, target); @@ -582,15 +841,14 @@ static void test_blockjob_common(enum drain_type drain_type) } else { g_assert_cmpint(job->job.pause_count, ==, 1); } - /* XXX We don't wait until the job is actually paused. Is this okay? */ - /* g_assert_true(job->job.paused); */ + g_assert_true(job->job.paused); g_assert_false(job->job.busy); /* The job is paused */ do_drain_end(drain_type, target); g_assert_cmpint(job->job.pause_count, ==, 0); g_assert_false(job->job.paused); - g_assert_false(job->job.busy); /* We're in job_sleep_ns() */ + g_assert_true(job->job.busy); /* We're in job_sleep_ns() */ ret = job_complete_sync(&job->job, &error_abort); g_assert_cmpint(ret, ==, 0); @@ -616,19 +874,399 @@ static void test_blockjob_drain_subtree(void) test_blockjob_common(BDRV_SUBTREE_DRAIN); } + +typedef struct BDRVTestTopState { + BdrvChild *wait_child; +} BDRVTestTopState; + +static void bdrv_test_top_close(BlockDriverState *bs) +{ + BdrvChild *c, *next_c; + QLIST_FOREACH_SAFE(c, &bs->children, next, next_c) { + bdrv_unref_child(bs, c); + } +} + +static int coroutine_fn bdrv_test_top_co_preadv(BlockDriverState *bs, + uint64_t offset, uint64_t bytes, + QEMUIOVector *qiov, int flags) +{ + BDRVTestTopState *tts = bs->opaque; + return bdrv_co_preadv(tts->wait_child, offset, bytes, qiov, flags); +} + +static BlockDriver bdrv_test_top_driver = { + .format_name = "test_top_driver", + .instance_size = sizeof(BDRVTestTopState), + + .bdrv_close = bdrv_test_top_close, + .bdrv_co_preadv = bdrv_test_top_co_preadv, + + .bdrv_child_perm = bdrv_format_default_perms, +}; + +typedef struct TestCoDeleteByDrainData { + BlockBackend *blk; + bool detach_instead_of_delete; + bool done; +} TestCoDeleteByDrainData; + +static void coroutine_fn test_co_delete_by_drain(void *opaque) +{ + TestCoDeleteByDrainData *dbdd = opaque; + BlockBackend *blk = dbdd->blk; + BlockDriverState *bs = blk_bs(blk); + BDRVTestTopState *tts = bs->opaque; + void *buffer = g_malloc(65536); + QEMUIOVector qiov; + struct iovec iov = { + .iov_base = buffer, + .iov_len = 65536, + }; + + qemu_iovec_init_external(&qiov, &iov, 1); + + /* Pretend some internal write operation from parent to child. + * Important: We have to read from the child, not from the parent! + * Draining works by first propagating it all up the tree to the + * root and then waiting for drainage from root to the leaves + * (protocol nodes). If we have a request waiting on the root, + * everything will be drained before we go back down the tree, but + * we do not want that. We want to be in the middle of draining + * when this following requests returns. */ + bdrv_co_preadv(tts->wait_child, 0, 65536, &qiov, 0); + + g_assert_cmpint(bs->refcnt, ==, 1); + + if (!dbdd->detach_instead_of_delete) { + blk_unref(blk); + } else { + BdrvChild *c, *next_c; + QLIST_FOREACH_SAFE(c, &bs->children, next, next_c) { + bdrv_unref_child(bs, c); + } + } + + dbdd->done = true; +} + +/** + * Test what happens when some BDS has some children, you drain one of + * them and this results in the BDS being deleted. + * + * If @detach_instead_of_delete is set, the BDS is not going to be + * deleted but will only detach all of its children. + */ +static void do_test_delete_by_drain(bool detach_instead_of_delete, + enum drain_type drain_type) +{ + BlockBackend *blk; + BlockDriverState *bs, *child_bs, *null_bs; + BDRVTestTopState *tts; + TestCoDeleteByDrainData dbdd; + Coroutine *co; + + bs = bdrv_new_open_driver(&bdrv_test_top_driver, "top", BDRV_O_RDWR, + &error_abort); + bs->total_sectors = 65536 >> BDRV_SECTOR_BITS; + tts = bs->opaque; + + null_bs = bdrv_open("null-co://", NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL, + &error_abort); + bdrv_attach_child(bs, null_bs, "null-child", &child_file, &error_abort); + + /* This child will be the one to pass to requests through to, and + * it will stall until a drain occurs */ + child_bs = bdrv_new_open_driver(&bdrv_test, "child", BDRV_O_RDWR, + &error_abort); + child_bs->total_sectors = 65536 >> BDRV_SECTOR_BITS; + /* Takes our reference to child_bs */ + tts->wait_child = bdrv_attach_child(bs, child_bs, "wait-child", &child_file, + &error_abort); + + /* This child is just there to be deleted + * (for detach_instead_of_delete == true) */ + null_bs = bdrv_open("null-co://", NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL, + &error_abort); + bdrv_attach_child(bs, null_bs, "null-child", &child_file, &error_abort); + + blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); + blk_insert_bs(blk, bs, &error_abort); + + /* Referenced by blk now */ + bdrv_unref(bs); + + g_assert_cmpint(bs->refcnt, ==, 1); + g_assert_cmpint(child_bs->refcnt, ==, 1); + g_assert_cmpint(null_bs->refcnt, ==, 1); + + + dbdd = (TestCoDeleteByDrainData){ + .blk = blk, + .detach_instead_of_delete = detach_instead_of_delete, + .done = false, + }; + co = qemu_coroutine_create(test_co_delete_by_drain, &dbdd); + qemu_coroutine_enter(co); + + /* Drain the child while the read operation is still pending. + * This should result in the operation finishing and + * test_co_delete_by_drain() resuming. Thus, @bs will be deleted + * and the coroutine will exit while this drain operation is still + * in progress. */ + switch (drain_type) { + case BDRV_DRAIN: + bdrv_ref(child_bs); + bdrv_drain(child_bs); + bdrv_unref(child_bs); + break; + case BDRV_SUBTREE_DRAIN: + /* Would have to ref/unref bs here for !detach_instead_of_delete, but + * then the whole test becomes pointless because the graph changes + * don't occur during the drain any more. */ + assert(detach_instead_of_delete); + bdrv_subtree_drained_begin(bs); + bdrv_subtree_drained_end(bs); + break; + case BDRV_DRAIN_ALL: + bdrv_drain_all_begin(); + bdrv_drain_all_end(); + break; + default: + g_assert_not_reached(); + } + + while (!dbdd.done) { + aio_poll(qemu_get_aio_context(), true); + } + + if (detach_instead_of_delete) { + /* Here, the reference has not passed over to the coroutine, + * so we have to delete the BB ourselves */ + blk_unref(blk); + } +} + +static void test_delete_by_drain(void) +{ + do_test_delete_by_drain(false, BDRV_DRAIN); +} + +static void test_detach_by_drain_all(void) +{ + do_test_delete_by_drain(true, BDRV_DRAIN_ALL); +} + +static void test_detach_by_drain(void) +{ + do_test_delete_by_drain(true, BDRV_DRAIN); +} + +static void test_detach_by_drain_subtree(void) +{ + do_test_delete_by_drain(true, BDRV_SUBTREE_DRAIN); +} + + +struct detach_by_parent_data { + BlockDriverState *parent_b; + BdrvChild *child_b; + BlockDriverState *c; + BdrvChild *child_c; + bool by_parent_cb; +}; +static struct detach_by_parent_data detach_by_parent_data; + +static void detach_indirect_bh(void *opaque) +{ + struct detach_by_parent_data *data = opaque; + + bdrv_unref_child(data->parent_b, data->child_b); + + bdrv_ref(data->c); + data->child_c = bdrv_attach_child(data->parent_b, data->c, "PB-C", + &child_file, &error_abort); +} + +static void detach_by_parent_aio_cb(void *opaque, int ret) +{ + struct detach_by_parent_data *data = &detach_by_parent_data; + + g_assert_cmpint(ret, ==, 0); + if (data->by_parent_cb) { + detach_indirect_bh(data); + } +} + +static void detach_by_driver_cb_drained_begin(BdrvChild *child) +{ + aio_bh_schedule_oneshot(qemu_get_current_aio_context(), + detach_indirect_bh, &detach_by_parent_data); + child_file.drained_begin(child); +} + +static BdrvChildRole detach_by_driver_cb_role; + +/* + * Initial graph: + * + * PA PB + * \ / \ + * A B C + * + * by_parent_cb == true: Test that parent callbacks don't poll + * + * PA has a pending write request whose callback changes the child nodes of + * PB: It removes B and adds C instead. The subtree of PB is drained, which + * will indirectly drain the write request, too. + * + * by_parent_cb == false: Test that bdrv_drain_invoke() doesn't poll + * + * PA's BdrvChildRole has a .drained_begin callback that schedules a BH + * that does the same graph change. If bdrv_drain_invoke() calls it, the + * state is messed up, but if it is only polled in the single + * BDRV_POLL_WHILE() at the end of the drain, this should work fine. + */ +static void test_detach_indirect(bool by_parent_cb) +{ + BlockBackend *blk; + BlockDriverState *parent_a, *parent_b, *a, *b, *c; + BdrvChild *child_a, *child_b; + BlockAIOCB *acb; + + QEMUIOVector qiov; + struct iovec iov = { + .iov_base = NULL, + .iov_len = 0, + }; + qemu_iovec_init_external(&qiov, &iov, 1); + + if (!by_parent_cb) { + detach_by_driver_cb_role = child_file; + detach_by_driver_cb_role.drained_begin = + detach_by_driver_cb_drained_begin; + } + + /* Create all involved nodes */ + parent_a = bdrv_new_open_driver(&bdrv_test, "parent-a", BDRV_O_RDWR, + &error_abort); + parent_b = bdrv_new_open_driver(&bdrv_test, "parent-b", 0, + &error_abort); + + a = bdrv_new_open_driver(&bdrv_test, "a", BDRV_O_RDWR, &error_abort); + b = bdrv_new_open_driver(&bdrv_test, "b", BDRV_O_RDWR, &error_abort); + c = bdrv_new_open_driver(&bdrv_test, "c", BDRV_O_RDWR, &error_abort); + + /* blk is a BB for parent-a */ + blk = blk_new(BLK_PERM_ALL, BLK_PERM_ALL); + blk_insert_bs(blk, parent_a, &error_abort); + bdrv_unref(parent_a); + + /* If we want to get bdrv_drain_invoke() to call aio_poll(), the driver + * callback must not return immediately. */ + if (!by_parent_cb) { + BDRVTestState *s = parent_a->opaque; + s->sleep_in_drain_begin = true; + } + + /* Set child relationships */ + bdrv_ref(b); + bdrv_ref(a); + child_b = bdrv_attach_child(parent_b, b, "PB-B", &child_file, &error_abort); + child_a = bdrv_attach_child(parent_b, a, "PB-A", &child_backing, &error_abort); + + bdrv_ref(a); + bdrv_attach_child(parent_a, a, "PA-A", + by_parent_cb ? &child_file : &detach_by_driver_cb_role, + &error_abort); + + g_assert_cmpint(parent_a->refcnt, ==, 1); + g_assert_cmpint(parent_b->refcnt, ==, 1); + g_assert_cmpint(a->refcnt, ==, 3); + g_assert_cmpint(b->refcnt, ==, 2); + g_assert_cmpint(c->refcnt, ==, 1); + + g_assert(QLIST_FIRST(&parent_b->children) == child_a); + g_assert(QLIST_NEXT(child_a, next) == child_b); + g_assert(QLIST_NEXT(child_b, next) == NULL); + + /* Start the evil write request */ + detach_by_parent_data = (struct detach_by_parent_data) { + .parent_b = parent_b, + .child_b = child_b, + .c = c, + .by_parent_cb = by_parent_cb, + }; + acb = blk_aio_preadv(blk, 0, &qiov, 0, detach_by_parent_aio_cb, NULL); + g_assert(acb != NULL); + + /* Drain and check the expected result */ + bdrv_subtree_drained_begin(parent_b); + + g_assert(detach_by_parent_data.child_c != NULL); + + g_assert_cmpint(parent_a->refcnt, ==, 1); + g_assert_cmpint(parent_b->refcnt, ==, 1); + g_assert_cmpint(a->refcnt, ==, 3); + g_assert_cmpint(b->refcnt, ==, 1); + g_assert_cmpint(c->refcnt, ==, 2); + + g_assert(QLIST_FIRST(&parent_b->children) == detach_by_parent_data.child_c); + g_assert(QLIST_NEXT(detach_by_parent_data.child_c, next) == child_a); + g_assert(QLIST_NEXT(child_a, next) == NULL); + + g_assert_cmpint(parent_a->quiesce_counter, ==, 1); + g_assert_cmpint(parent_b->quiesce_counter, ==, 1); + g_assert_cmpint(a->quiesce_counter, ==, 1); + g_assert_cmpint(b->quiesce_counter, ==, 0); + g_assert_cmpint(c->quiesce_counter, ==, 1); + + bdrv_subtree_drained_end(parent_b); + + bdrv_unref(parent_b); + blk_unref(blk); + + /* XXX Once bdrv_close() unref's children instead of just detaching them, + * this won't be necessary any more. */ + bdrv_unref(a); + bdrv_unref(a); + bdrv_unref(c); + + g_assert_cmpint(a->refcnt, ==, 1); + g_assert_cmpint(b->refcnt, ==, 1); + g_assert_cmpint(c->refcnt, ==, 1); + bdrv_unref(a); + bdrv_unref(b); + bdrv_unref(c); +} + +static void test_detach_by_parent_cb(void) +{ + test_detach_indirect(true); +} + +static void test_detach_by_driver_cb(void) +{ + test_detach_indirect(false); +} + int main(int argc, char **argv) { + int ret; + bdrv_init(); qemu_init_main_loop(&error_abort); g_test_init(&argc, &argv, NULL); + qemu_event_init(&done_event, false); g_test_add_func("/bdrv-drain/driver-cb/drain_all", test_drv_cb_drain_all); g_test_add_func("/bdrv-drain/driver-cb/drain", test_drv_cb_drain); g_test_add_func("/bdrv-drain/driver-cb/drain_subtree", test_drv_cb_drain_subtree); - // XXX bdrv_drain_all() doesn't work in coroutine context + g_test_add_func("/bdrv-drain/driver-cb/co/drain_all", + test_drv_cb_co_drain_all); g_test_add_func("/bdrv-drain/driver-cb/co/drain", test_drv_cb_co_drain); g_test_add_func("/bdrv-drain/driver-cb/co/drain_subtree", test_drv_cb_co_drain_subtree); @@ -639,19 +1277,38 @@ int main(int argc, char **argv) g_test_add_func("/bdrv-drain/quiesce/drain_subtree", test_quiesce_drain_subtree); - // XXX bdrv_drain_all() doesn't work in coroutine context + g_test_add_func("/bdrv-drain/quiesce/co/drain_all", + test_quiesce_co_drain_all); g_test_add_func("/bdrv-drain/quiesce/co/drain", test_quiesce_co_drain); g_test_add_func("/bdrv-drain/quiesce/co/drain_subtree", test_quiesce_co_drain_subtree); g_test_add_func("/bdrv-drain/nested", test_nested); g_test_add_func("/bdrv-drain/multiparent", test_multiparent); - g_test_add_func("/bdrv-drain/graph-change", test_graph_change); + + g_test_add_func("/bdrv-drain/graph-change/drain_subtree", + test_graph_change_drain_subtree); + g_test_add_func("/bdrv-drain/graph-change/drain_all", + test_graph_change_drain_all); + + g_test_add_func("/bdrv-drain/iothread/drain_all", test_iothread_drain_all); + g_test_add_func("/bdrv-drain/iothread/drain", test_iothread_drain); + g_test_add_func("/bdrv-drain/iothread/drain_subtree", + test_iothread_drain_subtree); g_test_add_func("/bdrv-drain/blockjob/drain_all", test_blockjob_drain_all); g_test_add_func("/bdrv-drain/blockjob/drain", test_blockjob_drain); g_test_add_func("/bdrv-drain/blockjob/drain_subtree", test_blockjob_drain_subtree); - return g_test_run(); + g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain); + g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all); + g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain); + g_test_add_func("/bdrv-drain/detach/drain_subtree", test_detach_by_drain_subtree); + g_test_add_func("/bdrv-drain/detach/parent_cb", test_detach_by_parent_cb); + g_test_add_func("/bdrv-drain/detach/driver_cb", test_detach_by_driver_cb); + + ret = g_test_run(); + qemu_event_destroy(&done_event); + return ret; } diff --git a/tests/test-hbitmap.c b/tests/test-hbitmap.c index f29631f939..5e67ac1d3a 100644 --- a/tests/test-hbitmap.c +++ b/tests/test-hbitmap.c @@ -30,6 +30,18 @@ typedef struct TestHBitmapData { } TestHBitmapData; +static int64_t check_hbitmap_iter_next(HBitmapIter *hbi) +{ + int next0, next1; + + next0 = hbitmap_iter_next(hbi, false); + next1 = hbitmap_iter_next(hbi, true); + + g_assert_cmpint(next0, ==, next1); + + return next0; +} + /* Check that the HBitmap and the shadow bitmap contain the same data, * ignoring the same "first" bits. */ @@ -46,7 +58,7 @@ static void hbitmap_test_check(TestHBitmapData *data, i = first; for (;;) { - next = hbitmap_iter_next(&hbi); + next = check_hbitmap_iter_next(&hbi); if (next < 0) { next = data->size; } @@ -435,25 +447,25 @@ static void test_hbitmap_iter_granularity(TestHBitmapData *data, /* Note that hbitmap_test_check has to be invoked manually in this test. */ hbitmap_test_init(data, 131072 << 7, 7); hbitmap_iter_init(&hbi, data->hb, 0); - g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0); hbitmap_test_set(data, ((L2 + L1 + 1) << 7) + 8, 8); hbitmap_iter_init(&hbi, data->hb, 0); - g_assert_cmpint(hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0); hbitmap_iter_init(&hbi, data->hb, (L2 + L1 + 2) << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0); + g_assert_cmpint(hbitmap_iter_next(&hbi, true), <, 0); hbitmap_test_set(data, (131072 << 7) - 8, 8); hbitmap_iter_init(&hbi, data->hb, 0); - g_assert_cmpint(hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), ==, 131071 << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, (L2 + L1 + 1) << 7); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, 131071 << 7); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0); hbitmap_iter_init(&hbi, data->hb, (L2 + L1 + 2) << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), ==, 131071 << 7); - g_assert_cmpint(hbitmap_iter_next(&hbi), <, 0); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), ==, 131071 << 7); + g_assert_cmpint(check_hbitmap_iter_next(&hbi), <, 0); } static void hbitmap_test_set_boundary_bits(TestHBitmapData *data, ssize_t diff) @@ -893,7 +905,7 @@ static void test_hbitmap_serialize_zeroes(TestHBitmapData *data, for (i = 0; i < num_positions; i++) { hbitmap_deserialize_zeroes(data->hb, positions[i], min_l1, true); hbitmap_iter_init(&iter, data->hb, 0); - next = hbitmap_iter_next(&iter); + next = check_hbitmap_iter_next(&iter); if (i == num_positions - 1) { g_assert_cmpint(next, ==, -1); } else { @@ -919,10 +931,10 @@ static void test_hbitmap_iter_and_reset(TestHBitmapData *data, hbitmap_iter_init(&hbi, data->hb, BITS_PER_LONG - 1); - hbitmap_iter_next(&hbi); + check_hbitmap_iter_next(&hbi); hbitmap_reset_all(data->hb); - hbitmap_iter_next(&hbi); + check_hbitmap_iter_next(&hbi); } static void test_hbitmap_next_zero_check(TestHBitmapData *data, int64_t start) diff --git a/util/hbitmap.c b/util/hbitmap.c index 58a2c93842..bcd304041a 100644 --- a/util/hbitmap.c +++ b/util/hbitmap.c @@ -141,7 +141,7 @@ unsigned long hbitmap_iter_skip_words(HBitmapIter *hbi) return cur; } -int64_t hbitmap_iter_next(HBitmapIter *hbi) +int64_t hbitmap_iter_next(HBitmapIter *hbi, bool advance) { unsigned long cur = hbi->cur[HBITMAP_LEVELS - 1] & hbi->hb->levels[HBITMAP_LEVELS - 1][hbi->pos]; @@ -154,8 +154,12 @@ int64_t hbitmap_iter_next(HBitmapIter *hbi) } } - /* The next call will resume work from the next bit. */ - hbi->cur[HBITMAP_LEVELS - 1] = cur & (cur - 1); + if (advance) { + /* The next call will resume work from the next bit. */ + hbi->cur[HBITMAP_LEVELS - 1] = cur & (cur - 1); + } else { + hbi->cur[HBITMAP_LEVELS - 1] = cur; + } item = ((uint64_t)hbi->pos << BITS_PER_LEVEL) + ctzl(cur); return item << hbi->granularity; |