aboutsummaryrefslogtreecommitdiff
path: root/block/quorum.c
diff options
context:
space:
mode:
Diffstat (limited to 'block/quorum.c')
-rw-r--r--block/quorum.c410
1 files changed, 216 insertions, 194 deletions
diff --git a/block/quorum.c b/block/quorum.c
index d122299352..86e2072dce 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -97,7 +97,7 @@ typedef struct QuorumAIOCB QuorumAIOCB;
* $children_count QuorumChildRequest.
*/
typedef struct QuorumChildRequest {
- BlockAIOCB *aiocb;
+ BlockDriverState *bs;
QEMUIOVector qiov;
uint8_t *buf;
int ret;
@@ -110,11 +110,12 @@ typedef struct QuorumChildRequest {
* used to do operations on each children and track overall progress.
*/
struct QuorumAIOCB {
- BlockAIOCB common;
+ BlockDriverState *bs;
+ Coroutine *co;
/* Request metadata */
- uint64_t sector_num;
- int nb_sectors;
+ uint64_t offset;
+ uint64_t bytes;
QEMUIOVector *qiov; /* calling IOV */
@@ -133,32 +134,15 @@ struct QuorumAIOCB {
int children_read; /* how many children have been read from */
};
-static bool quorum_vote(QuorumAIOCB *acb);
-
-static void quorum_aio_cancel(BlockAIOCB *blockacb)
-{
- QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common);
- BDRVQuorumState *s = acb->common.bs->opaque;
- int i;
-
- /* cancel all callbacks */
- for (i = 0; i < s->num_children; i++) {
- if (acb->qcrs[i].aiocb) {
- bdrv_aio_cancel_async(acb->qcrs[i].aiocb);
- }
- }
-}
-
-static AIOCBInfo quorum_aiocb_info = {
- .aiocb_size = sizeof(QuorumAIOCB),
- .cancel_async = quorum_aio_cancel,
-};
+typedef struct QuorumCo {
+ QuorumAIOCB *acb;
+ int idx;
+} QuorumCo;
static void quorum_aio_finalize(QuorumAIOCB *acb)
{
- acb->common.cb(acb->common.opaque, acb->vote_ret);
g_free(acb->qcrs);
- qemu_aio_unref(acb);
+ g_free(acb);
}
static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
@@ -171,30 +155,26 @@ static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
return a->l == b->l;
}
-static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
- BlockDriverState *bs,
+static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
QEMUIOVector *qiov,
- uint64_t sector_num,
- int nb_sectors,
- BlockCompletionFunc *cb,
- void *opaque)
+ uint64_t offset,
+ uint64_t bytes)
{
- QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque);
+ BDRVQuorumState *s = bs->opaque;
+ QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
int i;
- acb->common.bs->opaque = s;
- acb->sector_num = sector_num;
- acb->nb_sectors = nb_sectors;
- acb->qiov = qiov;
- acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
- acb->count = 0;
- acb->success_count = 0;
- acb->rewrite_count = 0;
- acb->votes.compare = quorum_sha256_compare;
- QLIST_INIT(&acb->votes.vote_list);
- acb->is_read = false;
- acb->vote_ret = 0;
+ *acb = (QuorumAIOCB) {
+ .co = qemu_coroutine_self(),
+ .bs = bs,
+ .offset = offset,
+ .bytes = bytes,
+ .qiov = qiov,
+ .votes.compare = quorum_sha256_compare,
+ .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
+ };
+ acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
for (i = 0; i < s->num_children; i++) {
acb->qcrs[i].buf = NULL;
acb->qcrs[i].ret = 0;
@@ -204,30 +184,37 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
return acb;
}
-static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
- int nb_sectors, char *node_name, int ret)
+static void quorum_report_bad(QuorumOpType type, uint64_t offset,
+ uint64_t bytes, char *node_name, int ret)
{
const char *msg = NULL;
+ int64_t start_sector = offset / BDRV_SECTOR_SIZE;
+ int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
+
if (ret < 0) {
msg = strerror(-ret);
}
- qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name,
- sector_num, nb_sectors, &error_abort);
+ qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
+ end_sector - start_sector, &error_abort);
}
static void quorum_report_failure(QuorumAIOCB *acb)
{
- const char *reference = bdrv_get_device_or_node_name(acb->common.bs);
- qapi_event_send_quorum_failure(reference, acb->sector_num,
- acb->nb_sectors, &error_abort);
+ const char *reference = bdrv_get_device_or_node_name(acb->bs);
+ int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
+ int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
+ BDRV_SECTOR_SIZE);
+
+ qapi_event_send_quorum_failure(reference, start_sector,
+ end_sector - start_sector, &error_abort);
}
static int quorum_vote_error(QuorumAIOCB *acb);
static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
{
- BDRVQuorumState *s = acb->common.bs->opaque;
+ BDRVQuorumState *s = acb->bs->opaque;
if (acb->success_count < s->threshold) {
acb->vote_ret = quorum_vote_error(acb);
@@ -238,22 +225,7 @@ static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
return false;
}
-static void quorum_rewrite_aio_cb(void *opaque, int ret)
-{
- QuorumAIOCB *acb = opaque;
-
- /* one less rewrite to do */
- acb->rewrite_count--;
-
- /* wait until all rewrite callbacks have completed */
- if (acb->rewrite_count) {
- return;
- }
-
- quorum_aio_finalize(acb);
-}
-
-static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb);
+static int read_fifo_child(QuorumAIOCB *acb);
static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
{
@@ -272,70 +244,7 @@ static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
{
QuorumAIOCB *acb = sacb->parent;
QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
- quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
- sacb->aiocb->bs->node_name, ret);
-}
-
-static void quorum_fifo_aio_cb(void *opaque, int ret)
-{
- QuorumChildRequest *sacb = opaque;
- QuorumAIOCB *acb = sacb->parent;
- BDRVQuorumState *s = acb->common.bs->opaque;
-
- assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
-
- if (ret < 0) {
- quorum_report_bad_acb(sacb, ret);
-
- /* We try to read next child in FIFO order if we fail to read */
- if (acb->children_read < s->num_children) {
- read_fifo_child(acb);
- return;
- }
- }
-
- acb->vote_ret = ret;
-
- /* FIXME: rewrite failed children if acb->children_read > 1? */
- quorum_aio_finalize(acb);
-}
-
-static void quorum_aio_cb(void *opaque, int ret)
-{
- QuorumChildRequest *sacb = opaque;
- QuorumAIOCB *acb = sacb->parent;
- BDRVQuorumState *s = acb->common.bs->opaque;
- bool rewrite = false;
- int i;
-
- sacb->ret = ret;
- if (ret == 0) {
- acb->success_count++;
- } else {
- quorum_report_bad_acb(sacb, ret);
- }
- acb->count++;
- assert(acb->count <= s->num_children);
- assert(acb->success_count <= s->num_children);
- if (acb->count < s->num_children) {
- return;
- }
-
- /* Do the vote on read */
- if (acb->is_read) {
- rewrite = quorum_vote(acb);
- for (i = 0; i < s->num_children; i++) {
- qemu_vfree(acb->qcrs[i].buf);
- qemu_iovec_destroy(&acb->qcrs[i].qiov);
- }
- } else {
- quorum_has_too_much_io_failed(acb);
- }
-
- /* if no rewrite is done the code will finish right away */
- if (!rewrite) {
- quorum_aio_finalize(acb);
- }
+ quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
}
static void quorum_report_bad_versions(BDRVQuorumState *s,
@@ -350,14 +259,31 @@ static void quorum_report_bad_versions(BDRVQuorumState *s,
continue;
}
QLIST_FOREACH(item, &version->items, next) {
- quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num,
- acb->nb_sectors,
+ quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
s->children[item->index]->bs->node_name, 0);
}
}
}
-static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
+static void quorum_rewrite_entry(void *opaque)
+{
+ QuorumCo *co = opaque;
+ QuorumAIOCB *acb = co->acb;
+ BDRVQuorumState *s = acb->bs->opaque;
+
+ /* Ignore any errors, it's just a correction attempt for already
+ * corrupted data. */
+ bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
+ acb->qiov, 0);
+
+ /* Wake up the caller after the last rewrite */
+ acb->rewrite_count--;
+ if (!acb->rewrite_count) {
+ qemu_coroutine_enter_if_inactive(acb->co);
+ }
+}
+
+static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb,
QuorumVoteValue *value)
{
QuorumVoteVersion *version;
@@ -376,7 +302,7 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
}
}
- /* quorum_rewrite_aio_cb will count down this to zero */
+ /* quorum_rewrite_entry will count down this to zero */
acb->rewrite_count = count;
/* now fire the correcting rewrites */
@@ -385,9 +311,14 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
continue;
}
QLIST_FOREACH(item, &version->items, next) {
- bdrv_aio_writev(s->children[item->index], acb->sector_num,
- acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb,
- acb);
+ Coroutine *co;
+ QuorumCo data = {
+ .acb = acb,
+ .idx = item->index,
+ };
+
+ co = qemu_coroutine_create(quorum_rewrite_entry, &data);
+ qemu_coroutine_enter(co);
}
}
@@ -507,8 +438,8 @@ static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
va_list ap;
va_start(ap, fmt);
- fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ",
- acb->sector_num, acb->nb_sectors);
+ fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ",
+ acb->offset, acb->bytes);
vfprintf(stderr, fmt, ap);
fprintf(stderr, "\n");
va_end(ap);
@@ -519,16 +450,15 @@ static bool quorum_compare(QuorumAIOCB *acb,
QEMUIOVector *a,
QEMUIOVector *b)
{
- BDRVQuorumState *s = acb->common.bs->opaque;
+ BDRVQuorumState *s = acb->bs->opaque;
ssize_t offset;
/* This driver will replace blkverify in this particular case */
if (s->is_blkverify) {
offset = qemu_iovec_compare(a, b);
if (offset != -1) {
- quorum_err(acb, "contents mismatch in sector %" PRId64,
- acb->sector_num +
- (uint64_t)(offset / BDRV_SECTOR_SIZE));
+ quorum_err(acb, "contents mismatch at offset %" PRIu64,
+ acb->offset + offset);
}
return true;
}
@@ -539,7 +469,7 @@ static bool quorum_compare(QuorumAIOCB *acb,
/* Do a vote to get the error code */
static int quorum_vote_error(QuorumAIOCB *acb)
{
- BDRVQuorumState *s = acb->common.bs->opaque;
+ BDRVQuorumState *s = acb->bs->opaque;
QuorumVoteVersion *winner = NULL;
QuorumVotes error_votes;
QuorumVoteValue result_value;
@@ -568,17 +498,16 @@ static int quorum_vote_error(QuorumAIOCB *acb)
return ret;
}
-static bool quorum_vote(QuorumAIOCB *acb)
+static void quorum_vote(QuorumAIOCB *acb)
{
bool quorum = true;
- bool rewrite = false;
int i, j, ret;
QuorumVoteValue hash;
- BDRVQuorumState *s = acb->common.bs->opaque;
+ BDRVQuorumState *s = acb->bs->opaque;
QuorumVoteVersion *winner;
if (quorum_has_too_much_io_failed(acb)) {
- return false;
+ return;
}
/* get the index of the first successful read */
@@ -606,7 +535,7 @@ static bool quorum_vote(QuorumAIOCB *acb)
/* Every successful read agrees */
if (quorum) {
quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
- return false;
+ return;
}
/* compute hashes for each successful read, also store indexes */
@@ -641,19 +570,46 @@ static bool quorum_vote(QuorumAIOCB *acb)
/* corruption correction is enabled */
if (s->rewrite_corrupted) {
- rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value);
+ quorum_rewrite_bad_versions(acb, &winner->value);
}
free_exit:
/* free lists */
quorum_free_vote_list(&acb->votes);
- return rewrite;
}
-static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
+static void read_quorum_children_entry(void *opaque)
{
- BDRVQuorumState *s = acb->common.bs->opaque;
- int i;
+ QuorumCo *co = opaque;
+ QuorumAIOCB *acb = co->acb;
+ BDRVQuorumState *s = acb->bs->opaque;
+ int i = co->idx;
+ QuorumChildRequest *sacb = &acb->qcrs[i];
+
+ sacb->bs = s->children[i]->bs;
+ sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
+ &acb->qcrs[i].qiov, 0);
+
+ if (sacb->ret == 0) {
+ acb->success_count++;
+ } else {
+ quorum_report_bad_acb(sacb, sacb->ret);
+ }
+
+ acb->count++;
+ assert(acb->count <= s->num_children);
+ assert(acb->success_count <= s->num_children);
+
+ /* Wake up the caller after the last read */
+ if (acb->count == s->num_children) {
+ qemu_coroutine_enter_if_inactive(acb->co);
+ }
+}
+
+static int read_quorum_children(QuorumAIOCB *acb)
+{
+ BDRVQuorumState *s = acb->bs->opaque;
+ int i, ret;
acb->children_read = s->num_children;
for (i = 0; i < s->num_children; i++) {
@@ -663,65 +619,131 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
}
for (i = 0; i < s->num_children; i++) {
- acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
- &acb->qcrs[i].qiov, acb->nb_sectors,
- quorum_aio_cb, &acb->qcrs[i]);
+ Coroutine *co;
+ QuorumCo data = {
+ .acb = acb,
+ .idx = i,
+ };
+
+ co = qemu_coroutine_create(read_quorum_children_entry, &data);
+ qemu_coroutine_enter(co);
}
- return &acb->common;
+ while (acb->count < s->num_children) {
+ qemu_coroutine_yield();
+ }
+
+ /* Do the vote on read */
+ quorum_vote(acb);
+ for (i = 0; i < s->num_children; i++) {
+ qemu_vfree(acb->qcrs[i].buf);
+ qemu_iovec_destroy(&acb->qcrs[i].qiov);
+ }
+
+ while (acb->rewrite_count) {
+ qemu_coroutine_yield();
+ }
+
+ ret = acb->vote_ret;
+
+ return ret;
}
-static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
+static int read_fifo_child(QuorumAIOCB *acb)
{
- BDRVQuorumState *s = acb->common.bs->opaque;
- int n = acb->children_read++;
+ BDRVQuorumState *s = acb->bs->opaque;
+ int n, ret;
+
+ /* We try to read the next child in FIFO order if we failed to read */
+ do {
+ n = acb->children_read++;
+ acb->qcrs[n].bs = s->children[n]->bs;
+ ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
+ acb->qiov, 0);
+ if (ret < 0) {
+ quorum_report_bad_acb(&acb->qcrs[n], ret);
+ }
+ } while (ret < 0 && acb->children_read < s->num_children);
- acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
- acb->qiov, acb->nb_sectors,
- quorum_fifo_aio_cb, &acb->qcrs[n]);
+ /* FIXME: rewrite failed children if acb->children_read > 1? */
- return &acb->common;
+ return ret;
}
-static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
- int64_t sector_num,
- QEMUIOVector *qiov,
- int nb_sectors,
- BlockCompletionFunc *cb,
- void *opaque)
+static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
- nb_sectors, cb, opaque);
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ int ret;
+
acb->is_read = true;
acb->children_read = 0;
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
- return read_quorum_children(acb);
+ ret = read_quorum_children(acb);
+ } else {
+ ret = read_fifo_child(acb);
+ }
+ quorum_aio_finalize(acb);
+
+ return ret;
+}
+
+static void write_quorum_entry(void *opaque)
+{
+ QuorumCo *co = opaque;
+ QuorumAIOCB *acb = co->acb;
+ BDRVQuorumState *s = acb->bs->opaque;
+ int i = co->idx;
+ QuorumChildRequest *sacb = &acb->qcrs[i];
+
+ sacb->bs = s->children[i]->bs;
+ sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
+ acb->qiov, 0);
+ if (sacb->ret == 0) {
+ acb->success_count++;
+ } else {
+ quorum_report_bad_acb(sacb, sacb->ret);
}
+ acb->count++;
+ assert(acb->count <= s->num_children);
+ assert(acb->success_count <= s->num_children);
- return read_fifo_child(acb);
+ /* Wake up the caller after the last write */
+ if (acb->count == s->num_children) {
+ qemu_coroutine_enter_if_inactive(acb->co);
+ }
}
-static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
- int64_t sector_num,
- QEMUIOVector *qiov,
- int nb_sectors,
- BlockCompletionFunc *cb,
- void *opaque)
+static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors,
- cb, opaque);
- int i;
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ int i, ret;
for (i = 0; i < s->num_children; i++) {
- acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
- qiov, nb_sectors, &quorum_aio_cb,
- &acb->qcrs[i]);
+ Coroutine *co;
+ QuorumCo data = {
+ .acb = acb,
+ .idx = i,
+ };
+
+ co = qemu_coroutine_create(write_quorum_entry, &data);
+ qemu_coroutine_enter(co);
+ }
+
+ while (acb->count < s->num_children) {
+ qemu_coroutine_yield();
}
- return &acb->common;
+ quorum_has_too_much_io_failed(acb);
+
+ ret = acb->vote_ret;
+ quorum_aio_finalize(acb);
+
+ return ret;
}
static int64_t quorum_getlength(BlockDriverState *bs)
@@ -765,7 +787,7 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
result = bdrv_co_flush(s->children[i]->bs);
if (result) {
quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0,
- bdrv_nb_sectors(s->children[i]->bs),
+ bdrv_getlength(s->children[i]->bs),
s->children[i]->bs->node_name, result);
result_value.l = result;
quorum_count_vote(&error_votes, &result_value, i);
@@ -1098,8 +1120,8 @@ static BlockDriver bdrv_quorum = {
.bdrv_getlength = quorum_getlength,
- .bdrv_aio_readv = quorum_aio_readv,
- .bdrv_aio_writev = quorum_aio_writev,
+ .bdrv_co_preadv = quorum_co_preadv,
+ .bdrv_co_pwritev = quorum_co_pwritev,
.bdrv_add_child = quorum_add_child,
.bdrv_del_child = quorum_del_child,