diff options
-rw-r--r-- | nbd.c | 72 |
1 files changed, 53 insertions, 19 deletions
@@ -18,6 +18,7 @@ #include "block/nbd.h" #include "block/block.h" +#include "block/block_int.h" #include "block/coroutine.h" @@ -107,6 +108,8 @@ struct NBDExport { uint32_t nbdflags; QTAILQ_HEAD(, NBDClient) clients; QTAILQ_ENTRY(NBDExport) next; + + AioContext *ctx; }; static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); @@ -123,6 +126,8 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + bool can_read; + QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -130,6 +135,10 @@ struct NBDClient { /* That's all folks */ +static void nbd_set_handlers(NBDClient *client); +static void nbd_unset_handlers(NBDClient *client); +static void nbd_update_can_read(NBDClient *client); + ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; @@ -862,7 +871,7 @@ void nbd_client_put(NBDClient *client) */ assert(client->closing); - qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + nbd_unset_handlers(client); close(client->sock); client->sock = -1; if (client->exp) { @@ -898,6 +907,7 @@ static NBDRequest *nbd_request_get(NBDClient *client) assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; + nbd_update_can_read(client); req = g_slice_new0(NBDRequest); nbd_client_get(client); @@ -914,9 +924,8 @@ static void nbd_request_put(NBDRequest *req) } g_slice_free(NBDRequest, req); - if (client->nb_requests-- == MAX_NBD_REQUESTS) { - qemu_notify_event(); - } + client->nb_requests--; + nbd_update_can_read(client); nbd_client_put(client); } @@ -932,6 +941,7 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, exp->nbdflags = nbdflags; exp->size = size == -1 ? bdrv_getlength(bs) : size; exp->close = close; + exp->ctx = bdrv_get_aio_context(bs); bdrv_ref(bs); return exp; } @@ -1023,10 +1033,6 @@ void nbd_export_close_all(void) } } -static int nbd_can_read(void *opaque); -static void nbd_read(void *opaque); -static void nbd_restart_write(void *opaque); - static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int len) { @@ -1035,9 +1041,8 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, ssize_t rc, ret; qemu_co_mutex_lock(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, - nbd_restart_write, client); client->send_coroutine = qemu_coroutine_self(); + nbd_set_handlers(client); if (!len) { rc = nbd_send_reply(csock, reply); @@ -1054,7 +1059,7 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, } client->send_coroutine = NULL; - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -1067,6 +1072,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque ssize_t rc; client->recv_coroutine = qemu_coroutine_self(); + nbd_update_can_read(client); + rc = nbd_receive_request(csock, request); if (rc < 0) { if (rc != -EAGAIN) { @@ -1108,6 +1115,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque out: client->recv_coroutine = NULL; + nbd_update_can_read(client); + return rc; } @@ -1259,13 +1268,6 @@ out: nbd_client_close(client); } -static int nbd_can_read(void *opaque) -{ - NBDClient *client = opaque; - - return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS; -} - static void nbd_read(void *opaque) { NBDClient *client = opaque; @@ -1284,6 +1286,37 @@ static void nbd_restart_write(void *opaque) qemu_coroutine_enter(client->send_coroutine, NULL); } +static void nbd_set_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, + client->can_read ? nbd_read : NULL, + client->send_coroutine ? nbd_restart_write : NULL, + client); + } +} + +static void nbd_unset_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, NULL, NULL, NULL); + } +} + +static void nbd_update_can_read(NBDClient *client) +{ + bool can_read = client->recv_coroutine || + client->nb_requests < MAX_NBD_REQUESTS; + + if (can_read != client->can_read) { + client->can_read = can_read; + nbd_set_handlers(client); + + /* There is no need to invoke aio_notify(), since aio_set_fd_handler() + * in nbd_set_handlers() will have taken care of that */ + } +} + NBDClient *nbd_client_new(NBDExport *exp, int csock, void (*close)(NBDClient *)) { @@ -1292,13 +1325,14 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, client->refcount = 1; client->exp = exp; client->sock = csock; + client->can_read = true; if (nbd_send_negotiate(client)) { g_free(client); return NULL; } client->close = close; qemu_co_mutex_init(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); if (exp) { QTAILQ_INSERT_TAIL(&exp->clients, client, next); |