aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/backup.c2
-rw-r--r--block/commit.c4
-rw-r--r--block/mirror.c22
-rw-r--r--block/replication.c2
-rw-r--r--block/stream.c4
-rw-r--r--blockdev.c8
-rw-r--r--blockjob.c219
-rw-r--r--include/block/blockjob.h40
-rw-r--r--include/block/blockjob_int.h26
-rw-r--r--include/qemu/job.h76
-rw-r--r--job.c137
-rw-r--r--tests/test-bdrv-drain.c38
-rw-r--r--tests/test-blockjob-txn.c12
-rw-r--r--tests/test-blockjob.c14
14 files changed, 305 insertions, 299 deletions
diff --git a/block/backup.c b/block/backup.c
index 22dd368c90..7d9aad9749 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -528,8 +528,8 @@ static const BlockJobDriver backup_job_driver = {
.instance_size = sizeof(BackupBlockJob),
.job_type = JOB_TYPE_BACKUP,
.free = block_job_free,
+ .start = backup_run,
},
- .start = backup_run,
.commit = backup_commit,
.abort = backup_abort,
.clean = backup_clean,
diff --git a/block/commit.c b/block/commit.c
index d326766e4d..2fbc31077a 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -220,8 +220,8 @@ static const BlockJobDriver commit_job_driver = {
.instance_size = sizeof(CommitBlockJob),
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
+ .start = commit_run,
},
- .start = commit_run,
};
static int coroutine_fn bdrv_commit_top_preadv(BlockDriverState *bs,
@@ -371,7 +371,7 @@ void commit_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error;
trace_commit_start(bs, base, top, s);
- block_job_start(&s->common);
+ job_start(&s->common.job);
return;
fail:
diff --git a/block/mirror.c b/block/mirror.c
index 90d4ac9cb6..95fc8072b0 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -126,7 +126,7 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
g_free(op);
if (s->waiting_for_io) {
- qemu_coroutine_enter(s->common.co);
+ qemu_coroutine_enter(s->common.job.co);
}
}
@@ -345,7 +345,7 @@ static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
mirror_wait_for_io(s);
}
- block_job_pause_point(&s->common);
+ job_pause_point(&s->common.job);
/* Find the number of consective dirty chunks following the first dirty
* one, and wait for in flight requests in them. */
@@ -597,7 +597,7 @@ static void mirror_throttle(MirrorBlockJob *s)
s->last_pause_ns = now;
block_job_sleep_ns(&s->common, 0);
} else {
- block_job_pause_point(&s->common);
+ job_pause_point(&s->common.job);
}
}
@@ -786,7 +786,7 @@ static void coroutine_fn mirror_run(void *opaque)
goto immediate_exit;
}
- block_job_pause_point(&s->common);
+ job_pause_point(&s->common.job);
cnt = bdrv_get_dirty_count(s->dirty_bitmap);
/* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
@@ -957,9 +957,9 @@ static void mirror_complete(BlockJob *job, Error **errp)
block_job_enter(&s->common);
}
-static void mirror_pause(BlockJob *job)
+static void mirror_pause(Job *job)
{
- MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+ MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
mirror_wait_for_all_io(s);
}
@@ -991,10 +991,10 @@ static const BlockJobDriver mirror_job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_MIRROR,
.free = block_job_free,
+ .start = mirror_run,
+ .pause = mirror_pause,
},
- .start = mirror_run,
.complete = mirror_complete,
- .pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
.drain = mirror_drain,
};
@@ -1004,10 +1004,10 @@ static const BlockJobDriver commit_active_job_driver = {
.instance_size = sizeof(MirrorBlockJob),
.job_type = JOB_TYPE_COMMIT,
.free = block_job_free,
+ .start = mirror_run,
+ .pause = mirror_pause,
},
- .start = mirror_run,
.complete = mirror_complete,
- .pause = mirror_pause,
.attached_aio_context = mirror_attached_aio_context,
.drain = mirror_drain,
};
@@ -1244,7 +1244,7 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
}
trace_mirror_start(bs, s, opaque);
- block_job_start(&s->common);
+ job_start(&s->common.job);
return;
fail:
diff --git a/block/replication.c b/block/replication.c
index 48148b884a..9ed6e0fb04 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -576,7 +576,7 @@ static void replication_start(ReplicationState *rs, ReplicationMode mode,
aio_context_release(aio_context);
return;
}
- block_job_start(job);
+ job_start(&job->job);
break;
default:
aio_context_release(aio_context);
diff --git a/block/stream.c b/block/stream.c
index 0bba81678c..6d8b7b6eee 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -213,8 +213,8 @@ static const BlockJobDriver stream_job_driver = {
.instance_size = sizeof(StreamBlockJob),
.job_type = JOB_TYPE_STREAM,
.free = block_job_free,
+ .start = stream_run,
},
- .start = stream_run,
};
void stream_start(const char *job_id, BlockDriverState *bs,
@@ -262,7 +262,7 @@ void stream_start(const char *job_id, BlockDriverState *bs,
s->on_error = on_error;
trace_stream_start(bs, base, s);
- block_job_start(&s->common);
+ job_start(&s->common.job);
return;
fail:
diff --git a/blockdev.c b/blockdev.c
index 3808b1fc00..c551fdf39a 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -1910,7 +1910,7 @@ static void drive_backup_commit(BlkActionState *common)
aio_context_acquire(aio_context);
assert(state->job);
- block_job_start(state->job);
+ job_start(&state->job->job);
aio_context_release(aio_context);
}
@@ -2008,7 +2008,7 @@ static void blockdev_backup_commit(BlkActionState *common)
aio_context_acquire(aio_context);
assert(state->job);
- block_job_start(state->job);
+ job_start(&state->job->job);
aio_context_release(aio_context);
}
@@ -3425,7 +3425,7 @@ void qmp_drive_backup(DriveBackup *arg, Error **errp)
BlockJob *job;
job = do_drive_backup(arg, NULL, errp);
if (job) {
- block_job_start(job);
+ job_start(&job->job);
}
}
@@ -3513,7 +3513,7 @@ void qmp_blockdev_backup(BlockdevBackup *arg, Error **errp)
BlockJob *job;
job = do_blockdev_backup(arg, NULL, errp);
if (job) {
- block_job_start(job);
+ job_start(&job->job);
}
}
diff --git a/blockjob.c b/blockjob.c
index 3ede511da0..313b1ff7ce 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -36,30 +36,9 @@
#include "qemu/coroutine.h"
#include "qemu/timer.h"
-/* Right now, this mutex is only needed to synchronize accesses to job->busy
- * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
- * block_job_enter. */
-static QemuMutex block_job_mutex;
-
-static void block_job_lock(void)
-{
- qemu_mutex_lock(&block_job_mutex);
-}
-
-static void block_job_unlock(void)
-{
- qemu_mutex_unlock(&block_job_mutex);
-}
-
-static void __attribute__((__constructor__)) block_job_init(void)
-{
- qemu_mutex_init(&block_job_mutex);
-}
-
static void block_job_event_cancelled(BlockJob *job);
static void block_job_event_completed(BlockJob *job, const char *msg);
static int block_job_event_pending(BlockJob *job);
-static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
/* Transactional group of block jobs */
struct BlockJobTxn {
@@ -161,33 +140,27 @@ static void block_job_txn_del_job(BlockJob *job)
}
}
-/* Assumes the block_job_mutex is held */
-static bool block_job_timer_pending(BlockJob *job)
-{
- return timer_pending(&job->sleep_timer);
-}
-
-/* Assumes the block_job_mutex is held */
-static bool block_job_timer_not_pending(BlockJob *job)
+/* Assumes the job_mutex is held */
+static bool job_timer_not_pending(Job *job)
{
- return !block_job_timer_pending(job);
+ return !timer_pending(&job->sleep_timer);
}
static void block_job_pause(BlockJob *job)
{
- job->pause_count++;
+ job->job.pause_count++;
}
static void block_job_resume(BlockJob *job)
{
- assert(job->pause_count > 0);
- job->pause_count--;
- if (job->pause_count) {
+ assert(job->job.pause_count > 0);
+ job->job.pause_count--;
+ if (job->job.pause_count) {
return;
}
/* kick only if no timer is pending */
- block_job_enter_cond(job, block_job_timer_not_pending);
+ job_enter_cond(&job->job, job_timer_not_pending);
}
static void block_job_attached_aio_context(AioContext *new_context,
@@ -208,7 +181,7 @@ void block_job_free(Job *job)
block_job_detach_aio_context, bjob);
blk_unref(bjob->blk);
error_free(bjob->blocker);
- assert(!timer_pending(&bjob->sleep_timer));
+ assert(!timer_pending(&bjob->job.sleep_timer));
}
static void block_job_attached_aio_context(AioContext *new_context,
@@ -226,7 +199,7 @@ static void block_job_attached_aio_context(AioContext *new_context,
static void block_job_drain(BlockJob *job)
{
- /* If job is !job->busy this kicks it into the next pause point. */
+ /* If job is !job->job.busy this kicks it into the next pause point. */
block_job_enter(job);
blk_drain(job->blk);
@@ -244,7 +217,7 @@ static void block_job_detach_aio_context(void *opaque)
block_job_pause(job);
- while (!job->paused && !job->completed) {
+ while (!job->job.paused && !job->completed) {
block_job_drain(job);
}
@@ -312,29 +285,11 @@ bool block_job_is_internal(BlockJob *job)
return (job->job.id == NULL);
}
-static bool block_job_started(BlockJob *job)
-{
- return job->co;
-}
-
const BlockJobDriver *block_job_driver(BlockJob *job)
{
return job->driver;
}
-/**
- * All jobs must allow a pause point before entering their job proper. This
- * ensures that jobs can be paused prior to being started, then resumed later.
- */
-static void coroutine_fn block_job_co_entry(void *opaque)
-{
- BlockJob *job = opaque;
-
- assert(job && job->driver && job->driver->start);
- block_job_pause_point(job);
- job->driver->start(job);
-}
-
static void block_job_sleep_timer_cb(void *opaque)
{
BlockJob *job = opaque;
@@ -342,24 +297,12 @@ static void block_job_sleep_timer_cb(void *opaque)
block_job_enter(job);
}
-void block_job_start(BlockJob *job)
-{
- assert(job && !block_job_started(job) && job->paused &&
- job->driver && job->driver->start);
- job->co = qemu_coroutine_create(block_job_co_entry, job);
- job->pause_count--;
- job->busy = true;
- job->paused = false;
- job_state_transition(&job->job, JOB_STATUS_RUNNING);
- bdrv_coroutine_enter(blk_bs(job->blk), job->co);
-}
-
static void block_job_decommission(BlockJob *job)
{
assert(job);
job->completed = true;
- job->busy = false;
- job->paused = false;
+ job->job.busy = false;
+ job->job.paused = false;
job->job.deferred_to_main_loop = true;
block_job_txn_del_job(job);
job_state_transition(&job->job, JOB_STATUS_NULL);
@@ -374,7 +317,7 @@ static void block_job_do_dismiss(BlockJob *job)
static void block_job_conclude(BlockJob *job)
{
job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
- if (job->auto_dismiss || !block_job_started(job)) {
+ if (job->auto_dismiss || !job_started(&job->job)) {
block_job_do_dismiss(job);
}
}
@@ -439,7 +382,7 @@ static int block_job_finalize_single(BlockJob *job)
}
/* Emit events only if we actually started */
- if (block_job_started(job)) {
+ if (job_started(&job->job)) {
if (job_is_cancelled(&job->job)) {
block_job_event_cancelled(job);
} else {
@@ -464,7 +407,7 @@ static void block_job_cancel_async(BlockJob *job, bool force)
if (job->user_paused) {
/* Do not call block_job_enter here, the caller will handle it. */
job->user_paused = false;
- job->pause_count--;
+ job->job.pause_count--;
}
job->job.cancelled = true;
/* To prevent 'force == false' overriding a previous 'force == true' */
@@ -615,6 +558,12 @@ static void block_job_completed_txn_success(BlockJob *job)
}
}
+/* Assumes the job_mutex is held */
+static bool job_timer_pending(Job *job)
+{
+ return timer_pending(&job->sleep_timer);
+}
+
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
{
int64_t old_speed = job->speed;
@@ -635,7 +584,7 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
}
/* kick only if a timer is pending */
- block_job_enter_cond(job, block_job_timer_pending);
+ job_enter_cond(&job->job, job_timer_pending);
}
int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
@@ -654,7 +603,7 @@ void block_job_complete(BlockJob *job, Error **errp)
if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
return;
}
- if (job->pause_count || job_is_cancelled(&job->job) ||
+ if (job->job.pause_count || job_is_cancelled(&job->job) ||
!job->driver->complete)
{
error_setg(errp, "The active block job '%s' cannot be completed",
@@ -708,7 +657,7 @@ bool block_job_user_paused(BlockJob *job)
void block_job_user_resume(BlockJob *job, Error **errp)
{
assert(job);
- if (!job->user_paused || job->pause_count <= 0) {
+ if (!job->user_paused || job->job.pause_count <= 0) {
error_setg(errp, "Can't resume a job that was not paused");
return;
}
@@ -727,7 +676,7 @@ void block_job_cancel(BlockJob *job, bool force)
return;
}
block_job_cancel_async(job, force);
- if (!block_job_started(job)) {
+ if (!job_started(&job->job)) {
block_job_completed(job, -ECANCELED);
} else if (job->job.deferred_to_main_loop) {
block_job_completed_txn_abort(job);
@@ -797,8 +746,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
info->type = g_strdup(job_type_str(&job->job));
info->device = g_strdup(job->job.id);
info->len = job->len;
- info->busy = atomic_read(&job->busy);
- info->paused = job->pause_count > 0;
+ info->busy = atomic_read(&job->job.busy);
+ info->paused = job->job.pause_count > 0;
info->offset = job->offset;
info->speed = job->speed;
info->io_status = job->iostatus;
@@ -915,12 +864,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->blk = blk;
job->cb = cb;
job->opaque = opaque;
- job->busy = false;
- job->paused = true;
- job->pause_count = 1;
job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS);
- aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
+ aio_timer_init(qemu_get_aio_context(), &job->job.sleep_timer,
QEMU_CLOCK_REALTIME, SCALE_NS,
block_job_sleep_timer_cb, job);
@@ -980,128 +926,41 @@ void block_job_completed(BlockJob *job, int ret)
}
}
-static bool block_job_should_pause(BlockJob *job)
-{
- return job->pause_count > 0;
-}
-
-/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
- * Reentering the job coroutine with block_job_enter() before the timer has
- * expired is allowed and cancels the timer.
- *
- * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
- * called explicitly. */
-static void block_job_do_yield(BlockJob *job, uint64_t ns)
-{
- block_job_lock();
- if (ns != -1) {
- timer_mod(&job->sleep_timer, ns);
- }
- job->busy = false;
- block_job_unlock();
- qemu_coroutine_yield();
-
- /* Set by block_job_enter before re-entering the coroutine. */
- assert(job->busy);
-}
-
-void coroutine_fn block_job_pause_point(BlockJob *job)
-{
- assert(job && block_job_started(job));
-
- if (!block_job_should_pause(job)) {
- return;
- }
- if (job_is_cancelled(&job->job)) {
- return;
- }
-
- if (job->driver->pause) {
- job->driver->pause(job);
- }
-
- if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) {
- JobStatus status = job->job.status;
- job_state_transition(&job->job, status == JOB_STATUS_READY
- ? JOB_STATUS_STANDBY
- : JOB_STATUS_PAUSED);
- job->paused = true;
- block_job_do_yield(job, -1);
- job->paused = false;
- job_state_transition(&job->job, status);
- }
-
- if (job->driver->resume) {
- job->driver->resume(job);
- }
-}
-
-/*
- * Conditionally enter a block_job pending a call to fn() while
- * under the block_job_lock critical section.
- */
-static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
-{
- if (!block_job_started(job)) {
- return;
- }
- if (job->job.deferred_to_main_loop) {
- return;
- }
-
- block_job_lock();
- if (job->busy) {
- block_job_unlock();
- return;
- }
-
- if (fn && !fn(job)) {
- block_job_unlock();
- return;
- }
-
- assert(!job->job.deferred_to_main_loop);
- timer_del(&job->sleep_timer);
- job->busy = true;
- block_job_unlock();
- aio_co_wake(job->co);
-}
-
void block_job_enter(BlockJob *job)
{
- block_job_enter_cond(job, NULL);
+ job_enter_cond(&job->job, NULL);
}
void block_job_sleep_ns(BlockJob *job, int64_t ns)
{
- assert(job->busy);
+ assert(job->job.busy);
/* Check cancellation *before* setting busy = false, too! */
if (job_is_cancelled(&job->job)) {
return;
}
- if (!block_job_should_pause(job)) {
- block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
+ if (!job_should_pause(&job->job)) {
+ job_do_yield(&job->job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
}
- block_job_pause_point(job);
+ job_pause_point(&job->job);
}
void block_job_yield(BlockJob *job)
{
- assert(job->busy);
+ assert(job->job.busy);
/* Check cancellation *before* setting busy = false, too! */
if (job_is_cancelled(&job->job)) {
return;
}
- if (!block_job_should_pause(job)) {
- block_job_do_yield(job, -1);
+ if (!job_should_pause(&job->job)) {
+ job_do_yield(&job->job, -1);
}
- block_job_pause_point(job);
+ job_pause_point(&job->job);
}
void block_job_iostatus_reset(BlockJob *job)
@@ -1109,7 +968,7 @@ void block_job_iostatus_reset(BlockJob *job)
if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
return;
}
- assert(job->user_paused && job->pause_count > 0);
+ assert(job->user_paused && job->job.pause_count > 0);
job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
}
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 2a9e865e31..b60d919fbf 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -51,43 +51,18 @@ typedef struct BlockJob {
BlockBackend *blk;
/**
- * The coroutine that executes the job. If not NULL, it is
- * reentered when busy is false and the job is cancelled.
- */
- Coroutine *co;
-
- /**
* Set to true if the job should abort immediately without waiting
* for data to be in sync.
*/
bool force;
/**
- * Counter for pause request. If non-zero, the block job is either paused,
- * or if busy == true will pause itself as soon as possible.
- */
- int pause_count;
-
- /**
* Set to true if the job is paused by user. Can be unpaused with the
* block-job-resume QMP command.
*/
bool user_paused;
/**
- * Set to false by the job while the coroutine has yielded and may be
- * re-entered by block_job_enter(). There may still be I/O or event loop
- * activity pending. Accessed under block_job_mutex (in blockjob.c).
- */
- bool busy;
-
- /**
- * Set to true by the job while it is in a quiescent state, where
- * no I/O or event loop activity is pending.
- */
- bool paused;
-
- /**
* Set to true when the job is ready to be completed.
*/
bool ready;
@@ -125,12 +100,6 @@ typedef struct BlockJob {
/** ret code passed to block_job_completed. */
int ret;
- /**
- * Timer that is used by @block_job_sleep_ns. Accessed under
- * block_job_mutex (in blockjob.c).
- */
- QEMUTimer sleep_timer;
-
/** True if this job should automatically finalize itself */
bool auto_finalize;
@@ -208,15 +177,6 @@ void block_job_remove_all_bdrv(BlockJob *job);
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
/**
- * block_job_start:
- * @job: A job that has not yet been started.
- *
- * Begins execution of a block job.
- * Takes ownership of one reference to the job object.
- */
-void block_job_start(BlockJob *job);
-
-/**
* block_job_cancel:
* @job: The job to be canceled.
* @force: Quit a job without waiting for data to be in sync.
diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
index 0c2f8de381..0a614a89b8 100644
--- a/include/block/blockjob_int.h
+++ b/include/block/blockjob_int.h
@@ -38,9 +38,6 @@ struct BlockJobDriver {
/** Generic JobDriver callbacks and settings */
JobDriver job_driver;
- /** Mandatory: Entrypoint for the Coroutine. */
- CoroutineEntry *start;
-
/**
* Optional callback for job types whose completion must be triggered
* manually.
@@ -85,20 +82,6 @@ struct BlockJobDriver {
*/
void (*clean)(BlockJob *job);
- /**
- * If the callback is not NULL, it will be invoked when the job transitions
- * into the paused state. Paused jobs must not perform any asynchronous
- * I/O or event loop activity. This callback is used to quiesce jobs.
- */
- void coroutine_fn (*pause)(BlockJob *job);
-
- /**
- * If the callback is not NULL, it will be invoked when the job transitions
- * out of the paused state. Any asynchronous I/O or event loop activity
- * should be restarted from this callback.
- */
- void coroutine_fn (*resume)(BlockJob *job);
-
/*
* If the callback is not NULL, it will be invoked before the job is
* resumed in a new AioContext. This is the place to move any resources
@@ -196,15 +179,6 @@ void block_job_early_fail(BlockJob *job);
void block_job_completed(BlockJob *job, int ret);
/**
- * block_job_pause_point:
- * @job: The job that is ready to pause.
- *
- * Pause now if block_job_pause() has been called. Block jobs that perform
- * lots of I/O must call this between requests so that the job can be paused.
- */
-void coroutine_fn block_job_pause_point(BlockJob *job);
-
-/**
* block_job_enter:
* @job: The job to enter.
*
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 933e0ab328..9dcff12283 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -28,6 +28,7 @@
#include "qapi/qapi-types-block-core.h"
#include "qemu/queue.h"
+#include "qemu/coroutine.h"
typedef struct JobDriver JobDriver;
@@ -51,6 +52,37 @@ typedef struct Job {
AioContext *aio_context;
/**
+ * The coroutine that executes the job. If not NULL, it is reentered when
+ * busy is false and the job is cancelled.
+ */
+ Coroutine *co;
+
+ /**
+ * Timer that is used by @block_job_sleep_ns. Accessed under job_mutex (in
+ * job.c).
+ */
+ QEMUTimer sleep_timer;
+
+ /**
+ * Counter for pause request. If non-zero, the block job is either paused,
+ * or if busy == true will pause itself as soon as possible.
+ */
+ int pause_count;
+
+ /**
+ * Set to false by the job while the coroutine has yielded and may be
+ * re-entered by block_job_enter(). There may still be I/O or event loop
+ * activity pending. Accessed under block_job_mutex (in blockjob.c).
+ */
+ bool busy;
+
+ /**
+ * Set to true by the job while it is in a quiescent state, where
+ * no I/O or event loop activity is pending.
+ */
+ bool paused;
+
+ /**
* Set to true if the job should cancel itself. The flag must
* always be tested just before toggling the busy flag from false
* to true. After a job has been cancelled, it should only yield
@@ -75,6 +107,23 @@ struct JobDriver {
/** Enum describing the operation */
JobType job_type;
+ /** Mandatory: Entrypoint for the Coroutine. */
+ CoroutineEntry *start;
+
+ /**
+ * If the callback is not NULL, it will be invoked when the job transitions
+ * into the paused state. Paused jobs must not perform any asynchronous
+ * I/O or event loop activity. This callback is used to quiesce jobs.
+ */
+ void coroutine_fn (*pause)(Job *job);
+
+ /**
+ * If the callback is not NULL, it will be invoked when the job transitions
+ * out of the paused state. Any asynchronous I/O or event loop activity
+ * should be restarted from this callback.
+ */
+ void coroutine_fn (*resume)(Job *job);
+
/** Called when the job is freed */
void (*free)(Job *job);
};
@@ -103,6 +152,30 @@ void job_ref(Job *job);
*/
void job_unref(Job *job);
+/**
+ * Conditionally enter the job coroutine if the job is ready to run, not
+ * already busy and fn() returns true. fn() is called while under the job_lock
+ * critical section.
+ */
+void job_enter_cond(Job *job, bool(*fn)(Job *job));
+
+/**
+ * @job: A job that has not yet been started.
+ *
+ * Begins execution of a job.
+ * Takes ownership of one reference to the job object.
+ */
+void job_start(Job *job);
+
+/**
+ * @job: The job that is ready to pause.
+ *
+ * Pause now if job_pause() has been called. Jobs that perform lots of I/O
+ * must call this between requests so that the job can be paused.
+ */
+void coroutine_fn job_pause_point(Job *job);
+
+
/** Returns the JobType of a given Job. */
JobType job_type(const Job *job);
@@ -153,5 +226,8 @@ void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque);
/* TODO To be removed from the public interface */
void job_state_transition(Job *job, JobStatus s1);
+void coroutine_fn job_do_yield(Job *job, uint64_t ns);
+bool job_should_pause(Job *job);
+bool job_started(Job *job);
#endif
diff --git a/job.c b/job.c
index c5a37fb8ef..78497fd6f5 100644
--- a/job.c
+++ b/job.c
@@ -60,6 +60,26 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
};
+/* Right now, this mutex is only needed to synchronize accesses to job->busy
+ * and job->sleep_timer, such as concurrent calls to job_do_yield and
+ * job_enter. */
+static QemuMutex job_mutex;
+
+static void job_lock(void)
+{
+ qemu_mutex_lock(&job_mutex);
+}
+
+static void job_unlock(void)
+{
+ qemu_mutex_unlock(&job_mutex);
+}
+
+static void __attribute__((__constructor__)) job_init(void)
+{
+ qemu_mutex_init(&job_mutex);
+}
+
/* TODO Make static once the whole state machine is in job.c */
void job_state_transition(Job *job, JobStatus s1)
{
@@ -101,6 +121,16 @@ bool job_is_cancelled(Job *job)
return job->cancelled;
}
+bool job_started(Job *job)
+{
+ return job->co;
+}
+
+bool job_should_pause(Job *job)
+{
+ return job->pause_count > 0;
+}
+
Job *job_next(Job *job)
{
if (!job) {
@@ -143,6 +173,9 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
job->id = g_strdup(job_id);
job->refcnt = 1;
job->aio_context = ctx;
+ job->busy = false;
+ job->paused = true;
+ job->pause_count = 1;
job_state_transition(job, JOB_STATUS_CREATED);
@@ -172,6 +205,110 @@ void job_unref(Job *job)
}
}
+void job_enter_cond(Job *job, bool(*fn)(Job *job))
+{
+ if (!job_started(job)) {
+ return;
+ }
+ if (job->deferred_to_main_loop) {
+ return;
+ }
+
+ job_lock();
+ if (job->busy) {
+ job_unlock();
+ return;
+ }
+
+ if (fn && !fn(job)) {
+ job_unlock();
+ return;
+ }
+
+ assert(!job->deferred_to_main_loop);
+ timer_del(&job->sleep_timer);
+ job->busy = true;
+ job_unlock();
+ aio_co_wake(job->co);
+}
+
+/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
+ * Reentering the job coroutine with block_job_enter() before the timer has
+ * expired is allowed and cancels the timer.
+ *
+ * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
+ * called explicitly. */
+void coroutine_fn job_do_yield(Job *job, uint64_t ns)
+{
+ job_lock();
+ if (ns != -1) {
+ timer_mod(&job->sleep_timer, ns);
+ }
+ job->busy = false;
+ job_unlock();
+ qemu_coroutine_yield();
+
+ /* Set by job_enter_cond() before re-entering the coroutine. */
+ assert(job->busy);
+}
+
+void coroutine_fn job_pause_point(Job *job)
+{
+ assert(job && job_started(job));
+
+ if (!job_should_pause(job)) {
+ return;
+ }
+ if (job_is_cancelled(job)) {
+ return;
+ }
+
+ if (job->driver->pause) {
+ job->driver->pause(job);
+ }
+
+ if (job_should_pause(job) && !job_is_cancelled(job)) {
+ JobStatus status = job->status;
+ job_state_transition(job, status == JOB_STATUS_READY
+ ? JOB_STATUS_STANDBY
+ : JOB_STATUS_PAUSED);
+ job->paused = true;
+ job_do_yield(job, -1);
+ job->paused = false;
+ job_state_transition(job, status);
+ }
+
+ if (job->driver->resume) {
+ job->driver->resume(job);
+ }
+}
+
+/**
+ * All jobs must allow a pause point before entering their job proper. This
+ * ensures that jobs can be paused prior to being started, then resumed later.
+ */
+static void coroutine_fn job_co_entry(void *opaque)
+{
+ Job *job = opaque;
+
+ assert(job && job->driver && job->driver->start);
+ job_pause_point(job);
+ job->driver->start(job);
+}
+
+
+void job_start(Job *job)
+{
+ assert(job && !job_started(job) && job->paused &&
+ job->driver && job->driver->start);
+ job->co = qemu_coroutine_create(job_co_entry, job);
+ job->pause_count--;
+ job->busy = true;
+ job->paused = false;
+ job_state_transition(job, JOB_STATUS_RUNNING);
+ aio_co_enter(job->aio_context, job->co);
+}
+
typedef struct {
Job *job;
JobDeferToMainLoopFn *fn;
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
index 4f8cba8377..c9f2f9b183 100644
--- a/tests/test-bdrv-drain.c
+++ b/tests/test-bdrv-drain.c
@@ -524,8 +524,8 @@ BlockJobDriver test_job_driver = {
.job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
+ .start = test_job_start,
},
- .start = test_job_start,
.complete = test_job_complete,
};
@@ -549,47 +549,47 @@ static void test_blockjob_common(enum drain_type drain_type)
job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL,
0, 0, NULL, NULL, &error_abort);
block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort);
- block_job_start(job);
+ job_start(&job->job);
- g_assert_cmpint(job->pause_count, ==, 0);
- g_assert_false(job->paused);
- g_assert_false(job->busy); /* We're in block_job_sleep_ns() */
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
do_drain_begin(drain_type, src);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
- g_assert_cmpint(job->pause_count, ==, 2);
+ g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
- g_assert_cmpint(job->pause_count, ==, 1);
+ g_assert_cmpint(job->job.pause_count, ==, 1);
}
/* XXX We don't wait until the job is actually paused. Is this okay? */
- /* g_assert_true(job->paused); */
- g_assert_false(job->busy); /* The job is paused */
+ /* g_assert_true(job->job.paused); */
+ g_assert_false(job->job.busy); /* The job is paused */
do_drain_end(drain_type, src);
- g_assert_cmpint(job->pause_count, ==, 0);
- g_assert_false(job->paused);
- g_assert_false(job->busy); /* We're in block_job_sleep_ns() */
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
do_drain_begin(drain_type, target);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
- g_assert_cmpint(job->pause_count, ==, 2);
+ g_assert_cmpint(job->job.pause_count, ==, 2);
} else {
- g_assert_cmpint(job->pause_count, ==, 1);
+ g_assert_cmpint(job->job.pause_count, ==, 1);
}
/* XXX We don't wait until the job is actually paused. Is this okay? */
- /* g_assert_true(job->paused); */
- g_assert_false(job->busy); /* The job is paused */
+ /* g_assert_true(job->job.paused); */
+ g_assert_false(job->job.busy); /* The job is paused */
do_drain_end(drain_type, target);
- g_assert_cmpint(job->pause_count, ==, 0);
- g_assert_false(job->paused);
- g_assert_false(job->busy); /* We're in block_job_sleep_ns() */
+ g_assert_cmpint(job->job.pause_count, ==, 0);
+ g_assert_false(job->job.paused);
+ g_assert_false(job->job.busy); /* We're in block_job_sleep_ns() */
ret = block_job_complete_sync(job, &error_abort);
g_assert_cmpint(ret, ==, 0);
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
index c03f9662d8..323e154a00 100644
--- a/tests/test-blockjob-txn.c
+++ b/tests/test-blockjob-txn.c
@@ -78,8 +78,8 @@ static const BlockJobDriver test_block_job_driver = {
.job_driver = {
.instance_size = sizeof(TestBlockJob),
.free = block_job_free,
+ .start = test_block_job_run,
},
- .start = test_block_job_run,
};
/* Create a block job that completes with a given return code after a given
@@ -125,7 +125,7 @@ static void test_single_job(int expected)
txn = block_job_txn_new();
job = test_block_job_start(1, true, expected, &result, txn);
- block_job_start(job);
+ job_start(&job->job);
if (expected == -ECANCELED) {
block_job_cancel(job, false);
@@ -165,8 +165,8 @@ static void test_pair_jobs(int expected1, int expected2)
txn = block_job_txn_new();
job1 = test_block_job_start(1, true, expected1, &result1, txn);
job2 = test_block_job_start(2, true, expected2, &result2, txn);
- block_job_start(job1);
- block_job_start(job2);
+ job_start(&job1->job);
+ job_start(&job2->job);
/* Release our reference now to trigger as many nice
* use-after-free bugs as possible.
@@ -227,8 +227,8 @@ static void test_pair_jobs_fail_cancel_race(void)
txn = block_job_txn_new();
job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn);
job2 = test_block_job_start(2, false, 0, &result2, txn);
- block_job_start(job1);
- block_job_start(job2);
+ job_start(&job1->job);
+ job_start(&job2->job);
block_job_cancel(job1, false);
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
index 5f43bd72a4..1d18325feb 100644
--- a/tests/test-blockjob.c
+++ b/tests/test-blockjob.c
@@ -199,8 +199,8 @@ static const BlockJobDriver test_cancel_driver = {
.job_driver = {
.instance_size = sizeof(CancelJob),
.free = block_job_free,
+ .start = cancel_job_start,
},
- .start = cancel_job_start,
.complete = cancel_job_complete,
};
@@ -254,7 +254,7 @@ static void test_cancel_running(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
cancel_common(s);
@@ -267,7 +267,7 @@ static void test_cancel_paused(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
block_job_user_pause(job, &error_abort);
@@ -284,7 +284,7 @@ static void test_cancel_ready(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true;
@@ -301,7 +301,7 @@ static void test_cancel_standby(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true;
@@ -322,7 +322,7 @@ static void test_cancel_pending(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true;
@@ -346,7 +346,7 @@ static void test_cancel_concluded(void)
s = create_common(&job);
- block_job_start(job);
+ job_start(&job->job);
assert(job->job.status == JOB_STATUS_RUNNING);
s->should_converge = true;