aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2014-07-09 11:53:10 +0200
committerStefan Hajnoczi <stefanha@redhat.com>2014-08-29 10:46:58 +0100
commitb493317d344357f7ac56606246d09b5604e54ab6 (patch)
tree70d229bfc876df17b21218be917394dd384cdb8a
parent79d9b6566b90efac072720f37a1b57d73f539264 (diff)
aio-win32: add support for sockets
Uses the same select/WSAEventSelect scheme as main-loop.c. WSAEventSelect() is edge-triggered, so it cannot be used directly, but it is still used as a way to exit from a blocking g_poll(). Before g_poll() is called, we poll sockets with a non-blocking select() to achieve the level-triggered semantics we require: if a socket is ready, the g_poll() is made non-blocking too. Based on a patch from Or Goshen. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
-rw-r--r--aio-win32.c150
-rw-r--r--block/Makefile.objs2
-rw-r--r--include/block/aio.h2
3 files changed, 145 insertions, 9 deletions
diff --git a/aio-win32.c b/aio-win32.c
index 45422704a4..61e3d2ddfe 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -22,12 +22,80 @@
struct AioHandler {
EventNotifier *e;
+ IOHandler *io_read;
+ IOHandler *io_write;
EventNotifierHandler *io_notify;
GPollFD pfd;
int deleted;
+ void *opaque;
QLIST_ENTRY(AioHandler) node;
};
+void aio_set_fd_handler(AioContext *ctx,
+ int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ /* fd is a SOCKET in our case */
+ AioHandler *node;
+
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ if (node->pfd.fd == fd && !node->deleted) {
+ break;
+ }
+ }
+
+ /* Are we deleting the fd handler? */
+ if (!io_read && !io_write) {
+ if (node) {
+ /* If the lock is held, just mark the node as deleted */
+ if (ctx->walking_handlers) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ } else {
+ /* Otherwise, delete it for real. We can't just mark it as
+ * deleted because deleted nodes are only cleaned up after
+ * releasing the walking_handlers lock.
+ */
+ QLIST_REMOVE(node, node);
+ g_free(node);
+ }
+ }
+ } else {
+ HANDLE event;
+
+ if (node == NULL) {
+ /* Alloc and insert if it's not already there */
+ node = g_malloc0(sizeof(AioHandler));
+ node->pfd.fd = fd;
+ QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ }
+
+ node->pfd.events = 0;
+ if (node->io_read) {
+ node->pfd.events |= G_IO_IN;
+ }
+ if (node->io_write) {
+ node->pfd.events |= G_IO_OUT;
+ }
+
+ node->e = &ctx->notifier;
+
+ /* Update handler with latest information */
+ node->opaque = opaque;
+ node->io_read = io_read;
+ node->io_write = io_write;
+
+ event = event_notifier_get_handle(&ctx->notifier);
+ WSAEventSelect(node->pfd.fd, event,
+ FD_READ | FD_ACCEPT | FD_CLOSE |
+ FD_CONNECT | FD_WRITE | FD_OOB);
+ }
+
+ aio_notify(ctx);
+}
+
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
EventNotifierHandler *io_notify)
@@ -78,7 +146,39 @@ void aio_set_event_notifier(AioContext *ctx,
bool aio_prepare(AioContext *ctx)
{
- return false;
+ static struct timeval tv0;
+ AioHandler *node;
+ bool have_select_revents = false;
+ fd_set rfds, wfds;
+
+ /* fill fd sets */
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ if (node->io_read) {
+ FD_SET ((SOCKET)node->pfd.fd, &rfds);
+ }
+ if (node->io_write) {
+ FD_SET ((SOCKET)node->pfd.fd, &wfds);
+ }
+ }
+
+ if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ node->pfd.revents = 0;
+ if (FD_ISSET(node->pfd.fd, &rfds)) {
+ node->pfd.revents |= G_IO_IN;
+ have_select_revents = true;
+ }
+
+ if (FD_ISSET(node->pfd.fd, &wfds)) {
+ node->pfd.revents |= G_IO_OUT;
+ have_select_revents = true;
+ }
+ }
+ }
+
+ return have_select_revents;
}
bool aio_pending(AioContext *ctx)
@@ -89,6 +189,13 @@ bool aio_pending(AioContext *ctx)
if (node->pfd.revents && node->io_notify) {
return true;
}
+
+ if ((node->pfd.revents & G_IO_IN) && node->io_read) {
+ return true;
+ }
+ if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
+ return true;
+ }
}
return false;
@@ -106,11 +213,12 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
+ int revents = node->pfd.revents;
ctx->walking_handlers++;
if (!node->deleted &&
- (node->pfd.revents || event_notifier_get_handle(node->e) == event) &&
+ (revents || event_notifier_get_handle(node->e) == event) &&
node->io_notify) {
node->pfd.revents = 0;
node->io_notify(node->e);
@@ -121,6 +229,28 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
+ if (!node->deleted &&
+ (node->io_read || node->io_write)) {
+ node->pfd.revents = 0;
+ if ((revents & G_IO_IN) && node->io_read) {
+ node->io_read(node->opaque);
+ progress = true;
+ }
+ if ((revents & G_IO_OUT) && node->io_write) {
+ node->io_write(node->opaque);
+ progress = true;
+ }
+
+ /* if the next select() will return an event, we have progressed */
+ if (event == event_notifier_get_handle(&ctx->notifier)) {
+ WSANETWORKEVENTS ev;
+ WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
+ if (ev.lNetworkEvents) {
+ progress = true;
+ }
+ }
+ }
+
tmp = node;
node = QLIST_NEXT(node, node);
@@ -149,10 +279,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
- bool was_dispatching, progress, first;
+ bool was_dispatching, progress, have_select_revents, first;
int count;
int timeout;
+ if (aio_prepare(ctx)) {
+ blocking = false;
+ have_select_revents = true;
+ }
+
was_dispatching = ctx->dispatching;
progress = false;
@@ -183,6 +318,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* wait until next event */
while (count > 0) {
+ HANDLE event;
int ret;
timeout = blocking
@@ -196,13 +332,17 @@ bool aio_poll(AioContext *ctx, bool blocking)
first = false;
/* if we have any signaled events, dispatch event */
- if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
+ event = NULL;
+ if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
+ event = events[ret - WAIT_OBJECT_0];
+ } else if (!have_select_revents) {
break;
}
+ have_select_revents = false;
blocking = false;
- progress |= aio_dispatch_handlers(ctx, events[ret - WAIT_OBJECT_0]);
+ progress |= aio_dispatch_handlers(ctx, event);
/* Try again, but only call each handler once. */
events[ret - WAIT_OBJECT_0] = events[--count];
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 858d2b387b..f45f9399aa 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -10,7 +10,6 @@ block-obj-$(CONFIG_WIN32) += raw-win32.o win32-aio.o
block-obj-$(CONFIG_POSIX) += raw-posix.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
-ifeq ($(CONFIG_POSIX),y)
block-obj-y += nbd.o nbd-client.o sheepdog.o
block-obj-$(CONFIG_LIBISCSI) += iscsi.o
block-obj-$(CONFIG_LIBNFS) += nfs.o
@@ -19,7 +18,6 @@ block-obj-$(CONFIG_RBD) += rbd.o
block-obj-$(CONFIG_GLUSTERFS) += gluster.o
block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
block-obj-$(CONFIG_LIBSSH2) += ssh.o
-endif
common-obj-y += stream.o
common-obj-y += commit.o
diff --git a/include/block/aio.h b/include/block/aio.h
index ef4197b861..4603c0f066 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -239,7 +239,6 @@ bool aio_dispatch(AioContext *ctx);
*/
bool aio_poll(AioContext *ctx, bool blocking);
-#ifdef CONFIG_POSIX
/* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
* be invoked when using aio_poll().
@@ -252,7 +251,6 @@ void aio_set_fd_handler(AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
-#endif
/* Register an event notifier and associated callbacks. Behaves very similarly
* to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks