diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2019-08-16 15:53:37 +0100 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2019-08-16 15:53:37 +0100 |
commit | c6a2225a5a5e272edc0304845e32ea436ea1043a (patch) | |
tree | 4bd7a214066714ef048ac30b4dedd5b8f6b31458 | |
parent | 95a9457fd44ad97c518858a4e1586a5498f9773c (diff) | |
parent | 8f071c9db506e03abcb1b76ec6d3d2f9488cc3b3 (diff) |
Merge remote-tracking branch 'remotes/ericb/tags/pull-nbd-2019-08-15' into staging
nbd patches for 2019-08-15
- Addition of InetSocketAddress keep-alive
- Addition of BDRV_REQ_PREFETCH for more efficient copy-on-read
- Initial refactoring in preparation of NBD reconnect
# gpg: Signature made Thu 15 Aug 2019 19:28:41 BST
# gpg: using RSA key A7A16B4A2527436A
# gpg: Good signature from "Eric Blake <eblake@redhat.com>" [full]
# gpg: aka "Eric Blake (Free Software Programmer) <ebb9@byu.net>" [full]
# gpg: aka "[jpeg image of size 6874]" [full]
# Primary key fingerprint: 71C2 CC22 B1C4 6029 27D2 F3AA A7A1 6B4A 2527 436A
* remotes/ericb/tags/pull-nbd-2019-08-15:
block/nbd: refactor nbd connection parameters
block/nbd: add cmdline and qapi parameter reconnect-delay
block/nbd: move from quit to state
block/nbd: use non-blocking io channel for nbd negotiation
block/nbd: split connection_co start out of nbd_client_connect
nbd: improve CMD_CACHE: use BDRV_REQ_PREFETCH
block/stream: use BDRV_REQ_PREFETCH
block: implement BDRV_REQ_PREFETCH
qapi: Add InetSocketAddress member keep-alive
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r-- | block/io.c | 18 | ||||
-rw-r--r-- | block/nbd.c | 195 | ||||
-rw-r--r-- | block/stream.c | 24 | ||||
-rw-r--r-- | include/block/block.h | 8 | ||||
-rw-r--r-- | include/block/nbd.h | 3 | ||||
-rw-r--r-- | nbd/client.c | 16 | ||||
-rw-r--r-- | nbd/server.c | 43 | ||||
-rw-r--r-- | qapi/block-core.json | 11 | ||||
-rw-r--r-- | qapi/sockets.json | 6 | ||||
-rw-r--r-- | qemu-nbd.c | 2 | ||||
-rw-r--r-- | util/qemu-sockets.c | 28 |
11 files changed, 233 insertions, 121 deletions
diff --git a/block/io.c b/block/io.c index 17a243cde9..56bbf195bb 100644 --- a/block/io.c +++ b/block/io.c @@ -1168,7 +1168,8 @@ bdrv_driver_pwritev_compressed(BlockDriverState *bs, uint64_t offset, } static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child, - int64_t offset, unsigned int bytes, QEMUIOVector *qiov) + int64_t offset, unsigned int bytes, QEMUIOVector *qiov, + int flags) { BlockDriverState *bs = child->bs; @@ -1279,9 +1280,11 @@ static int coroutine_fn bdrv_co_do_copy_on_readv(BdrvChild *child, goto err; } - qemu_iovec_from_buf(qiov, progress, bounce_buffer + skip_bytes, - pnum - skip_bytes); - } else { + if (!(flags & BDRV_REQ_PREFETCH)) { + qemu_iovec_from_buf(qiov, progress, bounce_buffer + skip_bytes, + pnum - skip_bytes); + } + } else if (!(flags & BDRV_REQ_PREFETCH)) { /* Read directly into the destination */ qemu_iovec_init(&local_qiov, qiov->niov); qemu_iovec_concat(&local_qiov, qiov, progress, pnum - skip_bytes); @@ -1332,7 +1335,8 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child, * potential fallback support, if we ever implement any read flags * to pass through to drivers. For now, there aren't any * passthrough flags. */ - assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ))); + assert(!(flags & ~(BDRV_REQ_NO_SERIALISING | BDRV_REQ_COPY_ON_READ | + BDRV_REQ_PREFETCH))); /* Handle Copy on Read and associated serialisation */ if (flags & BDRV_REQ_COPY_ON_READ) { @@ -1360,7 +1364,9 @@ static int coroutine_fn bdrv_aligned_preadv(BdrvChild *child, } if (!ret || pnum != bytes) { - ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov); + ret = bdrv_co_do_copy_on_readv(child, offset, bytes, qiov, flags); + goto out; + } else if (flags & BDRV_REQ_PREFETCH) { goto out; } } diff --git a/block/nbd.c b/block/nbd.c index 56b1c6ec74..beed46fb34 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -54,6 +54,11 @@ typedef struct { bool receiving; /* waiting for connection_co? */ } NBDClientRequest; +typedef enum NBDClientState { + NBD_CLIENT_CONNECTED, + NBD_CLIENT_QUIT +} NBDClientState; + typedef struct BDRVNBDState { QIOChannelSocket *sioc; /* The master data channel */ QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ @@ -63,17 +68,27 @@ typedef struct BDRVNBDState { CoQueue free_sema; Coroutine *connection_co; int in_flight; + NBDClientState state; NBDClientRequest requests[MAX_NBD_REQUESTS]; NBDReply reply; BlockDriverState *bs; - bool quit; - /* For nbd_refresh_filename() */ + /* Connection parameters */ + uint32_t reconnect_delay; SocketAddress *saddr; char *export, *tlscredsid; + QCryptoTLSCreds *tlscreds; + const char *hostname; + char *x_dirty_bitmap; } BDRVNBDState; +/* @ret will be used for reconnect in future */ +static void nbd_channel_error(BDRVNBDState *s, int ret) +{ + s->state = NBD_CLIENT_QUIT; +} + static void nbd_recv_coroutines_wake_all(BDRVNBDState *s) { int i; @@ -152,7 +167,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) int ret = 0; Error *local_err = NULL; - while (!s->quit) { + while (s->state != NBD_CLIENT_QUIT) { /* * The NBD client can only really be considered idle when it has * yielded from qio_channel_readv_all_eof(), waiting for data. This is @@ -170,6 +185,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) error_free(local_err); } if (ret <= 0) { + nbd_channel_error(s, ret ? ret : -EIO); break; } @@ -184,6 +200,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) !s->requests[i].receiving || (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply)) { + nbd_channel_error(s, -EINVAL); break; } @@ -203,7 +220,6 @@ static coroutine_fn void nbd_connection_entry(void *opaque) qemu_coroutine_yield(); } - s->quit = true; nbd_recv_coroutines_wake_all(s); bdrv_dec_in_flight(s->bs); @@ -216,12 +232,18 @@ static int nbd_co_send_request(BlockDriverState *bs, QEMUIOVector *qiov) { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - int rc, i; + int rc, i = -1; qemu_co_mutex_lock(&s->send_mutex); while (s->in_flight == MAX_NBD_REQUESTS) { qemu_co_queue_wait(&s->free_sema, &s->send_mutex); } + + if (s->state != NBD_CLIENT_CONNECTED) { + rc = -EIO; + goto err; + } + s->in_flight++; for (i = 0; i < MAX_NBD_REQUESTS; i++) { @@ -239,16 +261,12 @@ static int nbd_co_send_request(BlockDriverState *bs, request->handle = INDEX_TO_HANDLE(s, i); - if (s->quit) { - rc = -EIO; - goto err; - } assert(s->ioc); if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); - if (rc >= 0 && !s->quit) { + if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) { if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov, NULL) < 0) { rc = -EIO; @@ -263,9 +281,11 @@ static int nbd_co_send_request(BlockDriverState *bs, err: if (rc < 0) { - s->quit = true; - s->requests[i].coroutine = NULL; - s->in_flight--; + nbd_channel_error(s, rc); + if (i != -1) { + s->requests[i].coroutine = NULL; + s->in_flight--; + } qemu_co_queue_next(&s->free_sema); } qemu_co_mutex_unlock(&s->send_mutex); @@ -557,7 +577,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk( s->requests[i].receiving = true; qemu_coroutine_yield(); s->requests[i].receiving = false; - if (s->quit) { + if (s->state != NBD_CLIENT_CONNECTED) { error_setg(errp, "Connection closed"); return -EIO; } @@ -642,7 +662,7 @@ static coroutine_fn int nbd_co_receive_one_chunk( if (ret < 0) { memset(reply, 0, sizeof(*reply)); - s->quit = true; + nbd_channel_error(s, ret); } else { /* For assert at loop start in nbd_connection_entry */ *reply = s->reply; @@ -710,7 +730,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, NBDReply local_reply; NBDStructuredReplyChunk *chunk; Error *local_err = NULL; - if (s->quit) { + if (s->state != NBD_CLIENT_CONNECTED) { error_setg(&local_err, "Connection closed"); nbd_iter_channel_error(iter, -EIO, &local_err); goto break_loop; @@ -735,7 +755,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, } /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */ - if (nbd_reply_is_simple(reply) || s->quit) { + if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) { goto break_loop; } @@ -809,14 +829,14 @@ static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle, ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload, offset, qiov, &local_err); if (ret < 0) { - s->quit = true; + nbd_channel_error(s, ret); nbd_iter_channel_error(&iter, ret, &local_err); } break; default: if (!nbd_reply_type_is_error(chunk->type)) { /* not allowed reply type */ - s->quit = true; + nbd_channel_error(s, -EINVAL); error_setg(&local_err, "Unexpected reply type: %d (%s) for CMD_READ", chunk->type, nbd_reply_type_lookup(chunk->type)); @@ -854,7 +874,7 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s, switch (chunk->type) { case NBD_REPLY_TYPE_BLOCK_STATUS: if (received) { - s->quit = true; + nbd_channel_error(s, -EINVAL); error_setg(&local_err, "Several BLOCK_STATUS chunks in reply"); nbd_iter_channel_error(&iter, -EINVAL, &local_err); } @@ -864,13 +884,13 @@ static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s, payload, length, extent, &local_err); if (ret < 0) { - s->quit = true; + nbd_channel_error(s, ret); nbd_iter_channel_error(&iter, ret, &local_err); } break; default: if (!nbd_reply_type_is_error(chunk->type)) { - s->quit = true; + nbd_channel_error(s, -EINVAL); error_setg(&local_err, "Unexpected reply type: %d (%s) " "for CMD_BLOCK_STATUS", @@ -1167,47 +1187,43 @@ static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr, return sioc; } -static int nbd_client_connect(BlockDriverState *bs, - SocketAddress *saddr, - const char *export, - QCryptoTLSCreds *tlscreds, - const char *hostname, - const char *x_dirty_bitmap, - Error **errp) +static int nbd_client_connect(BlockDriverState *bs, Error **errp) { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + AioContext *aio_context = bdrv_get_aio_context(bs); int ret; /* * establish TCP connection, return error if it fails * TODO: Configurable retry-until-timeout behaviour. */ - QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp); + QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp); if (!sioc) { return -ECONNREFUSED; } /* NBD handshake */ - trace_nbd_client_connect(export); - qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL); + trace_nbd_client_connect(s->export); + qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); + qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context); s->info.request_sizes = true; s->info.structured_reply = true; s->info.base_allocation = true; - s->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap); - s->info.name = g_strdup(export ?: ""); - ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname, - &s->ioc, &s->info, errp); + s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap); + s->info.name = g_strdup(s->export ?: ""); + ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds, + s->hostname, &s->ioc, &s->info, errp); g_free(s->info.x_dirty_bitmap); g_free(s->info.name); if (ret < 0) { object_unref(OBJECT(sioc)); return ret; } - if (x_dirty_bitmap && !s->info.base_allocation) { + if (s->x_dirty_bitmap && !s->info.base_allocation) { error_setg(errp, "requested x-dirty-bitmap %s not found", - x_dirty_bitmap); + s->x_dirty_bitmap); ret = -EINVAL; goto fail; } @@ -1232,24 +1248,14 @@ static int nbd_client_connect(BlockDriverState *bs, object_ref(OBJECT(s->ioc)); } - /* - * Now that we're connected, set the socket to be non-blocking and - * kick the reply mechanism. - */ - qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); - s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); - bdrv_inc_in_flight(bs); - nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs)); - - trace_nbd_client_connect_success(export); + trace_nbd_client_connect_success(s->export); return 0; fail: /* - * We have connected, but must fail for other reasons. The - * connection is still blocking; send NBD_CMD_DISC as a courtesy - * to the server. + * We have connected, but must fail for other reasons. + * Send NBD_CMD_DISC as a courtesy to the server. */ { NBDRequest request = { .type = NBD_CMD_DISC }; @@ -1262,23 +1268,9 @@ static int nbd_client_connect(BlockDriverState *bs, } } -static int nbd_client_init(BlockDriverState *bs, - SocketAddress *saddr, - const char *export, - QCryptoTLSCreds *tlscreds, - const char *hostname, - const char *x_dirty_bitmap, - Error **errp) -{ - BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - - s->bs = bs; - qemu_co_mutex_init(&s->send_mutex); - qemu_co_queue_init(&s->free_sema); - - return nbd_client_connect(bs, saddr, export, tlscreds, hostname, - x_dirty_bitmap, errp); -} +/* + * Parse nbd_open options + */ static int nbd_parse_uri(const char *filename, QDict *options) { @@ -1583,18 +1575,27 @@ static QemuOptsList nbd_runtime_opts = { .help = "experimental: expose named dirty bitmap in place of " "block status", }, + { + .name = "reconnect-delay", + .type = QEMU_OPT_NUMBER, + .help = "On an unexpected disconnect, the nbd client tries to " + "connect again until succeeding or encountering a serious " + "error. During the first @reconnect-delay seconds, all " + "requests are paused and will be rerun on a successful " + "reconnect. After that time, any delayed requests and all " + "future requests before a successful reconnect will " + "immediately fail. Default 0", + }, { /* end of list */ } }, }; -static int nbd_open(BlockDriverState *bs, QDict *options, int flags, - Error **errp) +static int nbd_process_options(BlockDriverState *bs, QDict *options, + Error **errp) { BDRVNBDState *s = bs->opaque; - QemuOpts *opts = NULL; + QemuOpts *opts; Error *local_err = NULL; - QCryptoTLSCreds *tlscreds = NULL; - const char *hostname = NULL; int ret = -EINVAL; opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort); @@ -1619,8 +1620,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds")); if (s->tlscredsid) { - tlscreds = nbd_get_tls_creds(s->tlscredsid, errp); - if (!tlscreds) { + s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp); + if (!s->tlscreds) { goto error; } @@ -1629,18 +1630,17 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, error_setg(errp, "TLS only supported over IP sockets"); goto error; } - hostname = s->saddr->u.inet.host; + s->hostname = s->saddr->u.inet.host; } - /* NBD handshake */ - ret = nbd_client_init(bs, s->saddr, s->export, tlscreds, hostname, - qemu_opt_get(opts, "x-dirty-bitmap"), errp); + s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap")); + s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0); + + ret = 0; error: - if (tlscreds) { - object_unref(OBJECT(tlscreds)); - } if (ret < 0) { + object_unref(OBJECT(s->tlscreds)); qapi_free_SocketAddress(s->saddr); g_free(s->export); g_free(s->tlscredsid); @@ -1649,6 +1649,35 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, return ret; } +static int nbd_open(BlockDriverState *bs, QDict *options, int flags, + Error **errp) +{ + int ret; + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + + ret = nbd_process_options(bs, options, errp); + if (ret < 0) { + return ret; + } + + s->bs = bs; + qemu_co_mutex_init(&s->send_mutex); + qemu_co_queue_init(&s->free_sema); + + ret = nbd_client_connect(bs, errp); + if (ret < 0) { + return ret; + } + /* successfully connected */ + s->state = NBD_CLIENT_CONNECTED; + + s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); + bdrv_inc_in_flight(bs); + aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co); + + return 0; +} + static int nbd_co_flush(BlockDriverState *bs) { return nbd_client_co_flush(bs); @@ -1694,9 +1723,11 @@ static void nbd_close(BlockDriverState *bs) nbd_client_close(bs); + object_unref(OBJECT(s->tlscreds)); qapi_free_SocketAddress(s->saddr); g_free(s->export); g_free(s->tlscredsid); + g_free(s->x_dirty_bitmap); } static int64_t nbd_getlength(BlockDriverState *bs) diff --git a/block/stream.c b/block/stream.c index 6ac1e7bec4..0d3a6ac7c3 100644 --- a/block/stream.c +++ b/block/stream.c @@ -22,11 +22,11 @@ enum { /* - * Size of data buffer for populating the image file. This should be large - * enough to process multiple clusters in a single call, so that populating - * contiguous regions of the image is efficient. + * Maximum chunk size to feed to copy-on-read. This should be + * large enough to process multiple clusters in a single call, so + * that populating contiguous regions of the image is efficient. */ - STREAM_BUFFER_SIZE = 512 * 1024, /* in bytes */ + STREAM_CHUNK = 512 * 1024, /* in bytes */ }; typedef struct StreamBlockJob { @@ -39,13 +39,12 @@ typedef struct StreamBlockJob { } StreamBlockJob; static int coroutine_fn stream_populate(BlockBackend *blk, - int64_t offset, uint64_t bytes, - void *buf) + int64_t offset, uint64_t bytes) { assert(bytes < SIZE_MAX); - /* Copy-on-read the unallocated clusters */ - return blk_co_pread(blk, offset, bytes, buf, BDRV_REQ_COPY_ON_READ); + return blk_co_preadv(blk, offset, bytes, NULL, + BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH); } static void stream_abort(Job *job) @@ -117,7 +116,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp) int error = 0; int ret = 0; int64_t n = 0; /* bytes */ - void *buf; if (bs == s->bottom) { /* Nothing to stream */ @@ -130,8 +128,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp) } job_progress_set_remaining(&s->common.job, len); - buf = qemu_blockalign(bs, STREAM_BUFFER_SIZE); - /* Turn on copy-on-read for the whole block device so that guest read * requests help us make progress. Only do this when copying the entire * backing chain since the copy-on-read operation does not take base into @@ -154,7 +150,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp) copy = false; - ret = bdrv_is_allocated(bs, offset, STREAM_BUFFER_SIZE, &n); + ret = bdrv_is_allocated(bs, offset, STREAM_CHUNK, &n); if (ret == 1) { /* Allocated in the top, no need to copy. */ } else if (ret >= 0) { @@ -171,7 +167,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp) } trace_stream_one_iteration(s, offset, n, ret); if (copy) { - ret = stream_populate(blk, offset, n, buf); + ret = stream_populate(blk, offset, n); } if (ret < 0) { BlockErrorAction action = @@ -202,8 +198,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp) bdrv_disable_copy_on_read(bs); } - qemu_vfree(buf); - /* Do not remove the backing file if an error was there but ignored. */ return error; } diff --git a/include/block/block.h b/include/block/block.h index ae79b70e2d..89e40318cf 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -87,8 +87,14 @@ typedef enum { * fallback. */ BDRV_REQ_NO_FALLBACK = 0x100, + /* + * BDRV_REQ_PREFETCH may be used only together with BDRV_REQ_COPY_ON_READ + * on read request and means that caller doesn't really need data to be + * written to qiov parameter which may be NULL. + */ + BDRV_REQ_PREFETCH = 0x200, /* Mask of valid flags */ - BDRV_REQ_MASK = 0x1ff, + BDRV_REQ_MASK = 0x3ff, } BdrvRequestFlags; typedef struct BlockSizes { diff --git a/include/block/nbd.h b/include/block/nbd.h index bb9f5bc021..7b36d672f0 100644 --- a/include/block/nbd.h +++ b/include/block/nbd.h @@ -304,7 +304,8 @@ struct NBDExportInfo { }; typedef struct NBDExportInfo NBDExportInfo; -int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, +int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc, + QCryptoTLSCreds *tlscreds, const char *hostname, QIOChannel **outioc, NBDExportInfo *info, Error **errp); void nbd_free_export_list(NBDExportInfo *info, int count); diff --git a/nbd/client.c b/nbd/client.c index d554ae353d..49bf9906f9 100644 --- a/nbd/client.c +++ b/nbd/client.c @@ -868,7 +868,8 @@ static int nbd_list_meta_contexts(QIOChannel *ioc, * 2: server is newstyle, but lacks structured replies * 3: server is newstyle and set up for structured replies */ -static int nbd_start_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, +static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc, + QCryptoTLSCreds *tlscreds, const char *hostname, QIOChannel **outioc, bool structured_reply, bool *zeroes, Error **errp) @@ -935,6 +936,10 @@ static int nbd_start_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, return -EINVAL; } ioc = *outioc; + if (aio_context) { + qio_channel_set_blocking(ioc, false, NULL); + qio_channel_attach_aio_context(ioc, aio_context); + } } else { error_setg(errp, "Server does not support STARTTLS"); return -EINVAL; @@ -999,7 +1004,8 @@ static int nbd_negotiate_finish_oldstyle(QIOChannel *ioc, NBDExportInfo *info, * Returns: negative errno: failure talking to server * 0: server is connected */ -int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, +int nbd_receive_negotiate(AioContext *aio_context, QIOChannel *ioc, + QCryptoTLSCreds *tlscreds, const char *hostname, QIOChannel **outioc, NBDExportInfo *info, Error **errp) { @@ -1010,7 +1016,7 @@ int nbd_receive_negotiate(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, assert(info->name); trace_nbd_receive_negotiate_name(info->name); - result = nbd_start_negotiate(ioc, tlscreds, hostname, outioc, + result = nbd_start_negotiate(aio_context, ioc, tlscreds, hostname, outioc, info->structured_reply, &zeroes, errp); info->structured_reply = false; @@ -1130,8 +1136,8 @@ int nbd_receive_export_list(QIOChannel *ioc, QCryptoTLSCreds *tlscreds, QIOChannel *sioc = NULL; *info = NULL; - result = nbd_start_negotiate(ioc, tlscreds, hostname, &sioc, true, NULL, - errp); + result = nbd_start_negotiate(NULL, ioc, tlscreds, hostname, &sioc, true, + NULL, errp); if (tlscreds && sioc) { ioc = sioc; } diff --git a/nbd/server.c b/nbd/server.c index dbd2ff8555..3eacb89875 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -2105,12 +2105,15 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, return -EINVAL; } - req->data = blk_try_blockalign(client->exp->blk, request->len); - if (req->data == NULL) { - error_setg(errp, "No memory"); - return -ENOMEM; + if (request->type != NBD_CMD_CACHE) { + req->data = blk_try_blockalign(client->exp->blk, request->len); + if (req->data == NULL) { + error_setg(errp, "No memory"); + return -ENOMEM; + } } } + if (request->type == NBD_CMD_WRITE) { if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data", errp) < 0) @@ -2195,7 +2198,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request, int ret; NBDExport *exp = client->exp; - assert(request->type == NBD_CMD_READ || request->type == NBD_CMD_CACHE); + assert(request->type == NBD_CMD_READ); /* XXX: NBD Protocol only documents use of FUA with WRITE */ if (request->flags & NBD_CMD_FLAG_FUA) { @@ -2207,7 +2210,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request, } if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) && - request->len && request->type != NBD_CMD_CACHE) + request->len) { return nbd_co_send_sparse_read(client, request->handle, request->from, data, request->len, errp); @@ -2215,7 +2218,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request, ret = blk_pread(exp->blk, request->from + exp->dev_offset, data, request->len); - if (ret < 0 || request->type == NBD_CMD_CACHE) { + if (ret < 0) { return nbd_send_generic_reply(client, request->handle, ret, "reading from file failed", errp); } @@ -2234,6 +2237,28 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request, } } +/* + * nbd_do_cmd_cache + * + * Handle NBD_CMD_CACHE request. + * Return -errno if sending fails. Other errors are reported directly to the + * client as an error reply. + */ +static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request, + Error **errp) +{ + int ret; + NBDExport *exp = client->exp; + + assert(request->type == NBD_CMD_CACHE); + + ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len, + NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH); + + return nbd_send_generic_reply(client, request->handle, ret, + "caching data failed", errp); +} + /* Handle NBD request. * Return -errno if sending fails. Other errors are reported directly to the * client as an error reply. */ @@ -2247,8 +2272,10 @@ static coroutine_fn int nbd_handle_request(NBDClient *client, char *msg; switch (request->type) { - case NBD_CMD_READ: case NBD_CMD_CACHE: + return nbd_do_cmd_cache(client, request, errp); + + case NBD_CMD_READ: return nbd_do_cmd_read(client, request, data, errp); case NBD_CMD_WRITE: diff --git a/qapi/block-core.json b/qapi/block-core.json index 0d43d4f37c..f1e7701fbe 100644 --- a/qapi/block-core.json +++ b/qapi/block-core.json @@ -3860,13 +3860,22 @@ # traditional "base:allocation" block status (see # NBD_OPT_LIST_META_CONTEXT in the NBD protocol) (since 3.0) # +# @reconnect-delay: On an unexpected disconnect, the nbd client tries to +# connect again until succeeding or encountering a serious +# error. During the first @reconnect-delay seconds, all +# requests are paused and will be rerun on a successful +# reconnect. After that time, any delayed requests and all +# future requests before a successful reconnect will +# immediately fail. Default 0 (Since 4.2) +# # Since: 2.9 ## { 'struct': 'BlockdevOptionsNbd', 'data': { 'server': 'SocketAddress', '*export': 'str', '*tls-creds': 'str', - '*x-dirty-bitmap': 'str' } } + '*x-dirty-bitmap': 'str', + '*reconnect-delay': 'uint32' } } ## # @BlockdevOptionsRaw: diff --git a/qapi/sockets.json b/qapi/sockets.json index fc81d8d5e8..32375f3a36 100644 --- a/qapi/sockets.json +++ b/qapi/sockets.json @@ -53,6 +53,9 @@ # # @ipv6: whether to accept IPv6 addresses, default try both IPv4 and IPv6 # +# @keep-alive: enable keep-alive when connecting to this socket. Not supported +# for passive sockets. (Since 4.2) +# # Since: 1.3 ## { 'struct': 'InetSocketAddress', @@ -61,7 +64,8 @@ '*numeric': 'bool', '*to': 'uint16', '*ipv4': 'bool', - '*ipv6': 'bool' } } + '*ipv6': 'bool', + '*keep-alive': 'bool' } } ## # @UnixSocketAddress: diff --git a/qemu-nbd.c b/qemu-nbd.c index a8cb39e510..049645491d 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -362,7 +362,7 @@ static void *nbd_client_thread(void *arg) goto out; } - ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), + ret = nbd_receive_negotiate(NULL, QIO_CHANNEL(sioc), NULL, NULL, NULL, &info, &local_error); if (ret < 0) { if (local_error) { diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c index a5092dbd12..e3a1666578 100644 --- a/util/qemu-sockets.c +++ b/util/qemu-sockets.c @@ -219,6 +219,12 @@ static int inet_listen_saddr(InetSocketAddress *saddr, bool socket_created = false; Error *err = NULL; + if (saddr->keep_alive) { + error_setg(errp, "keep-alive option is not supported for passive " + "sockets"); + return -1; + } + memset(&ai,0, sizeof(ai)); ai.ai_flags = AI_PASSIVE; if (saddr->has_numeric && saddr->numeric) { @@ -458,6 +464,19 @@ int inet_connect_saddr(InetSocketAddress *saddr, Error **errp) } freeaddrinfo(res); + + if (saddr->keep_alive) { + int val = 1; + int ret = qemu_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + &val, sizeof(val)); + + if (ret < 0) { + error_setg_errno(errp, errno, "Unable to set KEEPALIVE"); + close(sock); + return -1; + } + } + return sock; } @@ -653,6 +672,15 @@ int inet_parse(InetSocketAddress *addr, const char *str, Error **errp) } addr->has_ipv6 = true; } + begin = strstr(optstr, ",keep-alive"); + if (begin) { + if (inet_parse_flag("keep-alive", begin + strlen(",keep-alive"), + &addr->keep_alive, errp) < 0) + { + return -1; + } + addr->has_keep_alive = true; + } return 0; } |