aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Maydell <peter.maydell@linaro.org>2015-02-02 19:36:02 +0000
committerPeter Maydell <peter.maydell@linaro.org>2015-02-02 19:36:02 +0000
commitd5fbb4c9ed52d97aebe5994d8a857c74c0d95a92 (patch)
tree46ddd11a5abcfaa68db676948ecf83303d153cfd
parent16017c48547960539fcadb1f91d252124f442482 (diff)
parent2aeba9d8a1b6121b98948fcd42fd2aa32f68b750 (diff)
Merge remote-tracking branch 'remotes/bonzini/tags/for-upstream' into staging
The important bits here are the first part of RCU. v1->v2 changes are the new qemu-thread patch to fix Mac OS X, and cleaning up warnings. v2->v3 removed the patch to enable modules by default. # gpg: Signature made Mon 02 Feb 2015 19:28:03 GMT using RSA key ID 78C7AE83 # gpg: Good signature from "Paolo Bonzini <bonzini@gnu.org>" # gpg: aka "Paolo Bonzini <pbonzini@redhat.com>" # gpg: WARNING: This key is not certified with sufficiently trusted signatures! # gpg: It is not certain that the signature belongs to the owner. # Primary key fingerprint: 46F5 9FBD 57D6 12E7 BFD4 E2F7 7E15 100C CD36 69B1 # Subkey fingerprint: F133 3857 4B66 2389 866C 7682 BFFB D25F 78C7 AE83 * remotes/bonzini/tags/for-upstream: scsi: Fix scsi_req_cancel_async for no aiocb req cpu-exec: simplify init_delay_params cpu-exec: simplify align_clocks memory: avoid ref/unref in memory_region_find memory: protect current_map by RCU memory: remove assertion on memory_region_destroy rcu: add call_rcu rcu: allow nesting of rcu_read_lock/rcu_read_unlock rcu: add rcutorture rcu: add rcu library qemu-thread: fix qemu_event without futexes Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r--cpu-exec.c9
-rw-r--r--cpus.c17
-rw-r--r--docs/rcu.txt387
-rw-r--r--hw/9pfs/virtio-9p-synth.c1
-rw-r--r--hw/scsi/scsi-bus.c2
-rw-r--r--include/exec/memory.h5
-rw-r--r--include/qemu/atomic.h61
-rw-r--r--include/qemu/queue.h13
-rw-r--r--include/qemu/rcu.h147
-rw-r--r--include/qemu/thread.h3
-rw-r--r--include/qemu/timer.h1
-rw-r--r--memory.c65
-rw-r--r--tests/Makefile7
-rw-r--r--tests/rcutorture.c451
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/qemu-thread-posix.c2
-rw-r--r--util/rcu.c291
17 files changed, 1398 insertions, 65 deletions
diff --git a/cpu-exec.c b/cpu-exec.c
index a4f0effaf4..fa506e628a 100644
--- a/cpu-exec.c
+++ b/cpu-exec.c
@@ -61,8 +61,7 @@ static void align_clocks(SyncClocks *sc, const CPUState *cpu)
sleep_delay.tv_sec = sc->diff_clk / 1000000000LL;
sleep_delay.tv_nsec = sc->diff_clk % 1000000000LL;
if (nanosleep(&sleep_delay, &rem_delay) < 0) {
- sc->diff_clk -= (sleep_delay.tv_sec - rem_delay.tv_sec) * 1000000000LL;
- sc->diff_clk -= sleep_delay.tv_nsec - rem_delay.tv_nsec;
+ sc->diff_clk = rem_delay.tv_sec * 1000000000LL + rem_delay.tv_nsec;
} else {
sc->diff_clk = 0;
}
@@ -101,10 +100,8 @@ static void init_delay_params(SyncClocks *sc,
if (!icount_align_option) {
return;
}
- sc->realtime_clock = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
- sc->diff_clk = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL) -
- sc->realtime_clock +
- cpu_get_clock_offset();
+ sc->realtime_clock = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL_RT);
+ sc->diff_clk = qemu_clock_get_ns(QEMU_CLOCK_VIRTUAL) - sc->realtime_clock;
sc->last_cpu_icount = cpu->icount_extra + cpu->icount_decr.u16.low;
if (sc->diff_clk < max_delay) {
max_delay = sc->diff_clk;
diff --git a/cpus.c b/cpus.c
index 3a5323b1fc..0cdd1d7156 100644
--- a/cpus.c
+++ b/cpus.c
@@ -229,23 +229,6 @@ int64_t cpu_get_clock(void)
return ti;
}
-/* return the offset between the host clock and virtual CPU clock */
-int64_t cpu_get_clock_offset(void)
-{
- int64_t ti;
- unsigned start;
-
- do {
- start = seqlock_read_begin(&timers_state.vm_clock_seqlock);
- ti = timers_state.cpu_clock_offset;
- if (!timers_state.cpu_ticks_enabled) {
- ti -= get_clock();
- }
- } while (seqlock_read_retry(&timers_state.vm_clock_seqlock, start));
-
- return -ti;
-}
-
/* enable cpu_get_ticks()
* Caller must hold BQL which server as mutex for vm_clock_seqlock.
*/
diff --git a/docs/rcu.txt b/docs/rcu.txt
new file mode 100644
index 0000000000..61752b93ab
--- /dev/null
+++ b/docs/rcu.txt
@@ -0,0 +1,387 @@
+Using RCU (Read-Copy-Update) for synchronization
+================================================
+
+Read-copy update (RCU) is a synchronization mechanism that is used to
+protect read-mostly data structures. RCU is very efficient and scalable
+on the read side (it is wait-free), and thus can make the read paths
+extremely fast.
+
+RCU supports concurrency between a single writer and multiple readers,
+thus it is not used alone. Typically, the write-side will use a lock to
+serialize multiple updates, but other approaches are possible (e.g.,
+restricting updates to a single task). In QEMU, when a lock is used,
+this will often be the "iothread mutex", also known as the "big QEMU
+lock" (BQL). Also, restricting updates to a single task is done in
+QEMU using the "bottom half" API.
+
+RCU is fundamentally a "wait-to-finish" mechanism. The read side marks
+sections of code with "critical sections", and the update side will wait
+for the execution of all *currently running* critical sections before
+proceeding, or before asynchronously executing a callback.
+
+The key point here is that only the currently running critical sections
+are waited for; critical sections that are started _after_ the beginning
+of the wait do not extend the wait, despite running concurrently with
+the updater. This is the reason why RCU is more scalable than,
+for example, reader-writer locks. It is so much more scalable that
+the system will have a single instance of the RCU mechanism; a single
+mechanism can be used for an arbitrary number of "things", without
+having to worry about things such as contention or deadlocks.
+
+How is this possible? The basic idea is to split updates in two phases,
+"removal" and "reclamation". During removal, we ensure that subsequent
+readers will not be able to get a reference to the old data. After
+removal has completed, a critical section will not be able to access
+the old data. Therefore, critical sections that begin after removal
+do not matter; as soon as all previous critical sections have finished,
+there cannot be any readers who hold references to the data structure,
+and these can now be safely reclaimed (e.g., freed or unref'ed).
+
+Here is a picutre:
+
+ thread 1 thread 2 thread 3
+ ------------------- ------------------------ -------------------
+ enter RCU crit.sec.
+ | finish removal phase
+ | begin wait
+ | | enter RCU crit.sec.
+ exit RCU crit.sec | |
+ complete wait |
+ begin reclamation phase |
+ exit RCU crit.sec.
+
+
+Note how thread 3 is still executing its critical section when thread 2
+starts reclaiming data. This is possible, because the old version of the
+data structure was not accessible at the time thread 3 began executing
+that critical section.
+
+
+RCU API
+=======
+
+The core RCU API is small:
+
+ void rcu_read_lock(void);
+
+ Used by a reader to inform the reclaimer that the reader is
+ entering an RCU read-side critical section.
+
+ void rcu_read_unlock(void);
+
+ Used by a reader to inform the reclaimer that the reader is
+ exiting an RCU read-side critical section. Note that RCU
+ read-side critical sections may be nested and/or overlapping.
+
+ void synchronize_rcu(void);
+
+ Blocks until all pre-existing RCU read-side critical sections
+ on all threads have completed. This marks the end of the removal
+ phase and the beginning of reclamation phase.
+
+ Note that it would be valid for another update to come while
+ synchronize_rcu is running. Because of this, it is better that
+ the updater releases any locks it may hold before calling
+ synchronize_rcu. If this is not possible (for example, because
+ the updater is protected by the BQL), you can use call_rcu.
+
+ void call_rcu1(struct rcu_head * head,
+ void (*func)(struct rcu_head *head));
+
+ This function invokes func(head) after all pre-existing RCU
+ read-side critical sections on all threads have completed. This
+ marks the end of the removal phase, with func taking care
+ asynchronously of the reclamation phase.
+
+ The foo struct needs to have an rcu_head structure added,
+ perhaps as follows:
+
+ struct foo {
+ struct rcu_head rcu;
+ int a;
+ char b;
+ long c;
+ };
+
+ so that the reclaimer function can fetch the struct foo address
+ and free it:
+
+ call_rcu1(&foo.rcu, foo_reclaim);
+
+ void foo_reclaim(struct rcu_head *rp)
+ {
+ struct foo *fp = container_of(rp, struct foo, rcu);
+ g_free(fp);
+ }
+
+ For the common case where the rcu_head member is the first of the
+ struct, you can use the following macro.
+
+ void call_rcu(T *p,
+ void (*func)(T *p),
+ field-name);
+
+ call_rcu1 is typically used through this macro, in the common case
+ where the "struct rcu_head" is the first field in the struct. In
+ the above case, one could have written simply:
+
+ call_rcu(foo_reclaim, g_free, rcu);
+
+ typeof(*p) atomic_rcu_read(p);
+
+ atomic_rcu_read() is similar to atomic_mb_read(), but it makes
+ some assumptions on the code that calls it. This allows a more
+ optimized implementation.
+
+ atomic_rcu_read assumes that whenever a single RCU critical
+ section reads multiple shared data, these reads are either
+ data-dependent or need no ordering. This is almost always the
+ case when using RCU, because read-side critical sections typically
+ navigate one or more pointers (the pointers that are changed on
+ every update) until reaching a data structure of interest,
+ and then read from there.
+
+ RCU read-side critical sections must use atomic_rcu_read() to
+ read data, unless concurrent writes are presented by another
+ synchronization mechanism.
+
+ Furthermore, RCU read-side critical sections should traverse the
+ data structure in a single direction, opposite to the direction
+ in which the updater initializes it.
+
+ void atomic_rcu_set(p, typeof(*p) v);
+
+ atomic_rcu_set() is also similar to atomic_mb_set(), and it also
+ makes assumptions on the code that calls it in order to allow a more
+ optimized implementation.
+
+ In particular, atomic_rcu_set() suffices for synchronization
+ with readers, if the updater never mutates a field within a
+ data item that is already accessible to readers. This is the
+ case when initializing a new copy of the RCU-protected data
+ structure; just ensure that initialization of *p is carried out
+ before atomic_rcu_set() makes the data item visible to readers.
+ If this rule is observed, writes will happen in the opposite
+ order as reads in the RCU read-side critical sections (or if
+ there is just one update), and there will be no need for other
+ synchronization mechanism to coordinate the accesses.
+
+The following APIs must be used before RCU is used in a thread:
+
+ void rcu_register_thread(void);
+
+ Mark a thread as taking part in the RCU mechanism. Such a thread
+ will have to report quiescent points regularly, either manually
+ or through the QemuCond/QemuSemaphore/QemuEvent APIs.
+
+ void rcu_unregister_thread(void);
+
+ Mark a thread as not taking part anymore in the RCU mechanism.
+ It is not a problem if such a thread reports quiescent points,
+ either manually or by using the QemuCond/QemuSemaphore/QemuEvent
+ APIs.
+
+Note that these APIs are relatively heavyweight, and should _not_ be
+nested.
+
+
+DIFFERENCES WITH LINUX
+======================
+
+- Waiting on a mutex is possible, though discouraged, within an RCU critical
+ section. This is because spinlocks are rarely (if ever) used in userspace
+ programming; not allowing this would prevent upgrading an RCU read-side
+ critical section to become an updater.
+
+- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
+ rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
+
+- call_rcu is a macro that has an extra argument (the name of the first
+ field in the struct, which must be a struct rcu_head), and expects the
+ type of the callback's argument to be the type of the first argument.
+ call_rcu1 is the same as Linux's call_rcu.
+
+
+RCU PATTERNS
+============
+
+Many patterns using read-writer locks translate directly to RCU, with
+the advantages of higher scalability and deadlock immunity.
+
+In general, RCU can be used whenever it is possible to create a new
+"version" of a data structure every time the updater runs. This may
+sound like a very strict restriction, however:
+
+- the updater does not mean "everything that writes to a data structure",
+ but rather "everything that involves a reclamation step". See the
+ array example below
+
+- in some cases, creating a new version of a data structure may actually
+ be very cheap. For example, modifying the "next" pointer of a singly
+ linked list is effectively creating a new version of the list.
+
+Here are some frequently-used RCU idioms that are worth noting.
+
+
+RCU list processing
+-------------------
+
+TBD (not yet used in QEMU)
+
+
+RCU reference counting
+----------------------
+
+Because grace periods are not allowed to complete while there is an RCU
+read-side critical section in progress, the RCU read-side primitives
+may be used as a restricted reference-counting mechanism. For example,
+consider the following code fragment:
+
+ rcu_read_lock();
+ p = atomic_rcu_read(&foo);
+ /* do something with p. */
+ rcu_read_unlock();
+
+The RCU read-side critical section ensures that the value of "p" remains
+valid until after the rcu_read_unlock(). In some sense, it is acquiring
+a reference to p that is later released when the critical section ends.
+The write side looks simply like this (with appropriate locking):
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ atomic_rcu_set(&foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ synchronize_rcu();
+ free(old);
+
+If the processing cannot be done purely within the critical section, it
+is possible to combine this idiom with a "real" reference count:
+
+ rcu_read_lock();
+ p = atomic_rcu_read(&foo);
+ foo_ref(p);
+ rcu_read_unlock();
+ /* do something with p. */
+ foo_unref(p);
+
+The write side can be like this:
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ atomic_rcu_set(&foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ synchronize_rcu();
+ foo_unref(old);
+
+or with call_rcu:
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ atomic_rcu_set(&foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ call_rcu(foo_unref, old, rcu);
+
+In both cases, the write side only performs removal. Reclamation
+happens when the last reference to a "foo" object is dropped.
+Using synchronize_rcu() is undesirably expensive, because the
+last reference may be dropped on the read side. Hence you can
+use call_rcu() instead:
+
+ foo_unref(struct foo *p) {
+ if (atomic_fetch_dec(&p->refcount) == 1) {
+ call_rcu(foo_destroy, p, rcu);
+ }
+ }
+
+
+Note that the same idioms would be possible with reader/writer
+locks:
+
+ read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
+ p = foo; p = foo;
+ /* do something with p. */ foo = new;
+ read_unlock(&foo_rwlock); free(p);
+ write_mutex_unlock(&foo_rwlock);
+ free(p);
+
+ ------------------------------------------------------------------
+
+ read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
+ p = foo; old = foo;
+ foo_ref(p); foo = new;
+ read_unlock(&foo_rwlock); foo_unref(old);
+ /* do something with p. */ write_mutex_unlock(&foo_rwlock);
+ read_lock(&foo_rwlock);
+ foo_unref(p);
+ read_unlock(&foo_rwlock);
+
+foo_unref could use a mechanism such as bottom halves to move deallocation
+out of the write-side critical section.
+
+
+RCU resizable arrays
+--------------------
+
+Resizable arrays can be used with RCU. The expensive RCU synchronization
+(or call_rcu) only needs to take place when the array is resized.
+The two items to take care of are:
+
+- ensuring that the old version of the array is available between removal
+ and reclamation;
+
+- avoiding mismatches in the read side between the array data and the
+ array size.
+
+The first problem is avoided simply by not using realloc. Instead,
+each resize will allocate a new array and copy the old data into it.
+The second problem would arise if the size and the data pointers were
+two members of a larger struct:
+
+ struct mystuff {
+ ...
+ int data_size;
+ int data_alloc;
+ T *data;
+ ...
+ };
+
+Instead, we store the size of the array with the array itself:
+
+ struct arr {
+ int size;
+ int alloc;
+ T data[];
+ };
+ struct arr *global_array;
+
+ read side:
+ rcu_read_lock();
+ struct arr *array = atomic_rcu_read(&global_array);
+ x = i < array->size ? array->data[i] : -1;
+ rcu_read_unlock();
+ return x;
+
+ write side (running under a lock):
+ if (global_array->size == global_array->alloc) {
+ /* Creating a new version. */
+ new_array = g_malloc(sizeof(struct arr) +
+ global_array->alloc * 2 * sizeof(T));
+ new_array->size = global_array->size;
+ new_array->alloc = global_array->alloc * 2;
+ memcpy(new_array->data, global_array->data,
+ global_array->alloc * sizeof(T));
+
+ /* Removal phase. */
+ old_array = global_array;
+ atomic_rcu_set(&new_array->data, new_array);
+ synchronize_rcu();
+
+ /* Reclamation phase. */
+ free(old_array);
+ }
+
+
+SOURCES
+=======
+
+* Documentation/RCU/ from the Linux kernel
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index 71262bccd2..e75aa8772e 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -17,6 +17,7 @@
#include "virtio-9p-xattr.h"
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-synth.h"
+#include "qemu/rcu.h"
#include <sys/stat.h>
diff --git a/hw/scsi/scsi-bus.c b/hw/scsi/scsi-bus.c
index 9b740a3cfa..db39ae0e23 100644
--- a/hw/scsi/scsi-bus.c
+++ b/hw/scsi/scsi-bus.c
@@ -1756,6 +1756,8 @@ void scsi_req_cancel_async(SCSIRequest *req, Notifier *notifier)
req->io_canceled = true;
if (req->aiocb) {
blk_aio_cancel_async(req->aiocb);
+ } else {
+ scsi_req_cancel_complete(req);
}
}
diff --git a/include/exec/memory.h b/include/exec/memory.h
index 0cd96b152e..06ffa1d185 100644
--- a/include/exec/memory.h
+++ b/include/exec/memory.h
@@ -33,6 +33,7 @@
#include "qemu/notify.h"
#include "qapi/error.h"
#include "qom/object.h"
+#include "qemu/rcu.h"
#define MAX_PHYS_ADDR_SPACE_BITS 62
#define MAX_PHYS_ADDR (((hwaddr)1 << MAX_PHYS_ADDR_SPACE_BITS) - 1)
@@ -207,9 +208,13 @@ struct MemoryListener {
*/
struct AddressSpace {
/* All fields are private. */
+ struct rcu_head rcu;
char *name;
MemoryRegion *root;
+
+ /* Accessed via RCU. */
struct FlatView *current_map;
+
int ioeventfd_nb;
struct MemoryRegionIoeventfd *ioeventfds;
struct AddressSpaceDispatch *dispatch;
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 93c2ae2f37..98e05ca875 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -129,6 +129,67 @@
#define atomic_set(ptr, i) ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
#endif
+/**
+ * atomic_rcu_read - reads a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
+ * includes a compiler barrier to ensure that value-speculative optimizations
+ * (e.g. VSS: Value Speculation Scheduling) does not perform the data read
+ * before the pointer read by speculating the value of the pointer. On new
+ * enough compilers, atomic_load takes care of such concern about
+ * dependency-breaking optimizations.
+ *
+ * Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
+ */
+#ifndef atomic_rcu_read
+#ifdef __ATOMIC_CONSUME
+#define atomic_rcu_read(ptr) ({ \
+ typeof(*ptr) _val; \
+ __atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
+ _val; \
+})
+#else
+#define atomic_rcu_read(ptr) ({ \
+ typeof(*ptr) _val = atomic_read(ptr); \
+ smp_read_barrier_depends(); \
+ _val; \
+})
+#endif
+#endif
+
+/**
+ * atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match atomic_rcu_read().
+ */
+#ifndef atomic_rcu_set
+#ifdef __ATOMIC_RELEASE
+#define atomic_rcu_set(ptr, i) do { \
+ typeof(*ptr) _val = (i); \
+ __atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
+} while(0)
+#else
+#define atomic_rcu_set(ptr, i) do { \
+ smp_wmb(); \
+ atomic_set(ptr, i); \
+} while (0)
+#endif
+#endif
+
/* These have the same semantics as Java volatile variables.
* See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
* "1. Issue a StoreStore barrier (wmb) before each volatile store."
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index a98eb3ad79..c602797652 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -104,6 +104,19 @@ struct { \
(head)->lh_first = NULL; \
} while (/*CONSTCOND*/0)
+#define QLIST_SWAP(dstlist, srclist, field) do { \
+ void *tmplist; \
+ tmplist = (srclist)->lh_first; \
+ (srclist)->lh_first = (dstlist)->lh_first; \
+ if ((srclist)->lh_first != NULL) { \
+ (srclist)->lh_first->field.le_prev = &(srclist)->lh_first; \
+ } \
+ (dstlist)->lh_first = tmplist; \
+ if ((dstlist)->lh_first != NULL) { \
+ (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first; \
+ } \
+} while (/*CONSTCOND*/0)
+
#define QLIST_INSERT_AFTER(listelm, elm, field) do { \
if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \
(listelm)->field.le_next->field.le_prev = \
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
new file mode 100644
index 0000000000..068a279a79
--- /dev/null
+++ b/include/qemu/rcu.h
@@ -0,0 +1,147 @@
+#ifndef QEMU_RCU_H
+#define QEMU_RCU_H
+
+/*
+ * urcu-mb.h
+ *
+ * Userspace RCU header with explicit memory barrier.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <glib.h>
+
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...) assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader_data {
+ /* Data used by both reader and synchronize_rcu() */
+ unsigned long ctr;
+ bool waiting;
+
+ /* Data used by reader only */
+ unsigned depth;
+
+ /* Data used for registry, protected by rcu_gp_lock */
+ QLIST_ENTRY(rcu_reader_data) node;
+};
+
+extern __thread struct rcu_reader_data rcu_reader;
+
+static inline void rcu_read_lock(void)
+{
+ struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+ unsigned ctr;
+
+ if (p_rcu_reader->depth++ > 0) {
+ return;
+ }
+
+ ctr = atomic_read(&rcu_gp_ctr);
+ atomic_xchg(&p_rcu_reader->ctr, ctr);
+ if (atomic_read(&p_rcu_reader->waiting)) {
+ atomic_set(&p_rcu_reader->waiting, false);
+ qemu_event_set(&rcu_gp_event);
+ }
+}
+
+static inline void rcu_read_unlock(void)
+{
+ struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+
+ assert(p_rcu_reader->depth != 0);
+ if (--p_rcu_reader->depth > 0) {
+ return;
+ }
+
+ atomic_xchg(&p_rcu_reader->ctr, 0);
+ if (atomic_read(&p_rcu_reader->waiting)) {
+ atomic_set(&p_rcu_reader->waiting, false);
+ qemu_event_set(&rcu_gp_event);
+ }
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+struct rcu_head;
+typedef void RCUCBFunc(struct rcu_head *head);
+
+struct rcu_head {
+ struct rcu_head *next;
+ RCUCBFunc *func;
+};
+
+extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
+
+/* The operands of the minus operator must have the same type,
+ * which must be the one that we specify in the cast.
+ */
+#define call_rcu(head, func, field) \
+ call_rcu1(({ \
+ char __attribute__((unused)) \
+ offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
+ func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
+ &(head)->field; \
+ }), \
+ (RCUCBFunc *)(func))
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* QEMU_RCU_H */
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e89fdc9785..5114ec8e79 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
int qemu_mutex_trylock(QemuMutex *mutex);
void qemu_mutex_unlock(QemuMutex *mutex);
-#define rcu_read_lock() do { } while (0)
-#define rcu_read_unlock() do { } while (0)
-
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);
diff --git a/include/qemu/timer.h b/include/qemu/timer.h
index ca5befba0e..eba8b2109c 100644
--- a/include/qemu/timer.h
+++ b/include/qemu/timer.h
@@ -838,7 +838,6 @@ static inline int64_t get_clock(void)
int64_t cpu_get_icount_raw(void);
int64_t cpu_get_icount(void);
int64_t cpu_get_clock(void);
-int64_t cpu_get_clock_offset(void);
int64_t cpu_icount_to_ns(int64_t icount);
/*******************************************/
diff --git a/memory.c b/memory.c
index c343bf37df..9b91243978 100644
--- a/memory.c
+++ b/memory.c
@@ -33,26 +33,12 @@ static bool memory_region_update_pending;
static bool ioeventfd_update_pending;
static bool global_dirty_log = false;
-/* flat_view_mutex is taken around reading as->current_map; the critical
- * section is extremely short, so I'm using a single mutex for every AS.
- * We could also RCU for the read-side.
- *
- * The BQL is taken around transaction commits, hence both locks are taken
- * while writing to as->current_map (with the BQL taken outside).
- */
-static QemuMutex flat_view_mutex;
-
static QTAILQ_HEAD(memory_listeners, MemoryListener) memory_listeners
= QTAILQ_HEAD_INITIALIZER(memory_listeners);
static QTAILQ_HEAD(, AddressSpace) address_spaces
= QTAILQ_HEAD_INITIALIZER(address_spaces);
-static void memory_init(void)
-{
- qemu_mutex_init(&flat_view_mutex);
-}
-
typedef struct AddrRange AddrRange;
/*
@@ -242,6 +228,7 @@ struct FlatRange {
* order.
*/
struct FlatView {
+ struct rcu_head rcu;
unsigned ref;
FlatRange *ranges;
unsigned nr;
@@ -654,10 +641,10 @@ static FlatView *address_space_get_flatview(AddressSpace *as)
{
FlatView *view;
- qemu_mutex_lock(&flat_view_mutex);
- view = as->current_map;
+ rcu_read_lock();
+ view = atomic_rcu_read(&as->current_map);
flatview_ref(view);
- qemu_mutex_unlock(&flat_view_mutex);
+ rcu_read_unlock();
return view;
}
@@ -766,10 +753,9 @@ static void address_space_update_topology(AddressSpace *as)
address_space_update_topology_pass(as, old_view, new_view, false);
address_space_update_topology_pass(as, old_view, new_view, true);
- qemu_mutex_lock(&flat_view_mutex);
- flatview_unref(as->current_map);
- as->current_map = new_view;
- qemu_mutex_unlock(&flat_view_mutex);
+ /* Writes are protected by the BQL. */
+ atomic_rcu_set(&as->current_map, new_view);
+ call_rcu(old_view, flatview_unref, rcu);
/* Note that all the old MemoryRegions are still alive up to this
* point. This relieves most MemoryListeners from the need to
@@ -1263,7 +1249,6 @@ static void memory_region_finalize(Object *obj)
MemoryRegion *mr = MEMORY_REGION(obj);
assert(QTAILQ_EMPTY(&mr->subregions));
- assert(memory_region_transaction_depth == 0);
mr->destructor(mr);
memory_region_clear_coalescing(mr);
g_free((char *)mr->name);
@@ -1843,11 +1828,11 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
}
range = addrrange_make(int128_make64(addr), int128_make64(size));
- view = address_space_get_flatview(as);
+ rcu_read_lock();
+ view = atomic_rcu_read(&as->current_map);
fr = flatview_lookup(view, range);
if (!fr) {
- flatview_unref(view);
- return ret;
+ goto out;
}
while (fr > view->ranges && addrrange_intersects(fr[-1].addr, range)) {
@@ -1864,8 +1849,8 @@ MemoryRegionSection memory_region_find(MemoryRegion *mr,
ret.offset_within_address_space = int128_get64(range.start);
ret.readonly = fr->readonly;
memory_region_ref(ret.mr);
-
- flatview_unref(view);
+out:
+ rcu_read_unlock();
return ret;
}
@@ -1958,10 +1943,6 @@ void memory_listener_unregister(MemoryListener *listener)
void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
{
- if (QTAILQ_EMPTY(&address_spaces)) {
- memory_init();
- }
-
memory_region_transaction_begin();
as->root = root;
as->current_map = g_new(FlatView, 1);
@@ -1975,15 +1956,10 @@ void address_space_init(AddressSpace *as, MemoryRegion *root, const char *name)
memory_region_transaction_commit();
}
-void address_space_destroy(AddressSpace *as)
+static void do_address_space_destroy(AddressSpace *as)
{
MemoryListener *listener;
- /* Flush out anything from MemoryListeners listening in on this */
- memory_region_transaction_begin();
- as->root = NULL;
- memory_region_transaction_commit();
- QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
address_space_destroy_dispatch(as);
QTAILQ_FOREACH(listener, &memory_listeners, link) {
@@ -1995,6 +1971,21 @@ void address_space_destroy(AddressSpace *as)
g_free(as->ioeventfds);
}
+void address_space_destroy(AddressSpace *as)
+{
+ /* Flush out anything from MemoryListeners listening in on this */
+ memory_region_transaction_begin();
+ as->root = NULL;
+ memory_region_transaction_commit();
+ QTAILQ_REMOVE(&address_spaces, as, address_spaces_link);
+
+ /* At this point, as->dispatch and as->current_map are dummy
+ * entries that the guest should never use. Wait for the old
+ * values to expire before freeing the data.
+ */
+ call_rcu(as, do_address_space_destroy, rcu);
+}
+
bool io_mem_read(MemoryRegion *mr, hwaddr addr, uint64_t *pval, unsigned size)
{
return memory_region_dispatch_read(mr, addr, pval, size);
diff --git a/tests/Makefile b/tests/Makefile
index c2e2e52f22..db5b3c3df1 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -60,6 +60,8 @@ gcov-files-test-mul64-y = util/host-utils.c
check-unit-y += tests/test-int128$(EXESUF)
# all code tested by test-int128 is inside int128.h
gcov-files-test-int128-y =
+check-unit-y += tests/rcutorture$(EXESUF)
+gcov-files-rcutorture-y = util/rcu.c
check-unit-y += tests/test-bitops$(EXESUF)
check-unit-$(CONFIG_HAS_GLIB_SUBPROCESS_TESTS) += tests/test-qdev-global-props$(EXESUF)
check-unit-y += tests/check-qom-interface$(EXESUF)
@@ -223,7 +225,8 @@ test-obj-y = tests/check-qint.o tests/check-qstring.o tests/check-qdict.o \
tests/test-qmp-input-visitor.o tests/test-qmp-input-strict.o \
tests/test-qmp-commands.o tests/test-visitor-serialization.o \
tests/test-x86-cpuid.o tests/test-mul64.o tests/test-int128.o \
- tests/test-opts-visitor.o tests/test-qmp-event.o
+ tests/test-opts-visitor.o tests/test-qmp-event.o \
+ tests/rcutorture.o
test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
tests/test-qapi-event.o
@@ -252,6 +255,8 @@ tests/test-x86-cpuid$(EXESUF): tests/test-x86-cpuid.o
tests/test-xbzrle$(EXESUF): tests/test-xbzrle.o migration/xbzrle.o page_cache.o libqemuutil.a
tests/test-cutils$(EXESUF): tests/test-cutils.o util/cutils.o
tests/test-int128$(EXESUF): tests/test-int128.o
+tests/rcutorture$(EXESUF): tests/rcutorture.o libqemuutil.a
+
tests/test-qdev-global-props$(EXESUF): tests/test-qdev-global-props.o \
hw/core/qdev.o hw/core/qdev-properties.o hw/core/hotplug.o\
hw/core/irq.o \
diff --git a/tests/rcutorture.c b/tests/rcutorture.c
new file mode 100644
index 0000000000..60a2ccfe2e
--- /dev/null
+++ b/tests/rcutorture.c
@@ -0,0 +1,451 @@
+/*
+ * rcutorture.c: simple user-level performance/stress test of RCU.
+ *
+ * Usage:
+ * ./rcu <nreaders> rperf [ <seconds> ]
+ * Run a read-side performance test with the specified
+ * number of readers for <seconds> seconds.
+ * ./rcu <nupdaters> uperf [ <seconds> ]
+ * Run an update-side performance test with the specified
+ * number of updaters and specified duration.
+ * ./rcu <nreaders> perf [ <seconds> ]
+ * Run a combined read/update performance test with the specified
+ * number of readers and one updater and specified duration.
+ *
+ * The above tests produce output as follows:
+ *
+ * n_reads: 46008000 n_updates: 146026 nreaders: 2 nupdaters: 1 duration: 1
+ * ns/read: 43.4707 ns/update: 6848.1
+ *
+ * The first line lists the total number of RCU reads and updates executed
+ * during the test, the number of reader threads, the number of updater
+ * threads, and the duration of the test in seconds. The second line
+ * lists the average duration of each type of operation in nanoseconds,
+ * or "nan" if the corresponding type of operation was not performed.
+ *
+ * ./rcu <nreaders> stress [ <seconds> ]
+ * Run a stress test with the specified number of readers and
+ * one updater.
+ *
+ * This test produces output as follows:
+ *
+ * n_reads: 114633217 n_updates: 3903415 n_mberror: 0
+ * rcu_stress_count: 114618391 14826 0 0 0 0 0 0 0 0 0
+ *
+ * The first line lists the number of RCU read and update operations
+ * executed, followed by the number of memory-ordering violations
+ * (which will be zero in a correct RCU implementation). The second
+ * line lists the number of readers observing progressively more stale
+ * data. A correct RCU implementation will have all but the first two
+ * numbers non-zero.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
+ */
+
+/*
+ * Test variables.
+ */
+
+#include <glib.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "qemu/atomic.h"
+#include "qemu/rcu.h"
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+
+long long n_reads = 0LL;
+long n_updates = 0L;
+int nthreadsrunning;
+
+#define GOFLAG_INIT 0
+#define GOFLAG_RUN 1
+#define GOFLAG_STOP 2
+
+static volatile int goflag = GOFLAG_INIT;
+
+#define RCU_READ_RUN 1000
+
+#define NR_THREADS 100
+static QemuThread threads[NR_THREADS];
+static struct rcu_reader_data *data[NR_THREADS];
+static int n_threads;
+
+static void create_thread(void *(*func)(void *))
+{
+ if (n_threads >= NR_THREADS) {
+ fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
+ exit(-1);
+ }
+ qemu_thread_create(&threads[n_threads], "test", func, &data[n_threads],
+ QEMU_THREAD_JOINABLE);
+ n_threads++;
+}
+
+static void wait_all_threads(void)
+{
+ int i;
+
+ for (i = 0; i < n_threads; i++) {
+ qemu_thread_join(&threads[i]);
+ }
+ n_threads = 0;
+}
+
+/*
+ * Performance test.
+ */
+
+static void *rcu_read_perf_test(void *arg)
+{
+ int i;
+ long long n_reads_local = 0;
+
+ rcu_register_thread();
+
+ *(struct rcu_reader_data **)arg = &rcu_reader;
+ atomic_inc(&nthreadsrunning);
+ while (goflag == GOFLAG_INIT) {
+ g_usleep(1000);
+ }
+ while (goflag == GOFLAG_RUN) {
+ for (i = 0; i < RCU_READ_RUN; i++) {
+ rcu_read_lock();
+ rcu_read_unlock();
+ }
+ n_reads_local += RCU_READ_RUN;
+ }
+ atomic_add(&n_reads, n_reads_local);
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+static void *rcu_update_perf_test(void *arg)
+{
+ long long n_updates_local = 0;
+
+ rcu_register_thread();
+
+ *(struct rcu_reader_data **)arg = &rcu_reader;
+ atomic_inc(&nthreadsrunning);
+ while (goflag == GOFLAG_INIT) {
+ g_usleep(1000);
+ }
+ while (goflag == GOFLAG_RUN) {
+ synchronize_rcu();
+ n_updates_local++;
+ }
+ atomic_add(&n_updates, n_updates_local);
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+static void perftestinit(void)
+{
+ nthreadsrunning = 0;
+}
+
+static void perftestrun(int nthreads, int duration, int nreaders, int nupdaters)
+{
+ while (atomic_read(&nthreadsrunning) < nthreads) {
+ g_usleep(1000);
+ }
+ goflag = GOFLAG_RUN;
+ g_usleep(duration * G_USEC_PER_SEC);
+ goflag = GOFLAG_STOP;
+ wait_all_threads();
+ printf("n_reads: %lld n_updates: %ld nreaders: %d nupdaters: %d duration: %d\n",
+ n_reads, n_updates, nreaders, nupdaters, duration);
+ printf("ns/read: %g ns/update: %g\n",
+ ((duration * 1000*1000*1000.*(double)nreaders) /
+ (double)n_reads),
+ ((duration * 1000*1000*1000.*(double)nupdaters) /
+ (double)n_updates));
+ exit(0);
+}
+
+static void perftest(int nreaders, int duration)
+{
+ int i;
+
+ perftestinit();
+ for (i = 0; i < nreaders; i++) {
+ create_thread(rcu_read_perf_test);
+ }
+ create_thread(rcu_update_perf_test);
+ perftestrun(i + 1, duration, nreaders, 1);
+}
+
+static void rperftest(int nreaders, int duration)
+{
+ int i;
+
+ perftestinit();
+ for (i = 0; i < nreaders; i++) {
+ create_thread(rcu_read_perf_test);
+ }
+ perftestrun(i, duration, nreaders, 0);
+}
+
+static void uperftest(int nupdaters, int duration)
+{
+ int i;
+
+ perftestinit();
+ for (i = 0; i < nupdaters; i++) {
+ create_thread(rcu_update_perf_test);
+ }
+ perftestrun(i, duration, 0, nupdaters);
+}
+
+/*
+ * Stress test.
+ */
+
+#define RCU_STRESS_PIPE_LEN 10
+
+struct rcu_stress {
+ int pipe_count;
+ int mbtest;
+};
+
+struct rcu_stress rcu_stress_array[RCU_STRESS_PIPE_LEN] = { { 0 } };
+struct rcu_stress *rcu_stress_current;
+int rcu_stress_idx;
+
+int n_mberror;
+long long rcu_stress_count[RCU_STRESS_PIPE_LEN + 1];
+
+
+static void *rcu_read_stress_test(void *arg)
+{
+ int i;
+ int itercnt = 0;
+ struct rcu_stress *p;
+ int pc;
+ long long n_reads_local = 0;
+ volatile int garbage = 0;
+
+ rcu_register_thread();
+
+ *(struct rcu_reader_data **)arg = &rcu_reader;
+ while (goflag == GOFLAG_INIT) {
+ g_usleep(1000);
+ }
+ while (goflag == GOFLAG_RUN) {
+ rcu_read_lock();
+ p = atomic_rcu_read(&rcu_stress_current);
+ if (p->mbtest == 0) {
+ n_mberror++;
+ }
+ rcu_read_lock();
+ for (i = 0; i < 100; i++) {
+ garbage++;
+ }
+ rcu_read_unlock();
+ pc = p->pipe_count;
+ rcu_read_unlock();
+ if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0)) {
+ pc = RCU_STRESS_PIPE_LEN;
+ }
+ atomic_inc(&rcu_stress_count[pc]);
+ n_reads_local++;
+ if ((++itercnt % 0x1000) == 0) {
+ synchronize_rcu();
+ }
+ }
+ atomic_add(&n_reads, n_reads_local);
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+static void *rcu_update_stress_test(void *arg)
+{
+ int i;
+ struct rcu_stress *p;
+
+ rcu_register_thread();
+
+ *(struct rcu_reader_data **)arg = &rcu_reader;
+ while (goflag == GOFLAG_INIT) {
+ g_usleep(1000);
+ }
+ while (goflag == GOFLAG_RUN) {
+ i = rcu_stress_idx + 1;
+ if (i >= RCU_STRESS_PIPE_LEN) {
+ i = 0;
+ }
+ p = &rcu_stress_array[i];
+ p->mbtest = 0;
+ smp_mb();
+ p->pipe_count = 0;
+ p->mbtest = 1;
+ atomic_rcu_set(&rcu_stress_current, p);
+ rcu_stress_idx = i;
+ for (i = 0; i < RCU_STRESS_PIPE_LEN; i++) {
+ if (i != rcu_stress_idx) {
+ rcu_stress_array[i].pipe_count++;
+ }
+ }
+ synchronize_rcu();
+ n_updates++;
+ }
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+static void *rcu_fake_update_stress_test(void *arg)
+{
+ rcu_register_thread();
+
+ *(struct rcu_reader_data **)arg = &rcu_reader;
+ while (goflag == GOFLAG_INIT) {
+ g_usleep(1000);
+ }
+ while (goflag == GOFLAG_RUN) {
+ synchronize_rcu();
+ g_usleep(1000);
+ }
+
+ rcu_unregister_thread();
+ return NULL;
+}
+
+static void stresstest(int nreaders, int duration)
+{
+ int i;
+
+ rcu_stress_current = &rcu_stress_array[0];
+ rcu_stress_current->pipe_count = 0;
+ rcu_stress_current->mbtest = 1;
+ for (i = 0; i < nreaders; i++) {
+ create_thread(rcu_read_stress_test);
+ }
+ create_thread(rcu_update_stress_test);
+ for (i = 0; i < 5; i++) {
+ create_thread(rcu_fake_update_stress_test);
+ }
+ goflag = GOFLAG_RUN;
+ g_usleep(duration * G_USEC_PER_SEC);
+ goflag = GOFLAG_STOP;
+ wait_all_threads();
+ printf("n_reads: %lld n_updates: %ld n_mberror: %d\n",
+ n_reads, n_updates, n_mberror);
+ printf("rcu_stress_count:");
+ for (i = 0; i <= RCU_STRESS_PIPE_LEN; i++) {
+ printf(" %lld", rcu_stress_count[i]);
+ }
+ printf("\n");
+ exit(0);
+}
+
+/* GTest interface */
+
+static void gtest_stress(int nreaders, int duration)
+{
+ int i;
+
+ rcu_stress_current = &rcu_stress_array[0];
+ rcu_stress_current->pipe_count = 0;
+ rcu_stress_current->mbtest = 1;
+ for (i = 0; i < nreaders; i++) {
+ create_thread(rcu_read_stress_test);
+ }
+ create_thread(rcu_update_stress_test);
+ for (i = 0; i < 5; i++) {
+ create_thread(rcu_fake_update_stress_test);
+ }
+ goflag = GOFLAG_RUN;
+ g_usleep(duration * G_USEC_PER_SEC);
+ goflag = GOFLAG_STOP;
+ wait_all_threads();
+ g_assert_cmpint(n_mberror, ==, 0);
+ for (i = 2; i <= RCU_STRESS_PIPE_LEN; i++) {
+ g_assert_cmpint(rcu_stress_count[i], ==, 0);
+ }
+}
+
+static void gtest_stress_1_1(void)
+{
+ gtest_stress(1, 1);
+}
+
+static void gtest_stress_10_1(void)
+{
+ gtest_stress(10, 1);
+}
+
+static void gtest_stress_1_5(void)
+{
+ gtest_stress(1, 5);
+}
+
+static void gtest_stress_10_5(void)
+{
+ gtest_stress(10, 5);
+}
+
+/*
+ * Mainprogram.
+ */
+
+static void usage(int argc, char *argv[])
+{
+ fprintf(stderr, "Usage: %s [nreaders [ perf | stress ] ]\n", argv[0]);
+ exit(-1);
+}
+
+int main(int argc, char *argv[])
+{
+ int nreaders = 1;
+ int duration = 1;
+
+ if (argc >= 2 && argv[1][0] == '-') {
+ g_test_init(&argc, &argv, NULL);
+ if (g_test_quick()) {
+ g_test_add_func("/rcu/torture/1reader", gtest_stress_1_1);
+ g_test_add_func("/rcu/torture/10readers", gtest_stress_10_1);
+ } else {
+ g_test_add_func("/rcu/torture/1reader", gtest_stress_1_5);
+ g_test_add_func("/rcu/torture/10readers", gtest_stress_10_5);
+ }
+ return g_test_run();
+ }
+
+ if (argc >= 2) {
+ nreaders = strtoul(argv[1], NULL, 0);
+ }
+ if (argc > 3) {
+ duration = strtoul(argv[3], NULL, 0);
+ }
+ if (argc < 3 || strcmp(argv[2], "stress") == 0) {
+ stresstest(nreaders, duration);
+ } else if (strcmp(argv[2], "rperf") == 0) {
+ rperftest(nreaders, duration);
+ } else if (strcmp(argv[2], "uperf") == 0) {
+ uperftest(nreaders, duration);
+ } else if (strcmp(argv[2], "perf") == 0) {
+ perftest(nreaders, duration);
+ }
+ usage(argc, argv);
+ return 0;
+}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 93007e2f56..ceaba30939 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -17,3 +17,4 @@ util-obj-y += throttle.o
util-obj-y += getauxval.o
util-obj-y += readline.o
util-obj-y += rfifolock.o
+util-obj-y += rcu.o
diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c
index 41cb23df0c..50a29d8f7a 100644
--- a/util/qemu-thread-posix.c
+++ b/util/qemu-thread-posix.c
@@ -307,11 +307,13 @@ static inline void futex_wait(QemuEvent *ev, unsigned val)
#else
static inline void futex_wake(QemuEvent *ev, int n)
{
+ pthread_mutex_lock(&ev->lock);
if (n == 1) {
pthread_cond_signal(&ev->cond);
} else {
pthread_cond_broadcast(&ev->cond);
}
+ pthread_mutex_unlock(&ev->lock);
}
static inline void futex_wait(QemuEvent *ev, unsigned val)
diff --git a/util/rcu.c b/util/rcu.c
new file mode 100644
index 0000000000..c9c3e6e4ab
--- /dev/null
+++ b/util/rcu.c
@@ -0,0 +1,291 @@
+/*
+ * urcu-mb.c
+ *
+ * Userspace RCU library with explicit memory barriers
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include "qemu-common.h"
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "qemu/rcu.h"
+#include "qemu/atomic.h"
+#include "qemu/thread.h"
+
+/*
+ * Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
+ * Bits 1 and above are defined in synchronize_rcu.
+ */
+#define RCU_GP_LOCKED (1UL << 0)
+#define RCU_GP_CTR (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
+
+QemuEvent rcu_gp_event;
+static QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+ unsigned long v;
+
+ v = atomic_read(ctr);
+ return v && (v != rcu_gp_ctr);
+}
+
+/* Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+__thread struct rcu_reader_data rcu_reader;
+
+/* Protected by rcu_gp_lock. */
+typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+/* Wait for previous parity/grace period to be empty of readers. */
+static void wait_for_readers(void)
+{
+ ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+ struct rcu_reader_data *index, *tmp;
+
+ for (;;) {
+ /* We want to be notified of changes made to rcu_gp_ongoing
+ * while we walk the list.
+ */
+ qemu_event_reset(&rcu_gp_event);
+
+ /* Instead of using atomic_mb_set for index->waiting, and
+ * atomic_mb_read for index->ctr, memory barriers are placed
+ * manually since writes to different threads are independent.
+ * atomic_mb_set has a smp_wmb before...
+ */
+ smp_wmb();
+ QLIST_FOREACH(index, &registry, node) {
+ atomic_set(&index->waiting, true);
+ }
+
+ /* ... and a smp_mb after. */
+ smp_mb();
+
+ QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+ if (!rcu_gp_ongoing(&index->ctr)) {
+ QLIST_REMOVE(index, node);
+ QLIST_INSERT_HEAD(&qsreaders, index, node);
+
+ /* No need for mb_set here, worst of all we
+ * get some extra futex wakeups.
+ */
+ atomic_set(&index->waiting, false);
+ }
+ }
+
+ /* atomic_mb_read has smp_rmb after. */
+ smp_rmb();
+
+ if (QLIST_EMPTY(&registry)) {
+ break;
+ }
+
+ /* Wait for one thread to report a quiescent state and
+ * try again.
+ */
+ qemu_event_wait(&rcu_gp_event);
+ }
+
+ /* put back the reader list in the registry */
+ QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+void synchronize_rcu(void)
+{
+ qemu_mutex_lock(&rcu_gp_lock);
+
+ if (!QLIST_EMPTY(&registry)) {
+ /* In either case, the atomic_mb_set below blocks stores that free
+ * old RCU-protected pointers.
+ */
+ if (sizeof(rcu_gp_ctr) < 8) {
+ /* For architectures with 32-bit longs, a two-subphases algorithm
+ * ensures we do not encounter overflow bugs.
+ *
+ * Switch parity: 0 -> 1, 1 -> 0.
+ */
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ wait_for_readers();
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ } else {
+ /* Increment current grace period. */
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+ }
+
+ wait_for_readers();
+ }
+
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+
+#define RCU_CALL_MIN_SIZE 30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu. Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+ struct rcu_head **old_tail;
+
+ node->next = NULL;
+ old_tail = atomic_xchg(&tail, &node->next);
+ atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+ struct rcu_head *node, *next;
+
+retry:
+ /* Test for an empty list, which we do not expect. Note that for
+ * the consumer head and tail are always consistent. The head
+ * is consistent because only the consumer reads/writes it.
+ * The tail, because it is the first step in the enqueuing.
+ * It is only the next pointers that might be inconsistent.
+ */
+ if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+ abort();
+ }
+
+ /* If the head node has NULL in its next pointer, the value is
+ * wrong and we need to wait until its enqueuer finishes the update.
+ */
+ node = head;
+ next = atomic_mb_read(&head->next);
+ if (!next) {
+ return NULL;
+ }
+
+ /* Since we are the sole consumer, and we excluded the empty case
+ * above, the queue will always have at least two nodes: the
+ * dummy node, and the one being removed. So we do not need to update
+ * the tail pointer.
+ */
+ head = next;
+
+ /* If we dequeued the dummy node, add it back at the end and retry. */
+ if (node == &dummy) {
+ enqueue(node);
+ goto retry;
+ }
+
+ return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+ struct rcu_head *node;
+
+ for (;;) {
+ int tries = 0;
+ int n = atomic_read(&rcu_call_count);
+
+ /* Heuristically wait for a decent number of callbacks to pile up.
+ * Fetch rcu_call_count now, we only must process elements that were
+ * added before synchronize_rcu() starts.
+ */
+ while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+ g_usleep(100000);
+ qemu_event_reset(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ if (n < RCU_CALL_MIN_SIZE) {
+ qemu_event_wait(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ }
+ }
+
+ atomic_sub(&rcu_call_count, n);
+ synchronize_rcu();
+ while (n > 0) {
+ node = try_dequeue();
+ while (!node) {
+ qemu_event_reset(&rcu_call_ready_event);
+ node = try_dequeue();
+ if (!node) {
+ qemu_event_wait(&rcu_call_ready_event);
+ node = try_dequeue();
+ }
+ }
+
+ n--;
+ node->func(node);
+ }
+ }
+ abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+ node->func = func;
+ enqueue(node);
+ atomic_inc(&rcu_call_count);
+ qemu_event_set(&rcu_call_ready_event);
+}
+
+void rcu_register_thread(void)
+{
+ assert(rcu_reader.ctr == 0);
+ qemu_mutex_lock(&rcu_gp_lock);
+ QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+void rcu_unregister_thread(void)
+{
+ qemu_mutex_lock(&rcu_gp_lock);
+ QLIST_REMOVE(&rcu_reader, node);
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+ QemuThread thread;
+
+ qemu_mutex_init(&rcu_gp_lock);
+ qemu_event_init(&rcu_gp_event, true);
+
+ qemu_event_init(&rcu_call_ready_event, false);
+ qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
+ NULL, QEMU_THREAD_DETACHED);
+
+ rcu_register_thread();
+}