diff options
-rwxr-xr-x | configure | 5 | ||||
-rw-r--r-- | include/block/aio.h | 9 | ||||
-rw-r--r-- | util/Makefile.objs | 1 | ||||
-rw-r--r-- | util/aio-posix.c | 20 | ||||
-rw-r--r-- | util/aio-posix.h | 20 | ||||
-rw-r--r-- | util/fdmon-io_uring.c | 326 |
6 files changed, 376 insertions, 5 deletions
@@ -4093,6 +4093,11 @@ if test "$linux_io_uring" != "no" ; then linux_io_uring_cflags=$($pkg_config --cflags liburing) linux_io_uring_libs=$($pkg_config --libs liburing) linux_io_uring=yes + + # io_uring is used in libqemuutil.a where per-file -libs variables are not + # seen by programs linking the archive. It's not ideal, but just add the + # library dependency globally. + LIBS="$linux_io_uring_libs $LIBS" else if test "$linux_io_uring" = "yes" ; then feature_not_found "linux io_uring" "Install liburing devel" diff --git a/include/block/aio.h b/include/block/aio.h index bd76b08f1a..83fc9b844d 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -14,6 +14,9 @@ #ifndef QEMU_AIO_H #define QEMU_AIO_H +#ifdef CONFIG_LINUX_IO_URING +#include <liburing.h> +#endif #include "qemu/queue.h" #include "qemu/event_notifier.h" #include "qemu/thread.h" @@ -96,6 +99,8 @@ struct BHListSlice { QSIMPLEQ_ENTRY(BHListSlice) next; }; +typedef QSLIST_HEAD(, AioHandler) AioHandlerSList; + struct AioContext { GSource source; @@ -181,6 +186,10 @@ struct AioContext { * locking. */ struct LuringState *linux_io_uring; + + /* State for file descriptor monitoring using Linux io_uring */ + struct io_uring fdmon_io_uring; + AioHandlerSList submit_list; #endif /* TimerLists for calling timers - one per clock type. Has its own diff --git a/util/Makefile.objs b/util/Makefile.objs index 6439077a68..6718a38b61 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -7,6 +7,7 @@ util-obj-$(call lnot,$(CONFIG_ATOMIC64)) += atomic64.o util-obj-$(CONFIG_POSIX) += aio-posix.o util-obj-$(CONFIG_POSIX) += fdmon-poll.o util-obj-$(CONFIG_EPOLL_CREATE1) += fdmon-epoll.o +util-obj-$(CONFIG_LINUX_IO_URING) += fdmon-io_uring.o util-obj-$(CONFIG_POSIX) += compatfd.o util-obj-$(CONFIG_POSIX) += event_notifier-posix.o util-obj-$(CONFIG_POSIX) += mmap-alloc.o diff --git a/util/aio-posix.c b/util/aio-posix.c index 028b2abded..ffd9cc381b 100644 --- a/util/aio-posix.c +++ b/util/aio-posix.c @@ -57,10 +57,16 @@ static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) g_source_remove_poll(&ctx->source, &node->pfd); } + node->pfd.revents = 0; + + /* If the fd monitor has already marked it deleted, leave it alone */ + if (QLIST_IS_INSERTED(node, node_deleted)) { + return false; + } + /* If a read is in progress, just mark the node as deleted */ if (qemu_lockcnt_count(&ctx->list_lock)) { QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); - node->pfd.revents = 0; return false; } /* Otherwise, delete it for real. We can't just mark it as @@ -126,9 +132,6 @@ void aio_set_fd_handler(AioContext *ctx, QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); } - if (node) { - deleted = aio_remove_fd_handler(ctx, node); - } /* No need to order poll_disable_cnt writes against other updates; * the counter is only used to avoid wasting time and latency on @@ -140,6 +143,9 @@ void aio_set_fd_handler(AioContext *ctx, atomic_read(&ctx->poll_disable_cnt) + poll_disable_change); ctx->fdmon_ops->update(ctx, node, new_node); + if (node) { + deleted = aio_remove_fd_handler(ctx, node); + } qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); @@ -565,11 +571,17 @@ void aio_context_setup(AioContext *ctx) ctx->fdmon_ops = &fdmon_poll_ops; ctx->epollfd = -1; + /* Use the fastest fd monitoring implementation if available */ + if (fdmon_io_uring_setup(ctx)) { + return; + } + fdmon_epoll_setup(ctx); } void aio_context_destroy(AioContext *ctx) { + fdmon_io_uring_destroy(ctx); fdmon_epoll_disable(ctx); } diff --git a/util/aio-posix.h b/util/aio-posix.h index 97899d0fbc..55fc771327 100644 --- a/util/aio-posix.h +++ b/util/aio-posix.h @@ -27,10 +27,14 @@ struct AioHandler { IOHandler *io_poll_begin; IOHandler *io_poll_end; void *opaque; - bool is_external; QLIST_ENTRY(AioHandler) node; QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */ QLIST_ENTRY(AioHandler) node_deleted; +#ifdef CONFIG_LINUX_IO_URING + QSLIST_ENTRY(AioHandler) node_submitted; + unsigned flags; /* see fdmon-io_uring.c */ +#endif + bool is_external; }; /* Add a handler to a ready list */ @@ -58,4 +62,18 @@ static inline void fdmon_epoll_disable(AioContext *ctx) } #endif /* !CONFIG_EPOLL_CREATE1 */ +#ifdef CONFIG_LINUX_IO_URING +bool fdmon_io_uring_setup(AioContext *ctx); +void fdmon_io_uring_destroy(AioContext *ctx); +#else +static inline bool fdmon_io_uring_setup(AioContext *ctx) +{ + return false; +} + +static inline void fdmon_io_uring_destroy(AioContext *ctx) +{ +} +#endif /* !CONFIG_LINUX_IO_URING */ + #endif /* AIO_POSIX_H */ diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c new file mode 100644 index 0000000000..fb99b4b61e --- /dev/null +++ b/util/fdmon-io_uring.c @@ -0,0 +1,326 @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +/* + * Linux io_uring file descriptor monitoring + * + * The Linux io_uring API supports file descriptor monitoring with a few + * advantages over existing APIs like poll(2) and epoll(7): + * + * 1. Userspace polling of events is possible because the completion queue (cq + * ring) is shared between the kernel and userspace. This allows + * applications that rely on userspace polling to also monitor file + * descriptors in the same userspace polling loop. + * + * 2. Submission and completion is batched and done together in a single system + * call. This minimizes the number of system calls. + * + * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than + * poll(2). + * + * 4. Nanosecond timeouts are supported so it requires fewer syscalls than + * epoll(7). + * + * This code only monitors file descriptors and does not do asynchronous disk + * I/O. Implementing disk I/O efficiently has other requirements and should + * use a separate io_uring so it does not make sense to unify the code. + * + * File descriptor monitoring is implemented using the following operations: + * + * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored. + * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When + * the poll mask changes for a file descriptor it is first removed and then + * re-added with the new poll mask, so this operation is also used as part + * of modifying an existing monitored file descriptor. + * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait + * for events. This operation self-cancels if another event completes + * before the timeout. + * + * io_uring calls the submission queue the "sq ring" and the completion queue + * the "cq ring". Ring entries are called "sqe" and "cqe", respectively. + * + * The code is structured so that sq/cq rings are only modified within + * fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on + * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD + * and/or IORING_OP_POLL_REMOVE sqes for them. + */ + +#include "qemu/osdep.h" +#include <poll.h> +#include "qemu/rcu_queue.h" +#include "aio-posix.h" + +enum { + FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */ + + /* AioHandler::flags */ + FDMON_IO_URING_PENDING = (1 << 0), + FDMON_IO_URING_ADD = (1 << 1), + FDMON_IO_URING_REMOVE = (1 << 2), +}; + +static inline int poll_events_from_pfd(int pfd_events) +{ + return (pfd_events & G_IO_IN ? POLLIN : 0) | + (pfd_events & G_IO_OUT ? POLLOUT : 0) | + (pfd_events & G_IO_HUP ? POLLHUP : 0) | + (pfd_events & G_IO_ERR ? POLLERR : 0); +} + +static inline int pfd_events_from_poll(int poll_events) +{ + return (poll_events & POLLIN ? G_IO_IN : 0) | + (poll_events & POLLOUT ? G_IO_OUT : 0) | + (poll_events & POLLHUP ? G_IO_HUP : 0) | + (poll_events & POLLERR ? G_IO_ERR : 0); +} + +/* + * Returns an sqe for submitting a request. Only be called within + * fdmon_io_uring_wait(). + */ +static struct io_uring_sqe *get_sqe(AioContext *ctx) +{ + struct io_uring *ring = &ctx->fdmon_io_uring; + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + int ret; + + if (likely(sqe)) { + return sqe; + } + + /* No free sqes left, submit pending sqes first */ + ret = io_uring_submit(ring); + assert(ret > 1); + sqe = io_uring_get_sqe(ring); + assert(sqe); + return sqe; +} + +/* Atomically enqueue an AioHandler for sq ring submission */ +static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags) +{ + unsigned old_flags; + + old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags); + if (!(old_flags & FDMON_IO_URING_PENDING)) { + QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted); + } +} + +/* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */ +static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags) +{ + AioHandler *node = QSLIST_FIRST(head); + + if (!node) { + return NULL; + } + + /* Doesn't need to be atomic since fill_sq_ring() moves the list */ + QSLIST_REMOVE_HEAD(head, node_submitted); + + /* + * Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two + * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and + * telling process_cqe() to delete the AioHandler when its + * IORING_OP_POLL_ADD completes. + */ + *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING | + FDMON_IO_URING_ADD)); + return node; +} + +static void fdmon_io_uring_update(AioContext *ctx, + AioHandler *old_node, + AioHandler *new_node) +{ + if (new_node) { + enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD); + } + + if (old_node) { + /* + * Deletion is tricky because IORING_OP_POLL_ADD and + * IORING_OP_POLL_REMOVE are async. We need to wait for the original + * IORING_OP_POLL_ADD to complete before this handler can be freed + * safely. + * + * It's possible that the file descriptor becomes ready and the + * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is + * submitted, too. + * + * Mark this handler deleted right now but don't place it on + * ctx->deleted_aio_handlers yet. Instead, manually fudge the list + * entry to make QLIST_IS_INSERTED() think this handler has been + * inserted and other code recognizes this AioHandler as deleted. + * + * Once the original IORING_OP_POLL_ADD completes we enqueue the + * handler on the real ctx->deleted_aio_handlers list to be freed. + */ + assert(!QLIST_IS_INSERTED(old_node, node_deleted)); + old_node->node_deleted.le_prev = &old_node->node_deleted.le_next; + + enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE); + } +} + +static void add_poll_add_sqe(AioContext *ctx, AioHandler *node) +{ + struct io_uring_sqe *sqe = get_sqe(ctx); + int events = poll_events_from_pfd(node->pfd.events); + + io_uring_prep_poll_add(sqe, node->pfd.fd, events); + io_uring_sqe_set_data(sqe, node); +} + +static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) +{ + struct io_uring_sqe *sqe = get_sqe(ctx); + + io_uring_prep_poll_remove(sqe, node); +} + +/* Add a timeout that self-cancels when another cqe becomes ready */ +static void add_timeout_sqe(AioContext *ctx, int64_t ns) +{ + struct io_uring_sqe *sqe; + struct __kernel_timespec ts = { + .tv_sec = ns / NANOSECONDS_PER_SECOND, + .tv_nsec = ns % NANOSECONDS_PER_SECOND, + }; + + sqe = get_sqe(ctx); + io_uring_prep_timeout(sqe, &ts, 1, 0); +} + +/* Add sqes from ctx->submit_list for submission */ +static void fill_sq_ring(AioContext *ctx) +{ + AioHandlerSList submit_list; + AioHandler *node; + unsigned flags; + + QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list); + + while ((node = dequeue(&submit_list, &flags))) { + /* Order matters, just in case both flags were set */ + if (flags & FDMON_IO_URING_ADD) { + add_poll_add_sqe(ctx, node); + } + if (flags & FDMON_IO_URING_REMOVE) { + add_poll_remove_sqe(ctx, node); + } + } +} + +/* Returns true if a handler became ready */ +static bool process_cqe(AioContext *ctx, + AioHandlerList *ready_list, + struct io_uring_cqe *cqe) +{ + AioHandler *node = io_uring_cqe_get_data(cqe); + unsigned flags; + + /* poll_timeout and poll_remove have a zero user_data field */ + if (!node) { + return false; + } + + /* + * Deletion can only happen when IORING_OP_POLL_ADD completes. If we race + * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE + * bit before IORING_OP_POLL_REMOVE is submitted. + */ + flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE); + if (flags & FDMON_IO_URING_REMOVE) { + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); + return false; + } + + aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res)); + + /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */ + add_poll_add_sqe(ctx, node); + return true; +} + +static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) +{ + struct io_uring *ring = &ctx->fdmon_io_uring; + struct io_uring_cqe *cqe; + unsigned num_cqes = 0; + unsigned num_ready = 0; + unsigned head; + + io_uring_for_each_cqe(ring, head, cqe) { + if (process_cqe(ctx, ready_list, cqe)) { + num_ready++; + } + + num_cqes++; + } + + io_uring_cq_advance(ring, num_cqes); + return num_ready; +} + +static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list, + int64_t timeout) +{ + unsigned wait_nr = 1; /* block until at least one cqe is ready */ + int ret; + + /* Fall back while external clients are disabled */ + if (atomic_read(&ctx->external_disable_cnt)) { + return fdmon_poll_ops.wait(ctx, ready_list, timeout); + } + + if (timeout == 0) { + wait_nr = 0; /* non-blocking */ + } else if (timeout > 0) { + add_timeout_sqe(ctx, timeout); + } + + fill_sq_ring(ctx); + + ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr); + assert(ret >= 0); + + return process_cq_ring(ctx, ready_list); +} + +static const FDMonOps fdmon_io_uring_ops = { + .update = fdmon_io_uring_update, + .wait = fdmon_io_uring_wait, +}; + +bool fdmon_io_uring_setup(AioContext *ctx) +{ + int ret; + + ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0); + if (ret != 0) { + return false; + } + + QSLIST_INIT(&ctx->submit_list); + ctx->fdmon_ops = &fdmon_io_uring_ops; + return true; +} + +void fdmon_io_uring_destroy(AioContext *ctx) +{ + if (ctx->fdmon_ops == &fdmon_io_uring_ops) { + AioHandler *node; + + io_uring_queue_exit(&ctx->fdmon_io_uring); + + /* No need to submit these anymore, just free them. */ + while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { + QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); + QLIST_REMOVE(node, node); + g_free(node); + } + + ctx->fdmon_ops = &fdmon_poll_ops; + } +} |