aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFam Zheng <famz@redhat.com>2015-11-05 18:13:15 -0500
committerKevin Wolf <kwolf@redhat.com>2015-11-12 16:22:44 +0100
commitc55a832fdddec2c350b585ade0476501f616608d (patch)
tree093da970af4e287f4c69791af1bf10b829a24c08
parent94db6d2d30962cc0422a69c88c3b3e9981b33e50 (diff)
block: Add block job transactions
Sometimes block jobs must execute as a transaction group. Finishing jobs wait until all other jobs are ready to complete successfully. Failure or cancellation of one job cancels the other jobs in the group. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Max Reitz <mreitz@redhat.com> Signed-off-by: Fam Zheng <famz@redhat.com> Signed-off-by: John Snow <jsnow@redhat.com> Message-id: 1446765200-3054-10-git-send-email-jsnow@redhat.com [Rewrite the implementation which is now contained in block_job_completed. --Fam] Signed-off-by: Fam Zheng <famz@redhat.com> Reviewed-by: Max Reitz <mreitz@redhat.com> Signed-off-by: John Snow <jsnow@redhat.com> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
-rw-r--r--blockjob.c135
-rw-r--r--include/block/block.h1
-rw-r--r--include/block/blockjob.h38
3 files changed, 172 insertions, 2 deletions
diff --git a/blockjob.c b/blockjob.c
index 81b268e20d..80adb9d52a 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -37,6 +37,19 @@
#include "qemu/timer.h"
#include "qapi-event.h"
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+
+ /* Is this txn being cancelled? */
+ bool aborting;
+
+ /* List of jobs */
+ QLIST_HEAD(, BlockJob) jobs;
+
+ /* Reference count */
+ int refcnt;
+};
+
void *block_job_create(const BlockJobDriver *driver, BlockDriverState *bs,
int64_t speed, BlockCompletionFunc *cb,
void *opaque, Error **errp)
@@ -94,6 +107,86 @@ void block_job_unref(BlockJob *job)
}
}
+static void block_job_completed_single(BlockJob *job)
+{
+ if (!job->ret) {
+ if (job->driver->commit) {
+ job->driver->commit(job);
+ }
+ } else {
+ if (job->driver->abort) {
+ job->driver->abort(job);
+ }
+ }
+ job->cb(job->opaque, job->ret);
+ if (job->txn) {
+ block_job_txn_unref(job->txn);
+ }
+ block_job_unref(job);
+}
+
+static void block_job_completed_txn_abort(BlockJob *job)
+{
+ AioContext *ctx;
+ BlockJobTxn *txn = job->txn;
+ BlockJob *other_job, *next;
+
+ if (txn->aborting) {
+ /*
+ * We are cancelled by another job, which will handle everything.
+ */
+ return;
+ }
+ txn->aborting = true;
+ /* We are the first failed job. Cancel other jobs. */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ ctx = bdrv_get_aio_context(other_job->bs);
+ aio_context_acquire(ctx);
+ }
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (other_job == job || other_job->completed) {
+ /* Other jobs are "effectively" cancelled by us, set the status for
+ * them; this job, however, may or may not be cancelled, depending
+ * on the caller, so leave it. */
+ if (other_job != job) {
+ other_job->cancelled = true;
+ }
+ continue;
+ }
+ block_job_cancel_sync(other_job);
+ assert(other_job->completed);
+ }
+ QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+ ctx = bdrv_get_aio_context(other_job->bs);
+ block_job_completed_single(other_job);
+ aio_context_release(ctx);
+ }
+}
+
+static void block_job_completed_txn_success(BlockJob *job)
+{
+ AioContext *ctx;
+ BlockJobTxn *txn = job->txn;
+ BlockJob *other_job, *next;
+ /*
+ * Successful completion, see if there are other running jobs in this
+ * txn.
+ */
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ if (!other_job->completed) {
+ return;
+ }
+ }
+ /* We are the last completed job, commit the transaction. */
+ QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
+ ctx = bdrv_get_aio_context(other_job->bs);
+ aio_context_acquire(ctx);
+ assert(other_job->ret == 0);
+ block_job_completed_single(other_job);
+ aio_context_release(ctx);
+ }
+}
+
void block_job_completed(BlockJob *job, int ret)
{
BlockDriverState *bs = job->bs;
@@ -102,8 +195,13 @@ void block_job_completed(BlockJob *job, int ret)
assert(!job->completed);
job->completed = true;
job->ret = ret;
- job->cb(job->opaque, ret);
- block_job_unref(job);
+ if (!job->txn) {
+ block_job_completed_single(job);
+ } else if (ret < 0 || block_job_is_cancelled(job)) {
+ block_job_completed_txn_abort(job);
+ } else {
+ block_job_completed_txn_success(job);
+ }
}
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
@@ -402,3 +500,36 @@ void block_job_defer_to_main_loop(BlockJob *job,
qemu_bh_schedule(data->bh);
}
+
+BlockJobTxn *block_job_txn_new(void)
+{
+ BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
+ QLIST_INIT(&txn->jobs);
+ txn->refcnt = 1;
+ return txn;
+}
+
+static void block_job_txn_ref(BlockJobTxn *txn)
+{
+ txn->refcnt++;
+}
+
+void block_job_txn_unref(BlockJobTxn *txn)
+{
+ if (txn && --txn->refcnt == 0) {
+ g_free(txn);
+ }
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+ if (!txn) {
+ return;
+ }
+
+ assert(!job->txn);
+ job->txn = txn;
+
+ QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+ block_job_txn_ref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index 92f6f6a95a..73edb1a79c 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -14,6 +14,7 @@ typedef struct BlockDriver BlockDriver;
typedef struct BlockJob BlockJob;
typedef struct BdrvChild BdrvChild;
typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
typedef struct BlockDriverInfo {
/* in bytes, 0 if irrelevant */
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index c70d55a644..d84ccd8d2c 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -162,6 +162,9 @@ struct BlockJob {
*/
int ret;
+ /** Non-NULL if this job is part of a transaction */
+ BlockJobTxn *txn;
+ QLIST_ENTRY(BlockJob) txn_list;
};
/**
@@ -405,4 +408,39 @@ void block_job_defer_to_main_loop(BlockJob *job,
BlockJobDeferToMainLoopFn *fn,
void *opaque);
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction. Jobs can be added to the
+ * transaction using block_job_txn_add_job().
+ *
+ * The transaction is automatically freed when the last job completes or is
+ * cancelled.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group. Jobs wait for each other before completing. Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_unref:
+ *
+ * Release a reference that was previously acquired with block_job_txn_add_job
+ * or block_job_txn_new. If it's the last reference to the object, it will be
+ * freed.
+ */
+void block_job_txn_unref(BlockJobTxn *txn);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction (may be NULL)
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction. The @job must not already be in a transaction.
+ * The caller must call either block_job_txn_unref() or block_job_completed()
+ * to release the reference that is automatically grabbed here.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
#endif