aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rcu.txt285
-rw-r--r--hw/9pfs/virtio-9p-synth.c1
-rw-r--r--include/qemu/atomic.h61
-rw-r--r--include/qemu/queue.h13
-rw-r--r--include/qemu/rcu.h112
-rw-r--r--include/qemu/thread.h3
-rw-r--r--util/Makefile.objs1
-rw-r--r--util/rcu.c172
8 files changed, 645 insertions, 3 deletions
diff --git a/docs/rcu.txt b/docs/rcu.txt
new file mode 100644
index 0000000000..9938ad382d
--- /dev/null
+++ b/docs/rcu.txt
@@ -0,0 +1,285 @@
+Using RCU (Read-Copy-Update) for synchronization
+================================================
+
+Read-copy update (RCU) is a synchronization mechanism that is used to
+protect read-mostly data structures. RCU is very efficient and scalable
+on the read side (it is wait-free), and thus can make the read paths
+extremely fast.
+
+RCU supports concurrency between a single writer and multiple readers,
+thus it is not used alone. Typically, the write-side will use a lock to
+serialize multiple updates, but other approaches are possible (e.g.,
+restricting updates to a single task). In QEMU, when a lock is used,
+this will often be the "iothread mutex", also known as the "big QEMU
+lock" (BQL). Also, restricting updates to a single task is done in
+QEMU using the "bottom half" API.
+
+RCU is fundamentally a "wait-to-finish" mechanism. The read side marks
+sections of code with "critical sections", and the update side will wait
+for the execution of all *currently running* critical sections before
+proceeding, or before asynchronously executing a callback.
+
+The key point here is that only the currently running critical sections
+are waited for; critical sections that are started _after_ the beginning
+of the wait do not extend the wait, despite running concurrently with
+the updater. This is the reason why RCU is more scalable than,
+for example, reader-writer locks. It is so much more scalable that
+the system will have a single instance of the RCU mechanism; a single
+mechanism can be used for an arbitrary number of "things", without
+having to worry about things such as contention or deadlocks.
+
+How is this possible? The basic idea is to split updates in two phases,
+"removal" and "reclamation". During removal, we ensure that subsequent
+readers will not be able to get a reference to the old data. After
+removal has completed, a critical section will not be able to access
+the old data. Therefore, critical sections that begin after removal
+do not matter; as soon as all previous critical sections have finished,
+there cannot be any readers who hold references to the data structure,
+and these can now be safely reclaimed (e.g., freed or unref'ed).
+
+Here is a picutre:
+
+ thread 1 thread 2 thread 3
+ ------------------- ------------------------ -------------------
+ enter RCU crit.sec.
+ | finish removal phase
+ | begin wait
+ | | enter RCU crit.sec.
+ exit RCU crit.sec | |
+ complete wait |
+ begin reclamation phase |
+ exit RCU crit.sec.
+
+
+Note how thread 3 is still executing its critical section when thread 2
+starts reclaiming data. This is possible, because the old version of the
+data structure was not accessible at the time thread 3 began executing
+that critical section.
+
+
+RCU API
+=======
+
+The core RCU API is small:
+
+ void rcu_read_lock(void);
+
+ Used by a reader to inform the reclaimer that the reader is
+ entering an RCU read-side critical section.
+
+ void rcu_read_unlock(void);
+
+ Used by a reader to inform the reclaimer that the reader is
+ exiting an RCU read-side critical section. Note that RCU
+ read-side critical sections may be nested and/or overlapping.
+
+ void synchronize_rcu(void);
+
+ Blocks until all pre-existing RCU read-side critical sections
+ on all threads have completed. This marks the end of the removal
+ phase and the beginning of reclamation phase.
+
+ Note that it would be valid for another update to come while
+ synchronize_rcu is running. Because of this, it is better that
+ the updater releases any locks it may hold before calling
+ synchronize_rcu.
+
+ typeof(*p) atomic_rcu_read(p);
+
+ atomic_rcu_read() is similar to atomic_mb_read(), but it makes
+ some assumptions on the code that calls it. This allows a more
+ optimized implementation.
+
+ atomic_rcu_read assumes that whenever a single RCU critical
+ section reads multiple shared data, these reads are either
+ data-dependent or need no ordering. This is almost always the
+ case when using RCU, because read-side critical sections typically
+ navigate one or more pointers (the pointers that are changed on
+ every update) until reaching a data structure of interest,
+ and then read from there.
+
+ RCU read-side critical sections must use atomic_rcu_read() to
+ read data, unless concurrent writes are presented by another
+ synchronization mechanism.
+
+ Furthermore, RCU read-side critical sections should traverse the
+ data structure in a single direction, opposite to the direction
+ in which the updater initializes it.
+
+ void atomic_rcu_set(p, typeof(*p) v);
+
+ atomic_rcu_set() is also similar to atomic_mb_set(), and it also
+ makes assumptions on the code that calls it in order to allow a more
+ optimized implementation.
+
+ In particular, atomic_rcu_set() suffices for synchronization
+ with readers, if the updater never mutates a field within a
+ data item that is already accessible to readers. This is the
+ case when initializing a new copy of the RCU-protected data
+ structure; just ensure that initialization of *p is carried out
+ before atomic_rcu_set() makes the data item visible to readers.
+ If this rule is observed, writes will happen in the opposite
+ order as reads in the RCU read-side critical sections (or if
+ there is just one update), and there will be no need for other
+ synchronization mechanism to coordinate the accesses.
+
+The following APIs must be used before RCU is used in a thread:
+
+ void rcu_register_thread(void);
+
+ Mark a thread as taking part in the RCU mechanism. Such a thread
+ will have to report quiescent points regularly, either manually
+ or through the QemuCond/QemuSemaphore/QemuEvent APIs.
+
+ void rcu_unregister_thread(void);
+
+ Mark a thread as not taking part anymore in the RCU mechanism.
+ It is not a problem if such a thread reports quiescent points,
+ either manually or by using the QemuCond/QemuSemaphore/QemuEvent
+ APIs.
+
+Note that these APIs are relatively heavyweight, and should _not_ be
+nested.
+
+
+DIFFERENCES WITH LINUX
+======================
+
+- Waiting on a mutex is possible, though discouraged, within an RCU critical
+ section. This is because spinlocks are rarely (if ever) used in userspace
+ programming; not allowing this would prevent upgrading an RCU read-side
+ critical section to become an updater.
+
+- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
+ rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
+
+
+RCU PATTERNS
+============
+
+Many patterns using read-writer locks translate directly to RCU, with
+the advantages of higher scalability and deadlock immunity.
+
+In general, RCU can be used whenever it is possible to create a new
+"version" of a data structure every time the updater runs. This may
+sound like a very strict restriction, however:
+
+- the updater does not mean "everything that writes to a data structure",
+ but rather "everything that involves a reclamation step". See the
+ array example below
+
+- in some cases, creating a new version of a data structure may actually
+ be very cheap. For example, modifying the "next" pointer of a singly
+ linked list is effectively creating a new version of the list.
+
+Here are some frequently-used RCU idioms that are worth noting.
+
+
+RCU list processing
+-------------------
+
+TBD (not yet used in QEMU)
+
+
+RCU reference counting
+----------------------
+
+Because grace periods are not allowed to complete while there is an RCU
+read-side critical section in progress, the RCU read-side primitives
+may be used as a restricted reference-counting mechanism. For example,
+consider the following code fragment:
+
+ rcu_read_lock();
+ p = atomic_rcu_read(&foo);
+ /* do something with p. */
+ rcu_read_unlock();
+
+The RCU read-side critical section ensures that the value of "p" remains
+valid until after the rcu_read_unlock(). In some sense, it is acquiring
+a reference to p that is later released when the critical section ends.
+The write side looks simply like this (with appropriate locking):
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ atomic_rcu_set(&foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ synchronize_rcu();
+ free(old);
+
+Note that the same idiom would be possible with reader/writer
+locks:
+
+ read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
+ p = foo; p = foo;
+ /* do something with p. */ foo = new;
+ read_unlock(&foo_rwlock); free(p);
+ write_mutex_unlock(&foo_rwlock);
+ free(p);
+
+
+RCU resizable arrays
+--------------------
+
+Resizable arrays can be used with RCU. The expensive RCU synchronization
+only needs to take place when the array is resized. The two items to
+take care of are:
+
+- ensuring that the old version of the array is available between removal
+ and reclamation;
+
+- avoiding mismatches in the read side between the array data and the
+ array size.
+
+The first problem is avoided simply by not using realloc. Instead,
+each resize will allocate a new array and copy the old data into it.
+The second problem would arise if the size and the data pointers were
+two members of a larger struct:
+
+ struct mystuff {
+ ...
+ int data_size;
+ int data_alloc;
+ T *data;
+ ...
+ };
+
+Instead, we store the size of the array with the array itself:
+
+ struct arr {
+ int size;
+ int alloc;
+ T data[];
+ };
+ struct arr *global_array;
+
+ read side:
+ rcu_read_lock();
+ struct arr *array = atomic_rcu_read(&global_array);
+ x = i < array->size ? array->data[i] : -1;
+ rcu_read_unlock();
+ return x;
+
+ write side (running under a lock):
+ if (global_array->size == global_array->alloc) {
+ /* Creating a new version. */
+ new_array = g_malloc(sizeof(struct arr) +
+ global_array->alloc * 2 * sizeof(T));
+ new_array->size = global_array->size;
+ new_array->alloc = global_array->alloc * 2;
+ memcpy(new_array->data, global_array->data,
+ global_array->alloc * sizeof(T));
+
+ /* Removal phase. */
+ old_array = global_array;
+ atomic_rcu_set(&new_array->data, new_array);
+ synchronize_rcu();
+
+ /* Reclamation phase. */
+ free(old_array);
+ }
+
+
+SOURCES
+=======
+
+* Documentation/RCU/ from the Linux kernel
diff --git a/hw/9pfs/virtio-9p-synth.c b/hw/9pfs/virtio-9p-synth.c
index 71262bccd2..e75aa8772e 100644
--- a/hw/9pfs/virtio-9p-synth.c
+++ b/hw/9pfs/virtio-9p-synth.c
@@ -17,6 +17,7 @@
#include "virtio-9p-xattr.h"
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-synth.h"
+#include "qemu/rcu.h"
#include <sys/stat.h>
diff --git a/include/qemu/atomic.h b/include/qemu/atomic.h
index 93c2ae2f37..98e05ca875 100644
--- a/include/qemu/atomic.h
+++ b/include/qemu/atomic.h
@@ -129,6 +129,67 @@
#define atomic_set(ptr, i) ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
#endif
+/**
+ * atomic_rcu_read - reads a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
+ * includes a compiler barrier to ensure that value-speculative optimizations
+ * (e.g. VSS: Value Speculation Scheduling) does not perform the data read
+ * before the pointer read by speculating the value of the pointer. On new
+ * enough compilers, atomic_load takes care of such concern about
+ * dependency-breaking optimizations.
+ *
+ * Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
+ */
+#ifndef atomic_rcu_read
+#ifdef __ATOMIC_CONSUME
+#define atomic_rcu_read(ptr) ({ \
+ typeof(*ptr) _val; \
+ __atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
+ _val; \
+})
+#else
+#define atomic_rcu_read(ptr) ({ \
+ typeof(*ptr) _val = atomic_read(ptr); \
+ smp_read_barrier_depends(); \
+ _val; \
+})
+#endif
+#endif
+
+/**
+ * atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match atomic_rcu_read().
+ */
+#ifndef atomic_rcu_set
+#ifdef __ATOMIC_RELEASE
+#define atomic_rcu_set(ptr, i) do { \
+ typeof(*ptr) _val = (i); \
+ __atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
+} while(0)
+#else
+#define atomic_rcu_set(ptr, i) do { \
+ smp_wmb(); \
+ atomic_set(ptr, i); \
+} while (0)
+#endif
+#endif
+
/* These have the same semantics as Java volatile variables.
* See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
* "1. Issue a StoreStore barrier (wmb) before each volatile store."
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index a98eb3ad79..c602797652 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -104,6 +104,19 @@ struct { \
(head)->lh_first = NULL; \
} while (/*CONSTCOND*/0)
+#define QLIST_SWAP(dstlist, srclist, field) do { \
+ void *tmplist; \
+ tmplist = (srclist)->lh_first; \
+ (srclist)->lh_first = (dstlist)->lh_first; \
+ if ((srclist)->lh_first != NULL) { \
+ (srclist)->lh_first->field.le_prev = &(srclist)->lh_first; \
+ } \
+ (dstlist)->lh_first = tmplist; \
+ if ((dstlist)->lh_first != NULL) { \
+ (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first; \
+ } \
+} while (/*CONSTCOND*/0)
+
#define QLIST_INSERT_AFTER(listelm, elm, field) do { \
if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \
(listelm)->field.le_next->field.le_prev = \
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
new file mode 100644
index 0000000000..cfef36e25a
--- /dev/null
+++ b/include/qemu/rcu.h
@@ -0,0 +1,112 @@
+#ifndef QEMU_RCU_H
+#define QEMU_RCU_H
+
+/*
+ * urcu-mb.h
+ *
+ * Userspace RCU header with explicit memory barrier.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <glib.h>
+
+#include "qemu/compiler.h"
+#include "qemu/thread.h"
+#include "qemu/queue.h"
+#include "qemu/atomic.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...) assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader_data {
+ /* Data used by both reader and synchronize_rcu() */
+ unsigned long ctr;
+ bool waiting;
+
+ /* Data used for registry, protected by rcu_gp_lock */
+ QLIST_ENTRY(rcu_reader_data) node;
+};
+
+extern __thread struct rcu_reader_data rcu_reader;
+
+static inline void rcu_read_lock(void)
+{
+ struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+
+ unsigned ctr = atomic_read(&rcu_gp_ctr);
+ atomic_xchg(&p_rcu_reader->ctr, ctr);
+ if (atomic_read(&p_rcu_reader->waiting)) {
+ atomic_set(&p_rcu_reader->waiting, false);
+ qemu_event_set(&rcu_gp_event);
+ }
+}
+
+static inline void rcu_read_unlock(void)
+{
+ struct rcu_reader_data *p_rcu_reader = &rcu_reader;
+
+ atomic_xchg(&p_rcu_reader->ctr, 0);
+ if (atomic_read(&p_rcu_reader->waiting)) {
+ atomic_set(&p_rcu_reader->waiting, false);
+ qemu_event_set(&rcu_gp_event);
+ }
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* QEMU_RCU_H */
diff --git a/include/qemu/thread.h b/include/qemu/thread.h
index e89fdc9785..5114ec8e79 100644
--- a/include/qemu/thread.h
+++ b/include/qemu/thread.h
@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
int qemu_mutex_trylock(QemuMutex *mutex);
void qemu_mutex_unlock(QemuMutex *mutex);
-#define rcu_read_lock() do { } while (0)
-#define rcu_read_unlock() do { } while (0)
-
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 93007e2f56..ceaba30939 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -17,3 +17,4 @@ util-obj-y += throttle.o
util-obj-y += getauxval.o
util-obj-y += readline.o
util-obj-y += rfifolock.o
+util-obj-y += rcu.o
diff --git a/util/rcu.c b/util/rcu.c
new file mode 100644
index 0000000000..1f737d5447
--- /dev/null
+++ b/util/rcu.c
@@ -0,0 +1,172 @@
+/*
+ * urcu-mb.c
+ *
+ * Userspace RCU library with explicit memory barriers
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2015 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "qemu/rcu.h"
+#include "qemu/atomic.h"
+
+/*
+ * Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
+ * Bits 1 and above are defined in synchronize_rcu.
+ */
+#define RCU_GP_LOCKED (1UL << 0)
+#define RCU_GP_CTR (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
+
+QemuEvent rcu_gp_event;
+static QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+ unsigned long v;
+
+ v = atomic_read(ctr);
+ return v && (v != rcu_gp_ctr);
+}
+
+/* Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+__thread struct rcu_reader_data rcu_reader;
+
+/* Protected by rcu_gp_lock. */
+typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+/* Wait for previous parity/grace period to be empty of readers. */
+static void wait_for_readers(void)
+{
+ ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+ struct rcu_reader_data *index, *tmp;
+
+ for (;;) {
+ /* We want to be notified of changes made to rcu_gp_ongoing
+ * while we walk the list.
+ */
+ qemu_event_reset(&rcu_gp_event);
+
+ /* Instead of using atomic_mb_set for index->waiting, and
+ * atomic_mb_read for index->ctr, memory barriers are placed
+ * manually since writes to different threads are independent.
+ * atomic_mb_set has a smp_wmb before...
+ */
+ smp_wmb();
+ QLIST_FOREACH(index, &registry, node) {
+ atomic_set(&index->waiting, true);
+ }
+
+ /* ... and a smp_mb after. */
+ smp_mb();
+
+ QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+ if (!rcu_gp_ongoing(&index->ctr)) {
+ QLIST_REMOVE(index, node);
+ QLIST_INSERT_HEAD(&qsreaders, index, node);
+
+ /* No need for mb_set here, worst of all we
+ * get some extra futex wakeups.
+ */
+ atomic_set(&index->waiting, false);
+ }
+ }
+
+ /* atomic_mb_read has smp_rmb after. */
+ smp_rmb();
+
+ if (QLIST_EMPTY(&registry)) {
+ break;
+ }
+
+ /* Wait for one thread to report a quiescent state and
+ * try again.
+ */
+ qemu_event_wait(&rcu_gp_event);
+ }
+
+ /* put back the reader list in the registry */
+ QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+void synchronize_rcu(void)
+{
+ qemu_mutex_lock(&rcu_gp_lock);
+
+ if (!QLIST_EMPTY(&registry)) {
+ /* In either case, the atomic_mb_set below blocks stores that free
+ * old RCU-protected pointers.
+ */
+ if (sizeof(rcu_gp_ctr) < 8) {
+ /* For architectures with 32-bit longs, a two-subphases algorithm
+ * ensures we do not encounter overflow bugs.
+ *
+ * Switch parity: 0 -> 1, 1 -> 0.
+ */
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ wait_for_readers();
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
+ } else {
+ /* Increment current grace period. */
+ atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
+ }
+
+ wait_for_readers();
+ }
+
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+void rcu_register_thread(void)
+{
+ assert(rcu_reader.ctr == 0);
+ qemu_mutex_lock(&rcu_gp_lock);
+ QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+void rcu_unregister_thread(void)
+{
+ qemu_mutex_lock(&rcu_gp_lock);
+ QLIST_REMOVE(&rcu_reader, node);
+ qemu_mutex_unlock(&rcu_gp_lock);
+}
+
+static void __attribute__((__constructor__)) rcu_init(void)
+{
+ qemu_mutex_init(&rcu_gp_lock);
+ qemu_event_init(&rcu_gp_event, true);
+ rcu_register_thread();
+}