From dbb44504c28683a4428308f72b9f61733fcda007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20P=2E=20Berrang=C3=A9?= Date: Mon, 11 Feb 2019 18:24:28 +0000 Subject: io: add qio_task_wait_thread to join with a background thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the ability for a caller to wait for completion of the background thread to synchronously dispatch its result, without needing to wait for the main loop to run the idle callback. This method needs very careful usage to avoid a dangerous race condition with the free'ing of the task. The completion callback is normally invoked from an idle callback registered with the main loop context. The qio_task_wait_thread method must only be called if the completion callback has not yet run. The only safe way to achieve this is to run the qio_task_wait_thread method from the thread that executes the main loop. It is generally a bad idea to use this method since it will block execution of the main loop, however, the design of the character devices and its usage from vhostuser already requires blocking execution. Signed-off-by: Daniel P. Berrangé Message-Id: <20190211182442.8542-3-berrange@redhat.com> Signed-off-by: Marc-André Lureau --- io/task.c | 41 +++++++++++++++++++++++++++++++++++++---- io/trace-events | 2 ++ 2 files changed, 39 insertions(+), 4 deletions(-) (limited to 'io') diff --git a/io/task.c b/io/task.c index 396866b10f..64c4c7126a 100644 --- a/io/task.c +++ b/io/task.c @@ -29,6 +29,7 @@ struct QIOTaskThreadData { gpointer opaque; GDestroyNotify destroy; GMainContext *context; + GSource *completion; }; @@ -40,6 +41,8 @@ struct QIOTask { Error *err; gpointer result; GDestroyNotify destroyResult; + QemuMutex thread_lock; + QemuCond thread_cond; struct QIOTaskThreadData *thread; }; @@ -58,6 +61,8 @@ QIOTask *qio_task_new(Object *source, task->func = func; task->opaque = opaque; task->destroy = destroy; + qemu_mutex_init(&task->thread_lock); + qemu_cond_init(&task->thread_cond); trace_qio_task_new(task, source, func, opaque); @@ -66,6 +71,7 @@ QIOTask *qio_task_new(Object *source, static void qio_task_free(QIOTask *task) { + qemu_mutex_lock(&task->thread_lock); if (task->thread) { if (task->thread->destroy) { task->thread->destroy(task->thread->opaque); @@ -89,6 +95,10 @@ static void qio_task_free(QIOTask *task) } object_unref(task->source); + qemu_mutex_unlock(&task->thread_lock); + qemu_mutex_destroy(&task->thread_lock); + qemu_cond_destroy(&task->thread_cond); + g_free(task); } @@ -107,7 +117,6 @@ static gboolean qio_task_thread_result(gpointer opaque) static gpointer qio_task_thread_worker(gpointer opaque) { QIOTask *task = opaque; - GSource *idle; trace_qio_task_thread_run(task); @@ -120,9 +129,17 @@ static gpointer qio_task_thread_worker(gpointer opaque) */ trace_qio_task_thread_exit(task); - idle = g_idle_source_new(); - g_source_set_callback(idle, qio_task_thread_result, task, NULL); - g_source_attach(idle, task->thread->context); + qemu_mutex_lock(&task->thread_lock); + + task->thread->completion = g_idle_source_new(); + g_source_set_callback(task->thread->completion, + qio_task_thread_result, task, NULL); + g_source_attach(task->thread->completion, + task->thread->context); + trace_qio_task_thread_source_attach(task, task->thread->completion); + + qemu_cond_signal(&task->thread_cond); + qemu_mutex_unlock(&task->thread_lock); return NULL; } @@ -157,6 +174,22 @@ void qio_task_run_in_thread(QIOTask *task, } +void qio_task_wait_thread(QIOTask *task) +{ + qemu_mutex_lock(&task->thread_lock); + g_assert(task->thread != NULL); + while (task->thread->completion == NULL) { + qemu_cond_wait(&task->thread_cond, &task->thread_lock); + } + + trace_qio_task_thread_source_cancel(task, task->thread->completion); + g_source_destroy(task->thread->completion); + qemu_mutex_unlock(&task->thread_lock); + + qio_task_thread_result(task); +} + + void qio_task_complete(QIOTask *task) { task->func(task, task->opaque); diff --git a/io/trace-events b/io/trace-events index f70bad7cbe..07a7bbec6a 100644 --- a/io/trace-events +++ b/io/trace-events @@ -7,6 +7,8 @@ qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start qio_task_thread_run(void *task) "Task thread run task=%p" qio_task_thread_exit(void *task) "Task thread exit task=%p" qio_task_thread_result(void *task) "Task thread result task=%p" +qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p" +qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p" # io/channel-socket.c qio_channel_socket_new(void *ioc) "Socket new ioc=%p" -- cgit v1.2.3