aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--block/nbd-client.c117
-rw-r--r--block/nbd-client.h2
-rw-r--r--nbd/client.c2
-rw-r--r--nbd/common.c9
-rw-r--r--nbd/server.c94
5 files changed, 83 insertions, 141 deletions
diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532805..10fcc9e81d 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i;
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
qemu_coroutine_enter(s->recv_coroutine[i]);
}
}
+ BDRV_POLL_WHILE(bs, s->read_reply_co);
}
static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
qio_channel_shutdown(client->ioc,
QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
- nbd_recv_coroutines_enter_all(client);
+ nbd_recv_coroutines_enter_all(bs);
nbd_client_detach_aio_context(bs);
object_unref(OBJECT(client->sioc));
@@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs)
client->ioc = NULL;
}
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
{
- BlockDriverState *bs = opaque;
- NBDClientSession *s = nbd_get_client_session(bs);
+ NBDClientSession *s = opaque;
uint64_t i;
int ret;
- if (!s->ioc) { /* Already closed */
- return;
- }
-
- if (s->reply.handle == 0) {
- /* No reply already in flight. Fetch a header. It is possible
- * that another thread has done the same thing in parallel, so
- * the socket is not readable anymore.
- */
+ for (;;) {
+ assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->ioc, &s->reply);
- if (ret == -EAGAIN) {
- return;
- }
if (ret < 0) {
- s->reply.handle = 0;
- goto fail;
+ break;
}
- }
- /* There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes. */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS) {
- goto fail;
- }
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes.
+ */
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+ break;
+ }
- if (s->recv_coroutine[i]) {
- qemu_coroutine_enter(s->recv_coroutine[i]);
- return;
+ /* We're woken up by the recv_coroutine itself. Note that there
+ * is no race between yielding and reentering read_reply_co. This
+ * is because:
+ *
+ * - if recv_coroutine[i] runs on the same AioContext, it is only
+ * entered after we yield
+ *
+ * - if recv_coroutine[i] runs on a different AioContext, reentering
+ * read_reply_co happens through a bottom half, which can only
+ * run after we yield.
+ */
+ aio_co_wake(s->recv_coroutine[i]);
+ qemu_coroutine_yield();
}
-
-fail:
- nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
- BlockDriverState *bs = opaque;
-
- qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+ s->read_reply_co = NULL;
}
static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
QEMUIOVector *qiov)
{
NBDClientSession *s = nbd_get_client_session(bs);
- AioContext *aio_context;
int rc, ret, i;
qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
return -EPIPE;
}
- s->send_coroutine = qemu_coroutine_self();
- aio_context = bdrv_get_aio_context(bs);
-
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, nbd_restart_write, NULL, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
@@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, NULL, NULL, bs);
- s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
{
int ret;
- /* Wait until we're woken up by the read handler. TODO: perhaps
- * peek at the next reply and avoid yielding if it's ours? */
+ /* Wait until we're woken up by nbd_read_reply_entry. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle ||
@@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* s->recv_coroutine[i] is set as soon as we get the send_lock. */
}
-static void nbd_coroutine_end(NBDClientSession *s,
+static void nbd_coroutine_end(BlockDriverState *bs,
NBDRequest *request)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i = HANDLE_TO_INDEX(s, request->handle);
+
s->recv_coroutine[i] = NULL;
- if (s->in_flight-- == MAX_NBD_REQUESTS) {
- qemu_co_queue_next(&s->free_sema);
+ s->in_flight--;
+ qemu_co_queue_next(&s->free_sema);
+
+ /* Kick the read_reply_co to get the next reply. */
+ if (s->read_reply_co) {
+ aio_co_wake(s->read_reply_co);
}
}
@@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, qiov);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
@@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
- aio_set_fd_handler(bdrv_get_aio_context(bs),
- nbd_get_client_session(bs)->sioc->fd,
- false, NULL, NULL, NULL, NULL);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
- aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
- false, nbd_reply_ready, NULL, NULL, bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+ aio_co_schedule(new_context, client->read_reply_co);
}
void nbd_client_close(BlockDriverState *bs)
@@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+ client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");
diff --git a/block/nbd-client.h b/block/nbd-client.h
index f8d6006849..8cdfc92e94 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
CoMutex send_mutex;
CoQueue free_sema;
- Coroutine *send_coroutine;
+ Coroutine *read_reply_co;
int in_flight;
Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
diff --git a/nbd/client.c b/nbd/client.c
index ffb0743bce..5c9dee37fa 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply)
ssize_t ret;
ret = read_sync(ioc, buf, sizeof(buf));
- if (ret < 0) {
+ if (ret <= 0) {
return ret;
}
diff --git a/nbd/common.c b/nbd/common.c
index a5f39ea58e..dccbb8e9de 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
}
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
- /* XXX figure out if we can create a variant on
- * qio_channel_yield() that works with AIO contexts
- * and consider using that in this branch */
- qemu_coroutine_yield();
- } else if (done) {
- /* XXX this is needed by nbd_reply_ready. */
- qio_channel_wait(ioc,
- do_read ? G_IO_IN : G_IO_OUT);
+ qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT);
} else {
return -EAGAIN;
}
diff --git a/nbd/server.c b/nbd/server.c
index efe5cb82c9..ac92fa0727 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -95,8 +95,6 @@ struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
- bool can_read;
-
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
@@ -104,9 +102,7 @@ struct NBDClient {
/* That's all folks */
-static void nbd_set_handlers(NBDClient *client);
-static void nbd_unset_handlers(NBDClient *client);
-static void nbd_update_can_read(NBDClient *client);
+static void nbd_client_receive_next_request(NBDClient *client);
static gboolean nbd_negotiate_continue(QIOChannel *ioc,
GIOCondition condition,
@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
- nbd_unset_handlers(client);
+ qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc));
if (client->tlscreds) {
@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
client->nb_requests++;
- nbd_update_can_read(client);
req = g_new0(NBDRequestData, 1);
nbd_client_get(client);
@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
g_free(req);
client->nb_requests--;
- nbd_update_can_read(client);
+ nbd_client_receive_next_request(client);
+
nbd_client_put(client);
}
@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
- nbd_set_handlers(client);
+ qio_channel_attach_aio_context(client->ioc, ctx);
+ if (client->recv_coroutine) {
+ aio_co_schedule(ctx, client->recv_coroutine);
+ }
+ if (client->send_coroutine) {
+ aio_co_schedule(ctx, client->send_coroutine);
+ }
}
}
@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
QTAILQ_FOREACH(client, &exp->clients, next) {
- nbd_unset_handlers(client);
+ qio_channel_detach_aio_context(client->ioc);
}
exp->ctx = NULL;
@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
g_assert(qemu_in_coroutine());
qemu_co_mutex_lock(&client->send_lock);
client->send_coroutine = qemu_coroutine_self();
- nbd_set_handlers(client);
if (!len) {
rc = nbd_send_reply(client->ioc, reply);
@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
}
client->send_coroutine = NULL;
- nbd_set_handlers(client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
ssize_t rc;
g_assert(qemu_in_coroutine());
- client->recv_coroutine = qemu_coroutine_self();
- nbd_update_can_read(client);
-
+ assert(client->recv_coroutine == qemu_coroutine_self());
rc = nbd_receive_request(client->ioc, request);
if (rc < 0) {
if (rc != -EAGAIN) {
@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
out:
client->recv_coroutine = NULL;
- nbd_update_can_read(client);
+ nbd_client_receive_next_request(client);
return rc;
}
-static void nbd_trip(void *opaque)
+/* Owns a reference to the NBDClient passed as opaque. */
+static coroutine_fn void nbd_trip(void *opaque)
{
NBDClient *client = opaque;
NBDExport *exp = client->exp;
NBDRequestData *req;
- NBDRequest request;
+ NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */
NBDReply reply;
ssize_t ret;
int flags;
TRACE("Reading request.");
if (client->closing) {
+ nbd_client_put(client);
return;
}
@@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque)
done:
nbd_request_put(req);
+ nbd_client_put(client);
return;
out:
nbd_request_put(req);
client_close(client);
+ nbd_client_put(client);
}
-static void nbd_read(void *opaque)
-{
- NBDClient *client = opaque;
-
- if (client->recv_coroutine) {
- qemu_coroutine_enter(client->recv_coroutine);
- } else {
- qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
- }
-}
-
-static void nbd_restart_write(void *opaque)
-{
- NBDClient *client = opaque;
-
- qemu_coroutine_enter(client->send_coroutine);
-}
-
-static void nbd_set_handlers(NBDClient *client)
-{
- if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
- client->can_read ? nbd_read : NULL,
- client->send_coroutine ? nbd_restart_write : NULL,
- NULL, client);
- }
-}
-
-static void nbd_unset_handlers(NBDClient *client)
-{
- if (client->exp && client->exp->ctx) {
- aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
- NULL, NULL, NULL);
- }
-}
-
-static void nbd_update_can_read(NBDClient *client)
+static void nbd_client_receive_next_request(NBDClient *client)
{
- bool can_read = client->recv_coroutine ||
- client->nb_requests < MAX_NBD_REQUESTS;
-
- if (can_read != client->can_read) {
- client->can_read = can_read;
- nbd_set_handlers(client);
-
- /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
- * in nbd_set_handlers() will have taken care of that */
+ if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+ nbd_client_get(client);
+ client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
+ aio_co_schedule(client->exp->ctx, client->recv_coroutine);
}
}
@@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
goto out;
}
qemu_co_mutex_init(&client->send_lock);
- nbd_set_handlers(client);
if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next);
}
+
+ nbd_client_receive_next_request(client);
+
out:
g_free(data);
}
@@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
object_ref(OBJECT(client->sioc));
client->ioc = QIO_CHANNEL(sioc);
object_ref(OBJECT(client->ioc));
- client->can_read = true;
client->close = close_fn;
data->client = client;