diff options
author | Kevin Wolf <kwolf@redhat.com> | 2018-04-13 17:31:02 +0200 |
---|---|---|
committer | Kevin Wolf <kwolf@redhat.com> | 2018-05-23 14:30:50 +0200 |
commit | da01ff7f38f52791f93fc3ca59afcfbb220f15af (patch) | |
tree | 74010498c6229085ce3f2cc2184168aec5fb99f8 /blockjob.c | |
parent | 1908a5590c7d214b1b6886bc19b81076fb65cec9 (diff) |
job: Move coroutine and related code to Job
This commit moves some core functions for dealing with the job coroutine
from BlockJob to Job. This includes primarily entering the coroutine
(both for the first and reentering) and yielding explicitly and at pause
points.
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: John Snow <jsnow@redhat.com>
Diffstat (limited to 'blockjob.c')
-rw-r--r-- | blockjob.c | 219 |
1 files changed, 39 insertions, 180 deletions
diff --git a/blockjob.c b/blockjob.c index 3ede511da0..313b1ff7ce 100644 --- a/blockjob.c +++ b/blockjob.c @@ -36,30 +36,9 @@ #include "qemu/coroutine.h" #include "qemu/timer.h" -/* Right now, this mutex is only needed to synchronize accesses to job->busy - * and job->sleep_timer, such as concurrent calls to block_job_do_yield and - * block_job_enter. */ -static QemuMutex block_job_mutex; - -static void block_job_lock(void) -{ - qemu_mutex_lock(&block_job_mutex); -} - -static void block_job_unlock(void) -{ - qemu_mutex_unlock(&block_job_mutex); -} - -static void __attribute__((__constructor__)) block_job_init(void) -{ - qemu_mutex_init(&block_job_mutex); -} - static void block_job_event_cancelled(BlockJob *job); static void block_job_event_completed(BlockJob *job, const char *msg); static int block_job_event_pending(BlockJob *job); -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)); /* Transactional group of block jobs */ struct BlockJobTxn { @@ -161,33 +140,27 @@ static void block_job_txn_del_job(BlockJob *job) } } -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_pending(BlockJob *job) -{ - return timer_pending(&job->sleep_timer); -} - -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_not_pending(BlockJob *job) +/* Assumes the job_mutex is held */ +static bool job_timer_not_pending(Job *job) { - return !block_job_timer_pending(job); + return !timer_pending(&job->sleep_timer); } static void block_job_pause(BlockJob *job) { - job->pause_count++; + job->job.pause_count++; } static void block_job_resume(BlockJob *job) { - assert(job->pause_count > 0); - job->pause_count--; - if (job->pause_count) { + assert(job->job.pause_count > 0); + job->job.pause_count--; + if (job->job.pause_count) { return; } /* kick only if no timer is pending */ - block_job_enter_cond(job, block_job_timer_not_pending); + job_enter_cond(&job->job, job_timer_not_pending); } static void block_job_attached_aio_context(AioContext *new_context, @@ -208,7 +181,7 @@ void block_job_free(Job *job) block_job_detach_aio_context, bjob); blk_unref(bjob->blk); error_free(bjob->blocker); - assert(!timer_pending(&bjob->sleep_timer)); + assert(!timer_pending(&bjob->job.sleep_timer)); } static void block_job_attached_aio_context(AioContext *new_context, @@ -226,7 +199,7 @@ static void block_job_attached_aio_context(AioContext *new_context, static void block_job_drain(BlockJob *job) { - /* If job is !job->busy this kicks it into the next pause point. */ + /* If job is !job->job.busy this kicks it into the next pause point. */ block_job_enter(job); blk_drain(job->blk); @@ -244,7 +217,7 @@ static void block_job_detach_aio_context(void *opaque) block_job_pause(job); - while (!job->paused && !job->completed) { + while (!job->job.paused && !job->completed) { block_job_drain(job); } @@ -312,29 +285,11 @@ bool block_job_is_internal(BlockJob *job) return (job->job.id == NULL); } -static bool block_job_started(BlockJob *job) -{ - return job->co; -} - const BlockJobDriver *block_job_driver(BlockJob *job) { return job->driver; } -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn block_job_co_entry(void *opaque) -{ - BlockJob *job = opaque; - - assert(job && job->driver && job->driver->start); - block_job_pause_point(job); - job->driver->start(job); -} - static void block_job_sleep_timer_cb(void *opaque) { BlockJob *job = opaque; @@ -342,24 +297,12 @@ static void block_job_sleep_timer_cb(void *opaque) block_job_enter(job); } -void block_job_start(BlockJob *job) -{ - assert(job && !block_job_started(job) && job->paused && - job->driver && job->driver->start); - job->co = qemu_coroutine_create(block_job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(&job->job, JOB_STATUS_RUNNING); - bdrv_coroutine_enter(blk_bs(job->blk), job->co); -} - static void block_job_decommission(BlockJob *job) { assert(job); job->completed = true; - job->busy = false; - job->paused = false; + job->job.busy = false; + job->job.paused = false; job->job.deferred_to_main_loop = true; block_job_txn_del_job(job); job_state_transition(&job->job, JOB_STATUS_NULL); @@ -374,7 +317,7 @@ static void block_job_do_dismiss(BlockJob *job) static void block_job_conclude(BlockJob *job) { job_state_transition(&job->job, JOB_STATUS_CONCLUDED); - if (job->auto_dismiss || !block_job_started(job)) { + if (job->auto_dismiss || !job_started(&job->job)) { block_job_do_dismiss(job); } } @@ -439,7 +382,7 @@ static int block_job_finalize_single(BlockJob *job) } /* Emit events only if we actually started */ - if (block_job_started(job)) { + if (job_started(&job->job)) { if (job_is_cancelled(&job->job)) { block_job_event_cancelled(job); } else { @@ -464,7 +407,7 @@ static void block_job_cancel_async(BlockJob *job, bool force) if (job->user_paused) { /* Do not call block_job_enter here, the caller will handle it. */ job->user_paused = false; - job->pause_count--; + job->job.pause_count--; } job->job.cancelled = true; /* To prevent 'force == false' overriding a previous 'force == true' */ @@ -615,6 +558,12 @@ static void block_job_completed_txn_success(BlockJob *job) } } +/* Assumes the job_mutex is held */ +static bool job_timer_pending(Job *job) +{ + return timer_pending(&job->sleep_timer); +} + void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) { int64_t old_speed = job->speed; @@ -635,7 +584,7 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) } /* kick only if a timer is pending */ - block_job_enter_cond(job, block_job_timer_pending); + job_enter_cond(&job->job, job_timer_pending); } int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) @@ -654,7 +603,7 @@ void block_job_complete(BlockJob *job, Error **errp) if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) { return; } - if (job->pause_count || job_is_cancelled(&job->job) || + if (job->job.pause_count || job_is_cancelled(&job->job) || !job->driver->complete) { error_setg(errp, "The active block job '%s' cannot be completed", @@ -708,7 +657,7 @@ bool block_job_user_paused(BlockJob *job) void block_job_user_resume(BlockJob *job, Error **errp) { assert(job); - if (!job->user_paused || job->pause_count <= 0) { + if (!job->user_paused || job->job.pause_count <= 0) { error_setg(errp, "Can't resume a job that was not paused"); return; } @@ -727,7 +676,7 @@ void block_job_cancel(BlockJob *job, bool force) return; } block_job_cancel_async(job, force); - if (!block_job_started(job)) { + if (!job_started(&job->job)) { block_job_completed(job, -ECANCELED); } else if (job->job.deferred_to_main_loop) { block_job_completed_txn_abort(job); @@ -797,8 +746,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp) info->type = g_strdup(job_type_str(&job->job)); info->device = g_strdup(job->job.id); info->len = job->len; - info->busy = atomic_read(&job->busy); - info->paused = job->pause_count > 0; + info->busy = atomic_read(&job->job.busy); + info->paused = job->job.pause_count > 0; info->offset = job->offset; info->speed = job->speed; info->io_status = job->iostatus; @@ -915,12 +864,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, job->blk = blk; job->cb = cb; job->opaque = opaque; - job->busy = false; - job->paused = true; - job->pause_count = 1; job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE); job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS); - aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, + aio_timer_init(qemu_get_aio_context(), &job->job.sleep_timer, QEMU_CLOCK_REALTIME, SCALE_NS, block_job_sleep_timer_cb, job); @@ -980,128 +926,41 @@ void block_job_completed(BlockJob *job, int ret) } } -static bool block_job_should_pause(BlockJob *job) -{ - return job->pause_count > 0; -} - -/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. - * Reentering the job coroutine with block_job_enter() before the timer has - * expired is allowed and cancels the timer. - * - * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be - * called explicitly. */ -static void block_job_do_yield(BlockJob *job, uint64_t ns) -{ - block_job_lock(); - if (ns != -1) { - timer_mod(&job->sleep_timer, ns); - } - job->busy = false; - block_job_unlock(); - qemu_coroutine_yield(); - - /* Set by block_job_enter before re-entering the coroutine. */ - assert(job->busy); -} - -void coroutine_fn block_job_pause_point(BlockJob *job) -{ - assert(job && block_job_started(job)); - - if (!block_job_should_pause(job)) { - return; - } - if (job_is_cancelled(&job->job)) { - return; - } - - if (job->driver->pause) { - job->driver->pause(job); - } - - if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) { - JobStatus status = job->job.status; - job_state_transition(&job->job, status == JOB_STATUS_READY - ? JOB_STATUS_STANDBY - : JOB_STATUS_PAUSED); - job->paused = true; - block_job_do_yield(job, -1); - job->paused = false; - job_state_transition(&job->job, status); - } - - if (job->driver->resume) { - job->driver->resume(job); - } -} - -/* - * Conditionally enter a block_job pending a call to fn() while - * under the block_job_lock critical section. - */ -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)) -{ - if (!block_job_started(job)) { - return; - } - if (job->job.deferred_to_main_loop) { - return; - } - - block_job_lock(); - if (job->busy) { - block_job_unlock(); - return; - } - - if (fn && !fn(job)) { - block_job_unlock(); - return; - } - - assert(!job->job.deferred_to_main_loop); - timer_del(&job->sleep_timer); - job->busy = true; - block_job_unlock(); - aio_co_wake(job->co); -} - void block_job_enter(BlockJob *job) { - block_job_enter_cond(job, NULL); + job_enter_cond(&job->job, NULL); } void block_job_sleep_ns(BlockJob *job, int64_t ns) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_yield(BlockJob *job) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, -1); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, -1); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_iostatus_reset(BlockJob *job) @@ -1109,7 +968,7 @@ void block_job_iostatus_reset(BlockJob *job) if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { return; } - assert(job->user_paused && job->pause_count > 0); + assert(job->user_paused && job->job.pause_count > 0); job->iostatus = BLOCK_DEVICE_IO_STATUS_OK; } |