diff options
-rw-r--r-- | include/block/thread-pool.h | 5 | ||||
-rw-r--r-- | thread-pool.c | 52 |
2 files changed, 53 insertions, 4 deletions
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 200703e35f..e1453c685d 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -26,6 +26,11 @@ typedef int ThreadPoolFunc(void *opaque); +typedef struct ThreadPool ThreadPool; + +ThreadPool *thread_pool_new(struct AioContext *ctx); +void thread_pool_free(ThreadPool *pool); + BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque); int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); diff --git a/thread-pool.c b/thread-pool.c index a0aecd08fe..d1e4570829 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -24,8 +24,6 @@ #include "qemu/event_notifier.h" #include "block/thread-pool.h" -typedef struct ThreadPool ThreadPool; - static void do_spawn_thread(ThreadPool *pool); typedef struct ThreadPoolElement ThreadPoolElement; @@ -59,8 +57,10 @@ struct ThreadPoolElement { struct ThreadPool { EventNotifier notifier; + AioContext *ctx; QemuMutex lock; QemuCond check_cancel; + QemuCond worker_stopped; QemuSemaphore sem; int max_threads; QEMUBH *new_thread_bh; @@ -75,6 +75,7 @@ struct ThreadPool { int new_threads; /* backlog of threads we need to create */ int pending_threads; /* threads created but not running yet */ int pending_cancellations; /* whether we need a cond_broadcast */ + bool stopping; }; /* Currently there is only one thread pool instance. */ @@ -88,7 +89,7 @@ static void *worker_thread(void *opaque) pool->pending_threads--; do_spawn_thread(pool); - while (1) { + while (!pool->stopping) { ThreadPoolElement *req; int ret; @@ -99,7 +100,7 @@ static void *worker_thread(void *opaque) qemu_mutex_lock(&pool->lock); pool->idle_threads--; } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); - if (ret == -1) { + if (ret == -1 || pool->stopping) { break; } @@ -124,6 +125,7 @@ static void *worker_thread(void *opaque) } pool->cur_threads--; + qemu_cond_signal(&pool->worker_stopped); qemu_mutex_unlock(&pool->lock); return NULL; } @@ -298,8 +300,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) memset(pool, 0, sizeof(*pool)); event_notifier_init(&pool->notifier, false); + pool->ctx = ctx; qemu_mutex_init(&pool->lock); qemu_cond_init(&pool->check_cancel); + qemu_cond_init(&pool->worker_stopped); qemu_sem_init(&pool->sem, 0); pool->max_threads = 64; pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); @@ -311,6 +315,46 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) thread_pool_active); } +ThreadPool *thread_pool_new(AioContext *ctx) +{ + ThreadPool *pool = g_new(ThreadPool, 1); + thread_pool_init_one(pool, ctx); + return pool; +} + +void thread_pool_free(ThreadPool *pool) +{ + if (!pool) { + return; + } + + assert(QLIST_EMPTY(&pool->head)); + + qemu_mutex_lock(&pool->lock); + + /* Stop new threads from spawning */ + qemu_bh_delete(pool->new_thread_bh); + pool->cur_threads -= pool->new_threads; + pool->new_threads = 0; + + /* Wait for worker threads to terminate */ + pool->stopping = true; + while (pool->cur_threads > 0) { + qemu_sem_post(&pool->sem); + qemu_cond_wait(&pool->worker_stopped, &pool->lock); + } + + qemu_mutex_unlock(&pool->lock); + + aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL); + qemu_sem_destroy(&pool->sem); + qemu_cond_destroy(&pool->check_cancel); + qemu_cond_destroy(&pool->worker_stopped); + qemu_mutex_destroy(&pool->lock); + event_notifier_cleanup(&pool->notifier); + g_free(pool); +} + static void thread_pool_init(void) { thread_pool_init_one(&global_pool, NULL); |