diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2017-02-21 11:58:03 +0000 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2017-02-21 11:58:03 +0000 |
commit | a0775e28cd6cae7eae248f74db7bc4a03da20c6b (patch) | |
tree | 6d141e7710855c40fdaf81a4c2731995782e443c /include | |
parent | b856256179f14c33a513d0b9cc3e4be355b95f43 (diff) | |
parent | a7b91d35bab97a2d3e779d0c64c9b837b52a6cf7 (diff) |
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging
Pull request
v2:
* Rebased to resolve scsi conflicts
# gpg: Signature made Tue 21 Feb 2017 11:56:24 GMT
# gpg: using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35 775A 9CA4 ABB3 81AB 73C8
* remotes/stefanha/tags/block-pull-request: (24 commits)
coroutine-lock: make CoRwlock thread-safe and fair
coroutine-lock: add mutex argument to CoQueue APIs
coroutine-lock: place CoMutex before CoQueue in header
test-aio-multithread: add performance comparison with thread-based mutexes
coroutine-lock: add limited spinning to CoMutex
coroutine-lock: make CoMutex thread-safe
block: document fields protected by AioContext lock
async: remove unnecessary inc/dec pairs
aio-posix: partially inline aio_dispatch into aio_poll
block: explicitly acquire aiocontext in aio callbacks that need it
block: explicitly acquire aiocontext in bottom halves that need it
block: explicitly acquire aiocontext in callbacks that need it
block: explicitly acquire aiocontext in timers that need it
aio: push aio_context_acquire/release down to dispatching
qed: introduce qed_aio_start_io and qed_aio_next_io_cb
blkdebug: reschedule coroutine on the AioContext it is running on
coroutine-lock: reschedule coroutine on the AioContext it was running on
nbd: convert to use qio_channel_yield
io: make qio_channel_yield aware of AioContexts
io: add methods to set I/O handlers on AioContext
...
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Diffstat (limited to 'include')
-rw-r--r-- | include/block/aio.h | 38 | ||||
-rw-r--r-- | include/block/block_int.h | 64 | ||||
-rw-r--r-- | include/io/channel.h | 72 | ||||
-rw-r--r-- | include/qemu/coroutine.h | 84 | ||||
-rw-r--r-- | include/qemu/coroutine_int.h | 11 | ||||
-rw-r--r-- | include/sysemu/block-backend.h | 14 |
6 files changed, 213 insertions, 70 deletions
diff --git a/include/block/aio.h b/include/block/aio.h index 7df271d2b9..677b6ffc25 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque); typedef bool AioPollFn(void *opaque); typedef void IOHandler(void *opaque); +struct Coroutine; struct ThreadPool; struct LinuxAioState; @@ -108,6 +109,9 @@ struct AioContext { bool notified; EventNotifier notifier; + QSLIST_HEAD(, Coroutine) scheduled_coroutines; + QEMUBH *co_schedule_bh; + /* Thread pool for performing work and receiving completion callbacks. * Has its own locking. */ @@ -306,12 +310,8 @@ bool aio_pending(AioContext *ctx); /* Dispatch any pending callbacks from the GSource attached to the AioContext. * * This is used internally in the implementation of the GSource. - * - * @dispatch_fds: true to process fds, false to skip them - * (can be used as an optimization by callers that know there - * are no fds ready) */ -bool aio_dispatch(AioContext *ctx, bool dispatch_fds); +void aio_dispatch(AioContext *ctx); /* Progress in completing AIO work to occur. This can issue new pending * aio as a result of executing I/O completion or bh callbacks. @@ -483,6 +483,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external) } /** + * aio_co_schedule: + * @ctx: the aio context + * @co: the coroutine + * + * Start a coroutine on a remote AioContext. + * + * The coroutine must not be entered by anyone else while aio_co_schedule() + * is active. In addition the coroutine must have yielded unless ctx + * is the context in which the coroutine is running (i.e. the value of + * qemu_get_current_aio_context() from the coroutine itself). + */ +void aio_co_schedule(AioContext *ctx, struct Coroutine *co); + +/** + * aio_co_wake: + * @co: the coroutine + * + * Restart a coroutine on the AioContext where it was running last, thus + * preventing coroutines from jumping from one context to another when they + * go to sleep. + * + * aio_co_wake may be executed either in coroutine or non-coroutine + * context. The coroutine must not be entered by anyone else while + * aio_co_wake() is active. + */ +void aio_co_wake(struct Coroutine *co); + +/** * Return the AioContext whose event loop runs in the current thread. * * If called from an IOThread this will be the IOThread's AioContext. If diff --git a/include/block/block_int.h b/include/block/block_int.h index 2d92d7edfe..1670941da9 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -430,8 +430,9 @@ struct BdrvChild { * copied as well. */ struct BlockDriverState { - int64_t total_sectors; /* if we are reading a disk image, give its - size in sectors */ + /* Protected by big QEMU lock or read-only after opening. No special + * locking needed during I/O... + */ int open_flags; /* flags used to open the file, re-used for re-open */ bool read_only; /* if true, the media is read only */ bool encrypted; /* if true, the media is encrypted */ @@ -439,14 +440,6 @@ struct BlockDriverState { bool sg; /* if true, the device is a /dev/sg* */ bool probed; /* if true, format was probed rather than specified */ - int copy_on_read; /* if nonzero, copy read backing sectors into image. - note this is a reference count */ - - CoQueue flush_queue; /* Serializing flush queue */ - bool active_flush_req; /* Flush request in flight? */ - unsigned int write_gen; /* Current data generation */ - unsigned int flushed_gen; /* Flushed write generation */ - BlockDriver *drv; /* NULL means no media */ void *opaque; @@ -468,18 +461,6 @@ struct BlockDriverState { BdrvChild *backing; BdrvChild *file; - /* Callback before write request is processed */ - NotifierWithReturnList before_write_notifiers; - - /* number of in-flight requests; overall and serialising */ - unsigned int in_flight; - unsigned int serialising_in_flight; - - bool wakeup; - - /* Offset after the highest byte written to */ - uint64_t wr_highest_offset; - /* I/O Limits */ BlockLimits bl; @@ -497,11 +478,8 @@ struct BlockDriverState { QTAILQ_ENTRY(BlockDriverState) bs_list; /* element of the list of monitor-owned BDS */ QTAILQ_ENTRY(BlockDriverState) monitor_list; - QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps; int refcnt; - QLIST_HEAD(, BdrvTrackedRequest) tracked_requests; - /* operation blockers */ QLIST_HEAD(, BdrvOpBlocker) op_blockers[BLOCK_OP_TYPE_MAX]; @@ -522,6 +500,31 @@ struct BlockDriverState { /* The error object in use for blocking operations on backing_hd */ Error *backing_blocker; + /* Protected by AioContext lock */ + + /* If true, copy read backing sectors into image. Can be >1 if more + * than one client has requested copy-on-read. + */ + int copy_on_read; + + /* If we are reading a disk image, give its size in sectors. + * Generally read-only; it is written to by load_vmstate and save_vmstate, + * but the block layer is quiescent during those. + */ + int64_t total_sectors; + + /* Callback before write request is processed */ + NotifierWithReturnList before_write_notifiers; + + /* number of in-flight requests; overall and serialising */ + unsigned int in_flight; + unsigned int serialising_in_flight; + + bool wakeup; + + /* Offset after the highest byte written to */ + uint64_t wr_highest_offset; + /* threshold limit for writes, in bytes. "High water mark". */ uint64_t write_threshold_offset; NotifierWithReturn write_threshold_notifier; @@ -529,6 +532,17 @@ struct BlockDriverState { /* counter for nested bdrv_io_plug */ unsigned io_plugged; + QLIST_HEAD(, BdrvTrackedRequest) tracked_requests; + CoQueue flush_queue; /* Serializing flush queue */ + bool active_flush_req; /* Flush request in flight? */ + unsigned int write_gen; /* Current data generation */ + unsigned int flushed_gen; /* Flushed write generation */ + + QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps; + + /* do we need to tell the quest if we have a volatile write cache? */ + int enable_write_cache; + int quiesce_counter; }; diff --git a/include/io/channel.h b/include/io/channel.h index 32a9470794..5d48906998 100644 --- a/include/io/channel.h +++ b/include/io/channel.h @@ -23,6 +23,8 @@ #include "qemu-common.h" #include "qom/object.h" +#include "qemu/coroutine.h" +#include "block/aio.h" #define TYPE_QIO_CHANNEL "qio-channel" #define QIO_CHANNEL(obj) \ @@ -80,6 +82,9 @@ struct QIOChannel { Object parent; unsigned int features; /* bitmask of QIOChannelFeatures */ char *name; + AioContext *ctx; + Coroutine *read_coroutine; + Coroutine *write_coroutine; #ifdef _WIN32 HANDLE event; /* For use with GSource on Win32 */ #endif @@ -132,6 +137,11 @@ struct QIOChannelClass { off_t offset, int whence, Error **errp); + void (*io_set_aio_fd_handler)(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque); }; /* General I/O handling functions */ @@ -497,13 +507,50 @@ guint qio_channel_add_watch(QIOChannel *ioc, /** + * qio_channel_attach_aio_context: + * @ioc: the channel object + * @ctx: the #AioContext to set the handlers on + * + * Request that qio_channel_yield() sets I/O handlers on + * the given #AioContext. If @ctx is %NULL, qio_channel_yield() + * uses QEMU's main thread event loop. + * + * You can move a #QIOChannel from one #AioContext to another even if + * I/O handlers are set for a coroutine. However, #QIOChannel provides + * no synchronization between the calls to qio_channel_yield() and + * qio_channel_attach_aio_context(). + * + * Therefore you should first call qio_channel_detach_aio_context() + * to ensure that the coroutine is not entered concurrently. Then, + * while the coroutine has yielded, call qio_channel_attach_aio_context(), + * and then aio_co_schedule() to place the coroutine on the new + * #AioContext. The calls to qio_channel_detach_aio_context() + * and qio_channel_attach_aio_context() should be protected with + * aio_context_acquire() and aio_context_release(). + */ +void qio_channel_attach_aio_context(QIOChannel *ioc, + AioContext *ctx); + +/** + * qio_channel_detach_aio_context: + * @ioc: the channel object + * + * Disable any I/O handlers set by qio_channel_yield(). With the + * help of aio_co_schedule(), this allows moving a coroutine that was + * paused by qio_channel_yield() to another context. + */ +void qio_channel_detach_aio_context(QIOChannel *ioc); + +/** * qio_channel_yield: * @ioc: the channel object * @condition: the I/O condition to wait for * - * Yields execution from the current coroutine until - * the condition indicated by @condition becomes - * available. + * Yields execution from the current coroutine until the condition + * indicated by @condition becomes available. @condition must + * be either %G_IO_IN or %G_IO_OUT; it cannot contain both. In + * addition, no two coroutine can be waiting on the same condition + * and channel at the same time. * * This must only be called from coroutine context */ @@ -525,4 +572,23 @@ void qio_channel_yield(QIOChannel *ioc, void qio_channel_wait(QIOChannel *ioc, GIOCondition condition); +/** + * qio_channel_set_aio_fd_handler: + * @ioc: the channel object + * @ctx: the AioContext to set the handlers on + * @io_read: the read handler + * @io_write: the write handler + * @opaque: the opaque value passed to the handler + * + * This is used internally by qio_channel_yield(). It can + * be used by channel implementations to forward the handlers + * to another channel (e.g. from #QIOChannelTLS to the + * underlying socket). + */ +void qio_channel_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque); + #endif /* QIO_CHANNEL_H */ diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 12584ed1b7..e60beaff81 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -112,11 +112,56 @@ bool qemu_in_coroutine(void); */ bool qemu_coroutine_entered(Coroutine *co); +/** + * Provides a mutex that can be used to synchronise coroutines + */ +struct CoWaitRecord; +typedef struct CoMutex { + /* Count of pending lockers; 0 for a free mutex, 1 for an + * uncontended mutex. + */ + unsigned locked; + + /* Context that is holding the lock. Useful to avoid spinning + * when two coroutines on the same AioContext try to get the lock. :) + */ + AioContext *ctx; + + /* A queue of waiters. Elements are added atomically in front of + * from_push. to_pop is only populated, and popped from, by whoever + * is in charge of the next wakeup. This can be an unlocker or, + * through the handoff protocol, a locker that is about to go to sleep. + */ + QSLIST_HEAD(, CoWaitRecord) from_push, to_pop; + + unsigned handoff, sequence; + + Coroutine *holder; +} CoMutex; + +/** + * Initialises a CoMutex. This must be called before any other operation is used + * on the CoMutex. + */ +void qemu_co_mutex_init(CoMutex *mutex); + +/** + * Locks the mutex. If the lock cannot be taken immediately, control is + * transferred to the caller of the current coroutine. + */ +void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); + +/** + * Unlocks the mutex and schedules the next coroutine that was waiting for this + * lock to be run. + */ +void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); + /** * CoQueues are a mechanism to queue coroutines in order to continue executing - * them later. They provide the fundamental primitives on which coroutine locks - * are built. + * them later. They are similar to condition variables, but they need help + * from an external mutex in order to maintain thread-safety. */ typedef struct CoQueue { QSIMPLEQ_HEAD(, Coroutine) entries; @@ -130,9 +175,10 @@ void qemu_co_queue_init(CoQueue *queue); /** * Adds the current coroutine to the CoQueue and transfers control to the - * caller of the coroutine. + * caller of the coroutine. The mutex is unlocked during the wait and + * locked again afterwards. */ -void coroutine_fn qemu_co_queue_wait(CoQueue *queue); +void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex); /** * Restarts the next coroutine in the CoQueue and removes it from the queue. @@ -157,36 +203,10 @@ bool qemu_co_enter_next(CoQueue *queue); bool qemu_co_queue_empty(CoQueue *queue); -/** - * Provides a mutex that can be used to synchronise coroutines - */ -typedef struct CoMutex { - bool locked; - Coroutine *holder; - CoQueue queue; -} CoMutex; - -/** - * Initialises a CoMutex. This must be called before any other operation is used - * on the CoMutex. - */ -void qemu_co_mutex_init(CoMutex *mutex); - -/** - * Locks the mutex. If the lock cannot be taken immediately, control is - * transferred to the caller of the current coroutine. - */ -void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex); - -/** - * Unlocks the mutex and schedules the next coroutine that was waiting for this - * lock to be run. - */ -void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex); - typedef struct CoRwlock { - bool writer; + int pending_writer; int reader; + CoMutex mutex; CoQueue queue; } CoRwlock; diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h index 14d4f1d1f2..cb98892bba 100644 --- a/include/qemu/coroutine_int.h +++ b/include/qemu/coroutine_int.h @@ -40,12 +40,21 @@ struct Coroutine { CoroutineEntry *entry; void *entry_arg; Coroutine *caller; + + /* Only used when the coroutine has terminated. */ QSLIST_ENTRY(Coroutine) pool_next; + size_t locks_held; - /* Coroutines that should be woken up when we yield or terminate */ + /* Coroutines that should be woken up when we yield or terminate. + * Only used when the coroutine is running. + */ QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup; + + /* Only used when the coroutine has yielded. */ + AioContext *ctx; QSIMPLEQ_ENTRY(Coroutine) co_queue_next; + QSLIST_ENTRY(Coroutine) co_scheduled_next; }; Coroutine *qemu_coroutine_new(void); diff --git a/include/sysemu/block-backend.h b/include/sysemu/block-backend.h index 6444e41d39..f365a51acf 100644 --- a/include/sysemu/block-backend.h +++ b/include/sysemu/block-backend.h @@ -64,14 +64,20 @@ typedef struct BlockDevOps { * fields that must be public. This is in particular for QLIST_ENTRY() and * friends so that BlockBackends can be kept in lists outside block-backend.c */ typedef struct BlockBackendPublic { - /* I/O throttling. - * throttle_state tells us if this BlockBackend has I/O limits configured. - * io_limits_disabled tells us if they are currently being enforced */ + /* I/O throttling has its own locking, but also some fields are + * protected by the AioContext lock. + */ + + /* Protected by AioContext lock. */ CoQueue throttled_reqs[2]; + + /* Nonzero if the I/O limits are currently being ignored; generally + * it is zero. */ unsigned int io_limits_disabled; /* The following fields are protected by the ThrottleGroup lock. - * See the ThrottleGroup documentation for details. */ + * See the ThrottleGroup documentation for details. + * throttle_state tells us if I/O limits are configured. */ ThrottleState *throttle_state; ThrottleTimers throttle_timers; unsigned pending_reqs[2]; |