diff options
Diffstat (limited to 'util')
-rw-r--r-- | util/rcu.c | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/util/rcu.c b/util/rcu.c index 1f737d5447..c9c3e6e4ab 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -26,6 +26,7 @@ * IBM's contributions to this file may be relicensed under LGPLv2 or later. */ +#include "qemu-common.h" #include <stdio.h> #include <assert.h> #include <stdlib.h> @@ -33,6 +34,7 @@ #include <errno.h> #include "qemu/rcu.h" #include "qemu/atomic.h" +#include "qemu/thread.h" /* * Global grace period counter. Bit 0 is always one in rcu_gp_ctr. @@ -149,6 +151,116 @@ void synchronize_rcu(void) qemu_mutex_unlock(&rcu_gp_lock); } + +#define RCU_CALL_MIN_SIZE 30 + +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h + * from liburcu. Note that head is only used by the consumer. + */ +static struct rcu_head dummy; +static struct rcu_head *head = &dummy, **tail = &dummy.next; +static int rcu_call_count; +static QemuEvent rcu_call_ready_event; + +static void enqueue(struct rcu_head *node) +{ + struct rcu_head **old_tail; + + node->next = NULL; + old_tail = atomic_xchg(&tail, &node->next); + atomic_mb_set(old_tail, node); +} + +static struct rcu_head *try_dequeue(void) +{ + struct rcu_head *node, *next; + +retry: + /* Test for an empty list, which we do not expect. Note that for + * the consumer head and tail are always consistent. The head + * is consistent because only the consumer reads/writes it. + * The tail, because it is the first step in the enqueuing. + * It is only the next pointers that might be inconsistent. + */ + if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) { + abort(); + } + + /* If the head node has NULL in its next pointer, the value is + * wrong and we need to wait until its enqueuer finishes the update. + */ + node = head; + next = atomic_mb_read(&head->next); + if (!next) { + return NULL; + } + + /* Since we are the sole consumer, and we excluded the empty case + * above, the queue will always have at least two nodes: the + * dummy node, and the one being removed. So we do not need to update + * the tail pointer. + */ + head = next; + + /* If we dequeued the dummy node, add it back at the end and retry. */ + if (node == &dummy) { + enqueue(node); + goto retry; + } + + return node; +} + +static void *call_rcu_thread(void *opaque) +{ + struct rcu_head *node; + + for (;;) { + int tries = 0; + int n = atomic_read(&rcu_call_count); + + /* Heuristically wait for a decent number of callbacks to pile up. + * Fetch rcu_call_count now, we only must process elements that were + * added before synchronize_rcu() starts. + */ + while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) { + g_usleep(100000); + qemu_event_reset(&rcu_call_ready_event); + n = atomic_read(&rcu_call_count); + if (n < RCU_CALL_MIN_SIZE) { + qemu_event_wait(&rcu_call_ready_event); + n = atomic_read(&rcu_call_count); + } + } + + atomic_sub(&rcu_call_count, n); + synchronize_rcu(); + while (n > 0) { + node = try_dequeue(); + while (!node) { + qemu_event_reset(&rcu_call_ready_event); + node = try_dequeue(); + if (!node) { + qemu_event_wait(&rcu_call_ready_event); + node = try_dequeue(); + } + } + + n--; + node->func(node); + } + } + abort(); +} + +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node)) +{ + node->func = func; + enqueue(node); + atomic_inc(&rcu_call_count); + qemu_event_set(&rcu_call_ready_event); +} + void rcu_register_thread(void) { assert(rcu_reader.ctr == 0); @@ -166,7 +278,14 @@ void rcu_unregister_thread(void) static void __attribute__((__constructor__)) rcu_init(void) { + QemuThread thread; + qemu_mutex_init(&rcu_gp_lock); qemu_event_init(&rcu_gp_event, true); + + qemu_event_init(&rcu_call_ready_event, false); + qemu_thread_create(&thread, "call_rcu", call_rcu_thread, + NULL, QEMU_THREAD_DETACHED); + rcu_register_thread(); } |