aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2023-03-07 17:02:06 +0000
committerPeter Maydell <peter.maydell@linaro.org>2023-03-07 17:02:06 +0000
commit7b0f0aa55fd292fa3489755a3a896e496c51ea86 (patch)
tree07bec628ff71b6ab5a1b6a117d41cb8b46117367
parent9832009d9dd2386664c15cc70f6e6bfe062be8bd (diff)
parent6229438cca037d42f44a96d38feb15cb102a444f (diff)
Merge tag 'for-upstream-mb' of https://gitlab.com/bonzini/qemu into staging
* Fix missing memory barriers * Fix comments about memory ordering # -----BEGIN PGP SIGNATURE----- # # iQFIBAABCAAyFiEE8TM4V0tmI4mGbHaCv/vSX3jHroMFAmQHIqoUHHBib256aW5p # QHJlZGhhdC5jb20ACgkQv/vSX3jHroPYBwgArUaS0KGrBM1XmRUUpXnJokmA37n8 # ft477na+XW+p9VYi27B0R01P8j+AkCrAO0Ir1MLG7axjn5KiRMnbf2uBgqasEREv # repJEXsqISoxA6vvAvnehKHAI9zu8b7frRc/30b6EOrrZpn0JKePSNRTyBu2seGO # NFDXPVA2Wom+xXaNSEGt0dmoJ6AzEVIZKhUIwyvUWOC7MXuuIkRWn9/nySUdvEt0 # RIFPPk7JCjnEc32vb4Xnq/Ncsy20tMIM1hlDxMOVNq3brjeSCzS0PPPSjE/X5OtW # Yn5YS0nCyD7wjP2dkXI4I1lUPxUUx6LvMz1aGbJCfyjSX41mNES/agoGgA== # =KEUo # -----END PGP SIGNATURE----- # gpg: Signature made Tue 07 Mar 2023 11:40:26 GMT # gpg: using RSA key F13338574B662389866C7682BFFBD25F78C7AE83 # gpg: issuer "pbonzini@redhat.com" # gpg: Good signature from "Paolo Bonzini <bonzini@gnu.org>" [full] # gpg: aka "Paolo Bonzini <pbonzini@redhat.com>" [full] # Primary key fingerprint: 46F5 9FBD 57D6 12E7 BFD4 E2F7 7E15 100C CD36 69B1 # Subkey fingerprint: F133 3857 4B66 2389 866C 7682 BFFB D25F 78C7 AE83 * tag 'for-upstream-mb' of https://gitlab.com/bonzini/qemu: async: clarify usage of barriers in the polling case async: update documentation of the memory barriers physmem: add missing memory barrier qemu-coroutine-lock: add smp_mb__after_rmw() aio-wait: switch to smp_mb__after_rmw() edu: add smp_mb__after_rmw() qemu-thread-win32: cleanup, fix, document QemuEvent qemu-thread-posix: cleanup, fix, document QemuEvent qatomic: add smp_mb__before/after_rmw() Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--docs/devel/atomics.rst26
-rw-r--r--hw/misc/edu.c5
-rw-r--r--include/block/aio-wait.h2
-rw-r--r--include/qemu/atomic.h17
-rw-r--r--softmmu/physmem.c3
-rw-r--r--util/async.c43
-rw-r--r--util/qemu-coroutine-lock.c9
-rw-r--r--util/qemu-thread-posix.c69
-rw-r--r--util/qemu-thread-win32.c82
9 files changed, 186 insertions, 70 deletions
diff --git a/docs/devel/atomics.rst b/docs/devel/atomics.rst
index 7957310071..633df65a97 100644
--- a/docs/devel/atomics.rst
+++ b/docs/devel/atomics.rst
@@ -27,7 +27,8 @@ provides macros that fall in three camps:
- weak atomic access and manual memory barriers: ``qatomic_read()``,
``qatomic_set()``, ``smp_rmb()``, ``smp_wmb()``, ``smp_mb()``,
- ``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``;
+ ``smp_mb_acquire()``, ``smp_mb_release()``, ``smp_read_barrier_depends()``,
+ ``smp_mb__before_rmw()``, ``smp_mb__after_rmw()``;
- sequentially consistent atomic access: everything else.
@@ -472,7 +473,7 @@ and memory barriers, and the equivalents in QEMU:
sequential consistency.
- in QEMU, ``qatomic_read()`` and ``qatomic_set()`` do not participate in
- the total ordering enforced by sequentially-consistent operations.
+ the ordering enforced by read-modify-write operations.
This is because QEMU uses the C11 memory model. The following example
is correct in Linux but not in QEMU:
@@ -488,9 +489,24 @@ and memory barriers, and the equivalents in QEMU:
because the read of ``y`` can be moved (by either the processor or the
compiler) before the write of ``x``.
- Fixing this requires an ``smp_mb()`` memory barrier between the write
- of ``x`` and the read of ``y``. In the common case where only one thread
- writes ``x``, it is also possible to write it like this:
+ Fixing this requires a full memory barrier between the write of ``x`` and
+ the read of ``y``. QEMU provides ``smp_mb__before_rmw()`` and
+ ``smp_mb__after_rmw()``; they act both as an optimization,
+ avoiding the memory barrier on processors where it is unnecessary,
+ and as a clarification of this corner case of the C11 memory model:
+
+ +--------------------------------+
+ | QEMU (correct) |
+ +================================+
+ | :: |
+ | |
+ | a = qatomic_fetch_add(&x, 2);|
+ | smp_mb__after_rmw(); |
+ | b = qatomic_read(&y); |
+ +--------------------------------+
+
+ In the common case where only one thread writes ``x``, it is also possible
+ to write it like this:
+--------------------------------+
| QEMU (correct) |
diff --git a/hw/misc/edu.c b/hw/misc/edu.c
index e935c418d4..a1f8bc77e7 100644
--- a/hw/misc/edu.c
+++ b/hw/misc/edu.c
@@ -267,6 +267,8 @@ static void edu_mmio_write(void *opaque, hwaddr addr, uint64_t val,
case 0x20:
if (val & EDU_STATUS_IRQFACT) {
qatomic_or(&edu->status, EDU_STATUS_IRQFACT);
+ /* Order check of the COMPUTING flag after setting IRQFACT. */
+ smp_mb__after_rmw();
} else {
qatomic_and(&edu->status, ~EDU_STATUS_IRQFACT);
}
@@ -349,6 +351,9 @@ static void *edu_fact_thread(void *opaque)
qemu_mutex_unlock(&edu->thr_mutex);
qatomic_and(&edu->status, ~EDU_STATUS_COMPUTING);
+ /* Clear COMPUTING flag before checking IRQFACT. */
+ smp_mb__after_rmw();
+
if (qatomic_read(&edu->status) & EDU_STATUS_IRQFACT) {
qemu_mutex_lock_iothread();
edu_raise_irq(edu, FACT_IRQ);
diff --git a/include/block/aio-wait.h b/include/block/aio-wait.h
index dd9a7f6461..da13357bb8 100644
--- a/include/block/aio-wait.h
+++ b/include/block/aio-wait.h
@@ -85,7 +85,7 @@ extern AioWait global_aio_wait;
/* Increment wait_->num_waiters before evaluating cond. */ \
qatomic_inc(&wait_->num_waiters); \
/* Paired with smp_mb in aio_wait_kick(). */ \
- smp_mb(); \
+ smp_mb__after_rmw(); \
if (ctx_ && in_aio_context_home_thread(ctx_)) { \
while ((cond)) { \
aio_poll(ctx_, true); \
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 874134fd19..f85834ee8b 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -245,6 +245,20 @@
#define smp_wmb() smp_mb_release()
#define smp_rmb() smp_mb_acquire()
+/*
+ * SEQ_CST is weaker than the older __sync_* builtins and Linux
+ * kernel read-modify-write atomics. Provide a macro to obtain
+ * the same semantics.
+ */
+#if !defined(QEMU_SANITIZE_THREAD) && \
+ (defined(__i386__) || defined(__x86_64__) || defined(__s390x__))
+# define smp_mb__before_rmw() signal_barrier()
+# define smp_mb__after_rmw() signal_barrier()
+#else
+# define smp_mb__before_rmw() smp_mb()
+# define smp_mb__after_rmw() smp_mb()
+#endif
+
/* qatomic_mb_read/set semantics map Java volatile variables. They are
* less expensive on some platforms (notably POWER) than fully
* sequentially consistent operations.
@@ -259,7 +273,8 @@
#if !defined(QEMU_SANITIZE_THREAD) && \
(defined(__i386__) || defined(__x86_64__) || defined(__s390x__))
/* This is more efficient than a store plus a fence. */
-# define qatomic_mb_set(ptr, i) ((void)qatomic_xchg(ptr, i))
+# define qatomic_mb_set(ptr, i) \
+ ({ (void)qatomic_xchg(ptr, i); smp_mb__after_rmw(); })
#else
# define qatomic_mb_set(ptr, i) \
({ qatomic_store_release(ptr, i); smp_mb(); })
diff --git a/softmmu/physmem.c b/softmmu/physmem.c
index 47143edb4f..a6efd8e8dd 100644
--- a/softmmu/physmem.c
+++ b/softmmu/physmem.c
@@ -2927,6 +2927,8 @@ void cpu_register_map_client(QEMUBH *bh)
qemu_mutex_lock(&map_client_list_lock);
client->bh = bh;
QLIST_INSERT_HEAD(&map_client_list, client, link);
+ /* Write map_client_list before reading in_use. */
+ smp_mb();
if (!qatomic_read(&bounce.in_use)) {
cpu_notify_map_clients_locked();
}
@@ -3116,6 +3118,7 @@ void address_space_unmap(AddressSpace *as, void *buffer, hwaddr len,
qemu_vfree(bounce.buffer);
bounce.buffer = NULL;
memory_region_unref(bounce.mr);
+ /* Clear in_use before reading map_client_list. */
qatomic_mb_set(&bounce.in_use, false);
cpu_notify_map_clients();
}
diff --git a/util/async.c b/util/async.c
index 0657b75397..21016a1ac7 100644
--- a/util/async.c
+++ b/util/async.c
@@ -74,14 +74,21 @@ static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags)
unsigned old_flags;
/*
- * The memory barrier implicit in qatomic_fetch_or makes sure that:
- * 1. idle & any writes needed by the callback are done before the
- * locations are read in the aio_bh_poll.
- * 2. ctx is loaded before the callback has a chance to execute and bh
- * could be freed.
+ * Synchronizes with atomic_fetch_and() in aio_bh_dequeue(), ensuring that
+ * insertion starts after BH_PENDING is set.
*/
old_flags = qatomic_fetch_or(&bh->flags, BH_PENDING | new_flags);
+
if (!(old_flags & BH_PENDING)) {
+ /*
+ * At this point the bottom half becomes visible to aio_bh_poll().
+ * This insertion thus synchronizes with QSLIST_MOVE_ATOMIC in
+ * aio_bh_poll(), ensuring that:
+ * 1. any writes needed by the callback are visible from the callback
+ * after aio_bh_dequeue() returns bh.
+ * 2. ctx is loaded before the callback has a chance to execute and bh
+ * could be freed.
+ */
QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next);
}
@@ -107,11 +114,8 @@ static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags)
QSLIST_REMOVE_HEAD(head, next);
/*
- * The qatomic_and is paired with aio_bh_enqueue(). The implicit memory
- * barrier ensures that the callback sees all writes done by the scheduling
- * thread. It also ensures that the scheduling thread sees the cleared
- * flag before bh->cb has run, and thus will call aio_notify again if
- * necessary.
+ * Synchronizes with qatomic_fetch_or() in aio_bh_enqueue(), ensuring that
+ * the removal finishes before BH_PENDING is reset.
*/
*flags = qatomic_fetch_and(&bh->flags,
~(BH_PENDING | BH_SCHEDULED | BH_IDLE));
@@ -158,6 +162,7 @@ int aio_bh_poll(AioContext *ctx)
BHListSlice *s;
int ret = 0;
+ /* Synchronizes with QSLIST_INSERT_HEAD_ATOMIC in aio_bh_enqueue(). */
QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list);
QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next);
@@ -448,15 +453,15 @@ LuringState *aio_get_linux_io_uring(AioContext *ctx)
void aio_notify(AioContext *ctx)
{
/*
- * Write e.g. bh->flags before writing ctx->notified. Pairs with smp_mb in
- * aio_notify_accept.
+ * Write e.g. ctx->bh_list before writing ctx->notified. Pairs with
+ * smp_mb() in aio_notify_accept().
*/
smp_wmb();
qatomic_set(&ctx->notified, true);
/*
- * Write ctx->notified before reading ctx->notify_me. Pairs
- * with smp_mb in aio_ctx_prepare or aio_poll.
+ * Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me.
+ * Pairs with smp_mb() in aio_ctx_prepare or aio_poll.
*/
smp_mb();
if (qatomic_read(&ctx->notify_me)) {
@@ -469,8 +474,9 @@ void aio_notify_accept(AioContext *ctx)
qatomic_set(&ctx->notified, false);
/*
- * Write ctx->notified before reading e.g. bh->flags. Pairs with smp_wmb
- * in aio_notify.
+ * Order reads of ctx->notified (in aio_context_notifier_poll()) and the
+ * above clearing of ctx->notified before reads of e.g. bh->flags. Pairs
+ * with smp_wmb() in aio_notify.
*/
smp_mb();
}
@@ -493,6 +499,11 @@ static bool aio_context_notifier_poll(void *opaque)
EventNotifier *e = opaque;
AioContext *ctx = container_of(e, AioContext, notifier);
+ /*
+ * No need for load-acquire because we just want to kick the
+ * event loop. aio_notify_accept() takes care of synchronizing
+ * the event loop with the producers.
+ */
return qatomic_read(&ctx->notified);
}
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 58f3f77181..84a50a9e91 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -201,10 +201,16 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
trace_qemu_co_mutex_lock_entry(mutex, self);
push_waiter(mutex, &w);
+ /*
+ * Add waiter before reading mutex->handoff. Pairs with qatomic_mb_set
+ * in qemu_co_mutex_unlock.
+ */
+ smp_mb__after_rmw();
+
/* This is the "Responsibility Hand-Off" protocol; a lock() picks from
* a concurrent unlock() the responsibility of waking somebody up.
*/
- old_handoff = qatomic_mb_read(&mutex->handoff);
+ old_handoff = qatomic_read(&mutex->handoff);
if (old_handoff &&
has_waiters(mutex) &&
qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
@@ -303,6 +309,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
}
our_handoff = mutex->sequence;
+ /* Set handoff before checking for waiters. */
qatomic_mb_set(&mutex->handoff, our_handoff);
if (!has_waiters(mutex)) {
/* The concurrent lock has not added itself yet, so it
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 93d2505797..b2e26e2120 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -384,13 +384,21 @@ void qemu_event_destroy(QemuEvent *ev)
void qemu_event_set(QemuEvent *ev)
{
- /* qemu_event_set has release semantics, but because it *loads*
+ assert(ev->initialized);
+
+ /*
+ * Pairs with both qemu_event_reset() and qemu_event_wait().
+ *
+ * qemu_event_set has release semantics, but because it *loads*
* ev->value we need a full memory barrier here.
*/
- assert(ev->initialized);
smp_mb();
if (qatomic_read(&ev->value) != EV_SET) {
- if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+ int old = qatomic_xchg(&ev->value, EV_SET);
+
+ /* Pairs with memory barrier in kernel futex_wait system call. */
+ smp_mb__after_rmw();
+ if (old == EV_BUSY) {
/* There were waiters, wake them up. */
qemu_futex_wake(ev, INT_MAX);
}
@@ -399,18 +407,19 @@ void qemu_event_set(QemuEvent *ev)
void qemu_event_reset(QemuEvent *ev)
{
- unsigned value;
-
assert(ev->initialized);
- value = qatomic_read(&ev->value);
- smp_mb_acquire();
- if (value == EV_SET) {
- /*
- * If there was a concurrent reset (or even reset+wait),
- * do nothing. Otherwise change EV_SET->EV_FREE.
- */
- qatomic_or(&ev->value, EV_FREE);
- }
+
+ /*
+ * If there was a concurrent reset (or even reset+wait),
+ * do nothing. Otherwise change EV_SET->EV_FREE.
+ */
+ qatomic_or(&ev->value, EV_FREE);
+
+ /*
+ * Order reset before checking the condition in the caller.
+ * Pairs with the first memory barrier in qemu_event_set().
+ */
+ smp_mb__after_rmw();
}
void qemu_event_wait(QemuEvent *ev)
@@ -418,20 +427,40 @@ void qemu_event_wait(QemuEvent *ev)
unsigned value;
assert(ev->initialized);
- value = qatomic_read(&ev->value);
- smp_mb_acquire();
+
+ /*
+ * qemu_event_wait must synchronize with qemu_event_set even if it does
+ * not go down the slow path, so this load-acquire is needed that
+ * synchronizes with the first memory barrier in qemu_event_set().
+ *
+ * If we do go down the slow path, there is no requirement at all: we
+ * might miss a qemu_event_set() here but ultimately the memory barrier in
+ * qemu_futex_wait() will ensure the check is done correctly.
+ */
+ value = qatomic_load_acquire(&ev->value);
if (value != EV_SET) {
if (value == EV_FREE) {
/*
- * Leave the event reset and tell qemu_event_set that there
- * are waiters. No need to retry, because there cannot be
- * a concurrent busy->free transition. After the CAS, the
- * event will be either set or busy.
+ * Leave the event reset and tell qemu_event_set that there are
+ * waiters. No need to retry, because there cannot be a concurrent
+ * busy->free transition. After the CAS, the event will be either
+ * set or busy.
+ *
+ * This cmpxchg doesn't have particular ordering requirements if it
+ * succeeds (moving the store earlier can only cause qemu_event_set()
+ * to issue _more_ wakeups), the failing case needs acquire semantics
+ * like the load above.
*/
if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
return;
}
}
+
+ /*
+ * This is the final check for a concurrent set, so it does need
+ * a smp_mb() pairing with the second barrier of qemu_event_set().
+ * The barrier is inside the FUTEX_WAIT system call.
+ */
qemu_futex_wait(ev, EV_BUSY);
}
}
diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c
index 69db254ac7..a7fe3cc345 100644
--- a/util/qemu-thread-win32.c
+++ b/util/qemu-thread-win32.c
@@ -272,12 +272,20 @@ void qemu_event_destroy(QemuEvent *ev)
void qemu_event_set(QemuEvent *ev)
{
assert(ev->initialized);
- /* qemu_event_set has release semantics, but because it *loads*
+
+ /*
+ * Pairs with both qemu_event_reset() and qemu_event_wait().
+ *
+ * qemu_event_set has release semantics, but because it *loads*
* ev->value we need a full memory barrier here.
*/
smp_mb();
if (qatomic_read(&ev->value) != EV_SET) {
- if (qatomic_xchg(&ev->value, EV_SET) == EV_BUSY) {
+ int old = qatomic_xchg(&ev->value, EV_SET);
+
+ /* Pairs with memory barrier after ResetEvent. */
+ smp_mb__after_rmw();
+ if (old == EV_BUSY) {
/* There were waiters, wake them up. */
SetEvent(ev->event);
}
@@ -286,17 +294,19 @@ void qemu_event_set(QemuEvent *ev)
void qemu_event_reset(QemuEvent *ev)
{
- unsigned value;
-
assert(ev->initialized);
- value = qatomic_read(&ev->value);
- smp_mb_acquire();
- if (value == EV_SET) {
- /* If there was a concurrent reset (or even reset+wait),
- * do nothing. Otherwise change EV_SET->EV_FREE.
- */
- qatomic_or(&ev->value, EV_FREE);
- }
+
+ /*
+ * If there was a concurrent reset (or even reset+wait),
+ * do nothing. Otherwise change EV_SET->EV_FREE.
+ */
+ qatomic_or(&ev->value, EV_FREE);
+
+ /*
+ * Order reset before checking the condition in the caller.
+ * Pairs with the first memory barrier in qemu_event_set().
+ */
+ smp_mb__after_rmw();
}
void qemu_event_wait(QemuEvent *ev)
@@ -304,29 +314,49 @@ void qemu_event_wait(QemuEvent *ev)
unsigned value;
assert(ev->initialized);
- value = qatomic_read(&ev->value);
- smp_mb_acquire();
+
+ /*
+ * qemu_event_wait must synchronize with qemu_event_set even if it does
+ * not go down the slow path, so this load-acquire is needed that
+ * synchronizes with the first memory barrier in qemu_event_set().
+ *
+ * If we do go down the slow path, there is no requirement at all: we
+ * might miss a qemu_event_set() here but ultimately the memory barrier in
+ * qemu_futex_wait() will ensure the check is done correctly.
+ */
+ value = qatomic_load_acquire(&ev->value);
if (value != EV_SET) {
if (value == EV_FREE) {
- /* qemu_event_set is not yet going to call SetEvent, but we are
- * going to do another check for EV_SET below when setting EV_BUSY.
- * At that point it is safe to call WaitForSingleObject.
+ /*
+ * Here the underlying kernel event is reset, but qemu_event_set is
+ * not yet going to call SetEvent. However, there will be another
+ * check for EV_SET below when setting EV_BUSY. At that point it
+ * is safe to call WaitForSingleObject.
*/
ResetEvent(ev->event);
- /* Tell qemu_event_set that there are waiters. No need to retry
- * because there cannot be a concurrent busy->free transition.
- * After the CAS, the event will be either set or busy.
+ /*
+ * It is not clear whether ResetEvent provides this barrier; kernel
+ * APIs (KeResetEvent/KeClearEvent) do not. Better safe than sorry!
+ */
+ smp_mb();
+
+ /*
+ * Leave the event reset and tell qemu_event_set that there are
+ * waiters. No need to retry, because there cannot be a concurrent
+ * busy->free transition. After the CAS, the event will be either
+ * set or busy.
*/
if (qatomic_cmpxchg(&ev->value, EV_FREE, EV_BUSY) == EV_SET) {
- value = EV_SET;
- } else {
- value = EV_BUSY;
+ return;
}
}
- if (value == EV_BUSY) {
- WaitForSingleObject(ev->event, INFINITE);
- }
+
+ /*
+ * ev->value is now EV_BUSY. Since we didn't observe EV_SET,
+ * qemu_event_set() must observe EV_BUSY and call SetEvent().
+ */
+ WaitForSingleObject(ev->event, INFINITE);
}
}