aboutsummaryrefslogtreecommitdiff
path: root/posix-aio-compat.c
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2012-05-24 18:03:13 +0200
committerPaolo Bonzini <pbonzini@redhat.com>2012-10-31 10:38:12 +0100
commit47e6b251a5e9a47c406f2f2c0b01bb88854c98ec (patch)
tree9c254f62be2c7e696f5831a557899dbdc4e16d54 /posix-aio-compat.c
parent19d092cf9ba3c01b0e22ef65c499ae7ddc28d0e8 (diff)
block: switch posix-aio-compat to threadpool
This is not meant for portability, but to remove code duplication. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Diffstat (limited to 'posix-aio-compat.c')
-rw-r--r--posix-aio-compat.c427
1 files changed, 39 insertions, 388 deletions
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 96e4daf505..4a1e3d3662 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,13 +28,12 @@
#include "sysemu.h"
#include "qemu-common.h"
#include "trace.h"
+#include "thread-pool.h"
#include "block_int.h"
#include "iov.h"
#include "block/raw-posix-aio.h"
-static void do_spawn_thread(void);
-
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_fildes;
@@ -46,82 +45,15 @@ struct qemu_paiocb {
size_t aio_nbytes;
#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
off_t aio_offset;
-
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
- ssize_t ret;
- int active;
- struct qemu_paiocb *next;
};
-typedef struct PosixAioState {
- int rfd, wfd;
- struct qemu_paiocb *first_aio;
-} PosixAioState;
-
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static int new_threads = 0; /* backlog of threads we need to create */
-static int pending_threads = 0; /* threads created but not running yet */
-static QEMUBH *new_thread_bh;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
static int preadv_present = 0;
#endif
-static void die2(int err, const char *what)
-{
- fprintf(stderr, "%s failed: %s\n", what, strerror(err));
- abort();
-}
-
-static void die(const char *what)
-{
- die2(errno, what);
-}
-
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -310,286 +242,54 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void posix_aio_notify_event(void);
-
-static void *aio_thread(void *unused)
+static int aio_worker(void *arg)
{
- mutex_lock(&lock);
- pending_threads--;
- mutex_unlock(&lock);
- do_spawn_thread();
-
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- idle_threads++;
- ret = cond_timedwait(&cond, &lock, &ts);
- idle_threads--;
+ struct qemu_paiocb *aiocb = arg;
+ ssize_t ret = 0;
+
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+ /* A short read means that we have reached EOF. Pad the buffer
+ * with zeros for bytes after EOF. */
+ iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
+ 0, aiocb->aio_nbytes - ret);
+
+ ret = aiocb->aio_nbytes;
}
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- ret = handle_aiocb_rw(aiocb);
- if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
- /* A short read means that we have reached EOF. Pad the buffer
- * with zeros for bytes after EOF. */
- iov_memset(aiocb->aio_iov, aiocb->aio_niov, ret,
- 0, aiocb->aio_nbytes - ret);
-
- ret = aiocb->aio_nbytes;
- }
- break;
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
ret = -EINVAL;
- break;
}
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- mutex_unlock(&lock);
-
- posix_aio_notify_event();
- }
-
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void do_spawn_thread(void)
-{
- sigset_t set, oldset;
-
- mutex_lock(&lock);
- if (!new_threads) {
- mutex_unlock(&lock);
- return;
- }
-
- new_threads--;
- pending_threads++;
-
- mutex_unlock(&lock);
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
-}
-
-static void spawn_thread_bh_fn(void *opaque)
-{
- do_spawn_thread();
-}
-
-static void spawn_thread(void)
-{
- cur_threads++;
- new_threads++;
- /* If there are threads being created, they will spawn new workers, so
- * we don't spend time creating many threads in a loop holding a mutex or
- * starving the current vcpu.
- *
- * If there are no idle threads, ask the main thread to create one, so we
- * inherit the correct affinity instead of the vcpu affinity.
- */
- if (!pending_threads) {
- qemu_bh_schedule(new_thread_bh);
- }
-}
-
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
-{
- aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
-}
-
-static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
-{
- ssize_t ret;
-
- mutex_lock(&lock);
- ret = aiocb->ret;
- mutex_unlock(&lock);
-
- return ret;
-}
-
-static int qemu_paio_error(struct qemu_paiocb *aiocb)
-{
- ssize_t ret = qemu_paio_return(aiocb);
-
- if (ret < 0)
- ret = -ret;
- else
- ret = 0;
-
- return ret;
-}
-
-static void posix_aio_read(void *opaque)
-{
- PosixAioState *s = opaque;
- struct qemu_paiocb *acb, **pacb;
- int ret;
- ssize_t len;
-
- /* read all bytes from signal pipe */
- for (;;) {
- char bytes[16];
-
- len = read(s->rfd, bytes, sizeof(bytes));
- if (len == -1 && errno == EINTR)
- continue; /* try again */
- if (len == sizeof(bytes))
- continue; /* more to read */
break;
- }
-
- for(;;) {
- pacb = &s->first_aio;
- for(;;) {
- acb = *pacb;
- if (!acb)
- return;
-
- ret = qemu_paio_error(acb);
- if (ret == ECANCELED) {
- /* remove the request */
- *pacb = acb->next;
- qemu_aio_release(acb);
- } else if (ret != EINPROGRESS) {
- /* end of aio */
- if (ret == 0) {
- ret = qemu_paio_return(acb);
- if (ret == acb->aio_nbytes)
- ret = 0;
- else
- ret = -EINVAL;
- } else {
- ret = -ret;
- }
-
- trace_paio_complete(acb, acb->common.opaque, ret);
-
- /* remove the request */
- *pacb = acb->next;
- /* call the callback */
- acb->common.cb(acb->common.opaque, ret);
- qemu_aio_release(acb);
- break;
- } else {
- pacb = &acb->next;
- }
- }
- }
-}
-
-static int posix_aio_flush(void *opaque)
-{
- PosixAioState *s = opaque;
- return !!s->first_aio;
-}
-
-static PosixAioState *posix_aio_state;
-
-static void posix_aio_notify_event(void)
-{
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
-}
-
-static void paio_remove(struct qemu_paiocb *acb)
-{
- struct qemu_paiocb **pacb;
-
- /* remove the callback from the queue */
- pacb = &posix_aio_state->first_aio;
- for(;;) {
- if (*pacb == NULL) {
- fprintf(stderr, "paio_remove: aio request not found!\n");
- break;
- } else if (*pacb == acb) {
- *pacb = acb->next;
- qemu_aio_release(acb);
- break;
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret == aiocb->aio_nbytes) {
+ ret = 0;
+ } else if (ret >= 0 && ret < aiocb->aio_nbytes) {
+ ret = -EINVAL;
}
- pacb = &(*pacb)->next;
- }
-}
-
-static void paio_cancel(BlockDriverAIOCB *blockacb)
-{
- struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- trace_paio_cancel(acb, acb->common.opaque);
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
-
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- paio_remove(acb);
+ qemu_aio_release(aiocb);
+ return ret;
}
static AIOPool raw_aio_pool = {
.aiocb_size = sizeof(struct qemu_paiocb),
- .cancel = paio_cancel,
};
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
@@ -609,12 +309,8 @@ BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
acb->aio_nbytes = nb_sectors * 512;
acb->aio_offset = sector_num * 512;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
- qemu_paio_submit(acb);
- return &acb->common;
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
}
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
@@ -630,50 +326,5 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
- acb->next = posix_aio_state->first_aio;
- posix_aio_state->first_aio = acb;
-
- qemu_paio_submit(acb);
- return &acb->common;
-}
-
-int paio_init(void)
-{
- PosixAioState *s;
- int fds[2];
- int ret;
-
- if (posix_aio_state)
- return 0;
-
- s = g_malloc(sizeof(PosixAioState));
-
- s->first_aio = NULL;
- if (qemu_pipe(fds) == -1) {
- fprintf(stderr, "failed to create pipe\n");
- g_free(s);
- return -1;
- }
-
- s->rfd = fds[0];
- s->wfd = fds[1];
-
- fcntl(s->rfd, F_SETFL, O_NONBLOCK);
- fcntl(s->wfd, F_SETFL, O_NONBLOCK);
-
- qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
-
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
- new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
-
- posix_aio_state = s;
- return 0;
+ return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
}