diff options
Diffstat (limited to 'blockjob.c')
-rw-r--r-- | blockjob.c | 219 |
1 files changed, 39 insertions, 180 deletions
diff --git a/blockjob.c b/blockjob.c index 3ede511da0..313b1ff7ce 100644 --- a/blockjob.c +++ b/blockjob.c @@ -36,30 +36,9 @@ #include "qemu/coroutine.h" #include "qemu/timer.h" -/* Right now, this mutex is only needed to synchronize accesses to job->busy - * and job->sleep_timer, such as concurrent calls to block_job_do_yield and - * block_job_enter. */ -static QemuMutex block_job_mutex; - -static void block_job_lock(void) -{ - qemu_mutex_lock(&block_job_mutex); -} - -static void block_job_unlock(void) -{ - qemu_mutex_unlock(&block_job_mutex); -} - -static void __attribute__((__constructor__)) block_job_init(void) -{ - qemu_mutex_init(&block_job_mutex); -} - static void block_job_event_cancelled(BlockJob *job); static void block_job_event_completed(BlockJob *job, const char *msg); static int block_job_event_pending(BlockJob *job); -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)); /* Transactional group of block jobs */ struct BlockJobTxn { @@ -161,33 +140,27 @@ static void block_job_txn_del_job(BlockJob *job) } } -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_pending(BlockJob *job) -{ - return timer_pending(&job->sleep_timer); -} - -/* Assumes the block_job_mutex is held */ -static bool block_job_timer_not_pending(BlockJob *job) +/* Assumes the job_mutex is held */ +static bool job_timer_not_pending(Job *job) { - return !block_job_timer_pending(job); + return !timer_pending(&job->sleep_timer); } static void block_job_pause(BlockJob *job) { - job->pause_count++; + job->job.pause_count++; } static void block_job_resume(BlockJob *job) { - assert(job->pause_count > 0); - job->pause_count--; - if (job->pause_count) { + assert(job->job.pause_count > 0); + job->job.pause_count--; + if (job->job.pause_count) { return; } /* kick only if no timer is pending */ - block_job_enter_cond(job, block_job_timer_not_pending); + job_enter_cond(&job->job, job_timer_not_pending); } static void block_job_attached_aio_context(AioContext *new_context, @@ -208,7 +181,7 @@ void block_job_free(Job *job) block_job_detach_aio_context, bjob); blk_unref(bjob->blk); error_free(bjob->blocker); - assert(!timer_pending(&bjob->sleep_timer)); + assert(!timer_pending(&bjob->job.sleep_timer)); } static void block_job_attached_aio_context(AioContext *new_context, @@ -226,7 +199,7 @@ static void block_job_attached_aio_context(AioContext *new_context, static void block_job_drain(BlockJob *job) { - /* If job is !job->busy this kicks it into the next pause point. */ + /* If job is !job->job.busy this kicks it into the next pause point. */ block_job_enter(job); blk_drain(job->blk); @@ -244,7 +217,7 @@ static void block_job_detach_aio_context(void *opaque) block_job_pause(job); - while (!job->paused && !job->completed) { + while (!job->job.paused && !job->completed) { block_job_drain(job); } @@ -312,29 +285,11 @@ bool block_job_is_internal(BlockJob *job) return (job->job.id == NULL); } -static bool block_job_started(BlockJob *job) -{ - return job->co; -} - const BlockJobDriver *block_job_driver(BlockJob *job) { return job->driver; } -/** - * All jobs must allow a pause point before entering their job proper. This - * ensures that jobs can be paused prior to being started, then resumed later. - */ -static void coroutine_fn block_job_co_entry(void *opaque) -{ - BlockJob *job = opaque; - - assert(job && job->driver && job->driver->start); - block_job_pause_point(job); - job->driver->start(job); -} - static void block_job_sleep_timer_cb(void *opaque) { BlockJob *job = opaque; @@ -342,24 +297,12 @@ static void block_job_sleep_timer_cb(void *opaque) block_job_enter(job); } -void block_job_start(BlockJob *job) -{ - assert(job && !block_job_started(job) && job->paused && - job->driver && job->driver->start); - job->co = qemu_coroutine_create(block_job_co_entry, job); - job->pause_count--; - job->busy = true; - job->paused = false; - job_state_transition(&job->job, JOB_STATUS_RUNNING); - bdrv_coroutine_enter(blk_bs(job->blk), job->co); -} - static void block_job_decommission(BlockJob *job) { assert(job); job->completed = true; - job->busy = false; - job->paused = false; + job->job.busy = false; + job->job.paused = false; job->job.deferred_to_main_loop = true; block_job_txn_del_job(job); job_state_transition(&job->job, JOB_STATUS_NULL); @@ -374,7 +317,7 @@ static void block_job_do_dismiss(BlockJob *job) static void block_job_conclude(BlockJob *job) { job_state_transition(&job->job, JOB_STATUS_CONCLUDED); - if (job->auto_dismiss || !block_job_started(job)) { + if (job->auto_dismiss || !job_started(&job->job)) { block_job_do_dismiss(job); } } @@ -439,7 +382,7 @@ static int block_job_finalize_single(BlockJob *job) } /* Emit events only if we actually started */ - if (block_job_started(job)) { + if (job_started(&job->job)) { if (job_is_cancelled(&job->job)) { block_job_event_cancelled(job); } else { @@ -464,7 +407,7 @@ static void block_job_cancel_async(BlockJob *job, bool force) if (job->user_paused) { /* Do not call block_job_enter here, the caller will handle it. */ job->user_paused = false; - job->pause_count--; + job->job.pause_count--; } job->job.cancelled = true; /* To prevent 'force == false' overriding a previous 'force == true' */ @@ -615,6 +558,12 @@ static void block_job_completed_txn_success(BlockJob *job) } } +/* Assumes the job_mutex is held */ +static bool job_timer_pending(Job *job) +{ + return timer_pending(&job->sleep_timer); +} + void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) { int64_t old_speed = job->speed; @@ -635,7 +584,7 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp) } /* kick only if a timer is pending */ - block_job_enter_cond(job, block_job_timer_pending); + job_enter_cond(&job->job, job_timer_pending); } int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) @@ -654,7 +603,7 @@ void block_job_complete(BlockJob *job, Error **errp) if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) { return; } - if (job->pause_count || job_is_cancelled(&job->job) || + if (job->job.pause_count || job_is_cancelled(&job->job) || !job->driver->complete) { error_setg(errp, "The active block job '%s' cannot be completed", @@ -708,7 +657,7 @@ bool block_job_user_paused(BlockJob *job) void block_job_user_resume(BlockJob *job, Error **errp) { assert(job); - if (!job->user_paused || job->pause_count <= 0) { + if (!job->user_paused || job->job.pause_count <= 0) { error_setg(errp, "Can't resume a job that was not paused"); return; } @@ -727,7 +676,7 @@ void block_job_cancel(BlockJob *job, bool force) return; } block_job_cancel_async(job, force); - if (!block_job_started(job)) { + if (!job_started(&job->job)) { block_job_completed(job, -ECANCELED); } else if (job->job.deferred_to_main_loop) { block_job_completed_txn_abort(job); @@ -797,8 +746,8 @@ BlockJobInfo *block_job_query(BlockJob *job, Error **errp) info->type = g_strdup(job_type_str(&job->job)); info->device = g_strdup(job->job.id); info->len = job->len; - info->busy = atomic_read(&job->busy); - info->paused = job->pause_count > 0; + info->busy = atomic_read(&job->job.busy); + info->paused = job->job.pause_count > 0; info->offset = job->offset; info->speed = job->speed; info->io_status = job->iostatus; @@ -915,12 +864,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, job->blk = blk; job->cb = cb; job->opaque = opaque; - job->busy = false; - job->paused = true; - job->pause_count = 1; job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE); job->auto_dismiss = !(flags & BLOCK_JOB_MANUAL_DISMISS); - aio_timer_init(qemu_get_aio_context(), &job->sleep_timer, + aio_timer_init(qemu_get_aio_context(), &job->job.sleep_timer, QEMU_CLOCK_REALTIME, SCALE_NS, block_job_sleep_timer_cb, job); @@ -980,128 +926,41 @@ void block_job_completed(BlockJob *job, int ret) } } -static bool block_job_should_pause(BlockJob *job) -{ - return job->pause_count > 0; -} - -/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds. - * Reentering the job coroutine with block_job_enter() before the timer has - * expired is allowed and cancels the timer. - * - * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be - * called explicitly. */ -static void block_job_do_yield(BlockJob *job, uint64_t ns) -{ - block_job_lock(); - if (ns != -1) { - timer_mod(&job->sleep_timer, ns); - } - job->busy = false; - block_job_unlock(); - qemu_coroutine_yield(); - - /* Set by block_job_enter before re-entering the coroutine. */ - assert(job->busy); -} - -void coroutine_fn block_job_pause_point(BlockJob *job) -{ - assert(job && block_job_started(job)); - - if (!block_job_should_pause(job)) { - return; - } - if (job_is_cancelled(&job->job)) { - return; - } - - if (job->driver->pause) { - job->driver->pause(job); - } - - if (block_job_should_pause(job) && !job_is_cancelled(&job->job)) { - JobStatus status = job->job.status; - job_state_transition(&job->job, status == JOB_STATUS_READY - ? JOB_STATUS_STANDBY - : JOB_STATUS_PAUSED); - job->paused = true; - block_job_do_yield(job, -1); - job->paused = false; - job_state_transition(&job->job, status); - } - - if (job->driver->resume) { - job->driver->resume(job); - } -} - -/* - * Conditionally enter a block_job pending a call to fn() while - * under the block_job_lock critical section. - */ -static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job)) -{ - if (!block_job_started(job)) { - return; - } - if (job->job.deferred_to_main_loop) { - return; - } - - block_job_lock(); - if (job->busy) { - block_job_unlock(); - return; - } - - if (fn && !fn(job)) { - block_job_unlock(); - return; - } - - assert(!job->job.deferred_to_main_loop); - timer_del(&job->sleep_timer); - job->busy = true; - block_job_unlock(); - aio_co_wake(job->co); -} - void block_job_enter(BlockJob *job) { - block_job_enter_cond(job, NULL); + job_enter_cond(&job->job, NULL); } void block_job_sleep_ns(BlockJob *job, int64_t ns) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_yield(BlockJob *job) { - assert(job->busy); + assert(job->job.busy); /* Check cancellation *before* setting busy = false, too! */ if (job_is_cancelled(&job->job)) { return; } - if (!block_job_should_pause(job)) { - block_job_do_yield(job, -1); + if (!job_should_pause(&job->job)) { + job_do_yield(&job->job, -1); } - block_job_pause_point(job); + job_pause_point(&job->job); } void block_job_iostatus_reset(BlockJob *job) @@ -1109,7 +968,7 @@ void block_job_iostatus_reset(BlockJob *job) if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) { return; } - assert(job->user_paused && job->pause_count > 0); + assert(job->user_paused && job->job.pause_count > 0); job->iostatus = BLOCK_DEVICE_IO_STATUS_OK; } |