aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/io/task.h29
-rw-r--r--io/task.c41
-rw-r--r--io/trace-events2
3 files changed, 67 insertions, 5 deletions
diff --git a/include/io/task.h b/include/io/task.h
index 9e09b95b2e..57d8ba835e 100644
--- a/include/io/task.h
+++ b/include/io/task.h
@@ -232,7 +232,8 @@ QIOTask *qio_task_new(Object *source,
*
* Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in
- * the event thread context that provided.
+ * the thread that is running the main loop associated
+ * with @context.
*/
void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker,
@@ -240,6 +241,32 @@ void qio_task_run_in_thread(QIOTask *task,
GDestroyNotify destroy,
GMainContext *context);
+
+/**
+ * qio_task_wait_thread:
+ * @task: the task struct
+ *
+ * Wait for completion of a task that was previously
+ * invoked using qio_task_run_in_thread. This MUST
+ * ONLY be invoked if the task has not already
+ * completed, since after the completion callback
+ * is invoked, @task will have been freed.
+ *
+ * To avoid racing with execution of the completion
+ * callback provided with qio_task_new, this method
+ * MUST ONLY be invoked from the thread that is
+ * running the main loop associated with @context
+ * parameter to qio_task_run_in_thread.
+ *
+ * When the thread has completed, the completion
+ * callback provided to qio_task_new will be invoked.
+ * When that callback returns @task will be freed,
+ * so @task must not be referenced after this
+ * method completes.
+ */
+void qio_task_wait_thread(QIOTask *task);
+
+
/**
* qio_task_complete:
* @task: the task struct
diff --git a/io/task.c b/io/task.c
index 396866b10f..64c4c7126a 100644
--- a/io/task.c
+++ b/io/task.c
@@ -29,6 +29,7 @@ struct QIOTaskThreadData {
gpointer opaque;
GDestroyNotify destroy;
GMainContext *context;
+ GSource *completion;
};
@@ -40,6 +41,8 @@ struct QIOTask {
Error *err;
gpointer result;
GDestroyNotify destroyResult;
+ QemuMutex thread_lock;
+ QemuCond thread_cond;
struct QIOTaskThreadData *thread;
};
@@ -58,6 +61,8 @@ QIOTask *qio_task_new(Object *source,
task->func = func;
task->opaque = opaque;
task->destroy = destroy;
+ qemu_mutex_init(&task->thread_lock);
+ qemu_cond_init(&task->thread_cond);
trace_qio_task_new(task, source, func, opaque);
@@ -66,6 +71,7 @@ QIOTask *qio_task_new(Object *source,
static void qio_task_free(QIOTask *task)
{
+ qemu_mutex_lock(&task->thread_lock);
if (task->thread) {
if (task->thread->destroy) {
task->thread->destroy(task->thread->opaque);
@@ -89,6 +95,10 @@ static void qio_task_free(QIOTask *task)
}
object_unref(task->source);
+ qemu_mutex_unlock(&task->thread_lock);
+ qemu_mutex_destroy(&task->thread_lock);
+ qemu_cond_destroy(&task->thread_cond);
+
g_free(task);
}
@@ -107,7 +117,6 @@ static gboolean qio_task_thread_result(gpointer opaque)
static gpointer qio_task_thread_worker(gpointer opaque)
{
QIOTask *task = opaque;
- GSource *idle;
trace_qio_task_thread_run(task);
@@ -120,9 +129,17 @@ static gpointer qio_task_thread_worker(gpointer opaque)
*/
trace_qio_task_thread_exit(task);
- idle = g_idle_source_new();
- g_source_set_callback(idle, qio_task_thread_result, task, NULL);
- g_source_attach(idle, task->thread->context);
+ qemu_mutex_lock(&task->thread_lock);
+
+ task->thread->completion = g_idle_source_new();
+ g_source_set_callback(task->thread->completion,
+ qio_task_thread_result, task, NULL);
+ g_source_attach(task->thread->completion,
+ task->thread->context);
+ trace_qio_task_thread_source_attach(task, task->thread->completion);
+
+ qemu_cond_signal(&task->thread_cond);
+ qemu_mutex_unlock(&task->thread_lock);
return NULL;
}
@@ -157,6 +174,22 @@ void qio_task_run_in_thread(QIOTask *task,
}
+void qio_task_wait_thread(QIOTask *task)
+{
+ qemu_mutex_lock(&task->thread_lock);
+ g_assert(task->thread != NULL);
+ while (task->thread->completion == NULL) {
+ qemu_cond_wait(&task->thread_cond, &task->thread_lock);
+ }
+
+ trace_qio_task_thread_source_cancel(task, task->thread->completion);
+ g_source_destroy(task->thread->completion);
+ qemu_mutex_unlock(&task->thread_lock);
+
+ qio_task_thread_result(task);
+}
+
+
void qio_task_complete(QIOTask *task)
{
task->func(task, task->opaque);
diff --git a/io/trace-events b/io/trace-events
index f70bad7cbe..07a7bbec6a 100644
--- a/io/trace-events
+++ b/io/trace-events
@@ -7,6 +7,8 @@ qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start
qio_task_thread_run(void *task) "Task thread run task=%p"
qio_task_thread_exit(void *task) "Task thread exit task=%p"
qio_task_thread_result(void *task) "Task thread result task=%p"
+qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p"
+qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p"
# io/channel-socket.c
qio_channel_socket_new(void *ioc) "Socket new ioc=%p"