aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/backup.c3
-rw-r--r--block/commit.c5
-rw-r--r--block/mirror.c5
-rw-r--r--block/stream.c5
-rw-r--r--block/trace-events6
-rw-r--r--blockjob.c54
-rw-r--r--include/block/blockjob.h9
-rw-r--r--tests/test-blockjob-txn.c12
8 files changed, 67 insertions, 32 deletions
diff --git a/block/backup.c b/block/backup.c
index 4ed449448a..ae1b99aa84 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -654,9 +654,8 @@ void backup_start(const char *job_id, BlockDriverState *bs,
block_job_add_bdrv(&job->common, target);
job->common.len = len;
- job->common.co = qemu_coroutine_create(job->common.driver->start, job);
block_job_txn_add_job(txn, &job->common);
- qemu_coroutine_enter(job->common.co);
+ block_job_start(&job->common);
return;
error:
diff --git a/block/commit.c b/block/commit.c
index 20d27e27a8..c284e8535d 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
s->backing_file_str = g_strdup(backing_file_str);
s->on_error = on_error;
- s->common.co = qemu_coroutine_create(s->common.driver->start, s);
- trace_commit_start(bs, base, top, s, s->common.co);
- qemu_coroutine_enter(s->common.co);
+ trace_commit_start(bs, base, top, s);
+ block_job_start(&s->common);
}
diff --git a/block/mirror.c b/block/mirror.c
index 659e09cbaf..62ac87f0c4 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -1009,9 +1009,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
}
}
- s->common.co = qemu_coroutine_create(s->common.driver->start, s);
- trace_mirror_start(bs, s, s->common.co, opaque);
- qemu_coroutine_enter(s->common.co);
+ trace_mirror_start(bs, s, opaque);
+ block_job_start(&s->common);
}
void mirror_start(const char *job_id, BlockDriverState *bs,
diff --git a/block/stream.c b/block/stream.c
index 92309ffc2e..1523ba7dfb 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -255,7 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs,
s->bs_flags = orig_bs_flags;
s->on_error = on_error;
- s->common.co = qemu_coroutine_create(s->common.driver->start, s);
- trace_stream_start(bs, base, s, s->common.co);
- qemu_coroutine_enter(s->common.co);
+ trace_stream_start(bs, base, s);
+ block_job_start(&s->common);
}
diff --git a/block/trace-events b/block/trace-events
index 882c9034c2..cfc05f2478 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -19,14 +19,14 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned int bytes, int64_t c
# block/stream.c
stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co %p"
+stream_start(void *bs, void *base, void *s) "bs %p base %p s %p"
# block/commit.c
commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
-commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base %p top %p s %p co %p"
+commit_start(void *bs, void *base, void *top, void *s) "bs %p base %p top %p s %p"
# block/mirror.c
-mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p"
+mirror_start(void *bs, void *s, void *opaque) "bs %p s %p opaque %p"
mirror_restart_iter(void *s, int64_t cnt) "s %p dirty count %"PRId64
mirror_before_flush(void *s) "s %p"
mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64
diff --git a/blockjob.c b/blockjob.c
index e3c458c021..513620c199 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -174,7 +174,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->blk = blk;
job->cb = cb;
job->opaque = opaque;
- job->busy = true;
+ job->busy = false;
+ job->paused = true;
+ job->pause_count = 1;
job->refcnt = 1;
bs->job = job;
@@ -202,6 +204,23 @@ bool block_job_is_internal(BlockJob *job)
return (job->id == NULL);
}
+static bool block_job_started(BlockJob *job)
+{
+ return job->co;
+}
+
+void block_job_start(BlockJob *job)
+{
+ assert(job && !block_job_started(job) && job->paused &&
+ !job->busy && job->driver->start);
+ job->co = qemu_coroutine_create(job->driver->start, job);
+ if (--job->pause_count == 0) {
+ job->paused = false;
+ job->busy = true;
+ qemu_coroutine_enter(job->co);
+ }
+}
+
void block_job_ref(BlockJob *job)
{
++job->refcnt;
@@ -248,14 +267,18 @@ static void block_job_completed_single(BlockJob *job)
if (job->cb) {
job->cb(job->opaque, job->ret);
}
- if (block_job_is_cancelled(job)) {
- block_job_event_cancelled(job);
- } else {
- const char *msg = NULL;
- if (job->ret < 0) {
- msg = strerror(-job->ret);
+
+ /* Emit events only if we actually started */
+ if (block_job_started(job)) {
+ if (block_job_is_cancelled(job)) {
+ block_job_event_cancelled(job);
+ } else {
+ const char *msg = NULL;
+ if (job->ret < 0) {
+ msg = strerror(-job->ret);
+ }
+ block_job_event_completed(job, msg);
}
- block_job_event_completed(job, msg);
}
if (job->txn) {
@@ -363,7 +386,8 @@ void block_job_complete(BlockJob *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */
assert(job->id);
- if (job->pause_count || job->cancelled || !job->driver->complete) {
+ if (job->pause_count || job->cancelled ||
+ !block_job_started(job) || !job->driver->complete) {
error_setg(errp, "The active block job '%s' cannot be completed",
job->id);
return;
@@ -395,6 +419,8 @@ bool block_job_user_paused(BlockJob *job)
void coroutine_fn block_job_pause_point(BlockJob *job)
{
+ assert(job && block_job_started(job));
+
if (!block_job_should_pause(job)) {
return;
}
@@ -446,9 +472,13 @@ void block_job_enter(BlockJob *job)
void block_job_cancel(BlockJob *job)
{
- job->cancelled = true;
- block_job_iostatus_reset(job);
- block_job_enter(job);
+ if (block_job_started(job)) {
+ job->cancelled = true;
+ block_job_iostatus_reset(job);
+ block_job_enter(job);
+ } else {
+ block_job_completed(job, -ECANCELED);
+ }
}
bool block_job_is_cancelled(BlockJob *job)
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 356cacf004..1acb256223 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -189,6 +189,15 @@ void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs);
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
/**
+ * block_job_start:
+ * @job: A job that has not yet been started.
+ *
+ * Begins execution of a block job.
+ * Takes ownership of one reference to the job object.
+ */
+void block_job_start(BlockJob *job);
+
+/**
* block_job_cancel:
* @job: The job to be canceled.
*
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index f9afc3be41..b132e39097 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -24,10 +24,6 @@ typedef struct {
int *result;
} TestBlockJob;
-static const BlockJobDriver test_block_job_driver = {
- .instance_size = sizeof(TestBlockJob),
-};
-
static void test_block_job_complete(BlockJob *job, void *opaque)
{
BlockDriverState *bs = blk_bs(job->blk);
@@ -77,6 +73,11 @@ static void test_block_job_cb(void *opaque, int ret)
g_free(data);
}
+static const BlockJobDriver test_block_job_driver = {
+ .instance_size = sizeof(TestBlockJob),
+ .start = test_block_job_run,
+};
+
/* Create a block job that completes with a given return code after a given
* number of event loop iterations. The return code is stored in the given
* result pointer.
@@ -104,10 +105,9 @@ static BlockJob *test_block_job_start(unsigned int iterations,
s->use_timer = use_timer;
s->rc = rc;
s->result = result;
- s->common.co = qemu_coroutine_create(test_block_job_run, s);
data->job = s;
data->result = result;
- qemu_coroutine_enter(s->common.co);
+ block_job_start(&s->common);
return &s->common;
}