diff options
Diffstat (limited to 'io')
-rw-r--r-- | io/task.c | 41 | ||||
-rw-r--r-- | io/trace-events | 2 |
2 files changed, 39 insertions, 4 deletions
@@ -29,6 +29,7 @@ struct QIOTaskThreadData { gpointer opaque; GDestroyNotify destroy; GMainContext *context; + GSource *completion; }; @@ -40,6 +41,8 @@ struct QIOTask { Error *err; gpointer result; GDestroyNotify destroyResult; + QemuMutex thread_lock; + QemuCond thread_cond; struct QIOTaskThreadData *thread; }; @@ -58,6 +61,8 @@ QIOTask *qio_task_new(Object *source, task->func = func; task->opaque = opaque; task->destroy = destroy; + qemu_mutex_init(&task->thread_lock); + qemu_cond_init(&task->thread_cond); trace_qio_task_new(task, source, func, opaque); @@ -66,6 +71,7 @@ QIOTask *qio_task_new(Object *source, static void qio_task_free(QIOTask *task) { + qemu_mutex_lock(&task->thread_lock); if (task->thread) { if (task->thread->destroy) { task->thread->destroy(task->thread->opaque); @@ -89,6 +95,10 @@ static void qio_task_free(QIOTask *task) } object_unref(task->source); + qemu_mutex_unlock(&task->thread_lock); + qemu_mutex_destroy(&task->thread_lock); + qemu_cond_destroy(&task->thread_cond); + g_free(task); } @@ -107,7 +117,6 @@ static gboolean qio_task_thread_result(gpointer opaque) static gpointer qio_task_thread_worker(gpointer opaque) { QIOTask *task = opaque; - GSource *idle; trace_qio_task_thread_run(task); @@ -120,9 +129,17 @@ static gpointer qio_task_thread_worker(gpointer opaque) */ trace_qio_task_thread_exit(task); - idle = g_idle_source_new(); - g_source_set_callback(idle, qio_task_thread_result, task, NULL); - g_source_attach(idle, task->thread->context); + qemu_mutex_lock(&task->thread_lock); + + task->thread->completion = g_idle_source_new(); + g_source_set_callback(task->thread->completion, + qio_task_thread_result, task, NULL); + g_source_attach(task->thread->completion, + task->thread->context); + trace_qio_task_thread_source_attach(task, task->thread->completion); + + qemu_cond_signal(&task->thread_cond); + qemu_mutex_unlock(&task->thread_lock); return NULL; } @@ -157,6 +174,22 @@ void qio_task_run_in_thread(QIOTask *task, } +void qio_task_wait_thread(QIOTask *task) +{ + qemu_mutex_lock(&task->thread_lock); + g_assert(task->thread != NULL); + while (task->thread->completion == NULL) { + qemu_cond_wait(&task->thread_cond, &task->thread_lock); + } + + trace_qio_task_thread_source_cancel(task, task->thread->completion); + g_source_destroy(task->thread->completion); + qemu_mutex_unlock(&task->thread_lock); + + qio_task_thread_result(task); +} + + void qio_task_complete(QIOTask *task) { task->func(task, task->opaque); diff --git a/io/trace-events b/io/trace-events index f70bad7cbe..07a7bbec6a 100644 --- a/io/trace-events +++ b/io/trace-events @@ -7,6 +7,8 @@ qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start qio_task_thread_run(void *task) "Task thread run task=%p" qio_task_thread_exit(void *task) "Task thread exit task=%p" qio_task_thread_result(void *task) "Task thread result task=%p" +qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p" +qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p" # io/channel-socket.c qio_channel_socket_new(void *ioc) "Socket new ioc=%p" |