diff options
-rw-r--r-- | aio-posix.c | 34 | ||||
-rw-r--r-- | async.c | 19 | ||||
-rw-r--r-- | docs/aio_notify.promela | 104 | ||||
-rw-r--r-- | include/block/aio.h | 9 |
4 files changed, 164 insertions, 2 deletions
diff --git a/aio-posix.c b/aio-posix.c index 44c4df3f3d..2eada2e049 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -175,11 +175,38 @@ static bool aio_dispatch(AioContext *ctx) bool aio_poll(AioContext *ctx, bool blocking) { AioHandler *node; + bool was_dispatching; int ret; bool progress; + was_dispatching = ctx->dispatching; progress = false; + /* aio_notify can avoid the expensive event_notifier_set if + * everything (file descriptors, bottom halves, timers) will + * be re-evaluated before the next blocking poll(). This happens + * in two cases: + * + * 1) when aio_poll is called with blocking == false + * + * 2) when we are called after poll(). If we are called before + * poll(), bottom halves will not be re-evaluated and we need + * aio_notify() if blocking == true. + * + * The first aio_dispatch() only does something when AioContext is + * running as a GSource, and in that case aio_poll is used only + * with blocking == false, so this optimization is already quite + * effective. However, the code is ugly and should be restructured + * to have a single aio_dispatch() call. To do this, we need to + * reorganize aio_poll into a prepare/poll/dispatch model like + * glib's. + * + * If we're in a nested event loop, ctx->dispatching might be true. + * In that case we can restore it just before returning, but we + * have to clear it now. + */ + aio_set_dispatching(ctx, !blocking); + /* * If there are callbacks left that have been queued, we need to call them. * Do not call select in this case, because it is possible that the caller @@ -190,12 +217,14 @@ bool aio_poll(AioContext *ctx, bool blocking) progress = true; } + /* Re-evaluate condition (1) above. */ + aio_set_dispatching(ctx, !blocking); if (aio_dispatch(ctx)) { progress = true; } if (progress && !blocking) { - return true; + goto out; } ctx->walking_handlers++; @@ -234,9 +263,12 @@ bool aio_poll(AioContext *ctx, bool blocking) } /* Run dispatch even if there were no readable fds to run timers */ + aio_set_dispatching(ctx, true); if (aio_dispatch(ctx)) { progress = true; } +out: + aio_set_dispatching(ctx, was_dispatching); return progress; } @@ -26,6 +26,7 @@ #include "block/aio.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" +#include "qemu/atomic.h" /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ @@ -247,9 +248,25 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx) return ctx->thread_pool; } +void aio_set_dispatching(AioContext *ctx, bool dispatching) +{ + ctx->dispatching = dispatching; + if (!dispatching) { + /* Write ctx->dispatching before reading e.g. bh->scheduled. + * Optimization: this is only needed when we're entering the "unsafe" + * phase where other threads must call event_notifier_set. + */ + smp_mb(); + } +} + void aio_notify(AioContext *ctx) { - event_notifier_set(&ctx->notifier); + /* Write e.g. bh->scheduled before reading ctx->dispatching. */ + smp_mb(); + if (!ctx->dispatching) { + event_notifier_set(&ctx->notifier); + } } static void aio_timerlist_notify(void *opaque) diff --git a/docs/aio_notify.promela b/docs/aio_notify.promela new file mode 100644 index 0000000000..ad3f6f08b0 --- /dev/null +++ b/docs/aio_notify.promela @@ -0,0 +1,104 @@ +/* + * This model describes the interaction between aio_set_dispatching() + * and aio_notify(). + * + * Author: Paolo Bonzini <pbonzini@redhat.com> + * + * This file is in the public domain. If you really want a license, + * the WTFPL will do. + * + * To simulate it: + * spin -p docs/aio_notify.promela + * + * To verify it: + * spin -a docs/aio_notify.promela + * gcc -O2 pan.c + * ./a.out -a + */ + +#define MAX 4 +#define LAST (1 << (MAX - 1)) +#define FINAL ((LAST << 1) - 1) + +bool dispatching; +bool event; + +int req, done; + +active proctype waiter() +{ + int fetch, blocking; + + do + :: done != FINAL -> { + // Computing "blocking" is separate from execution of the + // "bottom half" + blocking = (req == 0); + + // This is our "bottom half" + atomic { fetch = req; req = 0; } + done = done | fetch; + + // Wait for a nudge from the other side + do + :: event == 1 -> { event = 0; break; } + :: !blocking -> break; + od; + + dispatching = 1; + + // If you are simulating this model, you may want to add + // something like this here: + // + // int foo; foo++; foo++; foo++; + // + // This only wastes some time and makes it more likely + // that the notifier process hits the "fast path". + + dispatching = 0; + } + :: else -> break; + od +} + +active proctype notifier() +{ + int next = 1; + int sets = 0; + + do + :: next <= LAST -> { + // generate a request + req = req | next; + next = next << 1; + + // aio_notify + if + :: dispatching == 0 -> sets++; event = 1; + :: else -> skip; + fi; + + // Test both synchronous and asynchronous delivery + if + :: 1 -> do + :: req == 0 -> break; + od; + :: 1 -> skip; + fi; + } + :: else -> break; + od; + printf("Skipped %d event_notifier_set\n", MAX - sets); +} + +#define p (done == FINAL) + +never { + do + :: 1 // after an arbitrarily long prefix + :: p -> break // p becomes true + od; + do + :: !p -> accept: break // it then must remains true forever after + od +} diff --git a/include/block/aio.h b/include/block/aio.h index d81250cf2b..433e7ff8b6 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -60,8 +60,14 @@ struct AioContext { */ int walking_handlers; + /* Used to avoid unnecessary event_notifier_set calls in aio_notify. + * Writes protected by lock or BQL, reads are lockless. + */ + bool dispatching; + /* lock to protect between bh's adders and deleter */ QemuMutex bh_lock; + /* Anchor of the list of Bottom Halves belonging to the context */ struct QEMUBH *first_bh; @@ -83,6 +89,9 @@ struct AioContext { QEMUTimerListGroup tlg; }; +/* Used internally to synchronize aio_poll against qemu_bh_schedule. */ +void aio_set_dispatching(AioContext *ctx, bool dispatching); + /** * aio_context_new: Allocate a new AioContext. * |