aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/rbd.c130
1 files changed, 22 insertions, 108 deletions
diff --git a/block/rbd.c b/block/rbd.c
index f453f04757..121fae221e 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -95,18 +95,13 @@ typedef struct RADOSCB {
#define RBD_FD_WRITE 1
typedef struct BDRVRBDState {
- int fds[2];
rados_t cluster;
rados_ioctx_t io_ctx;
rbd_image_t image;
char name[RBD_MAX_IMAGE_NAME_SIZE];
char *snap;
- int event_reader_pos;
- RADOSCB *event_rcb;
} BDRVRBDState;
-static void rbd_aio_bh_cb(void *opaque);
-
static int qemu_rbd_next_tok(char *dst, int dst_len,
char *src, char delim,
const char *name,
@@ -369,9 +364,8 @@ static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options,
}
/*
- * This aio completion is being called from qemu_rbd_aio_event_reader()
- * and runs in qemu context. It schedules a bh, but just in case the aio
- * was not cancelled before.
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
*/
static void qemu_rbd_complete_aio(RADOSCB *rcb)
{
@@ -401,36 +395,19 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
acb->ret = r;
}
}
- /* Note that acb->bh can be NULL in case where the aio was cancelled */
- acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
- qemu_bh_schedule(acb->bh);
- g_free(rcb);
-}
-/*
- * aio fd read handler. It runs in the qemu context and calls the
- * completion handling of completed rados aio operations.
- */
-static void qemu_rbd_aio_event_reader(void *opaque)
-{
- BDRVRBDState *s = opaque;
+ g_free(rcb);
- ssize_t ret;
+ if (acb->cmd == RBD_AIO_READ) {
+ qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+ }
+ qemu_vfree(acb->bounce);
+ acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+ acb->status = 0;
- do {
- char *p = (char *)&s->event_rcb;
-
- /* now read the rcb pointer that was sent from a non qemu thread */
- ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
- sizeof(s->event_rcb) - s->event_reader_pos);
- if (ret > 0) {
- s->event_reader_pos += ret;
- if (s->event_reader_pos == sizeof(s->event_rcb)) {
- s->event_reader_pos = 0;
- qemu_rbd_complete_aio(s->event_rcb);
- }
- }
- } while (ret < 0 && errno == EINTR);
+ if (!acb->cancelled) {
+ qemu_aio_release(acb);
+ }
}
/* TODO Convert to fine grained options */
@@ -538,23 +515,9 @@ static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
bs->read_only = (s->snap != NULL);
- s->event_reader_pos = 0;
- r = qemu_pipe(s->fds);
- if (r < 0) {
- error_report("error opening eventfd");
- goto failed;
- }
- fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
- fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
- qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
- NULL, s);
-
-
qemu_opts_del(opts);
return 0;
-failed:
- rbd_close(s->image);
failed_open:
rados_ioctx_destroy(s->io_ctx);
failed_shutdown:
@@ -569,10 +532,6 @@ static void qemu_rbd_close(BlockDriverState *bs)
{
BDRVRBDState *s = bs->opaque;
- close(s->fds[0]);
- close(s->fds[1]);
- qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL);
-
rbd_close(s->image);
rados_ioctx_destroy(s->io_ctx);
g_free(s->snap);
@@ -600,34 +559,11 @@ static const AIOCBInfo rbd_aiocb_info = {
.cancel = qemu_rbd_aio_cancel,
};
-static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
+static void rbd_finish_bh(void *opaque)
{
- int ret = 0;
- while (1) {
- fd_set wfd;
- int fd = s->fds[RBD_FD_WRITE];
-
- /* send the op pointer to the qemu thread that is responsible
- for the aio/op completion. Must do it in a qemu thread context */
- ret = write(fd, (void *)&rcb, sizeof(rcb));
- if (ret >= 0) {
- break;
- }
- if (errno == EINTR) {
- continue;
- }
- if (errno != EAGAIN) {
- break;
- }
-
- FD_ZERO(&wfd);
- FD_SET(fd, &wfd);
- do {
- ret = select(fd + 1, NULL, &wfd, NULL, NULL);
- } while (ret < 0 && errno == EINTR);
- }
-
- return ret;
+ RADOSCB *rcb = opaque;
+ qemu_bh_delete(rcb->acb->bh);
+ qemu_rbd_complete_aio(rcb);
}
/*
@@ -635,40 +571,18 @@ static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
*
* Note: this function is being called from a non qemu thread so
* we need to be careful about what we do here. Generally we only
- * write to the block notification pipe, and do the rest of the
- * io completion handling from qemu_rbd_aio_event_reader() which
- * runs in a qemu context.
+ * schedule a BH, and do the rest of the io completion handling
+ * from rbd_finish_bh() which runs in a qemu context.
*/
static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
{
- int ret;
+ RBDAIOCB *acb = rcb->acb;
+
rcb->ret = rbd_aio_get_return_value(c);
rbd_aio_release(c);
- ret = qemu_rbd_send_pipe(rcb->s, rcb);
- if (ret < 0) {
- error_report("failed writing to acb->s->fds");
- g_free(rcb);
- }
-}
-
-/* Callback when all queued rbd_aio requests are complete */
-static void rbd_aio_bh_cb(void *opaque)
-{
- RBDAIOCB *acb = opaque;
-
- if (acb->cmd == RBD_AIO_READ) {
- qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
- }
- qemu_vfree(acb->bounce);
- acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
- qemu_bh_delete(acb->bh);
- acb->bh = NULL;
- acb->status = 0;
-
- if (!acb->cancelled) {
- qemu_aio_release(acb);
- }
+ acb->bh = qemu_bh_new(rbd_finish_bh, rcb);
+ qemu_bh_schedule(acb->bh);
}
static int rbd_aio_discard_wrapper(rbd_image_t image,