diff options
Diffstat (limited to 'nbd.c')
-rw-r--r-- | nbd.c | 105 |
1 files changed, 85 insertions, 20 deletions
@@ -18,6 +18,7 @@ #include "block/nbd.h" #include "block/block.h" +#include "block/block_int.h" #include "block/coroutine.h" @@ -107,6 +108,8 @@ struct NBDExport { uint32_t nbdflags; QTAILQ_HEAD(, NBDClient) clients; QTAILQ_ENTRY(NBDExport) next; + + AioContext *ctx; }; static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); @@ -123,6 +126,8 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + bool can_read; + QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -130,6 +135,10 @@ struct NBDClient { /* That's all folks */ +static void nbd_set_handlers(NBDClient *client); +static void nbd_unset_handlers(NBDClient *client); +static void nbd_update_can_read(NBDClient *client); + ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; @@ -156,7 +165,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) err = socket_error(); /* recoverable error */ - if (err == EINTR || (offset > 0 && err == EAGAIN)) { + if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) { continue; } @@ -862,7 +871,7 @@ void nbd_client_put(NBDClient *client) */ assert(client->closing); - qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + nbd_unset_handlers(client); close(client->sock); client->sock = -1; if (client->exp) { @@ -898,6 +907,7 @@ static NBDRequest *nbd_request_get(NBDClient *client) assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; + nbd_update_can_read(client); req = g_slice_new0(NBDRequest); nbd_client_get(client); @@ -914,12 +924,39 @@ static void nbd_request_put(NBDRequest *req) } g_slice_free(NBDRequest, req); - if (client->nb_requests-- == MAX_NBD_REQUESTS) { - qemu_notify_event(); - } + client->nb_requests--; + nbd_update_can_read(client); nbd_client_put(client); } +static void bs_aio_attached(AioContext *ctx, void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx); + + exp->ctx = ctx; + + QTAILQ_FOREACH(client, &exp->clients, next) { + nbd_set_handlers(client); + } +} + +static void bs_aio_detach(void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx); + + QTAILQ_FOREACH(client, &exp->clients, next) { + nbd_unset_handlers(client); + } + + exp->ctx = NULL; +} + NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size, uint32_t nbdflags, void (*close)(NBDExport *)) @@ -932,7 +969,9 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, exp->nbdflags = nbdflags; exp->size = size == -1 ? bdrv_getlength(bs) : size; exp->close = close; + exp->ctx = bdrv_get_aio_context(bs); bdrv_ref(bs); + bdrv_add_aio_context_notifier(bs, bs_aio_attached, bs_aio_detach, exp); return exp; } @@ -980,6 +1019,8 @@ void nbd_export_close(NBDExport *exp) nbd_export_set_name(exp, NULL); nbd_export_put(exp); if (exp->bs) { + bdrv_remove_aio_context_notifier(exp->bs, bs_aio_attached, + bs_aio_detach, exp); bdrv_unref(exp->bs); exp->bs = NULL; } @@ -1023,10 +1064,6 @@ void nbd_export_close_all(void) } } -static int nbd_can_read(void *opaque); -static void nbd_read(void *opaque); -static void nbd_restart_write(void *opaque); - static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int len) { @@ -1035,9 +1072,8 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, ssize_t rc, ret; qemu_co_mutex_lock(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, - nbd_restart_write, client); client->send_coroutine = qemu_coroutine_self(); + nbd_set_handlers(client); if (!len) { rc = nbd_send_reply(csock, reply); @@ -1054,7 +1090,7 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, } client->send_coroutine = NULL; - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -1067,6 +1103,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque ssize_t rc; client->recv_coroutine = qemu_coroutine_self(); + nbd_update_can_read(client); + rc = nbd_receive_request(csock, request); if (rc < 0) { if (rc != -EAGAIN) { @@ -1108,6 +1146,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque out: client->recv_coroutine = NULL; + nbd_update_can_read(client); + return rc; } @@ -1259,13 +1299,6 @@ out: nbd_client_close(client); } -static int nbd_can_read(void *opaque) -{ - NBDClient *client = opaque; - - return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS; -} - static void nbd_read(void *opaque) { NBDClient *client = opaque; @@ -1284,6 +1317,37 @@ static void nbd_restart_write(void *opaque) qemu_coroutine_enter(client->send_coroutine, NULL); } +static void nbd_set_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, + client->can_read ? nbd_read : NULL, + client->send_coroutine ? nbd_restart_write : NULL, + client); + } +} + +static void nbd_unset_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, NULL, NULL, NULL); + } +} + +static void nbd_update_can_read(NBDClient *client) +{ + bool can_read = client->recv_coroutine || + client->nb_requests < MAX_NBD_REQUESTS; + + if (can_read != client->can_read) { + client->can_read = can_read; + nbd_set_handlers(client); + + /* There is no need to invoke aio_notify(), since aio_set_fd_handler() + * in nbd_set_handlers() will have taken care of that */ + } +} + NBDClient *nbd_client_new(NBDExport *exp, int csock, void (*close)(NBDClient *)) { @@ -1292,13 +1356,14 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, client->refcount = 1; client->exp = exp; client->sock = csock; + client->can_read = true; if (nbd_send_negotiate(client)) { g_free(client); return NULL; } client->close = close; qemu_co_mutex_init(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); if (exp) { QTAILQ_INSERT_TAIL(&exp->clients, client, next); |