aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--aio-posix.c34
-rw-r--r--async.c19
-rw-r--r--docs/aio_notify.promela104
-rw-r--r--include/block/aio.h9
4 files changed, 164 insertions, 2 deletions
diff --git a/aio-posix.c b/aio-posix.c
index 44c4df3f3d..2eada2e049 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -175,11 +175,38 @@ static bool aio_dispatch(AioContext *ctx)
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
+ bool was_dispatching;
int ret;
bool progress;
+ was_dispatching = ctx->dispatching;
progress = false;
+ /* aio_notify can avoid the expensive event_notifier_set if
+ * everything (file descriptors, bottom halves, timers) will
+ * be re-evaluated before the next blocking poll(). This happens
+ * in two cases:
+ *
+ * 1) when aio_poll is called with blocking == false
+ *
+ * 2) when we are called after poll(). If we are called before
+ * poll(), bottom halves will not be re-evaluated and we need
+ * aio_notify() if blocking == true.
+ *
+ * The first aio_dispatch() only does something when AioContext is
+ * running as a GSource, and in that case aio_poll is used only
+ * with blocking == false, so this optimization is already quite
+ * effective. However, the code is ugly and should be restructured
+ * to have a single aio_dispatch() call. To do this, we need to
+ * reorganize aio_poll into a prepare/poll/dispatch model like
+ * glib's.
+ *
+ * If we're in a nested event loop, ctx->dispatching might be true.
+ * In that case we can restore it just before returning, but we
+ * have to clear it now.
+ */
+ aio_set_dispatching(ctx, !blocking);
+
/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
@@ -190,12 +217,14 @@ bool aio_poll(AioContext *ctx, bool blocking)
progress = true;
}
+ /* Re-evaluate condition (1) above. */
+ aio_set_dispatching(ctx, !blocking);
if (aio_dispatch(ctx)) {
progress = true;
}
if (progress && !blocking) {
- return true;
+ goto out;
}
ctx->walking_handlers++;
@@ -234,9 +263,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
/* Run dispatch even if there were no readable fds to run timers */
+ aio_set_dispatching(ctx, true);
if (aio_dispatch(ctx)) {
progress = true;
}
+out:
+ aio_set_dispatching(ctx, was_dispatching);
return progress;
}
diff --git a/async.c b/async.c
index 5b6fe6b4cc..34af0b25ca 100644
--- a/async.c
+++ b/async.c
@@ -26,6 +26,7 @@
#include "block/aio.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h"
+#include "qemu/atomic.h"
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
@@ -247,9 +248,25 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx)
return ctx->thread_pool;
}
+void aio_set_dispatching(AioContext *ctx, bool dispatching)
+{
+ ctx->dispatching = dispatching;
+ if (!dispatching) {
+ /* Write ctx->dispatching before reading e.g. bh->scheduled.
+ * Optimization: this is only needed when we're entering the "unsafe"
+ * phase where other threads must call event_notifier_set.
+ */
+ smp_mb();
+ }
+}
+
void aio_notify(AioContext *ctx)
{
- event_notifier_set(&ctx->notifier);
+ /* Write e.g. bh->scheduled before reading ctx->dispatching. */
+ smp_mb();
+ if (!ctx->dispatching) {
+ event_notifier_set(&ctx->notifier);
+ }
}
static void aio_timerlist_notify(void *opaque)
diff --git a/docs/aio_notify.promela b/docs/aio_notify.promela
new file mode 100644
index 0000000000..ad3f6f08b0
--- /dev/null
+++ b/docs/aio_notify.promela
@@ -0,0 +1,104 @@
+/*
+ * This model describes the interaction between aio_set_dispatching()
+ * and aio_notify().
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This file is in the public domain. If you really want a license,
+ * the WTFPL will do.
+ *
+ * To simulate it:
+ * spin -p docs/aio_notify.promela
+ *
+ * To verify it:
+ * spin -a docs/aio_notify.promela
+ * gcc -O2 pan.c
+ * ./a.out -a
+ */
+
+#define MAX 4
+#define LAST (1 << (MAX - 1))
+#define FINAL ((LAST << 1) - 1)
+
+bool dispatching;
+bool event;
+
+int req, done;
+
+active proctype waiter()
+{
+ int fetch, blocking;
+
+ do
+ :: done != FINAL -> {
+ // Computing "blocking" is separate from execution of the
+ // "bottom half"
+ blocking = (req == 0);
+
+ // This is our "bottom half"
+ atomic { fetch = req; req = 0; }
+ done = done | fetch;
+
+ // Wait for a nudge from the other side
+ do
+ :: event == 1 -> { event = 0; break; }
+ :: !blocking -> break;
+ od;
+
+ dispatching = 1;
+
+ // If you are simulating this model, you may want to add
+ // something like this here:
+ //
+ // int foo; foo++; foo++; foo++;
+ //
+ // This only wastes some time and makes it more likely
+ // that the notifier process hits the "fast path".
+
+ dispatching = 0;
+ }
+ :: else -> break;
+ od
+}
+
+active proctype notifier()
+{
+ int next = 1;
+ int sets = 0;
+
+ do
+ :: next <= LAST -> {
+ // generate a request
+ req = req | next;
+ next = next << 1;
+
+ // aio_notify
+ if
+ :: dispatching == 0 -> sets++; event = 1;
+ :: else -> skip;
+ fi;
+
+ // Test both synchronous and asynchronous delivery
+ if
+ :: 1 -> do
+ :: req == 0 -> break;
+ od;
+ :: 1 -> skip;
+ fi;
+ }
+ :: else -> break;
+ od;
+ printf("Skipped %d event_notifier_set\n", MAX - sets);
+}
+
+#define p (done == FINAL)
+
+never {
+ do
+ :: 1 // after an arbitrarily long prefix
+ :: p -> break // p becomes true
+ od;
+ do
+ :: !p -> accept: break // it then must remains true forever after
+ od
+}
diff --git a/include/block/aio.h b/include/block/aio.h
index d81250cf2b..433e7ff8b6 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -60,8 +60,14 @@ struct AioContext {
*/
int walking_handlers;
+ /* Used to avoid unnecessary event_notifier_set calls in aio_notify.
+ * Writes protected by lock or BQL, reads are lockless.
+ */
+ bool dispatching;
+
/* lock to protect between bh's adders and deleter */
QemuMutex bh_lock;
+
/* Anchor of the list of Bottom Halves belonging to the context */
struct QEMUBH *first_bh;
@@ -83,6 +89,9 @@ struct AioContext {
QEMUTimerListGroup tlg;
};
+/* Used internally to synchronize aio_poll against qemu_bh_schedule. */
+void aio_set_dispatching(AioContext *ctx, bool dispatching);
+
/**
* aio_context_new: Allocate a new AioContext.
*