diff options
author | Paolo Bonzini <pbonzini@redhat.com> | 2017-02-13 14:52:24 +0100 |
---|---|---|
committer | Stefan Hajnoczi <stefanha@redhat.com> | 2017-02-21 11:14:08 +0000 |
commit | ff82911cd3f69f028f2537825c9720ff78bc3f19 (patch) | |
tree | 68ea48537278208b0246e42061fcf2ac9b22cf1d /nbd | |
parent | c4c497d27f0be8552ae244e76ba2bce66bd2443e (diff) |
nbd: convert to use qio_channel_yield
In the client, read the reply headers from a coroutine, switching the
read side between the "read header" coroutine and the I/O coroutine that
reads the body of the reply.
In the server, if the server can read more requests it will create a new
"read request" coroutine as soon as a request has been read. Otherwise,
the new coroutine is created in nbd_request_put.
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
Message-id: 20170213135235.12274-8-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'nbd')
-rw-r--r-- | nbd/client.c | 2 | ||||
-rw-r--r-- | nbd/common.c | 9 | ||||
-rw-r--r-- | nbd/server.c | 94 |
3 files changed, 30 insertions, 75 deletions
diff --git a/nbd/client.c b/nbd/client.c index ffb0743bce..5c9dee37fa 100644 --- a/nbd/client.c +++ b/nbd/client.c @@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply) ssize_t ret; ret = read_sync(ioc, buf, sizeof(buf)); - if (ret < 0) { + if (ret <= 0) { return ret; } diff --git a/nbd/common.c b/nbd/common.c index a5f39ea58e..dccbb8e9de 100644 --- a/nbd/common.c +++ b/nbd/common.c @@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc, } if (len == QIO_CHANNEL_ERR_BLOCK) { if (qemu_in_coroutine()) { - /* XXX figure out if we can create a variant on - * qio_channel_yield() that works with AIO contexts - * and consider using that in this branch */ - qemu_coroutine_yield(); - } else if (done) { - /* XXX this is needed by nbd_reply_ready. */ - qio_channel_wait(ioc, - do_read ? G_IO_IN : G_IO_OUT); + qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT); } else { return -EAGAIN; } diff --git a/nbd/server.c b/nbd/server.c index efe5cb82c9..ac92fa0727 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -95,8 +95,6 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; - bool can_read; - QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -104,9 +102,7 @@ struct NBDClient { /* That's all folks */ -static void nbd_set_handlers(NBDClient *client); -static void nbd_unset_handlers(NBDClient *client); -static void nbd_update_can_read(NBDClient *client); +static void nbd_client_receive_next_request(NBDClient *client); static gboolean nbd_negotiate_continue(QIOChannel *ioc, GIOCondition condition, @@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client) */ assert(client->closing); - nbd_unset_handlers(client); + qio_channel_detach_aio_context(client->ioc); object_unref(OBJECT(client->sioc)); object_unref(OBJECT(client->ioc)); if (client->tlscreds) { @@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client) assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; - nbd_update_can_read(client); req = g_new0(NBDRequestData, 1); nbd_client_get(client); @@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req) g_free(req); client->nb_requests--; - nbd_update_can_read(client); + nbd_client_receive_next_request(client); + nbd_client_put(client); } @@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) exp->ctx = ctx; QTAILQ_FOREACH(client, &exp->clients, next) { - nbd_set_handlers(client); + qio_channel_attach_aio_context(client->ioc, ctx); + if (client->recv_coroutine) { + aio_co_schedule(ctx, client->recv_coroutine); + } + if (client->send_coroutine) { + aio_co_schedule(ctx, client->send_coroutine); + } } } @@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque) TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx); QTAILQ_FOREACH(client, &exp->clients, next) { - nbd_unset_handlers(client); + qio_channel_detach_aio_context(client->ioc); } exp->ctx = NULL; @@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply, g_assert(qemu_in_coroutine()); qemu_co_mutex_lock(&client->send_lock); client->send_coroutine = qemu_coroutine_self(); - nbd_set_handlers(client); if (!len) { rc = nbd_send_reply(client->ioc, reply); @@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply, } client->send_coroutine = NULL; - nbd_set_handlers(client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req, ssize_t rc; g_assert(qemu_in_coroutine()); - client->recv_coroutine = qemu_coroutine_self(); - nbd_update_can_read(client); - + assert(client->recv_coroutine == qemu_coroutine_self()); rc = nbd_receive_request(client->ioc, request); if (rc < 0) { if (rc != -EAGAIN) { @@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req, out: client->recv_coroutine = NULL; - nbd_update_can_read(client); + nbd_client_receive_next_request(client); return rc; } -static void nbd_trip(void *opaque) +/* Owns a reference to the NBDClient passed as opaque. */ +static coroutine_fn void nbd_trip(void *opaque) { NBDClient *client = opaque; NBDExport *exp = client->exp; NBDRequestData *req; - NBDRequest request; + NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */ NBDReply reply; ssize_t ret; int flags; TRACE("Reading request."); if (client->closing) { + nbd_client_put(client); return; } @@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque) done: nbd_request_put(req); + nbd_client_put(client); return; out: nbd_request_put(req); client_close(client); + nbd_client_put(client); } -static void nbd_read(void *opaque) -{ - NBDClient *client = opaque; - - if (client->recv_coroutine) { - qemu_coroutine_enter(client->recv_coroutine); - } else { - qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client)); - } -} - -static void nbd_restart_write(void *opaque) -{ - NBDClient *client = opaque; - - qemu_coroutine_enter(client->send_coroutine); -} - -static void nbd_set_handlers(NBDClient *client) -{ - if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, - client->can_read ? nbd_read : NULL, - client->send_coroutine ? nbd_restart_write : NULL, - NULL, client); - } -} - -static void nbd_unset_handlers(NBDClient *client) -{ - if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL, - NULL, NULL, NULL); - } -} - -static void nbd_update_can_read(NBDClient *client) +static void nbd_client_receive_next_request(NBDClient *client) { - bool can_read = client->recv_coroutine || - client->nb_requests < MAX_NBD_REQUESTS; - - if (can_read != client->can_read) { - client->can_read = can_read; - nbd_set_handlers(client); - - /* There is no need to invoke aio_notify(), since aio_set_fd_handler() - * in nbd_set_handlers() will have taken care of that */ + if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { + nbd_client_get(client); + client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); + aio_co_schedule(client->exp->ctx, client->recv_coroutine); } } @@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque) goto out; } qemu_co_mutex_init(&client->send_lock); - nbd_set_handlers(client); if (exp) { QTAILQ_INSERT_TAIL(&exp->clients, client, next); } + + nbd_client_receive_next_request(client); + out: g_free(data); } @@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp, object_ref(OBJECT(client->sioc)); client->ioc = QIO_CHANNEL(sioc); object_ref(OBJECT(client->ioc)); - client->can_read = true; client->close = close_fn; data->client = client; |