aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2015-08-12 15:38:18 +0200
committerStefan Weil <sw@weilnetz.de>2015-09-24 20:52:28 +0200
commit7c9b2bf67775ecc1359ce973580807d173e7f710 (patch)
tree344a675e9af8e8885444247dc18aab45710bdcc0
parenta246a01631f90230374c2b8ffce608232e2aa654 (diff)
qemu-thread: add a fast path to the Win32 QemuEvent
QemuEvents are used heavily by call_rcu. We do not want them to be slow, but the current implementation does a kernel call on every invocation of qemu_event_* and won't cut it. So, wrap a Win32 manual-reset event with a fast userspace path. The states and transitions are the same as for the futex and mutex/condvar implementations, but the slow path is different of course. The idea is to reset the Win32 event lazily, as part of a test-reset-test-wait sequence. Such a sequence is, indeed, how QemuEvents are used by RCU and other subsystems! The patch includes a formal model of the algorithm. Tested-by: Stefan Weil <sw@weilnetz.de> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Signed-off-by: Stefan Weil <sw@weilnetz.de>
-rw-r--r--docs/win32-qemu-event.promela98
-rw-r--r--include/qemu/thread-win32.h1
-rw-r--r--util/qemu-thread-win32.c66
3 files changed, 161 insertions, 4 deletions
diff --git a/docs/win32-qemu-event.promela b/docs/win32-qemu-event.promela
new file mode 100644
index 0000000000..c446a71555
--- /dev/null
+++ b/docs/win32-qemu-event.promela
@@ -0,0 +1,98 @@
+/*
+ * This model describes the implementation of QemuEvent in
+ * util/qemu-thread-win32.c.
+ *
+ * Author: Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This file is in the public domain. If you really want a license,
+ * the WTFPL will do.
+ *
+ * To verify it:
+ * spin -a docs/event.promela
+ * gcc -O2 pan.c -DSAFETY
+ * ./a.out
+ */
+
+bool event;
+int value;
+
+/* Primitives for a Win32 event */
+#define RAW_RESET event = false
+#define RAW_SET event = true
+#define RAW_WAIT do :: event -> break; od
+
+#if 0
+/* Basic sanity checking: test the Win32 event primitives */
+#define RESET RAW_RESET
+#define SET RAW_SET
+#define WAIT RAW_WAIT
+#else
+/* Full model: layer a userspace-only fast path on top of the RAW_*
+ * primitives. SET/RESET/WAIT have exactly the same semantics as
+ * RAW_SET/RAW_RESET/RAW_WAIT, but try to avoid invoking them.
+ */
+#define EV_SET 0
+#define EV_FREE 1
+#define EV_BUSY -1
+
+int state = EV_FREE;
+
+int xchg_result;
+#define SET if :: state != EV_SET -> \
+ atomic { /* xchg_result=xchg(state, EV_SET) */ \
+ xchg_result = state; \
+ state = EV_SET; \
+ } \
+ if :: xchg_result == EV_BUSY -> RAW_SET; \
+ :: else -> skip; \
+ fi; \
+ :: else -> skip; \
+ fi
+
+#define RESET if :: state == EV_SET -> atomic { state = state | EV_FREE; } \
+ :: else -> skip; \
+ fi
+
+int tmp1, tmp2;
+#define WAIT tmp1 = state; \
+ if :: tmp1 != EV_SET -> \
+ if :: tmp1 == EV_FREE -> \
+ RAW_RESET; \
+ atomic { /* tmp2=cas(state, EV_FREE, EV_BUSY) */ \
+ tmp2 = state; \
+ if :: tmp2 == EV_FREE -> state = EV_BUSY; \
+ :: else -> skip; \
+ fi; \
+ } \
+ if :: tmp2 == EV_SET -> tmp1 = EV_SET; \
+ :: else -> tmp1 = EV_BUSY; \
+ fi; \
+ :: else -> skip; \
+ fi; \
+ assert(tmp1 != EV_FREE); \
+ if :: tmp1 == EV_BUSY -> RAW_WAIT; \
+ :: else -> skip; \
+ fi; \
+ :: else -> skip; \
+ fi
+#endif
+
+active proctype waiter()
+{
+ if
+ :: !value ->
+ RESET;
+ if
+ :: !value -> WAIT;
+ :: else -> skip;
+ fi;
+ :: else -> skip;
+ fi;
+ assert(value);
+}
+
+active proctype notifier()
+{
+ value = true;
+ SET;
+}
diff --git a/include/qemu/thread-win32.h b/include/qemu/thread-win32.h
index 3d58081bed..385ff5f76a 100644
--- a/include/qemu/thread-win32.h
+++ b/include/qemu/thread-win32.h
@@ -18,6 +18,7 @@ struct QemuSemaphore {
};
struct QemuEvent {
+ int value;
HANDLE event;
};
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 406b52f91d..6cdd553e9a 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -238,10 +238,34 @@ void qemu_sem_wait(QemuSemaphore *sem)
}
}
+/* Wrap a Win32 manual-reset event with a fast userspace path. The idea
+ * is to reset the Win32 event lazily, as part of a test-reset-test-wait
+ * sequence. Such a sequence is, indeed, how QemuEvents are used by
+ * RCU and other subsystems!
+ *
+ * Valid transitions:
+ * - free->set, when setting the event
+ * - busy->set, when setting the event, followed by futex_wake
+ * - set->free, when resetting the event
+ * - free->busy, when waiting
+ *
+ * set->busy does not happen (it can be observed from the outside but
+ * it really is set->free->busy).
+ *
+ * busy->free provably cannot happen; to enforce it, the set->free transition
+ * is done with an OR, which becomes a no-op if the event has concurrently
+ * transitioned to free or busy (and is faster than cmpxchg).
+ */
+
+#define EV_SET 0
+#define EV_FREE 1
+#define EV_BUSY -1
+
void qemu_event_init(QemuEvent *ev, bool init)
{
/* Manual reset. */
- ev->event = CreateEvent(NULL, TRUE, init, NULL);
+ ev->event = CreateEvent(NULL, TRUE, TRUE, NULL);
+ ev->value = (init ? EV_SET : EV_FREE);
}
void qemu_event_destroy(QemuEvent *ev)
@@ -251,17 +275,51 @@ void qemu_event_destroy(QemuEvent *ev)
void qemu_event_set(QemuEvent *ev)
{
- SetEvent(ev->event);
+ if (atomic_mb_read(&ev->value) != EV_SET) {
+ if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+ /* There were waiters, wake them up. */
+ SetEvent(ev->event);
+ }
+ }
}
void qemu_event_reset(QemuEvent *ev)
{
- ResetEvent(ev->event);
+ if (atomic_mb_read(&ev->value) == EV_SET) {
+ /* If there was a concurrent reset (or even reset+wait),
+ * do nothing. Otherwise change EV_SET->EV_FREE.
+ */
+ atomic_or(&ev->value, EV_FREE);
+ }
}
void qemu_event_wait(QemuEvent *ev)
{
- WaitForSingleObject(ev->event, INFINITE);
+ unsigned value;
+
+ value = atomic_mb_read(&ev->value);
+ if (value != EV_SET) {
+ if (value == EV_FREE) {
+ /* qemu_event_set is not yet going to call SetEvent, but we are
+ * going to do another check for EV_SET below when setting EV_BUSY.
+ * At that point it is safe to call WaitForSingleObject.
+ */
+ ResetEvent(ev->event);
+
+ /* Tell qemu_event_set that there are waiters. No need to retry
+ * because there cannot be a concurent busy->free transition.
+ * After the CAS, the event will be either set or busy.
+ */
+ if (atomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
+ value = EV_SET;
+ } else {
+ value = EV_BUSY;
+ }
+ }
+ if (value == EV_BUSY) {
+ WaitForSingleObject(ev->event, INFINITE);
+ }
+ }
}
struct QemuThreadData {