diff options
-rw-r--r-- | block/export/vhost-user-blk-server.c | 9 | ||||
-rw-r--r-- | util/vhost-user-server.c | 243 | ||||
-rw-r--r-- | util/vhost-user-server.h | 29 |
3 files changed, 154 insertions, 127 deletions
diff --git a/block/export/vhost-user-blk-server.c b/block/export/vhost-user-blk-server.c index c8fa4ecba9..4d35232bf3 100644 --- a/block/export/vhost-user-blk-server.c +++ b/block/export/vhost-user-blk-server.c @@ -313,18 +313,13 @@ static const VuDevIface vu_block_iface = { static void blk_aio_attached(AioContext *ctx, void *opaque) { VuBlockDev *vub_dev = opaque; - aio_context_acquire(ctx); - vhost_user_server_set_aio_context(&vub_dev->vu_server, ctx); - aio_context_release(ctx); + vhost_user_server_attach_aio_context(&vub_dev->vu_server, ctx); } static void blk_aio_detach(void *opaque) { VuBlockDev *vub_dev = opaque; - AioContext *ctx = vub_dev->vu_server.ctx; - aio_context_acquire(ctx); - vhost_user_server_set_aio_context(&vub_dev->vu_server, NULL); - aio_context_release(ctx); + vhost_user_server_detach_aio_context(&vub_dev->vu_server); } static void diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c index 981908fef0..c448800e58 100644 --- a/util/vhost-user-server.c +++ b/util/vhost-user-server.c @@ -9,8 +9,50 @@ */ #include "qemu/osdep.h" #include "qemu/main-loop.h" +#include "block/aio-wait.h" #include "vhost-user-server.h" +/* + * Theory of operation: + * + * VuServer is started and stopped by vhost_user_server_start() and + * vhost_user_server_stop() from the main loop thread. Starting the server + * opens a vhost-user UNIX domain socket and listens for incoming connections. + * Only one connection is allowed at a time. + * + * The connection is handled by the vu_client_trip() coroutine in the + * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop + * where libvhost-user calls vu_message_read() to receive the next vhost-user + * protocol messages over the UNIX domain socket. + * + * When virtqueues are set up libvhost-user calls set_watch() to monitor kick + * fds. These fds are also handled in the VuServer->ctx AioContext. + * + * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down + * the socket connection. Shutting down the socket connection causes + * vu_message_read() to fail since no more data can be received from the socket. + * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop + * libvhost-user before terminating the coroutine. vu_deinit() calls + * remove_watch() to stop monitoring kick fds and this stops virtqueue + * processing. + * + * When vu_client_trip() has finished cleaning up it schedules a BH in the main + * loop thread to accept the next client connection. + * + * When libvhost-user detects an error it calls panic_cb() and sets the + * dev->broken flag. Both vu_client_trip() and kick fd processing stop when + * the dev->broken flag is set. + * + * It is possible to switch AioContexts using + * vhost_user_server_detach_aio_context() and + * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old + * AioContext and resume monitoring in the new AioContext. The vu_client_trip() + * coroutine remains in a yielded state during the switch. This is made + * possible by QIOChannel's support for spurious coroutine re-entry in + * qio_channel_yield(). The coroutine will restart I/O when re-entered from the + * new AioContext. + */ + static void vmsg_close_fds(VhostUserMsg *vmsg) { int i; @@ -27,68 +69,9 @@ static void vmsg_unblock_fds(VhostUserMsg *vmsg) } } -static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, - gpointer opaque); - -static void close_client(VuServer *server) -{ - /* - * Before closing the client - * - * 1. Let vu_client_trip stop processing new vhost-user msg - * - * 2. remove kick_handler - * - * 3. wait for the kick handler to be finished - * - * 4. wait for the current vhost-user msg to be finished processing - */ - - QIOChannelSocket *sioc = server->sioc; - /* When this is set vu_client_trip will stop new processing vhost-user message */ - server->sioc = NULL; - - while (server->processing_msg) { - if (server->ioc->read_coroutine) { - server->ioc->read_coroutine = NULL; - qio_channel_set_aio_fd_handler(server->ioc, server->ioc->ctx, NULL, - NULL, server->ioc); - server->processing_msg = false; - } - } - - vu_deinit(&server->vu_dev); - - /* vu_deinit() should have called remove_watch() */ - assert(QTAILQ_EMPTY(&server->vu_fd_watches)); - - object_unref(OBJECT(sioc)); - object_unref(OBJECT(server->ioc)); -} - static void panic_cb(VuDev *vu_dev, const char *buf) { - VuServer *server = container_of(vu_dev, VuServer, vu_dev); - - /* avoid while loop in close_client */ - server->processing_msg = false; - - if (buf) { - error_report("vu_panic: %s", buf); - } - - if (server->sioc) { - close_client(server); - } - - /* - * Set the callback function for network listener so another - * vhost-user client can connect to this server - */ - qio_net_listener_set_client_func(server->listener, - vu_accept, - server, - NULL); + error_report("vu_panic: %s", buf); } static bool coroutine_fn @@ -185,28 +168,31 @@ fail: return false; } - -static void vu_client_start(VuServer *server); static coroutine_fn void vu_client_trip(void *opaque) { VuServer *server = opaque; + VuDev *vu_dev = &server->vu_dev; - while (!server->aio_context_changed && server->sioc) { - server->processing_msg = true; - vu_dispatch(&server->vu_dev); - server->processing_msg = false; + while (!vu_dev->broken && vu_dispatch(vu_dev)) { + /* Keep running */ } - if (server->aio_context_changed && server->sioc) { - server->aio_context_changed = false; - vu_client_start(server); - } -} + vu_deinit(vu_dev); -static void vu_client_start(VuServer *server) -{ - server->co_trip = qemu_coroutine_create(vu_client_trip, server); - aio_co_enter(server->ctx, server->co_trip); + /* vu_deinit() should have called remove_watch() */ + assert(QTAILQ_EMPTY(&server->vu_fd_watches)); + + object_unref(OBJECT(server->sioc)); + server->sioc = NULL; + + object_unref(OBJECT(server->ioc)); + server->ioc = NULL; + + server->co_trip = NULL; + if (server->restart_listener_bh) { + qemu_bh_schedule(server->restart_listener_bh); + } + aio_wait_kick(); } /* @@ -219,11 +205,17 @@ static void vu_client_start(VuServer *server) static void kick_handler(void *opaque) { VuFdWatch *vu_fd_watch = opaque; - vu_fd_watch->processing = true; - vu_fd_watch->cb(vu_fd_watch->vu_dev, 0, vu_fd_watch->pvt); - vu_fd_watch->processing = false; -} + VuDev *vu_dev = vu_fd_watch->vu_dev; + vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); + + /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ + if (vu_dev->broken) { + VuServer *server = container_of(vu_dev, VuServer, vu_dev); + + qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + } +} static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) { @@ -319,61 +311,94 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); server->ioc = QIO_CHANNEL(sioc); object_ref(OBJECT(server->ioc)); - qio_channel_attach_aio_context(server->ioc, server->ctx); + + /* TODO vu_message_write() spins if non-blocking! */ qio_channel_set_blocking(server->ioc, false, NULL); - vu_client_start(server); -} + server->co_trip = qemu_coroutine_create(vu_client_trip, server); + + aio_context_acquire(server->ctx); + vhost_user_server_attach_aio_context(server, server->ctx); + aio_context_release(server->ctx); +} void vhost_user_server_stop(VuServer *server) { + aio_context_acquire(server->ctx); + + qemu_bh_delete(server->restart_listener_bh); + server->restart_listener_bh = NULL; + if (server->sioc) { - close_client(server); + VuFdWatch *vu_fd_watch; + + QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { + aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, + NULL, NULL, NULL, vu_fd_watch); + } + + qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + + AIO_WAIT_WHILE(server->ctx, server->co_trip); } + aio_context_release(server->ctx); + if (server->listener) { qio_net_listener_disconnect(server->listener); object_unref(OBJECT(server->listener)); } +} + +/* + * Allow the next client to connect to the server. Called from a BH in the main + * loop. + */ +static void restart_listener_bh(void *opaque) +{ + VuServer *server = opaque; + qio_net_listener_set_client_func(server->listener, vu_accept, server, + NULL); } -void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx) +/* Called with ctx acquired */ +void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) { - VuFdWatch *vu_fd_watch, *next; - void *opaque = NULL; - IOHandler *io_read = NULL; - bool attach; + VuFdWatch *vu_fd_watch; - server->ctx = ctx ? ctx : qemu_get_aio_context(); + server->ctx = ctx; if (!server->sioc) { - /* not yet serving any client*/ return; } - if (ctx) { - qio_channel_attach_aio_context(server->ioc, ctx); - server->aio_context_changed = true; - io_read = kick_handler; - attach = true; - } else { - qio_channel_detach_aio_context(server->ioc); - /* server->ioc->ctx keeps the old AioConext */ - ctx = server->ioc->ctx; - attach = false; + qio_channel_attach_aio_context(server->ioc, ctx); + + QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { + aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL, + NULL, vu_fd_watch); } - QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { - if (vu_fd_watch->cb) { - opaque = attach ? vu_fd_watch : NULL; - aio_set_fd_handler(ctx, vu_fd_watch->fd, true, - io_read, NULL, NULL, - opaque); + aio_co_schedule(ctx, server->co_trip); +} + +/* Called with server->ctx acquired */ +void vhost_user_server_detach_aio_context(VuServer *server) +{ + if (server->sioc) { + VuFdWatch *vu_fd_watch; + + QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { + aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, + NULL, NULL, NULL, vu_fd_watch); } + + qio_channel_detach_aio_context(server->ioc); } -} + server->ctx = NULL; +} bool vhost_user_server_start(VuServer *server, SocketAddress *socket_addr, @@ -382,6 +407,7 @@ bool vhost_user_server_start(VuServer *server, const VuDevIface *vu_iface, Error **errp) { + QEMUBH *bh; QIONetListener *listener = qio_net_listener_new(); if (qio_net_listener_open_sync(listener, socket_addr, 1, errp) < 0) { @@ -389,9 +415,12 @@ bool vhost_user_server_start(VuServer *server, return false; } + bh = qemu_bh_new(restart_listener_bh, server); + /* zero out unspecified fields */ *server = (VuServer) { .listener = listener, + .restart_listener_bh = bh, .vu_iface = vu_iface, .max_queues = max_queues, .ctx = ctx, diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h index 92177fc911..0da4c2cc4c 100644 --- a/util/vhost-user-server.h +++ b/util/vhost-user-server.h @@ -19,34 +19,36 @@ #include "qapi/error.h" #include "standard-headers/linux/virtio_blk.h" +/* A kick fd that we monitor on behalf of libvhost-user */ typedef struct VuFdWatch { VuDev *vu_dev; int fd; /*kick fd*/ void *pvt; vu_watch_cb cb; - bool processing; QTAILQ_ENTRY(VuFdWatch) next; } VuFdWatch; -typedef struct VuServer VuServer; - -struct VuServer { +/** + * VuServer: + * A vhost-user server instance with user-defined VuDevIface callbacks. + * Vhost-user device backends can be implemented using VuServer. VuDevIface + * callbacks and virtqueue kicks run in the given AioContext. + */ +typedef struct { QIONetListener *listener; + QEMUBH *restart_listener_bh; AioContext *ctx; int max_queues; const VuDevIface *vu_iface; + + /* Protected by ctx lock */ VuDev vu_dev; QIOChannel *ioc; /* The I/O channel with the client */ QIOChannelSocket *sioc; /* The underlying data channel with the client */ - /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */ - QIOChannel *ioc_slave; - QIOChannelSocket *sioc_slave; - Coroutine *co_trip; /* coroutine for processing VhostUserMsg */ QTAILQ_HEAD(, VuFdWatch) vu_fd_watches; - /* restart coroutine co_trip if AIOContext is changed */ - bool aio_context_changed; - bool processing_msg; -}; + + Coroutine *co_trip; /* coroutine for processing VhostUserMsg */ +} VuServer; bool vhost_user_server_start(VuServer *server, SocketAddress *unix_socket, @@ -57,6 +59,7 @@ bool vhost_user_server_start(VuServer *server, void vhost_user_server_stop(VuServer *server); -void vhost_user_server_set_aio_context(VuServer *server, AioContext *ctx); +void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx); +void vhost_user_server_detach_aio_context(VuServer *server); #endif /* VHOST_USER_SERVER_H */ |