aboutsummaryrefslogtreecommitdiff
path: root/aio-win32.c
diff options
context:
space:
mode:
Diffstat (limited to 'aio-win32.c')
-rw-r--r--aio-win32.c150
1 files changed, 145 insertions, 5 deletions
diff --git a/aio-win32.c b/aio-win32.c
index 45422704a4..61e3d2ddfe 100644
--- a/aio-win32.c
+++ b/aio-win32.c
@@ -22,12 +22,80 @@
struct AioHandler {
EventNotifier *e;
+ IOHandler *io_read;
+ IOHandler *io_write;
EventNotifierHandler *io_notify;
GPollFD pfd;
int deleted;
+ void *opaque;
QLIST_ENTRY(AioHandler) node;
};
+void aio_set_fd_handler(AioContext *ctx,
+ int fd,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ /* fd is a SOCKET in our case */
+ AioHandler *node;
+
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ if (node->pfd.fd == fd && !node->deleted) {
+ break;
+ }
+ }
+
+ /* Are we deleting the fd handler? */
+ if (!io_read && !io_write) {
+ if (node) {
+ /* If the lock is held, just mark the node as deleted */
+ if (ctx->walking_handlers) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ } else {
+ /* Otherwise, delete it for real. We can't just mark it as
+ * deleted because deleted nodes are only cleaned up after
+ * releasing the walking_handlers lock.
+ */
+ QLIST_REMOVE(node, node);
+ g_free(node);
+ }
+ }
+ } else {
+ HANDLE event;
+
+ if (node == NULL) {
+ /* Alloc and insert if it's not already there */
+ node = g_malloc0(sizeof(AioHandler));
+ node->pfd.fd = fd;
+ QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ }
+
+ node->pfd.events = 0;
+ if (node->io_read) {
+ node->pfd.events |= G_IO_IN;
+ }
+ if (node->io_write) {
+ node->pfd.events |= G_IO_OUT;
+ }
+
+ node->e = &ctx->notifier;
+
+ /* Update handler with latest information */
+ node->opaque = opaque;
+ node->io_read = io_read;
+ node->io_write = io_write;
+
+ event = event_notifier_get_handle(&ctx->notifier);
+ WSAEventSelect(node->pfd.fd, event,
+ FD_READ | FD_ACCEPT | FD_CLOSE |
+ FD_CONNECT | FD_WRITE | FD_OOB);
+ }
+
+ aio_notify(ctx);
+}
+
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
EventNotifierHandler *io_notify)
@@ -78,7 +146,39 @@ void aio_set_event_notifier(AioContext *ctx,
bool aio_prepare(AioContext *ctx)
{
- return false;
+ static struct timeval tv0;
+ AioHandler *node;
+ bool have_select_revents = false;
+ fd_set rfds, wfds;
+
+ /* fill fd sets */
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ if (node->io_read) {
+ FD_SET ((SOCKET)node->pfd.fd, &rfds);
+ }
+ if (node->io_write) {
+ FD_SET ((SOCKET)node->pfd.fd, &wfds);
+ }
+ }
+
+ if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
+ QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ node->pfd.revents = 0;
+ if (FD_ISSET(node->pfd.fd, &rfds)) {
+ node->pfd.revents |= G_IO_IN;
+ have_select_revents = true;
+ }
+
+ if (FD_ISSET(node->pfd.fd, &wfds)) {
+ node->pfd.revents |= G_IO_OUT;
+ have_select_revents = true;
+ }
+ }
+ }
+
+ return have_select_revents;
}
bool aio_pending(AioContext *ctx)
@@ -89,6 +189,13 @@ bool aio_pending(AioContext *ctx)
if (node->pfd.revents && node->io_notify) {
return true;
}
+
+ if ((node->pfd.revents & G_IO_IN) && node->io_read) {
+ return true;
+ }
+ if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
+ return true;
+ }
}
return false;
@@ -106,11 +213,12 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
node = QLIST_FIRST(&ctx->aio_handlers);
while (node) {
AioHandler *tmp;
+ int revents = node->pfd.revents;
ctx->walking_handlers++;
if (!node->deleted &&
- (node->pfd.revents || event_notifier_get_handle(node->e) == event) &&
+ (revents || event_notifier_get_handle(node->e) == event) &&
node->io_notify) {
node->pfd.revents = 0;
node->io_notify(node->e);
@@ -121,6 +229,28 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
+ if (!node->deleted &&
+ (node->io_read || node->io_write)) {
+ node->pfd.revents = 0;
+ if ((revents & G_IO_IN) && node->io_read) {
+ node->io_read(node->opaque);
+ progress = true;
+ }
+ if ((revents & G_IO_OUT) && node->io_write) {
+ node->io_write(node->opaque);
+ progress = true;
+ }
+
+ /* if the next select() will return an event, we have progressed */
+ if (event == event_notifier_get_handle(&ctx->notifier)) {
+ WSANETWORKEVENTS ev;
+ WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
+ if (ev.lNetworkEvents) {
+ progress = true;
+ }
+ }
+ }
+
tmp = node;
node = QLIST_NEXT(node, node);
@@ -149,10 +279,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
- bool was_dispatching, progress, first;
+ bool was_dispatching, progress, have_select_revents, first;
int count;
int timeout;
+ if (aio_prepare(ctx)) {
+ blocking = false;
+ have_select_revents = true;
+ }
+
was_dispatching = ctx->dispatching;
progress = false;
@@ -183,6 +318,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* wait until next event */
while (count > 0) {
+ HANDLE event;
int ret;
timeout = blocking
@@ -196,13 +332,17 @@ bool aio_poll(AioContext *ctx, bool blocking)
first = false;
/* if we have any signaled events, dispatch event */
- if ((DWORD) (ret - WAIT_OBJECT_0) >= count) {
+ event = NULL;
+ if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
+ event = events[ret - WAIT_OBJECT_0];
+ } else if (!have_select_revents) {
break;
}
+ have_select_revents = false;
blocking = false;
- progress |= aio_dispatch_handlers(ctx, events[ret - WAIT_OBJECT_0]);
+ progress |= aio_dispatch_handlers(ctx, event);
/* Try again, but only call each handler once. */
events[ret - WAIT_OBJECT_0] = events[--count];