aboutsummaryrefslogtreecommitdiff
path: root/job.c
diff options
context:
space:
mode:
Diffstat (limited to 'job.c')
-rw-r--r--job.c84
1 files changed, 76 insertions, 8 deletions
diff --git a/job.c b/job.c
index 4f6fd73cd7..2e453f60bc 100644
--- a/job.c
+++ b/job.c
@@ -221,7 +221,7 @@ bool job_is_completed(Job *job)
return false;
}
-bool job_started(Job *job)
+static bool job_started(Job *job)
{
return job->co;
}
@@ -391,10 +391,10 @@ void job_enter(Job *job)
}
/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
- * Reentering the job coroutine with block_job_enter() before the timer has
- * expired is allowed and cancels the timer.
+ * Reentering the job coroutine with job_enter() before the timer has expired
+ * is allowed and cancels the timer.
*
- * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
+ * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
* called explicitly. */
void coroutine_fn job_do_yield(Job *job, uint64_t ns)
{
@@ -579,7 +579,7 @@ static void job_conclude(Job *job)
}
}
-void job_update_rc(Job *job)
+static void job_update_rc(Job *job)
{
if (!job->ret && job_is_cancelled(job)) {
job->ret = -ECANCELED;
@@ -644,7 +644,7 @@ static int job_finalize_single(Job *job)
return 0;
}
-void job_cancel_async(Job *job, bool force)
+static void job_cancel_async(Job *job, bool force)
{
if (job->user_paused) {
/* Do not call job_enter here, the caller will handle it. */
@@ -660,7 +660,7 @@ void job_cancel_async(Job *job, bool force)
job->force_cancel |= force;
}
-void job_completed_txn_abort(Job *job)
+static void job_completed_txn_abort(Job *job)
{
AioContext *ctx;
JobTxn *txn = job->txn;
@@ -748,7 +748,7 @@ static int job_transition_to_pending(Job *job)
return 0;
}
-void job_completed_txn_success(Job *job)
+static void job_completed_txn_success(Job *job)
{
JobTxn *txn = job->txn;
Job *other_job;
@@ -774,6 +774,74 @@ void job_completed_txn_success(Job *job)
}
}
+void job_completed(Job *job, int ret)
+{
+ assert(job && job->txn && !job_is_completed(job));
+ job->ret = ret;
+ job_update_rc(job);
+ trace_job_completed(job, ret, job->ret);
+ if (job->ret) {
+ job_completed_txn_abort(job);
+ } else {
+ job_completed_txn_success(job);
+ }
+}
+
+void job_cancel(Job *job, bool force)
+{
+ if (job->status == JOB_STATUS_CONCLUDED) {
+ job_do_dismiss(job);
+ return;
+ }
+ job_cancel_async(job, force);
+ if (!job_started(job)) {
+ job_completed(job, -ECANCELED);
+ } else if (job->deferred_to_main_loop) {
+ job_completed_txn_abort(job);
+ } else {
+ job_enter(job);
+ }
+}
+
+void job_user_cancel(Job *job, bool force, Error **errp)
+{
+ if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
+ return;
+ }
+ job_cancel(job, force);
+}
+
+/* A wrapper around job_cancel() taking an Error ** parameter so it may be
+ * used with job_finish_sync() without the need for (rather nasty) function
+ * pointer casts there. */
+static void job_cancel_err(Job *job, Error **errp)
+{
+ job_cancel(job, false);
+}
+
+int job_cancel_sync(Job *job)
+{
+ return job_finish_sync(job, &job_cancel_err, NULL);
+}
+
+void job_cancel_sync_all(void)
+{
+ Job *job;
+ AioContext *aio_context;
+
+ while ((job = job_next(NULL))) {
+ aio_context = job->aio_context;
+ aio_context_acquire(aio_context);
+ job_cancel_sync(job);
+ aio_context_release(aio_context);
+ }
+}
+
+int job_complete_sync(Job *job, Error **errp)
+{
+ return job_finish_sync(job, job_complete, errp);
+}
+
void job_complete(Job *job, Error **errp)
{
/* Should not be reachable via external interface for internal jobs */