diff options
-rw-r--r-- | include/block/aio.h | 20 | ||||
-rw-r--r-- | tests/test-aio.c | 3 | ||||
-rw-r--r-- | util/async.c | 237 |
3 files changed, 158 insertions, 102 deletions
diff --git a/include/block/aio.h b/include/block/aio.h index 7ba9bd7874..1a2ce9ca26 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -51,6 +51,19 @@ struct ThreadPool; struct LinuxAioState; struct LuringState; +/* + * Each aio_bh_poll() call carves off a slice of the BH list, so that newly + * scheduled BHs are not processed until the next aio_bh_poll() call. All + * active aio_bh_poll() calls chain their slices together in a list, so that + * nested aio_bh_poll() calls process all scheduled bottom halves. + */ +typedef QSLIST_HEAD(, QEMUBH) BHList; +typedef struct BHListSlice BHListSlice; +struct BHListSlice { + BHList bh_list; + QSIMPLEQ_ENTRY(BHListSlice) next; +}; + struct AioContext { GSource source; @@ -91,8 +104,11 @@ struct AioContext { */ QemuLockCnt list_lock; - /* Anchor of the list of Bottom Halves belonging to the context */ - struct QEMUBH *first_bh; + /* Bottom Halves pending aio_bh_poll() processing */ + BHList bh_list; + + /* Chained BH list slices for each nested aio_bh_poll() call */ + QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list; /* Used by aio_notify. * diff --git a/tests/test-aio.c b/tests/test-aio.c index 86fb73b3d5..8a46078463 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -615,7 +615,8 @@ static void test_source_bh_delete_from_cb(void) g_assert_cmpint(data1.n, ==, data1.max); g_assert(data1.bh == NULL); - g_assert(!g_main_context_iteration(NULL, false)); + assert(g_main_context_iteration(NULL, false)); + assert(!g_main_context_iteration(NULL, false)); } static void test_source_bh_delete_from_cb_many(void) diff --git a/util/async.c b/util/async.c index c192a24a61..b94518b948 100644 --- a/util/async.c +++ b/util/async.c @@ -29,6 +29,7 @@ #include "block/thread-pool.h" #include "qemu/main-loop.h" #include "qemu/atomic.h" +#include "qemu/rcu_queue.h" #include "block/raw-aio.h" #include "qemu/coroutine_int.h" #include "trace.h" @@ -36,16 +37,76 @@ /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ +/* QEMUBH::flags values */ +enum { + /* Already enqueued and waiting for aio_bh_poll() */ + BH_PENDING = (1 << 0), + + /* Invoke the callback */ + BH_SCHEDULED = (1 << 1), + + /* Delete without invoking callback */ + BH_DELETED = (1 << 2), + + /* Delete after invoking callback */ + BH_ONESHOT = (1 << 3), + + /* Schedule periodically when the event loop is idle */ + BH_IDLE = (1 << 4), +}; + struct QEMUBH { AioContext *ctx; QEMUBHFunc *cb; void *opaque; - QEMUBH *next; - bool scheduled; - bool idle; - bool deleted; + QSLIST_ENTRY(QEMUBH) next; + unsigned flags; }; +/* Called concurrently from any thread */ +static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags) +{ + AioContext *ctx = bh->ctx; + unsigned old_flags; + + /* + * The memory barrier implicit in atomic_fetch_or makes sure that: + * 1. idle & any writes needed by the callback are done before the + * locations are read in the aio_bh_poll. + * 2. ctx is loaded before the callback has a chance to execute and bh + * could be freed. + */ + old_flags = atomic_fetch_or(&bh->flags, BH_PENDING | new_flags); + if (!(old_flags & BH_PENDING)) { + QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next); + } + + aio_notify(ctx); +} + +/* Only called from aio_bh_poll() and aio_ctx_finalize() */ +static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags) +{ + QEMUBH *bh = QSLIST_FIRST_RCU(head); + + if (!bh) { + return NULL; + } + + QSLIST_REMOVE_HEAD(head, next); + + /* + * The atomic_and is paired with aio_bh_enqueue(). The implicit memory + * barrier ensures that the callback sees all writes done by the scheduling + * thread. It also ensures that the scheduling thread sees the cleared + * flag before bh->cb has run, and thus will call aio_notify again if + * necessary. + */ + *flags = atomic_fetch_and(&bh->flags, + ~(BH_PENDING | BH_SCHEDULED | BH_IDLE)); + return bh; +} + void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) { QEMUBH *bh; @@ -55,15 +116,7 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_lockcnt_lock(&ctx->list_lock); - bh->next = ctx->first_bh; - bh->scheduled = 1; - bh->deleted = 1; - /* Make sure that the members are ready before putting bh into list */ - smp_wmb(); - ctx->first_bh = bh; - qemu_lockcnt_unlock(&ctx->list_lock); - aio_notify(ctx); + aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT); } QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) @@ -75,12 +128,6 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_lockcnt_lock(&ctx->list_lock); - bh->next = ctx->first_bh; - /* Make sure that the members are ready before putting bh into list */ - smp_wmb(); - ctx->first_bh = bh; - qemu_lockcnt_unlock(&ctx->list_lock); return bh; } @@ -89,91 +136,56 @@ void aio_bh_call(QEMUBH *bh) bh->cb(bh->opaque); } -/* Multiple occurrences of aio_bh_poll cannot be called concurrently. - * The count in ctx->list_lock is incremented before the call, and is - * not affected by the call. - */ +/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */ int aio_bh_poll(AioContext *ctx) { - QEMUBH *bh, **bhp, *next; - int ret; - bool deleted = false; - - ret = 0; - for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { - next = atomic_rcu_read(&bh->next); - /* The atomic_xchg is paired with the one in qemu_bh_schedule. The - * implicit memory barrier ensures that the callback sees all writes - * done by the scheduling thread. It also ensures that the scheduling - * thread sees the zero before bh->cb has run, and thus will call - * aio_notify again if necessary. - */ - if (atomic_xchg(&bh->scheduled, 0)) { + BHListSlice slice; + BHListSlice *s; + int ret = 0; + + QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list); + QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next); + + while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) { + QEMUBH *bh; + unsigned flags; + + bh = aio_bh_dequeue(&s->bh_list, &flags); + if (!bh) { + QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next); + continue; + } + + if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { /* Idle BHs don't count as progress */ - if (!bh->idle) { + if (!(flags & BH_IDLE)) { ret = 1; } - bh->idle = 0; aio_bh_call(bh); } - if (bh->deleted) { - deleted = true; + if (flags & (BH_DELETED | BH_ONESHOT)) { + g_free(bh); } } - /* remove deleted bhs */ - if (!deleted) { - return ret; - } - - if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { - bhp = &ctx->first_bh; - while (*bhp) { - bh = *bhp; - if (bh->deleted && !bh->scheduled) { - *bhp = bh->next; - g_free(bh); - } else { - bhp = &bh->next; - } - } - qemu_lockcnt_inc_and_unlock(&ctx->list_lock); - } return ret; } void qemu_bh_schedule_idle(QEMUBH *bh) { - bh->idle = 1; - /* Make sure that idle & any writes needed by the callback are done - * before the locations are read in the aio_bh_poll. - */ - atomic_mb_set(&bh->scheduled, 1); + aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE); } void qemu_bh_schedule(QEMUBH *bh) { - AioContext *ctx; - - ctx = bh->ctx; - bh->idle = 0; - /* The memory barrier implicit in atomic_xchg makes sure that: - * 1. idle & any writes needed by the callback are done before the - * locations are read in the aio_bh_poll. - * 2. ctx is loaded before scheduled is set and the callback has a chance - * to execute. - */ - if (atomic_xchg(&bh->scheduled, 1) == 0) { - aio_notify(ctx); - } + aio_bh_enqueue(bh, BH_SCHEDULED); } - /* This func is async. */ void qemu_bh_cancel(QEMUBH *bh) { - atomic_mb_set(&bh->scheduled, 0); + atomic_and(&bh->flags, ~BH_SCHEDULED); } /* This func is async.The bottom half will do the delete action at the finial @@ -181,21 +193,16 @@ void qemu_bh_cancel(QEMUBH *bh) */ void qemu_bh_delete(QEMUBH *bh) { - bh->scheduled = 0; - bh->deleted = 1; + aio_bh_enqueue(bh, BH_DELETED); } -int64_t -aio_compute_timeout(AioContext *ctx) +static int64_t aio_compute_bh_timeout(BHList *head, int timeout) { - int64_t deadline; - int timeout = -1; QEMUBH *bh; - for (bh = atomic_rcu_read(&ctx->first_bh); bh; - bh = atomic_rcu_read(&bh->next)) { - if (bh->scheduled) { - if (bh->idle) { + QSLIST_FOREACH_RCU(bh, head, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { + if (bh->flags & BH_IDLE) { /* idle bottom halves will be polled at least * every 10ms */ timeout = 10000000; @@ -207,6 +214,28 @@ aio_compute_timeout(AioContext *ctx) } } + return timeout; +} + +int64_t +aio_compute_timeout(AioContext *ctx) +{ + BHListSlice *s; + int64_t deadline; + int timeout = -1; + + timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout); + if (timeout == 0) { + return 0; + } + + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { + timeout = aio_compute_bh_timeout(&s->bh_list, timeout); + if (timeout == 0) { + return 0; + } + } + deadline = timerlistgroup_deadline_ns(&ctx->tlg); if (deadline == 0) { return 0; @@ -237,15 +266,24 @@ aio_ctx_check(GSource *source) { AioContext *ctx = (AioContext *) source; QEMUBH *bh; + BHListSlice *s; atomic_and(&ctx->notify_me, ~1); aio_notify_accept(ctx); - for (bh = ctx->first_bh; bh; bh = bh->next) { - if (bh->scheduled) { + QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { return true; } } + + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { + QSLIST_FOREACH_RCU(bh, &s->bh_list, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { + return true; + } + } + } return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0); } @@ -265,6 +303,8 @@ static void aio_ctx_finalize(GSource *source) { AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + unsigned flags; thread_pool_free(ctx->thread_pool); @@ -287,18 +327,15 @@ aio_ctx_finalize(GSource *source) assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); qemu_bh_delete(ctx->co_schedule_bh); - qemu_lockcnt_lock(&ctx->list_lock); - assert(!qemu_lockcnt_count(&ctx->list_lock)); - while (ctx->first_bh) { - QEMUBH *next = ctx->first_bh->next; + /* There must be no aio_bh_poll() calls going on */ + assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list)); + while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) { /* qemu_bh_delete() must have been called on BHs in this AioContext */ - assert(ctx->first_bh->deleted); + assert(flags & BH_DELETED); - g_free(ctx->first_bh); - ctx->first_bh = next; + g_free(bh); } - qemu_lockcnt_unlock(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); event_notifier_cleanup(&ctx->notifier); @@ -445,6 +482,8 @@ AioContext *aio_context_new(Error **errp) AioContext *ctx; ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + QSLIST_INIT(&ctx->bh_list); + QSIMPLEQ_INIT(&ctx->bh_slice_list); aio_context_setup(ctx); ret = event_notifier_init(&ctx->notifier, false); |