diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2021-03-31 16:38:49 +0100 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2021-03-31 16:38:49 +0100 |
commit | 1bd16067b652cce41a9214d0c62c73d5b45ab4b1 (patch) | |
tree | 594b7ebcd1e1e348f5a25f7670af6e7d8d8072ac | |
parent | 6ee55e1d10c25c2f6bf5ce2084ad2327e17affa5 (diff) | |
parent | b6489ac06695e257ea0a9841364577e247fdee30 (diff) |
Merge remote-tracking branch 'remotes/stefanha-gitlab/tags/block-pull-request' into staging
Pull request
A fix for VDI image files and more generally for CoRwlock.
# gpg: Signature made Wed 31 Mar 2021 10:50:39 BST
# gpg: using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full]
# gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>" [full]
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35 775A 9CA4 ABB3 81AB 73C8
* remotes/stefanha-gitlab/tags/block-pull-request:
test-coroutine: Add rwlock downgrade test
test-coroutine: Add rwlock upgrade test
coroutine-lock: Reimplement CoRwlock to fix downgrade bug
coroutine-lock: Store the coroutine in the CoWaitRecord only once
block/vdi: Don't assume that blocks are larger than VdiHeader
block/vdi: When writing new bmap entry fails, don't leak the buffer
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r-- | block/vdi.c | 11 | ||||
-rw-r--r-- | include/qemu/coroutine.h | 17 | ||||
-rw-r--r-- | tests/unit/test-coroutine.c | 161 | ||||
-rw-r--r-- | util/qemu-coroutine-lock.c | 149 |
4 files changed, 274 insertions, 64 deletions
diff --git a/block/vdi.c b/block/vdi.c index 5627e7d764..548f8a057b 100644 --- a/block/vdi.c +++ b/block/vdi.c @@ -690,23 +690,26 @@ nonallocating_write: logout("finished data write\n"); if (ret < 0) { + g_free(block); return ret; } if (block) { /* One or more new blocks were allocated. */ - VdiHeader *header = (VdiHeader *) block; + VdiHeader *header; uint8_t *base; uint64_t offset; uint32_t n_sectors; + g_free(block); + header = g_malloc(sizeof(*header)); + logout("now writing modified header\n"); assert(VDI_IS_ALLOCATED(bmap_first)); *header = s->header; vdi_header_to_le(header); - ret = bdrv_pwrite(bs->file, 0, block, sizeof(VdiHeader)); - g_free(block); - block = NULL; + ret = bdrv_pwrite(bs->file, 0, header, sizeof(*header)); + g_free(header); if (ret < 0) { return ret; diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 84eab6e3bf..ce5b9c6851 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock); bool qemu_co_queue_empty(CoQueue *queue); +typedef struct CoRwTicket CoRwTicket; typedef struct CoRwlock { - int pending_writer; - int reader; CoMutex mutex; - CoQueue queue; + + /* Number of readers, or -1 if owned for writing. */ + int owners; + + /* Waiting coroutines. */ + QSIMPLEQ_HEAD(, CoRwTicket) tickets; } CoRwlock; /** @@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock); /** * Write Locks the CoRwlock from a reader. This is a bit more efficient than * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock. - * However, if the lock cannot be upgraded immediately, control is transferred - * to the caller of the current coroutine. Also, @qemu_co_rwlock_upgrade - * only overrides CoRwlock fairness if there are no concurrent readers, so - * another writer might run while @qemu_co_rwlock_upgrade blocks. + * Note that if the lock cannot be upgraded immediately, control is transferred + * to the caller of the current coroutine; another writer might run while + * @qemu_co_rwlock_upgrade blocks. */ void qemu_co_rwlock_upgrade(CoRwlock *lock); diff --git a/tests/unit/test-coroutine.c b/tests/unit/test-coroutine.c index e946d93a65..aa77a3bcb3 100644 --- a/tests/unit/test-coroutine.c +++ b/tests/unit/test-coroutine.c @@ -264,6 +264,165 @@ static void test_co_mutex_lockable(void) g_assert(QEMU_MAKE_LOCKABLE(null_pointer) == NULL); } +static CoRwlock rwlock; + +/* Test that readers are properly sent back to the queue when upgrading, + * even if they are the sole readers. The test scenario is as follows: + * + * + * | c1 | c2 | + * |--------------+------------+ + * | rdlock | | + * | yield | | + * | | wrlock | + * | | <queued> | + * | upgrade | | + * | <queued> | <dequeued> | + * | | unlock | + * | <dequeued> | | + * | unlock | | + */ + +static void coroutine_fn rwlock_yield_upgrade(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_upgrade(&rwlock); + qemu_co_rwlock_unlock(&rwlock); + + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock_yield(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void test_co_rwlock_upgrade(void) +{ + bool c1_done = false; + bool c2_done = false; + Coroutine *c1, *c2; + + qemu_co_rwlock_init(&rwlock); + c1 = qemu_coroutine_create(rwlock_yield_upgrade, &c1_done); + c2 = qemu_coroutine_create(rwlock_wrlock_yield, &c2_done); + + qemu_coroutine_enter(c1); + qemu_coroutine_enter(c2); + + /* c1 now should go to sleep. */ + qemu_coroutine_enter(c1); + g_assert(!c1_done); + + qemu_coroutine_enter(c2); + g_assert(c1_done); + g_assert(c2_done); +} + +static void coroutine_fn rwlock_rdlock_yield(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + qemu_coroutine_yield(); + + qemu_co_rwlock_unlock(&rwlock); + qemu_coroutine_yield(); + + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock_downgrade(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + + qemu_co_rwlock_downgrade(&rwlock); + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_rdlock(void *opaque) +{ + qemu_co_rwlock_rdlock(&rwlock); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +static void coroutine_fn rwlock_wrlock(void *opaque) +{ + qemu_co_rwlock_wrlock(&rwlock); + + qemu_co_rwlock_unlock(&rwlock); + *(bool *)opaque = true; +} + +/* + * Check that downgrading a reader-writer lock does not cause a hang. + * + * Four coroutines are used to produce a situation where there are + * both reader and writer hopefuls waiting to acquire an rwlock that + * is held by a reader. + * + * The correct sequence of operations we aim to provoke can be + * represented as: + * + * | c1 | c2 | c3 | c4 | + * |--------+------------+------------+------------| + * | rdlock | | | | + * | yield | | | | + * | | wrlock | | | + * | | <queued> | | | + * | | | rdlock | | + * | | | <queued> | | + * | | | | wrlock | + * | | | | <queued> | + * | unlock | | | | + * | yield | | | | + * | | <dequeued> | | | + * | | downgrade | | | + * | | | <dequeued> | | + * | | | unlock | | + * | | ... | | | + * | | unlock | | | + * | | | | <dequeued> | + * | | | | unlock | + */ +static void test_co_rwlock_downgrade(void) +{ + bool c1_done = false; + bool c2_done = false; + bool c3_done = false; + bool c4_done = false; + Coroutine *c1, *c2, *c3, *c4; + + qemu_co_rwlock_init(&rwlock); + + c1 = qemu_coroutine_create(rwlock_rdlock_yield, &c1_done); + c2 = qemu_coroutine_create(rwlock_wrlock_downgrade, &c2_done); + c3 = qemu_coroutine_create(rwlock_rdlock, &c3_done); + c4 = qemu_coroutine_create(rwlock_wrlock, &c4_done); + + qemu_coroutine_enter(c1); + qemu_coroutine_enter(c2); + qemu_coroutine_enter(c3); + qemu_coroutine_enter(c4); + + qemu_coroutine_enter(c1); + + g_assert(c2_done); + g_assert(c3_done); + g_assert(c4_done); + + qemu_coroutine_enter(c1); + + g_assert(c1_done); +} + /* * Check that creation, enter, and return work */ @@ -501,6 +660,8 @@ int main(int argc, char **argv) g_test_add_func("/basic/order", test_order); g_test_add_func("/locking/co-mutex", test_co_mutex); g_test_add_func("/locking/co-mutex/lockable", test_co_mutex_lockable); + g_test_add_func("/locking/co-rwlock/upgrade", test_co_rwlock_upgrade); + g_test_add_func("/locking/co-rwlock/downgrade", test_co_rwlock_downgrade); if (g_test_perf()) { g_test_add_func("/perf/lifecycle", perf_lifecycle); g_test_add_func("/perf/nesting", perf_nesting); diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 5816bf8900..2669403839 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -204,7 +204,6 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, unsigned old_handoff; trace_qemu_co_mutex_lock_entry(mutex, self); - w.co = self; push_waiter(mutex, &w); /* This is the "Responsibility Hand-Off" protocol; a lock() picks from @@ -328,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) trace_qemu_co_mutex_unlock_return(mutex, self); } +struct CoRwTicket { + bool read; + Coroutine *co; + QSIMPLEQ_ENTRY(CoRwTicket) next; +}; + void qemu_co_rwlock_init(CoRwlock *lock) { - memset(lock, 0, sizeof(*lock)); - qemu_co_queue_init(&lock->queue); qemu_co_mutex_init(&lock->mutex); + lock->owners = 0; + QSIMPLEQ_INIT(&lock->tickets); +} + +/* Releases the internal CoMutex. */ +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock) +{ + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets); + Coroutine *co = NULL; + + /* + * Setting lock->owners here prevents rdlock and wrlock from + * sneaking in between unlock and wake. + */ + + if (tkt) { + if (tkt->read) { + if (lock->owners >= 0) { + lock->owners++; + co = tkt->co; + } + } else { + if (lock->owners == 0) { + lock->owners = -1; + co = tkt->co; + } + } + } + + if (co) { + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next); + qemu_co_mutex_unlock(&lock->mutex); + aio_co_wake(co); + } else { + qemu_co_mutex_unlock(&lock->mutex); + } } void qemu_co_rwlock_rdlock(CoRwlock *lock) @@ -341,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock) qemu_co_mutex_lock(&lock->mutex); /* For fairness, wait if a writer is in line. */ - while (lock->pending_writer) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); + if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) { + lock->owners++; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { true, self }; + + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_mutex_unlock(&lock->mutex); + qemu_coroutine_yield(); + assert(lock->owners >= 1); + + /* Possibly wake another reader, which will wake the next in line. */ + qemu_co_mutex_lock(&lock->mutex); + qemu_co_rwlock_maybe_wake_one(lock); } - lock->reader++; - qemu_co_mutex_unlock(&lock->mutex); - /* The rest of the read-side critical section is run without the mutex. */ self->locks_held++; } @@ -356,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock) Coroutine *self = qemu_coroutine_self(); assert(qemu_in_coroutine()); - if (!lock->reader) { - /* The critical section started in qemu_co_rwlock_wrlock. */ - qemu_co_queue_restart_all(&lock->queue); - } else { - self->locks_held--; + self->locks_held--; - qemu_co_mutex_lock(&lock->mutex); - lock->reader--; - assert(lock->reader >= 0); - /* Wakeup only one waiting writer */ - if (!lock->reader) { - qemu_co_queue_next(&lock->queue); - } + qemu_co_mutex_lock(&lock->mutex); + if (lock->owners > 0) { + lock->owners--; + } else { + assert(lock->owners == -1); + lock->owners = 0; } - qemu_co_mutex_unlock(&lock->mutex); + + qemu_co_rwlock_maybe_wake_one(lock); } void qemu_co_rwlock_downgrade(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); - - /* lock->mutex critical section started in qemu_co_rwlock_wrlock or - * qemu_co_rwlock_upgrade. - */ - assert(lock->reader == 0); - lock->reader++; - qemu_co_mutex_unlock(&lock->mutex); + qemu_co_mutex_lock(&lock->mutex); + assert(lock->owners == -1); + lock->owners = 1; - /* The rest of the read-side critical section is run without the mutex. */ - self->locks_held++; + /* Possibly wake another reader, which will wake the next in line. */ + qemu_co_rwlock_maybe_wake_one(lock); } void qemu_co_rwlock_wrlock(CoRwlock *lock) { + Coroutine *self = qemu_coroutine_self(); + qemu_co_mutex_lock(&lock->mutex); - lock->pending_writer++; - while (lock->reader) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); + if (lock->owners == 0) { + lock->owners = -1; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { false, qemu_coroutine_self() }; + + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_mutex_unlock(&lock->mutex); + qemu_coroutine_yield(); + assert(lock->owners == -1); } - lock->pending_writer--; - /* The rest of the write-side critical section is run with - * the mutex taken, so that lock->reader remains zero. - * There is no need to update self->locks_held. - */ + self->locks_held++; } void qemu_co_rwlock_upgrade(CoRwlock *lock) { - Coroutine *self = qemu_coroutine_self(); - qemu_co_mutex_lock(&lock->mutex); - assert(lock->reader > 0); - lock->reader--; - lock->pending_writer++; - while (lock->reader) { - qemu_co_queue_wait(&lock->queue, &lock->mutex); - } - lock->pending_writer--; + assert(lock->owners > 0); + /* For fairness, wait if a writer is in line. */ + if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) { + lock->owners = -1; + qemu_co_mutex_unlock(&lock->mutex); + } else { + CoRwTicket my_ticket = { false, qemu_coroutine_self() }; - /* The rest of the write-side critical section is run with - * the mutex taken, similar to qemu_co_rwlock_wrlock. Do - * not account for the lock twice in self->locks_held. - */ - self->locks_held--; + lock->owners--; + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); + qemu_co_rwlock_maybe_wake_one(lock); + qemu_coroutine_yield(); + assert(lock->owners == -1); + } } |