diff options
author | Paolo Bonzini <pbonzini@redhat.com> | 2014-07-09 11:53:10 +0200 |
---|---|---|
committer | Stefan Hajnoczi <stefanha@redhat.com> | 2014-08-29 10:46:58 +0100 |
commit | b493317d344357f7ac56606246d09b5604e54ab6 (patch) | |
tree | 70d229bfc876df17b21218be917394dd384cdb8a | |
parent | 79d9b6566b90efac072720f37a1b57d73f539264 (diff) |
aio-win32: add support for sockets
Uses the same select/WSAEventSelect scheme as main-loop.c.
WSAEventSelect() is edge-triggered, so it cannot be used
directly, but it is still used as a way to exit from a
blocking g_poll().
Before g_poll() is called, we poll sockets with a non-blocking
select() to achieve the level-triggered semantics we require:
if a socket is ready, the g_poll() is made non-blocking too.
Based on a patch from Or Goshen.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r-- | aio-win32.c | 150 | ||||
-rw-r--r-- | block/Makefile.objs | 2 | ||||
-rw-r--r-- | include/block/aio.h | 2 |
3 files changed, 145 insertions, 9 deletions
diff --git a/aio-win32.c b/aio-win32.c index 45422704a4..61e3d2ddfe 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -22,12 +22,80 @@ struct AioHandler { EventNotifier *e; + IOHandler *io_read; + IOHandler *io_write; EventNotifierHandler *io_notify; GPollFD pfd; int deleted; + void *opaque; QLIST_ENTRY(AioHandler) node; }; +void aio_set_fd_handler(AioContext *ctx, + int fd, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + /* fd is a SOCKET in our case */ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd && !node->deleted) { + break; + } + } + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write) { + if (node) { + /* If the lock is held, just mark the node as deleted */ + if (ctx->walking_handlers) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the walking_handlers lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + HANDLE event; + + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_malloc0(sizeof(AioHandler)); + node->pfd.fd = fd; + QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + } + + node->pfd.events = 0; + if (node->io_read) { + node->pfd.events |= G_IO_IN; + } + if (node->io_write) { + node->pfd.events |= G_IO_OUT; + } + + node->e = &ctx->notifier; + + /* Update handler with latest information */ + node->opaque = opaque; + node->io_read = io_read; + node->io_write = io_write; + + event = event_notifier_get_handle(&ctx->notifier); + WSAEventSelect(node->pfd.fd, event, + FD_READ | FD_ACCEPT | FD_CLOSE | + FD_CONNECT | FD_WRITE | FD_OOB); + } + + aio_notify(ctx); +} + void aio_set_event_notifier(AioContext *ctx, EventNotifier *e, EventNotifierHandler *io_notify) @@ -78,7 +146,39 @@ void aio_set_event_notifier(AioContext *ctx, bool aio_prepare(AioContext *ctx) { - return false; + static struct timeval tv0; + AioHandler *node; + bool have_select_revents = false; + fd_set rfds, wfds; + + /* fill fd sets */ + FD_ZERO(&rfds); + FD_ZERO(&wfds); + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->io_read) { + FD_SET ((SOCKET)node->pfd.fd, &rfds); + } + if (node->io_write) { + FD_SET ((SOCKET)node->pfd.fd, &wfds); + } + } + + if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + node->pfd.revents = 0; + if (FD_ISSET(node->pfd.fd, &rfds)) { + node->pfd.revents |= G_IO_IN; + have_select_revents = true; + } + + if (FD_ISSET(node->pfd.fd, &wfds)) { + node->pfd.revents |= G_IO_OUT; + have_select_revents = true; + } + } + } + + return have_select_revents; } bool aio_pending(AioContext *ctx) @@ -89,6 +189,13 @@ bool aio_pending(AioContext *ctx) if (node->pfd.revents && node->io_notify) { return true; } + + if ((node->pfd.revents & G_IO_IN) && node->io_read) { + return true; + } + if ((node->pfd.revents & G_IO_OUT) && node->io_write) { + return true; + } } return false; @@ -106,11 +213,12 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) node = QLIST_FIRST(&ctx->aio_handlers); while (node) { AioHandler *tmp; + int revents = node->pfd.revents; ctx->walking_handlers++; if (!node->deleted && - (node->pfd.revents || event_notifier_get_handle(node->e) == event) && + (revents || event_notifier_get_handle(node->e) == event) && node->io_notify) { node->pfd.revents = 0; node->io_notify(node->e); @@ -121,6 +229,28 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) } } + if (!node->deleted && + (node->io_read || node->io_write)) { + node->pfd.revents = 0; + if ((revents & G_IO_IN) && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if ((revents & G_IO_OUT) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + /* if the next select() will return an event, we have progressed */ + if (event == event_notifier_get_handle(&ctx->notifier)) { + WSANETWORKEVENTS ev; + WSAEnumNetworkEvents(node->pfd.fd, event, &ev); + if (ev.lNetworkEvents) { + progress = true; + } + } + } + tmp = node; node = QLIST_NEXT(node, node); @@ -149,10 +279,15 @@ bool aio_poll(AioContext *ctx, bool blocking) { AioHandler *node; HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; - bool was_dispatching, progress, first; + bool was_dispatching, progress, have_select_revents, first; int count; int timeout; + if (aio_prepare(ctx)) { + blocking = false; + have_select_revents = true; + } + was_dispatching = ctx->dispatching; progress = false; @@ -183,6 +318,7 @@ bool aio_poll(AioContext *ctx, bool blocking) /* wait until next event */ while (count > 0) { + HANDLE event; int ret; timeout = blocking @@ -196,13 +332,17 @@ bool aio_poll(AioContext *ctx, bool blocking) first = false; /* if we have any signaled events, dispatch event */ - if ((DWORD) (ret - WAIT_OBJECT_0) >= count) { + event = NULL; + if ((DWORD) (ret - WAIT_OBJECT_0) < count) { + event = events[ret - WAIT_OBJECT_0]; + } else if (!have_select_revents) { break; } + have_select_revents = false; blocking = false; - progress |= aio_dispatch_handlers(ctx, events[ret - WAIT_OBJECT_0]); + progress |= aio_dispatch_handlers(ctx, event); /* Try again, but only call each handler once. */ events[ret - WAIT_OBJECT_0] = events[--count]; diff --git a/block/Makefile.objs b/block/Makefile.objs index 858d2b387b..f45f9399aa 100644 --- a/block/Makefile.objs +++ b/block/Makefile.objs @@ -10,7 +10,6 @@ block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o block-obj-$(CONFIG_POSIX) += raw-posix.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o -ifeq ($(CONFIG_POSIX),y) block-obj-y += nbd.o nbd-client.o sheepdog.o block-obj-$(CONFIG_LIBISCSI) += iscsi.o block-obj-$(CONFIG_LIBNFS) += nfs.o @@ -19,7 +18,6 @@ block-obj-$(CONFIG_RBD) += rbd.o block-obj-$(CONFIG_GLUSTERFS) += gluster.o block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o block-obj-$(CONFIG_LIBSSH2) += ssh.o -endif common-obj-y += stream.o common-obj-y += commit.o diff --git a/include/block/aio.h b/include/block/aio.h index ef4197b861..4603c0f066 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -239,7 +239,6 @@ bool aio_dispatch(AioContext *ctx); */ bool aio_poll(AioContext *ctx, bool blocking); -#ifdef CONFIG_POSIX /* Register a file descriptor and associated callbacks. Behaves very similarly * to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will * be invoked when using aio_poll(). @@ -252,7 +251,6 @@ void aio_set_fd_handler(AioContext *ctx, IOHandler *io_read, IOHandler *io_write, void *opaque); -#endif /* Register an event notifier and associated callbacks. Behaves very similarly * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks |