aboutsummaryrefslogtreecommitdiff
path: root/block
diff options
context:
space:
mode:
authorMax Reitz <mreitz@redhat.com>2019-07-19 11:26:14 +0200
committerKevin Wolf <kwolf@redhat.com>2019-07-19 13:19:16 +0200
commite037c09c78520cbdb6da7cfc6ad0256d5870b814 (patch)
treeee4b9ffaf87c06445fe091a6dbeed72246005d16 /block
parent1b285657687c7f08761759092fa05fa33578fc00 (diff)
block: Do not poll in bdrv_do_drained_end()
We should never poll anywhere in bdrv_do_drained_end() (including its recursive callees like bdrv_drain_invoke()), because it does not cope well with graph changes. In fact, it has been written based on the postulation that no graph changes will happen in it. Instead, the callers that want to poll must poll, i.e. all currently globally available wrappers: bdrv_drained_end(), bdrv_subtree_drained_end(), bdrv_unapply_subtree_drain(), and bdrv_drain_all_end(). Graph changes there do not matter. They can poll simply by passing a pointer to a drained_end_counter and wait until it reaches 0. This patch also adds a non-polling global wrapper for bdrv_do_drained_end() that takes a drained_end_counter pointer. We need such a variant because now no function called anywhere from bdrv_do_drained_end() must poll. This includes BdrvChildRole.drained_end(), which already must not poll according to its interface documentation, but bdrv_child_cb_drained_end() just violates that by invoking bdrv_drained_end() (which does poll). Therefore, BdrvChildRole.drained_end() must take a *drained_end_counter parameter, which bdrv_child_cb_drained_end() can pass on to the new bdrv_drained_end_no_poll() function. Note that we now have a pattern of all drained_end-related functions either polling or receiving a *drained_end_counter to let the caller poll based on that. A problem with a single poll loop is that when the drained section in bdrv_set_aio_context_ignore() ends, some nodes in the subgraph may be in the old contexts, while others are in the new context already. To let the collective poll in bdrv_drained_end() work correctly, we must not hold a lock to the old context, so that the old context can make progress in case it is different from the current context. (In the process, remove the comment saying that the current context is always the old context, because it is wrong.) In all other places, all nodes in a subtree must be in the same context, so we can just poll that. The exception of course is bdrv_drain_all_end(), but that always runs in the main context, so we can just poll NULL (like bdrv_drain_all_begin() does). Signed-off-by: Max Reitz <mreitz@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Diffstat (limited to 'block')
-rw-r--r--block/block-backend.c6
-rw-r--r--block/io.c80
2 files changed, 60 insertions, 26 deletions
diff --git a/block/block-backend.c b/block/block-backend.c
index a8d160fd5d..0056b526b8 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -121,7 +121,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options,
}
static void blk_root_drained_begin(BdrvChild *child);
static bool blk_root_drained_poll(BdrvChild *child);
-static void blk_root_drained_end(BdrvChild *child);
+static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter);
static void blk_root_change_media(BdrvChild *child, bool load);
static void blk_root_resize(BdrvChild *child);
@@ -1249,7 +1249,7 @@ int blk_pread_unthrottled(BlockBackend *blk, int64_t offset, uint8_t *buf,
blk_root_drained_begin(blk->root);
ret = blk_pread(blk, offset, buf, count);
- blk_root_drained_end(blk->root);
+ blk_root_drained_end(blk->root, NULL);
return ret;
}
@@ -2236,7 +2236,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
return !!blk->in_flight;
}
-static void blk_root_drained_end(BdrvChild *child)
+static void blk_root_drained_end(BdrvChild *child, int *drained_end_counter)
{
BlockBackend *blk = child->opaque;
assert(blk->quiesce_counter);
diff --git a/block/io.c b/block/io.c
index b0b33174d3..8f23cab82e 100644
--- a/block/io.c
+++ b/block/io.c
@@ -55,17 +55,26 @@ static void bdrv_parent_drained_begin(BlockDriverState *bs, BdrvChild *ignore,
}
}
-void bdrv_parent_drained_end_single(BdrvChild *c)
+static void bdrv_parent_drained_end_single_no_poll(BdrvChild *c,
+ int *drained_end_counter)
{
assert(c->parent_quiesce_counter > 0);
c->parent_quiesce_counter--;
if (c->role->drained_end) {
- c->role->drained_end(c);
+ c->role->drained_end(c, drained_end_counter);
}
}
+void bdrv_parent_drained_end_single(BdrvChild *c)
+{
+ int drained_end_counter = 0;
+ bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter);
+ BDRV_POLL_WHILE(c->bs, atomic_read(&drained_end_counter) > 0);
+}
+
static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
- bool ignore_bds_parents)
+ bool ignore_bds_parents,
+ int *drained_end_counter)
{
BdrvChild *c, *next;
@@ -73,7 +82,7 @@ static void bdrv_parent_drained_end(BlockDriverState *bs, BdrvChild *ignore,
if (c == ignore || (ignore_bds_parents && c->role->parent_is_bds)) {
continue;
}
- bdrv_parent_drained_end_single(c);
+ bdrv_parent_drained_end_single_no_poll(c, drained_end_counter);
}
}
@@ -212,13 +221,11 @@ static void coroutine_fn bdrv_drain_invoke_entry(void *opaque)
atomic_mb_set(&data->done, true);
bdrv_dec_in_flight(bs);
- if (data->drained_end_counter) {
+ if (!data->begin) {
atomic_dec(data->drained_end_counter);
}
- if (data->begin || data->drained_end_counter) {
- g_free(data);
- }
+ g_free(data);
}
/* Recursively call BlockDriver.bdrv_co_drain_begin/end callbacks */
@@ -240,7 +247,7 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin,
.drained_end_counter = drained_end_counter,
};
- if (!begin && drained_end_counter) {
+ if (!begin) {
atomic_inc(drained_end_counter);
}
@@ -249,15 +256,6 @@ static void bdrv_drain_invoke(BlockDriverState *bs, bool begin,
bdrv_inc_in_flight(bs);
data->co = qemu_coroutine_create(bdrv_drain_invoke_entry, data);
aio_co_schedule(bdrv_get_aio_context(bs), data->co);
-
- /*
- * TODO: Drop this and make callers pass @drained_end_counter and poll
- * themselves
- */
- if (!begin && !drained_end_counter) {
- BDRV_POLL_WHILE(bs, !data->done);
- g_free(data);
- }
}
/* Returns true if BDRV_POLL_WHILE() should go into a blocking aio_poll() */
@@ -320,9 +318,11 @@ static void bdrv_co_drain_bh_cb(void *opaque)
}
bdrv_dec_in_flight(bs);
if (data->begin) {
+ assert(!data->drained_end_counter);
bdrv_do_drained_begin(bs, data->recursive, data->parent,
data->ignore_bds_parents, data->poll);
} else {
+ assert(!data->poll);
bdrv_do_drained_end(bs, data->recursive, data->parent,
data->ignore_bds_parents,
data->drained_end_counter);
@@ -438,6 +438,20 @@ void bdrv_subtree_drained_begin(BlockDriverState *bs)
bdrv_do_drained_begin(bs, true, NULL, false, true);
}
+/**
+ * This function does not poll, nor must any of its recursively called
+ * functions. The *drained_end_counter pointee will be incremented
+ * once for every background operation scheduled, and decremented once
+ * the operation settles. Therefore, the pointer must remain valid
+ * until the pointee reaches 0. That implies that whoever sets up the
+ * pointee has to poll until it is 0.
+ *
+ * We use atomic operations to access *drained_end_counter, because
+ * (1) when called from bdrv_set_aio_context_ignore(), the subgraph of
+ * @bs may contain nodes in different AioContexts,
+ * (2) bdrv_drain_all_end() uses the same counter for all nodes,
+ * regardless of which AioContext they are in.
+ */
static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
BdrvChild *parent, bool ignore_bds_parents,
int *drained_end_counter)
@@ -445,6 +459,8 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
BdrvChild *child, *next;
int old_quiesce_counter;
+ assert(drained_end_counter != NULL);
+
if (qemu_in_coroutine()) {
bdrv_co_yield_to_drain(bs, false, recursive, parent, ignore_bds_parents,
false, drained_end_counter);
@@ -454,7 +470,8 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
/* Re-enable things in child-to-parent order */
bdrv_drain_invoke(bs, false, drained_end_counter);
- bdrv_parent_drained_end(bs, parent, ignore_bds_parents);
+ bdrv_parent_drained_end(bs, parent, ignore_bds_parents,
+ drained_end_counter);
old_quiesce_counter = atomic_fetch_dec(&bs->quiesce_counter);
if (old_quiesce_counter == 1) {
@@ -473,12 +490,21 @@ static void bdrv_do_drained_end(BlockDriverState *bs, bool recursive,
void bdrv_drained_end(BlockDriverState *bs)
{
- bdrv_do_drained_end(bs, false, NULL, false, NULL);
+ int drained_end_counter = 0;
+ bdrv_do_drained_end(bs, false, NULL, false, &drained_end_counter);
+ BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0);
+}
+
+void bdrv_drained_end_no_poll(BlockDriverState *bs, int *drained_end_counter)
+{
+ bdrv_do_drained_end(bs, false, NULL, false, drained_end_counter);
}
void bdrv_subtree_drained_end(BlockDriverState *bs)
{
- bdrv_do_drained_end(bs, true, NULL, false, NULL);
+ int drained_end_counter = 0;
+ bdrv_do_drained_end(bs, true, NULL, false, &drained_end_counter);
+ BDRV_POLL_WHILE(bs, atomic_read(&drained_end_counter) > 0);
}
void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
@@ -492,11 +518,15 @@ void bdrv_apply_subtree_drain(BdrvChild *child, BlockDriverState *new_parent)
void bdrv_unapply_subtree_drain(BdrvChild *child, BlockDriverState *old_parent)
{
+ int drained_end_counter = 0;
int i;
for (i = 0; i < old_parent->recursive_quiesce_counter; i++) {
- bdrv_do_drained_end(child->bs, true, child, false, NULL);
+ bdrv_do_drained_end(child->bs, true, child, false,
+ &drained_end_counter);
}
+
+ BDRV_POLL_WHILE(child->bs, atomic_read(&drained_end_counter) > 0);
}
/*
@@ -596,15 +626,19 @@ void bdrv_drain_all_begin(void)
void bdrv_drain_all_end(void)
{
BlockDriverState *bs = NULL;
+ int drained_end_counter = 0;
while ((bs = bdrv_next_all_states(bs))) {
AioContext *aio_context = bdrv_get_aio_context(bs);
aio_context_acquire(aio_context);
- bdrv_do_drained_end(bs, false, NULL, true, NULL);
+ bdrv_do_drained_end(bs, false, NULL, true, &drained_end_counter);
aio_context_release(aio_context);
}
+ assert(qemu_get_current_aio_context() == qemu_get_aio_context());
+ AIO_WAIT_WHILE(NULL, atomic_read(&drained_end_counter) > 0);
+
assert(bdrv_drain_all_count > 0);
bdrv_drain_all_count--;
}