aboutsummaryrefslogtreecommitdiff
path: root/util/thread-pool.c
diff options
context:
space:
mode:
authorNicolas Saenz Julienne <nsaenzju@redhat.com>2022-04-25 09:57:23 +0200
committerStefan Hajnoczi <stefanha@redhat.com>2022-05-09 10:43:23 +0100
commit71ad4713cc1d7fca24388b828ef31ae6cb38a31c (patch)
treebaf49f521c85ffa4551b2cb73083562570b3c857 /util/thread-pool.c
parent70ac26b9e5ca8374bb3ef3f30b871726673c9f27 (diff)
util/event-loop-base: Introduce options to set the thread pool size
The thread pool regulates itself: when idle, it kills threads until empty, when in demand, it creates new threads until full. This behaviour doesn't play well with latency sensitive workloads where the price of creating a new thread is too high. For example, when paired with qemu's '-mlock', or using safety features like SafeStack, creating a new thread has been measured take multiple milliseconds. In order to mitigate this let's introduce a new 'EventLoopBase' property to set the thread pool size. The threads will be created during the pool's initialization or upon updating the property's value, remain available during its lifetime regardless of demand, and destroyed upon freeing it. A properly characterized workload will then be able to configure the pool to avoid any latency spikes. Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Acked-by: Markus Armbruster <armbru@redhat.com> Message-id: 20220425075723.20019-4-nsaenzju@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'util/thread-pool.c')
-rw-r--r--util/thread-pool.c55
1 files changed, 51 insertions, 4 deletions
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..196835b4d3 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -58,7 +58,6 @@ struct ThreadPool {
QemuMutex lock;
QemuCond worker_stopped;
QemuSemaphore sem;
- int max_threads;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
@@ -71,8 +70,27 @@ struct ThreadPool {
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
bool stopping;
+ int min_threads;
+ int max_threads;
};
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
+{
+ /*
+ * The semaphore timed out, we should exit the loop except when:
+ * - There is work to do, we raced with the signal.
+ * - The max threads threshold just changed, we raced with the signal.
+ * - The thread pool forces a minimum number of readily available threads.
+ */
+ if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+ pool->cur_threads > pool->max_threads ||
+ pool->cur_threads <= pool->min_threads)) {
+ return true;
+ }
+
+ return false;
+}
+
static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
ret = qemu_sem_timedwait(&pool->sem, 10000);
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
- if (ret == -1 || pool->stopping) {
+ } while (back_to_sleep(pool, ret));
+ if (ret == -1 || pool->stopping ||
+ pool->cur_threads > pool->max_threads) {
break;
}
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+ qemu_mutex_lock(&pool->lock);
+
+ pool->min_threads = ctx->thread_pool_min;
+ pool->max_threads = ctx->thread_pool_max;
+
+ /*
+ * We either have to:
+ * - Increase the number available of threads until over the min_threads
+ * threshold.
+ * - Decrease the number of available threads until under the max_threads
+ * threshold.
+ * - Do nothing. The current number of threads fall in between the min and
+ * max thresholds. We'll let the pool manage itself.
+ */
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
+ spawn_thread(pool);
+ }
+
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
+ qemu_sem_post(&pool->sem);
+ }
+
+ qemu_mutex_unlock(&pool->lock);
+}
+
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
if (!ctx) {
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
- pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
QTAILQ_INIT(&pool->request_list);
+
+ thread_pool_update_params(pool, ctx);
}
ThreadPool *thread_pool_new(AioContext *ctx)