diff options
Diffstat (limited to 'block/quorum.c')
-rw-r--r-- | block/quorum.c | 410 |
1 files changed, 216 insertions, 194 deletions
diff --git a/block/quorum.c b/block/quorum.c index d122299352..86e2072dce 100644 --- a/block/quorum.c +++ b/block/quorum.c @@ -97,7 +97,7 @@ typedef struct QuorumAIOCB QuorumAIOCB; * $children_count QuorumChildRequest. */ typedef struct QuorumChildRequest { - BlockAIOCB *aiocb; + BlockDriverState *bs; QEMUIOVector qiov; uint8_t *buf; int ret; @@ -110,11 +110,12 @@ typedef struct QuorumChildRequest { * used to do operations on each children and track overall progress. */ struct QuorumAIOCB { - BlockAIOCB common; + BlockDriverState *bs; + Coroutine *co; /* Request metadata */ - uint64_t sector_num; - int nb_sectors; + uint64_t offset; + uint64_t bytes; QEMUIOVector *qiov; /* calling IOV */ @@ -133,32 +134,15 @@ struct QuorumAIOCB { int children_read; /* how many children have been read from */ }; -static bool quorum_vote(QuorumAIOCB *acb); - -static void quorum_aio_cancel(BlockAIOCB *blockacb) -{ - QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common); - BDRVQuorumState *s = acb->common.bs->opaque; - int i; - - /* cancel all callbacks */ - for (i = 0; i < s->num_children; i++) { - if (acb->qcrs[i].aiocb) { - bdrv_aio_cancel_async(acb->qcrs[i].aiocb); - } - } -} - -static AIOCBInfo quorum_aiocb_info = { - .aiocb_size = sizeof(QuorumAIOCB), - .cancel_async = quorum_aio_cancel, -}; +typedef struct QuorumCo { + QuorumAIOCB *acb; + int idx; +} QuorumCo; static void quorum_aio_finalize(QuorumAIOCB *acb) { - acb->common.cb(acb->common.opaque, acb->vote_ret); g_free(acb->qcrs); - qemu_aio_unref(acb); + g_free(acb); } static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) @@ -171,30 +155,26 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) return a->l == b->l; } -static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s, - BlockDriverState *bs, +static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, QEMUIOVector *qiov, - uint64_t sector_num, - int nb_sectors, - BlockCompletionFunc *cb, - void *opaque) + uint64_t offset, + uint64_t bytes) { - QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque); + BDRVQuorumState *s = bs->opaque; + QuorumAIOCB *acb = g_new(QuorumAIOCB, 1); int i; - acb->common.bs->opaque = s; - acb->sector_num = sector_num; - acb->nb_sectors = nb_sectors; - acb->qiov = qiov; - acb->qcrs = g_new0(QuorumChildRequest, s->num_children); - acb->count = 0; - acb->success_count = 0; - acb->rewrite_count = 0; - acb->votes.compare = quorum_sha256_compare; - QLIST_INIT(&acb->votes.vote_list); - acb->is_read = false; - acb->vote_ret = 0; + *acb = (QuorumAIOCB) { + .co = qemu_coroutine_self(), + .bs = bs, + .offset = offset, + .bytes = bytes, + .qiov = qiov, + .votes.compare = quorum_sha256_compare, + .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list), + }; + acb->qcrs = g_new0(QuorumChildRequest, s->num_children); for (i = 0; i < s->num_children; i++) { acb->qcrs[i].buf = NULL; acb->qcrs[i].ret = 0; @@ -204,30 +184,37 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s, return acb; } -static void quorum_report_bad(QuorumOpType type, uint64_t sector_num, - int nb_sectors, char *node_name, int ret) +static void quorum_report_bad(QuorumOpType type, uint64_t offset, + uint64_t bytes, char *node_name, int ret) { const char *msg = NULL; + int64_t start_sector = offset / BDRV_SECTOR_SIZE; + int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE); + if (ret < 0) { msg = strerror(-ret); } - qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, - sector_num, nb_sectors, &error_abort); + qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector, + end_sector - start_sector, &error_abort); } static void quorum_report_failure(QuorumAIOCB *acb) { - const char *reference = bdrv_get_device_or_node_name(acb->common.bs); - qapi_event_send_quorum_failure(reference, acb->sector_num, - acb->nb_sectors, &error_abort); + const char *reference = bdrv_get_device_or_node_name(acb->bs); + int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE; + int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes, + BDRV_SECTOR_SIZE); + + qapi_event_send_quorum_failure(reference, start_sector, + end_sector - start_sector, &error_abort); } static int quorum_vote_error(QuorumAIOCB *acb); static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) { - BDRVQuorumState *s = acb->common.bs->opaque; + BDRVQuorumState *s = acb->bs->opaque; if (acb->success_count < s->threshold) { acb->vote_ret = quorum_vote_error(acb); @@ -238,22 +225,7 @@ static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) return false; } -static void quorum_rewrite_aio_cb(void *opaque, int ret) -{ - QuorumAIOCB *acb = opaque; - - /* one less rewrite to do */ - acb->rewrite_count--; - - /* wait until all rewrite callbacks have completed */ - if (acb->rewrite_count) { - return; - } - - quorum_aio_finalize(acb); -} - -static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb); +static int read_fifo_child(QuorumAIOCB *acb); static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) { @@ -272,70 +244,7 @@ static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) { QuorumAIOCB *acb = sacb->parent; QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; - quorum_report_bad(type, acb->sector_num, acb->nb_sectors, - sacb->aiocb->bs->node_name, ret); -} - -static void quorum_fifo_aio_cb(void *opaque, int ret) -{ - QuorumChildRequest *sacb = opaque; - QuorumAIOCB *acb = sacb->parent; - BDRVQuorumState *s = acb->common.bs->opaque; - - assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO); - - if (ret < 0) { - quorum_report_bad_acb(sacb, ret); - - /* We try to read next child in FIFO order if we fail to read */ - if (acb->children_read < s->num_children) { - read_fifo_child(acb); - return; - } - } - - acb->vote_ret = ret; - - /* FIXME: rewrite failed children if acb->children_read > 1? */ - quorum_aio_finalize(acb); -} - -static void quorum_aio_cb(void *opaque, int ret) -{ - QuorumChildRequest *sacb = opaque; - QuorumAIOCB *acb = sacb->parent; - BDRVQuorumState *s = acb->common.bs->opaque; - bool rewrite = false; - int i; - - sacb->ret = ret; - if (ret == 0) { - acb->success_count++; - } else { - quorum_report_bad_acb(sacb, ret); - } - acb->count++; - assert(acb->count <= s->num_children); - assert(acb->success_count <= s->num_children); - if (acb->count < s->num_children) { - return; - } - - /* Do the vote on read */ - if (acb->is_read) { - rewrite = quorum_vote(acb); - for (i = 0; i < s->num_children; i++) { - qemu_vfree(acb->qcrs[i].buf); - qemu_iovec_destroy(&acb->qcrs[i].qiov); - } - } else { - quorum_has_too_much_io_failed(acb); - } - - /* if no rewrite is done the code will finish right away */ - if (!rewrite) { - quorum_aio_finalize(acb); - } + quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret); } static void quorum_report_bad_versions(BDRVQuorumState *s, @@ -350,14 +259,31 @@ static void quorum_report_bad_versions(BDRVQuorumState *s, continue; } QLIST_FOREACH(item, &version->items, next) { - quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num, - acb->nb_sectors, + quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes, s->children[item->index]->bs->node_name, 0); } } } -static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb, +static void quorum_rewrite_entry(void *opaque) +{ + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + + /* Ignore any errors, it's just a correction attempt for already + * corrupted data. */ + bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes, + acb->qiov, 0); + + /* Wake up the caller after the last rewrite */ + acb->rewrite_count--; + if (!acb->rewrite_count) { + qemu_coroutine_enter_if_inactive(acb->co); + } +} + +static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb, QuorumVoteValue *value) { QuorumVoteVersion *version; @@ -376,7 +302,7 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb, } } - /* quorum_rewrite_aio_cb will count down this to zero */ + /* quorum_rewrite_entry will count down this to zero */ acb->rewrite_count = count; /* now fire the correcting rewrites */ @@ -385,9 +311,14 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb, continue; } QLIST_FOREACH(item, &version->items, next) { - bdrv_aio_writev(s->children[item->index], acb->sector_num, - acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb, - acb); + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = item->index, + }; + + co = qemu_coroutine_create(quorum_rewrite_entry, &data); + qemu_coroutine_enter(co); } } @@ -507,8 +438,8 @@ static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb, va_list ap; va_start(ap, fmt); - fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ", - acb->sector_num, acb->nb_sectors); + fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ", + acb->offset, acb->bytes); vfprintf(stderr, fmt, ap); fprintf(stderr, "\n"); va_end(ap); @@ -519,16 +450,15 @@ static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b) { - BDRVQuorumState *s = acb->common.bs->opaque; + BDRVQuorumState *s = acb->bs->opaque; ssize_t offset; /* This driver will replace blkverify in this particular case */ if (s->is_blkverify) { offset = qemu_iovec_compare(a, b); if (offset != -1) { - quorum_err(acb, "contents mismatch in sector %" PRId64, - acb->sector_num + - (uint64_t)(offset / BDRV_SECTOR_SIZE)); + quorum_err(acb, "contents mismatch at offset %" PRIu64, + acb->offset + offset); } return true; } @@ -539,7 +469,7 @@ static bool quorum_compare(QuorumAIOCB *acb, /* Do a vote to get the error code */ static int quorum_vote_error(QuorumAIOCB *acb) { - BDRVQuorumState *s = acb->common.bs->opaque; + BDRVQuorumState *s = acb->bs->opaque; QuorumVoteVersion *winner = NULL; QuorumVotes error_votes; QuorumVoteValue result_value; @@ -568,17 +498,16 @@ static int quorum_vote_error(QuorumAIOCB *acb) return ret; } -static bool quorum_vote(QuorumAIOCB *acb) +static void quorum_vote(QuorumAIOCB *acb) { bool quorum = true; - bool rewrite = false; int i, j, ret; QuorumVoteValue hash; - BDRVQuorumState *s = acb->common.bs->opaque; + BDRVQuorumState *s = acb->bs->opaque; QuorumVoteVersion *winner; if (quorum_has_too_much_io_failed(acb)) { - return false; + return; } /* get the index of the first successful read */ @@ -606,7 +535,7 @@ static bool quorum_vote(QuorumAIOCB *acb) /* Every successful read agrees */ if (quorum) { quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); - return false; + return; } /* compute hashes for each successful read, also store indexes */ @@ -641,19 +570,46 @@ static bool quorum_vote(QuorumAIOCB *acb) /* corruption correction is enabled */ if (s->rewrite_corrupted) { - rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value); + quorum_rewrite_bad_versions(acb, &winner->value); } free_exit: /* free lists */ quorum_free_vote_list(&acb->votes); - return rewrite; } -static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb) +static void read_quorum_children_entry(void *opaque) { - BDRVQuorumState *s = acb->common.bs->opaque; - int i; + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + int i = co->idx; + QuorumChildRequest *sacb = &acb->qcrs[i]; + + sacb->bs = s->children[i]->bs; + sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes, + &acb->qcrs[i].qiov, 0); + + if (sacb->ret == 0) { + acb->success_count++; + } else { + quorum_report_bad_acb(sacb, sacb->ret); + } + + acb->count++; + assert(acb->count <= s->num_children); + assert(acb->success_count <= s->num_children); + + /* Wake up the caller after the last read */ + if (acb->count == s->num_children) { + qemu_coroutine_enter_if_inactive(acb->co); + } +} + +static int read_quorum_children(QuorumAIOCB *acb) +{ + BDRVQuorumState *s = acb->bs->opaque; + int i, ret; acb->children_read = s->num_children; for (i = 0; i < s->num_children; i++) { @@ -663,65 +619,131 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb) } for (i = 0; i < s->num_children; i++) { - acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num, - &acb->qcrs[i].qiov, acb->nb_sectors, - quorum_aio_cb, &acb->qcrs[i]); + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = i, + }; + + co = qemu_coroutine_create(read_quorum_children_entry, &data); + qemu_coroutine_enter(co); } - return &acb->common; + while (acb->count < s->num_children) { + qemu_coroutine_yield(); + } + + /* Do the vote on read */ + quorum_vote(acb); + for (i = 0; i < s->num_children; i++) { + qemu_vfree(acb->qcrs[i].buf); + qemu_iovec_destroy(&acb->qcrs[i].qiov); + } + + while (acb->rewrite_count) { + qemu_coroutine_yield(); + } + + ret = acb->vote_ret; + + return ret; } -static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb) +static int read_fifo_child(QuorumAIOCB *acb) { - BDRVQuorumState *s = acb->common.bs->opaque; - int n = acb->children_read++; + BDRVQuorumState *s = acb->bs->opaque; + int n, ret; + + /* We try to read the next child in FIFO order if we failed to read */ + do { + n = acb->children_read++; + acb->qcrs[n].bs = s->children[n]->bs; + ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes, + acb->qiov, 0); + if (ret < 0) { + quorum_report_bad_acb(&acb->qcrs[n], ret); + } + } while (ret < 0 && acb->children_read < s->num_children); - acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num, - acb->qiov, acb->nb_sectors, - quorum_fifo_aio_cb, &acb->qcrs[n]); + /* FIXME: rewrite failed children if acb->children_read > 1? */ - return &acb->common; + return ret; } -static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs, - int64_t sector_num, - QEMUIOVector *qiov, - int nb_sectors, - BlockCompletionFunc *cb, - void *opaque) +static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset, + uint64_t bytes, QEMUIOVector *qiov, int flags) { BDRVQuorumState *s = bs->opaque; - QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, - nb_sectors, cb, opaque); + QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes); + int ret; + acb->is_read = true; acb->children_read = 0; if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { - return read_quorum_children(acb); + ret = read_quorum_children(acb); + } else { + ret = read_fifo_child(acb); + } + quorum_aio_finalize(acb); + + return ret; +} + +static void write_quorum_entry(void *opaque) +{ + QuorumCo *co = opaque; + QuorumAIOCB *acb = co->acb; + BDRVQuorumState *s = acb->bs->opaque; + int i = co->idx; + QuorumChildRequest *sacb = &acb->qcrs[i]; + + sacb->bs = s->children[i]->bs; + sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes, + acb->qiov, 0); + if (sacb->ret == 0) { + acb->success_count++; + } else { + quorum_report_bad_acb(sacb, sacb->ret); } + acb->count++; + assert(acb->count <= s->num_children); + assert(acb->success_count <= s->num_children); - return read_fifo_child(acb); + /* Wake up the caller after the last write */ + if (acb->count == s->num_children) { + qemu_coroutine_enter_if_inactive(acb->co); + } } -static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs, - int64_t sector_num, - QEMUIOVector *qiov, - int nb_sectors, - BlockCompletionFunc *cb, - void *opaque) +static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset, + uint64_t bytes, QEMUIOVector *qiov, int flags) { BDRVQuorumState *s = bs->opaque; - QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors, - cb, opaque); - int i; + QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes); + int i, ret; for (i = 0; i < s->num_children; i++) { - acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num, - qiov, nb_sectors, &quorum_aio_cb, - &acb->qcrs[i]); + Coroutine *co; + QuorumCo data = { + .acb = acb, + .idx = i, + }; + + co = qemu_coroutine_create(write_quorum_entry, &data); + qemu_coroutine_enter(co); + } + + while (acb->count < s->num_children) { + qemu_coroutine_yield(); } - return &acb->common; + quorum_has_too_much_io_failed(acb); + + ret = acb->vote_ret; + quorum_aio_finalize(acb); + + return ret; } static int64_t quorum_getlength(BlockDriverState *bs) @@ -765,7 +787,7 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs) result = bdrv_co_flush(s->children[i]->bs); if (result) { quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, - bdrv_nb_sectors(s->children[i]->bs), + bdrv_getlength(s->children[i]->bs), s->children[i]->bs->node_name, result); result_value.l = result; quorum_count_vote(&error_votes, &result_value, i); @@ -1098,8 +1120,8 @@ static BlockDriver bdrv_quorum = { .bdrv_getlength = quorum_getlength, - .bdrv_aio_readv = quorum_aio_readv, - .bdrv_aio_writev = quorum_aio_writev, + .bdrv_co_preadv = quorum_co_preadv, + .bdrv_co_pwritev = quorum_co_pwritev, .bdrv_add_child = quorum_add_child, .bdrv_del_child = quorum_del_child, |