diff options
Diffstat (limited to 'nbd/server.c')
-rw-r--r-- | nbd/server.c | 208 |
1 files changed, 162 insertions, 46 deletions
diff --git a/nbd/server.c b/nbd/server.c index 895cf0a752..941832f178 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -122,26 +122,28 @@ struct NBDMetaContexts { }; struct NBDClient { - int refcount; + int refcount; /* atomic */ void (*close_fn)(NBDClient *client, bool negotiated); + QemuMutex lock; + NBDExport *exp; QCryptoTLSCreds *tlscreds; char *tlsauthz; QIOChannelSocket *sioc; /* The underlying data channel */ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ - Coroutine *recv_coroutine; + Coroutine *recv_coroutine; /* protected by lock */ CoMutex send_lock; Coroutine *send_coroutine; - bool read_yielding; - bool quiescing; + bool read_yielding; /* protected by lock */ + bool quiescing; /* protected by lock */ QTAILQ_ENTRY(NBDClient) next; - int nb_requests; - bool closing; + int nb_requests; /* protected by lock */ + bool closing; /* protected by lock */ uint32_t check_align; /* If non-zero, check for aligned client requests */ @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp) len = qio_channel_readv(client->ioc, &iov, 1, errp); if (len == QIO_CHANNEL_ERR_BLOCK) { - client->read_yielding = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = true; + + /* Prompt main loop thread to re-run nbd_drained_poll() */ + aio_wait_kick(); + } qio_channel_yield(client->ioc, G_IO_IN); - client->read_yielding = false; - if (client->quiescing) { - return -EAGAIN; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->read_yielding = false; + if (client->quiescing) { + return -EAGAIN; + } } continue; } else if (len < 0) { @@ -1501,14 +1510,17 @@ static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *reque #define MAX_NBD_REQUESTS 16 +/* Runs in export AioContext and main loop thread */ void nbd_client_get(NBDClient *client) { - client->refcount++; + qatomic_inc(&client->refcount); } void nbd_client_put(NBDClient *client) { - if (--client->refcount == 0) { + assert(qemu_in_main_thread()); + + if (qatomic_fetch_dec(&client->refcount) == 1) { /* The last reference should be dropped by client->close, * which is called by client_close. */ @@ -1525,17 +1537,47 @@ void nbd_client_put(NBDClient *client) blk_exp_unref(&client->exp->common); } g_free(client->contexts.bitmaps); + qemu_mutex_destroy(&client->lock); g_free(client); } } +/* + * Tries to release the reference to @client, but only if other references + * remain. This is an optimization for the common case where we want to avoid + * the expense of scheduling nbd_client_put() in the main loop thread. + * + * Returns true upon success or false if the reference was not released because + * it is the last reference. + */ +static bool nbd_client_put_nonzero(NBDClient *client) +{ + int old = qatomic_read(&client->refcount); + int expected; + + do { + if (old == 1) { + return false; + } + + expected = old; + old = qatomic_cmpxchg(&client->refcount, expected, expected - 1); + } while (old != expected); + + return true; +} + static void client_close(NBDClient *client, bool negotiated) { - if (client->closing) { - return; - } + assert(qemu_in_main_thread()); + + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->closing) { + return; + } - client->closing = true; + client->closing = true; + } /* Force requests to finish. They will drop their own references, * then we'll close the socket and free the NBDClient. @@ -1549,6 +1591,7 @@ static void client_close(NBDClient *client, bool negotiated) } } +/* Runs in export AioContext with client->lock held */ static NBDRequestData *nbd_request_get(NBDClient *client) { NBDRequestData *req; @@ -1557,11 +1600,11 @@ static NBDRequestData *nbd_request_get(NBDClient *client) client->nb_requests++; req = g_new0(NBDRequestData, 1); - nbd_client_get(client); req->client = client; return req; } +/* Runs in export AioContext with client->lock held */ static void nbd_request_put(NBDRequestData *req) { NBDClient *client = req->client; @@ -1578,8 +1621,6 @@ static void nbd_request_put(NBDRequestData *req) } nbd_client_receive_next_request(client); - - nbd_client_put(client); } static void blk_aio_attached(AioContext *ctx, void *opaque) @@ -1587,14 +1628,18 @@ static void blk_aio_attached(AioContext *ctx, void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + trace_nbd_blk_aio_attached(exp->name, ctx); exp->common.ctx = ctx; QTAILQ_FOREACH(client, &exp->clients, next) { - assert(client->nb_requests == 0); - assert(client->recv_coroutine == NULL); - assert(client->send_coroutine == NULL); + WITH_QEMU_LOCK_GUARD(&client->lock) { + assert(client->nb_requests == 0); + assert(client->recv_coroutine == NULL); + assert(client->send_coroutine == NULL); + } } } @@ -1602,6 +1647,8 @@ static void blk_aio_detach(void *opaque) { NBDExport *exp = opaque; + assert(qemu_in_main_thread()); + trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); exp->common.ctx = NULL; @@ -1612,8 +1659,12 @@ static void nbd_drained_begin(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = true; + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = true; + } } } @@ -1622,28 +1673,48 @@ static void nbd_drained_end(void *opaque) NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - client->quiescing = false; - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + client->quiescing = false; + nbd_client_receive_next_request(client); + } } } +/* Runs in export AioContext */ +static void nbd_wake_read_bh(void *opaque) +{ + NBDClient *client = opaque; + qio_channel_wake_read(client->ioc); +} + static bool nbd_drained_poll(void *opaque) { NBDExport *exp = opaque; NBDClient *client; + assert(qemu_in_main_thread()); + QTAILQ_FOREACH(client, &exp->clients, next) { - if (client->nb_requests != 0) { - /* - * If there's a coroutine waiting for a request on nbd_read_eof() - * enter it here so we don't depend on the client to wake it up. - */ - if (client->recv_coroutine != NULL && client->read_yielding) { - qio_channel_wake_read(client->ioc); - } + WITH_QEMU_LOCK_GUARD(&client->lock) { + if (client->nb_requests != 0) { + /* + * If there's a coroutine waiting for a request on nbd_read_eof() + * enter it here so we don't depend on the client to wake it up. + * + * Schedule a BH in the export AioContext to avoid missing the + * wake up due to the race between qio_channel_wake_read() and + * qio_channel_yield(). + */ + if (client->recv_coroutine != NULL && client->read_yielding) { + aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp), + nbd_wake_read_bh, client); + } - return true; + return true; + } } } @@ -1654,6 +1725,8 @@ static void nbd_eject_notifier(Notifier *n, void *data) { NBDExport *exp = container_of(n, NBDExport, eject_notifier); + assert(qemu_in_main_thread()); + blk_exp_request_shutdown(&exp->common); } @@ -2539,7 +2612,6 @@ static int coroutine_fn nbd_co_receive_request(NBDRequestData *req, int ret; g_assert(qemu_in_coroutine()); - assert(client->recv_coroutine == qemu_coroutine_self()); ret = nbd_receive_request(client, request, errp); if (ret < 0) { return ret; @@ -2936,15 +3008,23 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, static coroutine_fn void nbd_trip(void *opaque) { NBDClient *client = opaque; - NBDRequestData *req; + NBDRequestData *req = NULL; NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */ int ret; Error *local_err = NULL; + /* + * Note that nbd_client_put() and client_close() must be called from the + * main loop thread. Use aio_co_reschedule_self() to switch AioContext + * before calling these functions. + */ + trace_nbd_trip(); + + qemu_mutex_lock(&client->lock); + if (client->closing) { - nbd_client_put(client); - return; + goto done; } if (client->quiescing) { @@ -2952,14 +3032,27 @@ static coroutine_fn void nbd_trip(void *opaque) * We're switching between AIO contexts. Don't attempt to receive a new * request and kick the main context which may be waiting for us. */ - nbd_client_put(client); client->recv_coroutine = NULL; aio_wait_kick(); - return; + goto done; } req = nbd_request_get(client); - ret = nbd_co_receive_request(req, &request, &local_err); + + /* + * nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has + * set client->quiescing but by the time we get back nbd_drained_end() may + * have already cleared client->quiescing. In that case we try again + * because nothing else will spawn an nbd_trip() coroutine until we set + * client->recv_coroutine = NULL further down. + */ + do { + assert(client->recv_coroutine == qemu_coroutine_self()); + qemu_mutex_unlock(&client->lock); + ret = nbd_co_receive_request(req, &request, &local_err); + qemu_mutex_lock(&client->lock); + } while (ret == -EAGAIN && !client->quiescing); + client->recv_coroutine = NULL; if (client->closing) { @@ -2971,15 +3064,16 @@ static coroutine_fn void nbd_trip(void *opaque) } if (ret == -EAGAIN) { - assert(client->quiescing); goto done; } nbd_client_receive_next_request(client); + if (ret == -EIO) { goto disconnect; } + qemu_mutex_unlock(&client->lock); qio_channel_set_cork(client->ioc, true); if (ret < 0) { @@ -2999,6 +3093,10 @@ static coroutine_fn void nbd_trip(void *opaque) g_free(request.contexts->bitmaps); g_free(request.contexts); } + + qio_channel_set_cork(client->ioc, false); + qemu_mutex_lock(&client->lock); + if (ret < 0) { error_prepend(&local_err, "Failed to send reply: "); goto disconnect; @@ -3013,21 +3111,36 @@ static coroutine_fn void nbd_trip(void *opaque) goto disconnect; } - qio_channel_set_cork(client->ioc, false); done: - nbd_request_put(req); - nbd_client_put(client); + if (req) { + nbd_request_put(req); + } + + qemu_mutex_unlock(&client->lock); + + if (!nbd_client_put_nonzero(client)) { + aio_co_reschedule_self(qemu_get_aio_context()); + nbd_client_put(client); + } return; disconnect: if (local_err) { error_reportf_err(local_err, "Disconnect client, due to: "); } + nbd_request_put(req); + qemu_mutex_unlock(&client->lock); + + aio_co_reschedule_self(qemu_get_aio_context()); client_close(client, true); nbd_client_put(client); } +/* + * Runs in export AioContext and main loop thread. Caller must hold + * client->lock. + */ static void nbd_client_receive_next_request(NBDClient *client) { if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS && @@ -3053,7 +3166,9 @@ static coroutine_fn void nbd_co_client_start(void *opaque) return; } - nbd_client_receive_next_request(client); + WITH_QEMU_LOCK_GUARD(&client->lock) { + nbd_client_receive_next_request(client); + } } /* @@ -3070,6 +3185,7 @@ void nbd_client_new(QIOChannelSocket *sioc, Coroutine *co; client = g_new0(NBDClient, 1); + qemu_mutex_init(&client->lock); client->refcount = 1; client->tlscreds = tlscreds; if (tlscreds) { |