aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Henderson <richard.henderson@linaro.org>2022-05-09 11:07:04 -0700
committerRichard Henderson <richard.henderson@linaro.org>2022-05-09 11:07:04 -0700
commit178bacb66d98d9ee7a702b9f2a4dfcd88b72a9ab (patch)
treeab363c70e86292f400bea30bcf207eac414d80be
parentb0c3c60366ed43eb1569eb18c10df6eb993534c3 (diff)
parent3dc584abeef0e1277c2de8c1c1974cb49444eb0a (diff)
Merge tag 'block-pull-request' of https://gitlab.com/stefanha/qemu into staging
Pull request - Add new thread-pool-min/thread-pool-max parameters to control the thread pool used for async I/O. - Fix virtio-scsi IOThread 100% CPU consumption QEMU 7.0 regression. # -----BEGIN PGP SIGNATURE----- # # iQEzBAABCAAdFiEEhpWov9P5fNqsNXdanKSrs4Grc8gFAmJ5DqgACgkQnKSrs4Gr # c8iAqAf/WEJzEso0Hu3UUYJi2lAXpLxWPjoNBlPdQlKIJ/I0zQIF0P7GeCifF+0l # iMjgBv0ofyAuV47gaTJlVrAR75+hJ/IXNDhnu3UuvNWfVOqvksgw6kuHkMo9A2hC # 4tIHEU9J8jbQSSdQTaZR8Zj4FX1/zcxMBAXT3YO3De6zo78RatBTuNP4dsZzt8bI # Qs1a4A0p2ScNXK8EcF4QwAWfoxu9OPPzN52DBCNxcIcnn0SUab4NbDxzpRV4ZhDP # 08WoafI5O+2Kb36QysJN01LqajHrClG/fozrPzBLq5aZUK3xewJGB1hEdGTLkkmz # NJNBg5Ldszwj4PDZ1dFU3/03aigb3g== # =t5eR # -----END PGP SIGNATURE----- # gpg: Signature made Mon 09 May 2022 05:52:56 AM PDT # gpg: using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8 # gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full] # gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>" [full] * tag 'block-pull-request' of https://gitlab.com/stefanha/qemu: virtio-scsi: move request-related items from .h to .c virtio-scsi: clean up virtio_scsi_handle_cmd_vq() virtio-scsi: clean up virtio_scsi_handle_ctrl_vq() virtio-scsi: clean up virtio_scsi_handle_event_vq() virtio-scsi: don't waste CPU polling the event virtqueue virtio-scsi: fix ctrl and event handler functions in dataplane mode util/event-loop-base: Introduce options to set the thread pool size util/main-loop: Introduce the main loop into QOM Introduce event-loop-base abstract class Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
-rw-r--r--event-loop-base.c140
-rw-r--r--hw/scsi/virtio-scsi-dataplane.c2
-rw-r--r--hw/scsi/virtio-scsi.c101
-rw-r--r--hw/virtio/virtio.c13
-rw-r--r--include/block/aio.h10
-rw-r--r--include/block/thread-pool.h3
-rw-r--r--include/hw/virtio/virtio-scsi.h43
-rw-r--r--include/hw/virtio/virtio.h1
-rw-r--r--include/qemu/main-loop.h10
-rw-r--r--include/sysemu/event-loop-base.h41
-rw-r--r--include/sysemu/iothread.h6
-rw-r--r--iothread.c68
-rw-r--r--meson.build26
-rw-r--r--qapi/qom.json43
-rw-r--r--util/aio-posix.c1
-rw-r--r--util/async.c20
-rw-r--r--util/main-loop.c65
-rw-r--r--util/thread-pool.c55
18 files changed, 505 insertions, 143 deletions
diff --git a/event-loop-base.c b/event-loop-base.c
new file mode 100644
index 0000000000..d5be4dc6fc
--- /dev/null
+++ b/event-loop-base.c
@@ -0,0 +1,140 @@
+/*
+ * QEMU event-loop base
+ *
+ * Copyright (C) 2022 Red Hat Inc
+ *
+ * Authors:
+ * Stefan Hajnoczi <stefanha@redhat.com>
+ * Nicolas Saenz Julienne <nsaenzju@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qom/object_interfaces.h"
+#include "qapi/error.h"
+#include "block/thread-pool.h"
+#include "sysemu/event-loop-base.h"
+
+typedef struct {
+ const char *name;
+ ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
+} EventLoopBaseParamInfo;
+
+static void event_loop_base_instance_init(Object *obj)
+{
+ EventLoopBase *base = EVENT_LOOP_BASE(obj);
+
+ base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+}
+
+static EventLoopBaseParamInfo aio_max_batch_info = {
+ "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
+};
+static EventLoopBaseParamInfo thread_pool_min_info = {
+ "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
+};
+static EventLoopBaseParamInfo thread_pool_max_info = {
+ "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
+};
+
+static void event_loop_base_get_param(Object *obj, Visitor *v,
+ const char *name, void *opaque, Error **errp)
+{
+ EventLoopBase *event_loop_base = EVENT_LOOP_BASE(obj);
+ EventLoopBaseParamInfo *info = opaque;
+ int64_t *field = (void *)event_loop_base + info->offset;
+
+ visit_type_int64(v, name, field, errp);
+}
+
+static void event_loop_base_set_param(Object *obj, Visitor *v,
+ const char *name, void *opaque, Error **errp)
+{
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(obj);
+ EventLoopBase *base = EVENT_LOOP_BASE(obj);
+ EventLoopBaseParamInfo *info = opaque;
+ int64_t *field = (void *)base + info->offset;
+ int64_t value;
+
+ if (!visit_type_int64(v, name, &value, errp)) {
+ return;
+ }
+
+ if (value < 0) {
+ error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
+ info->name, INT64_MAX);
+ return;
+ }
+
+ *field = value;
+
+ if (bc->update_params) {
+ bc->update_params(base, errp);
+ }
+
+ return;
+}
+
+static void event_loop_base_complete(UserCreatable *uc, Error **errp)
+{
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
+ EventLoopBase *base = EVENT_LOOP_BASE(uc);
+
+ if (bc->init) {
+ bc->init(base, errp);
+ }
+}
+
+static bool event_loop_base_can_be_deleted(UserCreatable *uc)
+{
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
+ EventLoopBase *backend = EVENT_LOOP_BASE(uc);
+
+ if (bc->can_be_deleted) {
+ return bc->can_be_deleted(backend);
+ }
+
+ return true;
+}
+
+static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
+{
+ UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
+ ucc->complete = event_loop_base_complete;
+ ucc->can_be_deleted = event_loop_base_can_be_deleted;
+
+ object_class_property_add(klass, "aio-max-batch", "int",
+ event_loop_base_get_param,
+ event_loop_base_set_param,
+ NULL, &aio_max_batch_info);
+ object_class_property_add(klass, "thread-pool-min", "int",
+ event_loop_base_get_param,
+ event_loop_base_set_param,
+ NULL, &thread_pool_min_info);
+ object_class_property_add(klass, "thread-pool-max", "int",
+ event_loop_base_get_param,
+ event_loop_base_set_param,
+ NULL, &thread_pool_max_info);
+}
+
+static const TypeInfo event_loop_base_info = {
+ .name = TYPE_EVENT_LOOP_BASE,
+ .parent = TYPE_OBJECT,
+ .instance_size = sizeof(EventLoopBase),
+ .instance_init = event_loop_base_instance_init,
+ .class_size = sizeof(EventLoopBaseClass),
+ .class_init = event_loop_base_class_init,
+ .abstract = true,
+ .interfaces = (InterfaceInfo[]) {
+ { TYPE_USER_CREATABLE },
+ { }
+ }
+};
+
+static void register_types(void)
+{
+ type_register_static(&event_loop_base_info);
+}
+type_init(register_types);
diff --git a/hw/scsi/virtio-scsi-dataplane.c b/hw/scsi/virtio-scsi-dataplane.c
index 29575cbaf6..8bb6e6acfc 100644
--- a/hw/scsi/virtio-scsi-dataplane.c
+++ b/hw/scsi/virtio-scsi-dataplane.c
@@ -138,7 +138,7 @@ int virtio_scsi_dataplane_start(VirtIODevice *vdev)
aio_context_acquire(s->ctx);
virtio_queue_aio_attach_host_notifier(vs->ctrl_vq, s->ctx);
- virtio_queue_aio_attach_host_notifier(vs->event_vq, s->ctx);
+ virtio_queue_aio_attach_host_notifier_no_poll(vs->event_vq, s->ctx);
for (i = 0; i < vs->conf.num_queues; i++) {
virtio_queue_aio_attach_host_notifier(vs->cmd_vqs[i], s->ctx);
diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c
index 34a968ecfb..db54d104be 100644
--- a/hw/scsi/virtio-scsi.c
+++ b/hw/scsi/virtio-scsi.c
@@ -29,6 +29,43 @@
#include "hw/virtio/virtio-access.h"
#include "trace.h"
+typedef struct VirtIOSCSIReq {
+ /*
+ * Note:
+ * - fields up to resp_iov are initialized by virtio_scsi_init_req;
+ * - fields starting at vring are zeroed by virtio_scsi_init_req.
+ */
+ VirtQueueElement elem;
+
+ VirtIOSCSI *dev;
+ VirtQueue *vq;
+ QEMUSGList qsgl;
+ QEMUIOVector resp_iov;
+
+ union {
+ /* Used for two-stage request submission */
+ QTAILQ_ENTRY(VirtIOSCSIReq) next;
+
+ /* Used for cancellation of request during TMFs */
+ int remaining;
+ };
+
+ SCSIRequest *sreq;
+ size_t resp_size;
+ enum SCSIXferMode mode;
+ union {
+ VirtIOSCSICmdResp cmd;
+ VirtIOSCSICtrlTMFResp tmf;
+ VirtIOSCSICtrlANResp an;
+ VirtIOSCSIEvent event;
+ } resp;
+ union {
+ VirtIOSCSICmdReq cmd;
+ VirtIOSCSICtrlTMFReq tmf;
+ VirtIOSCSICtrlANReq an;
+ } req;
+} VirtIOSCSIReq;
+
static inline int virtio_scsi_get_lun(uint8_t *lun)
{
return ((lun[2] << 8) | lun[3]) & 0x3FFF;
@@ -45,7 +82,7 @@ static inline SCSIDevice *virtio_scsi_device_get(VirtIOSCSI *s, uint8_t *lun)
return scsi_device_get(&s->bus, 0, lun[1], virtio_scsi_get_lun(lun));
}
-void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
+static void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
{
VirtIODevice *vdev = VIRTIO_DEVICE(s);
const size_t zero_skip =
@@ -58,7 +95,7 @@ void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req)
memset((uint8_t *)req + zero_skip, 0, sizeof(*req) - zero_skip);
}
-void virtio_scsi_free_req(VirtIOSCSIReq *req)
+static void virtio_scsi_free_req(VirtIOSCSIReq *req)
{
qemu_iovec_destroy(&req->resp_iov);
qemu_sglist_destroy(&req->qsgl);
@@ -460,28 +497,41 @@ static void virtio_scsi_handle_ctrl_req(VirtIOSCSI *s, VirtIOSCSIReq *req)
}
}
-bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
{
VirtIOSCSIReq *req;
- bool progress = false;
while ((req = virtio_scsi_pop_req(s, vq))) {
- progress = true;
virtio_scsi_handle_ctrl_req(s, req);
}
- return progress;
+}
+
+/*
+ * If dataplane is configured but not yet started, do so now and return true on
+ * success.
+ *
+ * Dataplane is started by the core virtio code but virtqueue handler functions
+ * can also be invoked when a guest kicks before DRIVER_OK, so this helper
+ * function helps us deal with manually starting ioeventfd in that case.
+ */
+static bool virtio_scsi_defer_to_dataplane(VirtIOSCSI *s)
+{
+ if (!s->ctx || s->dataplane_started) {
+ return false;
+ }
+
+ virtio_device_start_ioeventfd(&s->parent_obj.parent_obj);
+ return !s->dataplane_fenced;
}
static void virtio_scsi_handle_ctrl(VirtIODevice *vdev, VirtQueue *vq)
{
VirtIOSCSI *s = (VirtIOSCSI *)vdev;
- if (s->ctx) {
- virtio_device_start_ioeventfd(vdev);
- if (!s->dataplane_fenced) {
- return;
- }
+ if (virtio_scsi_defer_to_dataplane(s)) {
+ return;
}
+
virtio_scsi_acquire(s);
virtio_scsi_handle_ctrl_vq(s, vq);
virtio_scsi_release(s);
@@ -672,12 +722,11 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req)
scsi_req_unref(sreq);
}
-bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
{
VirtIOSCSIReq *req, *next;
int ret = 0;
bool suppress_notifications = virtio_queue_get_notification(vq);
- bool progress = false;
QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
@@ -687,7 +736,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
}
while ((req = virtio_scsi_pop_req(s, vq))) {
- progress = true;
ret = virtio_scsi_handle_cmd_req_prepare(s, req);
if (!ret) {
QTAILQ_INSERT_TAIL(&reqs, req, next);
@@ -712,7 +760,6 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
virtio_scsi_handle_cmd_req_submit(s, req);
}
- return progress;
}
static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq)
@@ -720,12 +767,10 @@ static void virtio_scsi_handle_cmd(VirtIODevice *vdev, VirtQueue *vq)
/* use non-QOM casts in the data path */
VirtIOSCSI *s = (VirtIOSCSI *)vdev;
- if (s->ctx && !s->dataplane_started) {
- virtio_device_start_ioeventfd(vdev);
- if (!s->dataplane_fenced) {
- return;
- }
+ if (virtio_scsi_defer_to_dataplane(s)) {
+ return;
}
+
virtio_scsi_acquire(s);
virtio_scsi_handle_cmd_vq(s, vq);
virtio_scsi_release(s);
@@ -793,8 +838,8 @@ static void virtio_scsi_reset(VirtIODevice *vdev)
s->events_dropped = false;
}
-void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
- uint32_t event, uint32_t reason)
+static void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
+ uint32_t event, uint32_t reason)
{
VirtIOSCSICommon *vs = VIRTIO_SCSI_COMMON(s);
VirtIOSCSIReq *req;
@@ -842,25 +887,21 @@ void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
virtio_scsi_complete_req(req);
}
-bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
+static void virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
{
if (s->events_dropped) {
virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
- return true;
}
- return false;
}
static void virtio_scsi_handle_event(VirtIODevice *vdev, VirtQueue *vq)
{
VirtIOSCSI *s = VIRTIO_SCSI(vdev);
- if (s->ctx) {
- virtio_device_start_ioeventfd(vdev);
- if (!s->dataplane_fenced) {
- return;
- }
+ if (virtio_scsi_defer_to_dataplane(s)) {
+ return;
}
+
virtio_scsi_acquire(s);
virtio_scsi_handle_event_vq(s, vq);
virtio_scsi_release(s);
diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c
index 9d637e043e..67a873f54a 100644
--- a/hw/virtio/virtio.c
+++ b/hw/virtio/virtio.c
@@ -3534,6 +3534,19 @@ void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx)
virtio_queue_host_notifier_aio_poll_end);
}
+/*
+ * Same as virtio_queue_aio_attach_host_notifier() but without polling. Use
+ * this for rx virtqueues and similar cases where the virtqueue handler
+ * function does not pop all elements. When the virtqueue is left non-empty
+ * polling consumes CPU cycles and should not be used.
+ */
+void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx)
+{
+ aio_set_event_notifier(ctx, &vq->host_notifier, true,
+ virtio_queue_host_notifier_read,
+ NULL, NULL);
+}
+
void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx)
{
aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL, NULL);
diff --git a/include/block/aio.h b/include/block/aio.h
index 5634173b12..d128558f1d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -192,6 +192,8 @@ struct AioContext {
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
QEMUBH *co_schedule_bh;
+ int thread_pool_min;
+ int thread_pool_max;
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
Error **errp);
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: min number of threads to have readily available in the thread pool
+ * @min: max number of threads the thread pool can contain
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+ int64_t max, Error **errp);
#endif
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 7dd7d730a0..2020bcc92d 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -20,6 +20,8 @@
#include "block/block.h"
+#define THREAD_POOL_MAX_THREADS_DEFAULT 64
+
typedef int ThreadPoolFunc(void *opaque);
typedef struct ThreadPool ThreadPool;
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
ThreadPoolFunc *func, void *arg);
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
#endif
diff --git a/include/hw/virtio/virtio-scsi.h b/include/hw/virtio/virtio-scsi.h
index 0997313f0a..a36aad9c86 100644
--- a/include/hw/virtio/virtio-scsi.h
+++ b/include/hw/virtio/virtio-scsi.h
@@ -92,42 +92,6 @@ struct VirtIOSCSI {
uint32_t host_features;
};
-typedef struct VirtIOSCSIReq {
- /* Note:
- * - fields up to resp_iov are initialized by virtio_scsi_init_req;
- * - fields starting at vring are zeroed by virtio_scsi_init_req.
- * */
- VirtQueueElement elem;
-
- VirtIOSCSI *dev;
- VirtQueue *vq;
- QEMUSGList qsgl;
- QEMUIOVector resp_iov;
-
- union {
- /* Used for two-stage request submission */
- QTAILQ_ENTRY(VirtIOSCSIReq) next;
-
- /* Used for cancellation of request during TMFs */
- int remaining;
- };
-
- SCSIRequest *sreq;
- size_t resp_size;
- enum SCSIXferMode mode;
- union {
- VirtIOSCSICmdResp cmd;
- VirtIOSCSICtrlTMFResp tmf;
- VirtIOSCSICtrlANResp an;
- VirtIOSCSIEvent event;
- } resp;
- union {
- VirtIOSCSICmdReq cmd;
- VirtIOSCSICtrlTMFReq tmf;
- VirtIOSCSICtrlANReq an;
- } req;
-} VirtIOSCSIReq;
-
static inline void virtio_scsi_acquire(VirtIOSCSI *s)
{
if (s->ctx) {
@@ -149,13 +113,6 @@ void virtio_scsi_common_realize(DeviceState *dev,
Error **errp);
void virtio_scsi_common_unrealize(DeviceState *dev);
-bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq);
-bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq);
-bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq);
-void virtio_scsi_init_req(VirtIOSCSI *s, VirtQueue *vq, VirtIOSCSIReq *req);
-void virtio_scsi_free_req(VirtIOSCSIReq *req);
-void virtio_scsi_push_event(VirtIOSCSI *s, SCSIDevice *dev,
- uint32_t event, uint32_t reason);
void virtio_scsi_dataplane_setup(VirtIOSCSI *s, Error **errp);
int virtio_scsi_dataplane_start(VirtIODevice *s);
diff --git a/include/hw/virtio/virtio.h b/include/hw/virtio/virtio.h
index b31c4507f5..b62a35fdca 100644
--- a/include/hw/virtio/virtio.h
+++ b/include/hw/virtio/virtio.h
@@ -317,6 +317,7 @@ EventNotifier *virtio_queue_get_host_notifier(VirtQueue *vq);
void virtio_queue_set_host_notifier_enabled(VirtQueue *vq, bool enabled);
void virtio_queue_host_notifier_read(EventNotifier *n);
void virtio_queue_aio_attach_host_notifier(VirtQueue *vq, AioContext *ctx);
+void virtio_queue_aio_attach_host_notifier_no_poll(VirtQueue *vq, AioContext *ctx);
void virtio_queue_aio_detach_host_notifier(VirtQueue *vq, AioContext *ctx);
VirtQueue *virtio_vector_first_queue(VirtIODevice *vdev, uint16_t vector);
VirtQueue *virtio_vector_next_queue(VirtQueue *vq);
diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
index 89bd9edefb..5518845299 100644
--- a/include/qemu/main-loop.h
+++ b/include/qemu/main-loop.h
@@ -26,9 +26,19 @@
#define QEMU_MAIN_LOOP_H
#include "block/aio.h"
+#include "qom/object.h"
+#include "sysemu/event-loop-base.h"
#define SIG_IPI SIGUSR1
+#define TYPE_MAIN_LOOP "main-loop"
+OBJECT_DECLARE_TYPE(MainLoop, MainLoopClass, MAIN_LOOP)
+
+struct MainLoop {
+ EventLoopBase parent_obj;
+};
+typedef struct MainLoop MainLoop;
+
/**
* qemu_init_main_loop: Set up the process so that it can run the main loop.
*
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
new file mode 100644
index 0000000000..2748bf6ae1
--- /dev/null
+++ b/include/sysemu/event-loop-base.h
@@ -0,0 +1,41 @@
+/*
+ * QEMU event-loop backend
+ *
+ * Copyright (C) 2022 Red Hat Inc
+ *
+ * Authors:
+ * Nicolas Saenz Julienne <nsaenzju@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef QEMU_EVENT_LOOP_BASE_H
+#define QEMU_EVENT_LOOP_BASE_H
+
+#include "qom/object.h"
+#include "block/aio.h"
+#include "qemu/typedefs.h"
+
+#define TYPE_EVENT_LOOP_BASE "event-loop-base"
+OBJECT_DECLARE_TYPE(EventLoopBase, EventLoopBaseClass,
+ EVENT_LOOP_BASE)
+
+struct EventLoopBaseClass {
+ ObjectClass parent_class;
+
+ void (*init)(EventLoopBase *base, Error **errp);
+ void (*update_params)(EventLoopBase *base, Error **errp);
+ bool (*can_be_deleted)(EventLoopBase *base);
+};
+
+struct EventLoopBase {
+ Object parent;
+
+ /* AioContext AIO engine parameters */
+ int64_t aio_max_batch;
+
+ /* AioContext thread pool parameters */
+ int64_t thread_pool_min;
+ int64_t thread_pool_max;
+};
+#endif
diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h
index 7f714bd136..8f8601d6ab 100644
--- a/include/sysemu/iothread.h
+++ b/include/sysemu/iothread.h
@@ -17,11 +17,12 @@
#include "block/aio.h"
#include "qemu/thread.h"
#include "qom/object.h"
+#include "sysemu/event-loop-base.h"
#define TYPE_IOTHREAD "iothread"
struct IOThread {
- Object parent_obj;
+ EventLoopBase parent_obj;
QemuThread thread;
AioContext *ctx;
@@ -37,9 +38,6 @@ struct IOThread {
int64_t poll_max_ns;
int64_t poll_grow;
int64_t poll_shrink;
-
- /* AioContext AIO engine parameters */
- int64_t aio_max_batch;
};
typedef struct IOThread IOThread;
diff --git a/iothread.c b/iothread.c
index 0f98af0f2a..529194a566 100644
--- a/iothread.c
+++ b/iothread.c
@@ -17,6 +17,7 @@
#include "qemu/module.h"
#include "block/aio.h"
#include "block/block.h"
+#include "sysemu/event-loop-base.h"
#include "sysemu/iothread.h"
#include "qapi/error.h"
#include "qapi/qapi-commands-misc.h"
@@ -152,10 +153,15 @@ static void iothread_init_gcontext(IOThread *iothread)
iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
}
-static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
+static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
{
+ IOThread *iothread = IOTHREAD(base);
ERRP_GUARD();
+ if (!iothread->ctx) {
+ return;
+ }
+
aio_context_set_poll_params(iothread->ctx,
iothread->poll_max_ns,
iothread->poll_grow,
@@ -166,14 +172,18 @@ static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
}
aio_context_set_aio_params(iothread->ctx,
- iothread->aio_max_batch,
+ iothread->parent_obj.aio_max_batch,
errp);
+
+ aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
+ base->thread_pool_max, errp);
}
-static void iothread_complete(UserCreatable *obj, Error **errp)
+
+static void iothread_init(EventLoopBase *base, Error **errp)
{
Error *local_error = NULL;
- IOThread *iothread = IOTHREAD(obj);
+ IOThread *iothread = IOTHREAD(base);
char *thread_name;
iothread->stopping = false;
@@ -189,7 +199,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
*/
iothread_init_gcontext(iothread);
- iothread_set_aio_context_params(iothread, &local_error);
+ iothread_set_aio_context_params(base, &local_error);
if (local_error) {
error_propagate(errp, local_error);
aio_context_unref(iothread->ctx);
@@ -201,7 +211,7 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
* to inherit.
*/
thread_name = g_strdup_printf("IO %s",
- object_get_canonical_path_component(OBJECT(obj)));
+ object_get_canonical_path_component(OBJECT(base)));
qemu_thread_create(&iothread->thread, thread_name, iothread_run,
iothread, QEMU_THREAD_JOINABLE);
g_free(thread_name);
@@ -226,9 +236,6 @@ static IOThreadParamInfo poll_grow_info = {
static IOThreadParamInfo poll_shrink_info = {
"poll-shrink", offsetof(IOThread, poll_shrink),
};
-static IOThreadParamInfo aio_max_batch_info = {
- "aio-max-batch", offsetof(IOThread, aio_max_batch),
-};
static void iothread_get_param(Object *obj, Visitor *v,
const char *name, IOThreadParamInfo *info, Error **errp)
@@ -288,35 +295,12 @@ static void iothread_set_poll_param(Object *obj, Visitor *v,
}
}
-static void iothread_get_aio_param(Object *obj, Visitor *v,
- const char *name, void *opaque, Error **errp)
-{
- IOThreadParamInfo *info = opaque;
-
- iothread_get_param(obj, v, name, info, errp);
-}
-
-static void iothread_set_aio_param(Object *obj, Visitor *v,
- const char *name, void *opaque, Error **errp)
-{
- IOThread *iothread = IOTHREAD(obj);
- IOThreadParamInfo *info = opaque;
-
- if (!iothread_set_param(obj, v, name, info, errp)) {
- return;
- }
-
- if (iothread->ctx) {
- aio_context_set_aio_params(iothread->ctx,
- iothread->aio_max_batch,
- errp);
- }
-}
-
static void iothread_class_init(ObjectClass *klass, void *class_data)
{
- UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
- ucc->complete = iothread_complete;
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass);
+
+ bc->init = iothread_init;
+ bc->update_params = iothread_set_aio_context_params;
object_class_property_add(klass, "poll-max-ns", "int",
iothread_get_poll_param,
@@ -330,23 +314,15 @@ static void iothread_class_init(ObjectClass *klass, void *class_data)
iothread_get_poll_param,
iothread_set_poll_param,
NULL, &poll_shrink_info);
- object_class_property_add(klass, "aio-max-batch", "int",
- iothread_get_aio_param,
- iothread_set_aio_param,
- NULL, &aio_max_batch_info);
}
static const TypeInfo iothread_info = {
.name = TYPE_IOTHREAD,
- .parent = TYPE_OBJECT,
+ .parent = TYPE_EVENT_LOOP_BASE,
.class_init = iothread_class_init,
.instance_size = sizeof(IOThread),
.instance_init = iothread_instance_init,
.instance_finalize = iothread_instance_finalize,
- .interfaces = (InterfaceInfo[]) {
- {TYPE_USER_CREATABLE},
- {}
- },
};
static void iothread_register_types(void)
@@ -383,7 +359,7 @@ static int query_one_iothread(Object *object, void *opaque)
info->poll_max_ns = iothread->poll_max_ns;
info->poll_grow = iothread->poll_grow;
info->poll_shrink = iothread->poll_shrink;
- info->aio_max_batch = iothread->aio_max_batch;
+ info->aio_max_batch = iothread->parent_obj.aio_max_batch;
QAPI_LIST_APPEND(*tail, info);
return 0;
diff --git a/meson.build b/meson.build
index fa672e57bc..864e97945f 100644
--- a/meson.build
+++ b/meson.build
@@ -3025,6 +3025,7 @@ subdir('qom')
subdir('authz')
subdir('crypto')
subdir('ui')
+subdir('hw')
if enable_modules
@@ -3032,6 +3033,18 @@ if enable_modules
modulecommon = declare_dependency(link_whole: libmodulecommon, compile_args: '-DBUILD_DSO')
endif
+qom_ss = qom_ss.apply(config_host, strict: false)
+libqom = static_library('qom', qom_ss.sources() + genh,
+ dependencies: [qom_ss.dependencies()],
+ name_suffix: 'fa')
+qom = declare_dependency(link_whole: libqom)
+
+event_loop_base = files('event-loop-base.c')
+event_loop_base = static_library('event-loop-base', sources: event_loop_base + genh,
+ build_by_default: true)
+event_loop_base = declare_dependency(link_whole: event_loop_base,
+ dependencies: [qom])
+
stub_ss = stub_ss.apply(config_all, strict: false)
util_ss.add_all(trace_ss)
@@ -3040,7 +3053,8 @@ libqemuutil = static_library('qemuutil',
sources: util_ss.sources() + stub_ss.sources() + genh,
dependencies: [util_ss.dependencies(), libm, threads, glib, socket, malloc, pixman])
qemuutil = declare_dependency(link_with: libqemuutil,
- sources: genh + version_res)
+ sources: genh + version_res,
+ dependencies: [event_loop_base])
if have_system or have_user
decodetree = generator(find_program('scripts/decodetree.py'),
@@ -3118,7 +3132,6 @@ subdir('monitor')
subdir('net')
subdir('replay')
subdir('semihosting')
-subdir('hw')
subdir('tcg')
subdir('fpu')
subdir('accel')
@@ -3243,13 +3256,6 @@ qemu_syms = custom_target('qemu.syms', output: 'qemu.syms',
capture: true,
command: [undefsym, nm, '@INPUT@'])
-qom_ss = qom_ss.apply(config_host, strict: false)
-libqom = static_library('qom', qom_ss.sources() + genh,
- dependencies: [qom_ss.dependencies()],
- name_suffix: 'fa')
-
-qom = declare_dependency(link_whole: libqom)
-
authz_ss = authz_ss.apply(config_host, strict: false)
libauthz = static_library('authz', authz_ss.sources() + genh,
dependencies: [authz_ss.dependencies()],
@@ -3302,7 +3308,7 @@ libblockdev = static_library('blockdev', blockdev_ss.sources() + genh,
build_by_default: false)
blockdev = declare_dependency(link_whole: [libblockdev],
- dependencies: [block])
+ dependencies: [block, event_loop_base])
qmp_ss = qmp_ss.apply(config_host, strict: false)
libqmp = static_library('qmp', qmp_ss.sources() + genh,
diff --git a/qapi/qom.json b/qapi/qom.json
index eeb5395ff3..6a653c6636 100644
--- a/qapi/qom.json
+++ b/qapi/qom.json
@@ -500,6 +500,28 @@
'*grab-toggle': 'GrabToggleKeys' } }
##
+# @EventLoopBaseProperties:
+#
+# Common properties for event loops
+#
+# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
+# 0 means that the engine will use its default.
+# (default: 0)
+#
+# @thread-pool-min: minimum number of threads reserved in the thread pool
+# (default:0)
+#
+# @thread-pool-max: maximum number of threads the thread pool can contain
+# (default:64)
+#
+# Since: 7.1
+##
+{ 'struct': 'EventLoopBaseProperties',
+ 'data': { '*aio-max-batch': 'int',
+ '*thread-pool-min': 'int',
+ '*thread-pool-max': 'int' } }
+
+##
# @IothreadProperties:
#
# Properties for iothread objects.
@@ -516,17 +538,26 @@
# algorithm detects it is spending too long polling without
# encountering events. 0 selects a default behaviour (default: 0)
#
-# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
-# 0 means that the engine will use its default
-# (default:0, since 6.1)
+# The @aio-max-batch option is available since 6.1.
#
# Since: 2.0
##
{ 'struct': 'IothreadProperties',
+ 'base': 'EventLoopBaseProperties',
'data': { '*poll-max-ns': 'int',
'*poll-grow': 'int',
- '*poll-shrink': 'int',
- '*aio-max-batch': 'int' } }
+ '*poll-shrink': 'int' } }
+
+##
+# @MainLoopProperties:
+#
+# Properties for the main-loop object.
+#
+# Since: 7.1
+##
+{ 'struct': 'MainLoopProperties',
+ 'base': 'EventLoopBaseProperties',
+ 'data': {} }
##
# @MemoryBackendProperties:
@@ -818,6 +849,7 @@
{ 'name': 'input-linux',
'if': 'CONFIG_LINUX' },
'iothread',
+ 'main-loop',
{ 'name': 'memory-backend-epc',
'if': 'CONFIG_LINUX' },
'memory-backend-file',
@@ -883,6 +915,7 @@
'input-linux': { 'type': 'InputLinuxProperties',
'if': 'CONFIG_LINUX' },
'iothread': 'IothreadProperties',
+ 'main-loop': 'MainLoopProperties',
'memory-backend-epc': { 'type': 'MemoryBackendEpcProperties',
'if': 'CONFIG_LINUX' },
'memory-backend-file': 'MemoryBackendFileProperties',
diff --git a/util/aio-posix.c b/util/aio-posix.c
index be0182a3c6..731f3826c0 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -15,6 +15,7 @@
#include "qemu/osdep.h"
#include "block/block.h"
+#include "block/thread-pool.h"
#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "qemu/rcu_queue.h"
diff --git a/util/async.c b/util/async.c
index 2ea1172f3e..554ba70cca 100644
--- a/util/async.c
+++ b/util/async.c
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
ctx->aio_max_batch = 0;
+ ctx->thread_pool_min = 0;
+ ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
+
return ctx;
fail:
g_source_destroy(&ctx->source);
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
assert(!get_my_aiocontext());
set_my_aiocontext(ctx);
}
+
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
+ int64_t max, Error **errp)
+{
+
+ if (min > max || !max || min > INT_MAX || max > INT_MAX) {
+ error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+ return;
+ }
+
+ ctx->thread_pool_min = min;
+ ctx->thread_pool_max = max;
+
+ if (ctx->thread_pool) {
+ thread_pool_update_params(ctx->thread_pool, ctx);
+ }
+}
diff --git a/util/main-loop.c b/util/main-loop.c
index 9afac10dff..f00a25451b 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -30,9 +30,11 @@
#include "sysemu/replay.h"
#include "qemu/main-loop.h"
#include "block/aio.h"
+#include "block/thread-pool.h"
#include "qemu/error-report.h"
#include "qemu/queue.h"
#include "qemu/compiler.h"
+#include "qom/object.h"
#ifndef _WIN32
#include <sys/wait.h>
@@ -184,6 +186,69 @@ int qemu_init_main_loop(Error **errp)
return 0;
}
+static void main_loop_update_params(EventLoopBase *base, Error **errp)
+{
+ ERRP_GUARD();
+
+ if (!qemu_aio_context) {
+ error_setg(errp, "qemu aio context not ready");
+ return;
+ }
+
+ aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
+ if (*errp) {
+ return;
+ }
+
+ aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
+ base->thread_pool_max, errp);
+}
+
+MainLoop *mloop;
+
+static void main_loop_init(EventLoopBase *base, Error **errp)
+{
+ MainLoop *m = MAIN_LOOP(base);
+
+ if (mloop) {
+ error_setg(errp, "only one main-loop instance allowed");
+ return;
+ }
+
+ main_loop_update_params(base, errp);
+
+ mloop = m;
+ return;
+}
+
+static bool main_loop_can_be_deleted(EventLoopBase *base)
+{
+ return false;
+}
+
+static void main_loop_class_init(ObjectClass *oc, void *class_data)
+{
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(oc);
+
+ bc->init = main_loop_init;
+ bc->update_params = main_loop_update_params;
+ bc->can_be_deleted = main_loop_can_be_deleted;
+}
+
+static const TypeInfo main_loop_info = {
+ .name = TYPE_MAIN_LOOP,
+ .parent = TYPE_EVENT_LOOP_BASE,
+ .class_init = main_loop_class_init,
+ .instance_size = sizeof(MainLoop),
+};
+
+static void main_loop_register_types(void)
+{
+ type_register_static(&main_loop_info);
+}
+
+type_init(main_loop_register_types)
+
static int max_priority;
#ifndef _WIN32
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..196835b4d3 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -58,7 +58,6 @@ struct ThreadPool {
QemuMutex lock;
QemuCond worker_stopped;
QemuSemaphore sem;
- int max_threads;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
bool stopping;
+ int min_threads;
+ int max_threads;
};
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+ /*
+ * The semaphore timed out, we should exit the loop except when:
+ * - There is work to do, we raced with the signal.
+ * - The max threads threshold just changed, we raced with the signal.
+ * - The thread pool forces a minimum number of readily available threads.
+ */
+ if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+ pool->cur_threads > pool->max_threads ||
+ pool->cur_threads <= pool->min_threads)) {
+ return true;
+ }
+
+ return false;
+}
+
static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
ret = qemu_sem_timedwait(&pool->sem, 10000);
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
- if (ret == -1 || pool->stopping) {
+ } while (back_to_sleep(pool, ret));
+ if (ret == -1 || pool->stopping ||
+ pool->cur_threads > pool->max_threads) {
break;
}
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+ qemu_mutex_lock(&pool->lock);
+
+ pool->min_threads = ctx->thread_pool_min;
+ pool->max_threads = ctx->thread_pool_max;
+
+ /*
+ * We either have to:
+ * - Increase the number available of threads until over the min_threads
+ * threshold.
+ * - Decrease the number of available threads until under the max_threads
+ * threshold.
+ * - Do nothing. The current number of threads fall in between the min and
+ * max thresholds. We'll let the pool manage itself.
+ */
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+ spawn_thread(pool);
+ }
+
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+ qemu_sem_post(&pool->sem);
+ }
+
+ qemu_mutex_unlock(&pool->lock);
+}
+
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
- pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
QTAILQ_INIT(&pool->request_list);
+
+ thread_pool_update_params(pool, ctx);
}
ThreadPool *thread_pool_new(AioContext *ctx)