diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2021-01-13 14:19:24 +0000 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2021-01-13 14:19:24 +0000 |
commit | 45240eed4f064576d589ea60ebadf3c11d7ab891 (patch) | |
tree | 9384e23478ab00d726eeb0a7e403f97284262933 | |
parent | 6b63d126121a9535784003924fcb67f574a6afc0 (diff) | |
parent | 91d48e520a4a4f72e97aeb333029694f5d57cc93 (diff) |
Merge remote-tracking branch 'remotes/armbru/tags/pull-yank-2021-01-13' into staging
Yank patches patches for 2021-01-13
# gpg: Signature made Wed 13 Jan 2021 09:25:46 GMT
# gpg: using RSA key 354BC8B3D7EB2A6B68674E5F3870B400EB918653
# gpg: issuer "armbru@redhat.com"
# gpg: Good signature from "Markus Armbruster <armbru@redhat.com>" [full]
# gpg: aka "Markus Armbruster <armbru@pond.sub.org>" [full]
# Primary key fingerprint: 354B C8B3 D7EB 2A6B 6867 4E5F 3870 B400 EB91 8653
* remotes/armbru/tags/pull-yank-2021-01-13:
tests/test-char.c: Wait for the chardev to connect in char_socket_client_dupid_test
io: Document qmp oob suitability of qio_channel_shutdown and io_shutdown
io/channel-tls.c: make qio_channel_tls_shutdown thread-safe
migration: Add yank feature
chardev/char-socket.c: Add yank feature
block/nbd.c: Add yank feature
Introduce yank feature
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r-- | MAINTAINERS | 7 | ||||
-rw-r--r-- | block/nbd.c | 153 | ||||
-rw-r--r-- | chardev/char-socket.c | 34 | ||||
-rw-r--r-- | include/io/channel.h | 5 | ||||
-rw-r--r-- | include/qemu/yank.h | 97 | ||||
-rw-r--r-- | io/channel-tls.c | 6 | ||||
-rw-r--r-- | migration/channel.c | 13 | ||||
-rw-r--r-- | migration/migration.c | 22 | ||||
-rw-r--r-- | migration/multifd.c | 10 | ||||
-rw-r--r-- | migration/qemu-file-channel.c | 7 | ||||
-rw-r--r-- | migration/savevm.c | 5 | ||||
-rw-r--r-- | qapi/meson.build | 1 | ||||
-rw-r--r-- | qapi/qapi-schema.json | 1 | ||||
-rw-r--r-- | qapi/yank.json | 119 | ||||
-rw-r--r-- | tests/test-char.c | 1 | ||||
-rw-r--r-- | util/meson.build | 1 | ||||
-rw-r--r-- | util/yank.c | 207 |
17 files changed, 625 insertions, 64 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index 4d9df874a1..cb0656aec3 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -2732,6 +2732,13 @@ F: util/uuid.c F: include/qemu/uuid.h F: tests/test-uuid.c +Yank feature +M: Lukas Straub <lukasstraub2@web.de> +S: Odd fixes +F: util/yank.c +F: include/qemu/yank.h +F: qapi/yank.json + COLO Framework M: zhanghailiang <zhang.zhanghailiang@huawei.com> S: Maintained diff --git a/block/nbd.c b/block/nbd.c index 242a258f3a..42e10c7c93 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -35,6 +35,7 @@ #include "qemu/option.h" #include "qemu/cutils.h" #include "qemu/main-loop.h" +#include "qemu/atomic.h" #include "qapi/qapi-visit-sockets.h" #include "qapi/qmp/qstring.h" @@ -44,6 +45,8 @@ #include "block/nbd.h" #include "block/block_int.h" +#include "qemu/yank.h" + #define EN_OPTSTR ":exportname=" #define MAX_NBD_REQUESTS 16 @@ -141,14 +144,13 @@ typedef struct BDRVNBDState { NBDConnectThread *connect_thread; } BDRVNBDState; -static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr, - Error **errp); -static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs, - Error **errp); +static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr, + Error **errp); +static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp); static void nbd_co_establish_connection_cancel(BlockDriverState *bs, bool detach); -static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc, - Error **errp); +static int nbd_client_handshake(BlockDriverState *bs, Error **errp); +static void nbd_yank(void *opaque); static void nbd_clear_bdrvstate(BDRVNBDState *s) { @@ -166,12 +168,12 @@ static void nbd_clear_bdrvstate(BDRVNBDState *s) static void nbd_channel_error(BDRVNBDState *s, int ret) { if (ret == -EIO) { - if (s->state == NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT : NBD_CLIENT_CONNECTING_NOWAIT; } } else { - if (s->state == NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } s->state = NBD_CLIENT_QUIT; @@ -203,7 +205,7 @@ static void reconnect_delay_timer_cb(void *opaque) { BDRVNBDState *s = opaque; - if (s->state == NBD_CLIENT_CONNECTING_WAIT) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { s->state = NBD_CLIENT_CONNECTING_NOWAIT; while (qemu_co_enter_next(&s->free_sema, NULL)) { /* Resume all queued requests */ @@ -215,7 +217,7 @@ static void reconnect_delay_timer_cb(void *opaque) static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns) { - if (s->state != NBD_CLIENT_CONNECTING_WAIT) { + if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) { return; } @@ -260,7 +262,7 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs, * s->connection_co is either yielded from nbd_receive_reply or from * nbd_co_reconnect_loop() */ - if (s->state == NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context); } @@ -286,7 +288,7 @@ static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs) reconnect_delay_timer_del(s); - if (s->state == NBD_CLIENT_CONNECTING_WAIT) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { s->state = NBD_CLIENT_CONNECTING_NOWAIT; qemu_co_queue_restart_all(&s->free_sema); } @@ -337,13 +339,14 @@ static void nbd_teardown_connection(BlockDriverState *bs) static bool nbd_client_connecting(BDRVNBDState *s) { - return s->state == NBD_CLIENT_CONNECTING_WAIT || - s->state == NBD_CLIENT_CONNECTING_NOWAIT; + NBDClientState state = qatomic_load_acquire(&s->state); + return state == NBD_CLIENT_CONNECTING_WAIT || + state == NBD_CLIENT_CONNECTING_NOWAIT; } static bool nbd_client_connecting_wait(BDRVNBDState *s) { - return s->state == NBD_CLIENT_CONNECTING_WAIT; + return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT; } static void connect_bh(void *opaque) @@ -423,12 +426,12 @@ static void *connect_thread_func(void *opaque) return NULL; } -static QIOChannelSocket *coroutine_fn +static int coroutine_fn nbd_co_establish_connection(BlockDriverState *bs, Error **errp) { + int ret; QemuThread thread; BDRVNBDState *s = bs->opaque; - QIOChannelSocket *res; NBDConnectThread *thr = s->connect_thread; qemu_mutex_lock(&thr->mutex); @@ -445,10 +448,12 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp) case CONNECT_THREAD_SUCCESS: /* Previous attempt finally succeeded in background */ thr->state = CONNECT_THREAD_NONE; - res = thr->sioc; + s->sioc = thr->sioc; thr->sioc = NULL; + yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), + nbd_yank, bs); qemu_mutex_unlock(&thr->mutex); - return res; + return 0; case CONNECT_THREAD_RUNNING: /* Already running, will wait */ break; @@ -480,8 +485,13 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp) thr->state = CONNECT_THREAD_NONE; error_propagate(errp, thr->err); thr->err = NULL; - res = thr->sioc; + s->sioc = thr->sioc; thr->sioc = NULL; + if (s->sioc) { + yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), + nbd_yank, bs); + } + ret = (s->sioc ? 0 : -1); break; case CONNECT_THREAD_RUNNING: case CONNECT_THREAD_RUNNING_DETACHED: @@ -490,7 +500,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp) * failed. Still connect thread is executing in background, and its * result may be used for next connection attempt. */ - res = NULL; + ret = -1; error_setg(errp, "Connection attempt cancelled by other operation"); break; @@ -507,7 +517,7 @@ nbd_co_establish_connection(BlockDriverState *bs, Error **errp) qemu_mutex_unlock(&thr->mutex); - return res; + return ret; } /* @@ -560,7 +570,6 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) { int ret; Error *local_err = NULL; - QIOChannelSocket *sioc; if (!nbd_client_connecting(s)) { return; @@ -593,21 +602,22 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) /* Finalize previous connection if any */ if (s->ioc) { qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); + yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), + nbd_yank, s->bs); object_unref(OBJECT(s->sioc)); s->sioc = NULL; object_unref(OBJECT(s->ioc)); s->ioc = NULL; } - sioc = nbd_co_establish_connection(s->bs, &local_err); - if (!sioc) { + if (nbd_co_establish_connection(s->bs, &local_err) < 0) { ret = -ECONNREFUSED; goto out; } bdrv_dec_in_flight(s->bs); - ret = nbd_client_handshake(s->bs, sioc, &local_err); + ret = nbd_client_handshake(s->bs, &local_err); if (s->drained) { s->wait_drained_end = true; @@ -639,7 +649,7 @@ static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s) uint64_t timeout = 1 * NANOSECONDS_PER_SECOND; uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND; - if (s->state == NBD_CLIENT_CONNECTING_WAIT) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + s->reconnect_delay * NANOSECONDS_PER_SECOND); } @@ -682,7 +692,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) int ret = 0; Error *local_err = NULL; - while (s->state != NBD_CLIENT_QUIT) { + while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) { /* * The NBD client can only really be considered idle when it has * yielded from qio_channel_readv_all_eof(), waiting for data. This is @@ -697,7 +707,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) nbd_co_reconnect_loop(s); } - if (s->state != NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { continue; } @@ -752,6 +762,8 @@ static coroutine_fn void nbd_connection_entry(void *opaque) s->connection_co = NULL; if (s->ioc) { qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); + yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), + nbd_yank, s->bs); object_unref(OBJECT(s->sioc)); s->sioc = NULL; object_unref(OBJECT(s->ioc)); @@ -776,7 +788,7 @@ static int nbd_co_send_request(BlockDriverState *bs, qemu_co_queue_wait(&s->free_sema, &s->send_mutex); } - if (s->state != NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { rc = -EIO; goto err; } @@ -803,7 +815,8 @@ static int nbd_co_send_request(BlockDriverState *bs, if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); - if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED && + rc >= 0) { if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov, NULL) < 0) { rc = -EIO; @@ -1128,7 +1141,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk( s->requests[i].receiving = true; qemu_coroutine_yield(); s->requests[i].receiving = false; - if (s->state != NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { error_setg(errp, "Connection closed"); return -EIO; } @@ -1287,7 +1300,7 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, NBDReply local_reply; NBDStructuredReplyChunk *chunk; Error *local_err = NULL; - if (s->state != NBD_CLIENT_CONNECTED) { + if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { error_setg(&local_err, "Connection closed"); nbd_iter_channel_error(iter, -EIO, &local_err); goto break_loop; @@ -1312,7 +1325,8 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, } /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */ - if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) { + if (nbd_reply_is_simple(reply) || + qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { goto break_loop; } @@ -1744,6 +1758,15 @@ static int nbd_client_reopen_prepare(BDRVReopenState *state, return 0; } +static void nbd_yank(void *opaque) +{ + BlockDriverState *bs = opaque; + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; + + qatomic_store_release(&s->state, NBD_CLIENT_QUIT); + qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL); +} + static void nbd_client_close(BlockDriverState *bs) { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; @@ -1756,52 +1779,53 @@ static void nbd_client_close(BlockDriverState *bs) nbd_teardown_connection(bs); } -static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr, - Error **errp) +static int nbd_establish_connection(BlockDriverState *bs, + SocketAddress *saddr, + Error **errp) { ERRP_GUARD(); - QIOChannelSocket *sioc; + BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - sioc = qio_channel_socket_new(); - qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client"); + s->sioc = qio_channel_socket_new(); + qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client"); - qio_channel_socket_connect_sync(sioc, saddr, errp); + qio_channel_socket_connect_sync(s->sioc, saddr, errp); if (*errp) { - object_unref(OBJECT(sioc)); - return NULL; + object_unref(OBJECT(s->sioc)); + s->sioc = NULL; + return -1; } - qio_channel_set_delay(QIO_CHANNEL(sioc), false); + yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs); + qio_channel_set_delay(QIO_CHANNEL(s->sioc), false); - return sioc; + return 0; } -/* nbd_client_handshake takes ownership on sioc. On failure it is unref'ed. */ -static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc, - Error **errp) +/* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */ +static int nbd_client_handshake(BlockDriverState *bs, Error **errp) { BDRVNBDState *s = (BDRVNBDState *)bs->opaque; AioContext *aio_context = bdrv_get_aio_context(bs); int ret; trace_nbd_client_handshake(s->export); - - s->sioc = sioc; - - qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); - qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context); + qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL); + qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context); s->info.request_sizes = true; s->info.structured_reply = true; s->info.base_allocation = true; s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap); s->info.name = g_strdup(s->export ?: ""); - ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds, + ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds, s->hostname, &s->ioc, &s->info, errp); g_free(s->info.x_dirty_bitmap); g_free(s->info.name); if (ret < 0) { - object_unref(OBJECT(sioc)); + yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), + nbd_yank, bs); + object_unref(OBJECT(s->sioc)); s->sioc = NULL; return ret; } @@ -1834,7 +1858,7 @@ static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc, } if (!s->ioc) { - s->ioc = QIO_CHANNEL(sioc); + s->ioc = QIO_CHANNEL(s->sioc); object_ref(OBJECT(s->ioc)); } @@ -1850,9 +1874,11 @@ static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc, { NBDRequest request = { .type = NBD_CMD_DISC }; - nbd_send_request(s->ioc ?: QIO_CHANNEL(sioc), &request); + nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request); - object_unref(OBJECT(sioc)); + yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), + nbd_yank, bs); + object_unref(OBJECT(s->sioc)); s->sioc = NULL; return ret; @@ -2244,7 +2270,6 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, { int ret; BDRVNBDState *s = (BDRVNBDState *)bs->opaque; - QIOChannelSocket *sioc; ret = nbd_process_options(bs, options, errp); if (ret < 0) { @@ -2255,17 +2280,22 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags, qemu_co_mutex_init(&s->send_mutex); qemu_co_queue_init(&s->free_sema); + if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) { + return -EEXIST; + } + /* * establish TCP connection, return error if it fails * TODO: Configurable retry-until-timeout behaviour. */ - sioc = nbd_establish_connection(s->saddr, errp); - if (!sioc) { + if (nbd_establish_connection(bs, s->saddr, errp) < 0) { + yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); return -ECONNREFUSED; } - ret = nbd_client_handshake(bs, sioc, errp); + ret = nbd_client_handshake(bs, errp); if (ret < 0) { + yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); nbd_clear_bdrvstate(s); return ret; } @@ -2325,6 +2355,7 @@ static void nbd_close(BlockDriverState *bs) BDRVNBDState *s = bs->opaque; nbd_client_close(bs); + yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); nbd_clear_bdrvstate(s); } diff --git a/chardev/char-socket.c b/chardev/char-socket.c index 213a4c8dd0..8a707d766c 100644 --- a/chardev/char-socket.c +++ b/chardev/char-socket.c @@ -34,6 +34,7 @@ #include "qapi/error.h" #include "qapi/clone-visitor.h" #include "qapi/qapi-visit-sockets.h" +#include "qemu/yank.h" #include "chardev/char-io.h" #include "qom/object.h" @@ -70,6 +71,7 @@ struct SocketChardev { size_t read_msgfds_num; int *write_msgfds; size_t write_msgfds_num; + bool registered_yank; SocketAddress *addr; bool is_listen; @@ -415,6 +417,12 @@ static void tcp_chr_free_connection(Chardev *chr) tcp_set_msgfds(chr, NULL, 0); remove_fd_in_watch(chr); + if (s->state == TCP_CHARDEV_STATE_CONNECTING + || s->state == TCP_CHARDEV_STATE_CONNECTED) { + yank_unregister_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(s->sioc)); + } object_unref(OBJECT(s->sioc)); s->sioc = NULL; object_unref(OBJECT(s->ioc)); @@ -932,6 +940,9 @@ static int tcp_chr_add_client(Chardev *chr, int fd) } tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING); tcp_chr_set_client_ioc_name(chr, sioc); + yank_register_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(sioc)); ret = tcp_chr_new_client(chr, sioc); object_unref(OBJECT(sioc)); return ret; @@ -946,6 +957,9 @@ static void tcp_chr_accept(QIONetListener *listener, tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING); tcp_chr_set_client_ioc_name(chr, cioc); + yank_register_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(cioc)); tcp_chr_new_client(chr, cioc); } @@ -961,6 +975,9 @@ static int tcp_chr_connect_client_sync(Chardev *chr, Error **errp) object_unref(OBJECT(sioc)); return -1; } + yank_register_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(sioc)); tcp_chr_new_client(chr, sioc); object_unref(OBJECT(sioc)); return 0; @@ -976,6 +993,9 @@ static void tcp_chr_accept_server_sync(Chardev *chr) tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING); sioc = qio_net_listener_wait_client(s->listener); tcp_chr_set_client_ioc_name(chr, sioc); + yank_register_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(sioc)); tcp_chr_new_client(chr, sioc); object_unref(OBJECT(sioc)); } @@ -1086,6 +1106,9 @@ static void char_socket_finalize(Object *obj) object_unref(OBJECT(s->tls_creds)); } g_free(s->tls_authz); + if (s->registered_yank) { + yank_unregister_instance(CHARDEV_YANK_INSTANCE(chr->label)); + } qemu_chr_be_event(chr, CHR_EVENT_CLOSED); } @@ -1101,6 +1124,9 @@ static void qemu_chr_socket_connected(QIOTask *task, void *opaque) if (qio_task_propagate_error(task, &err)) { tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED); + yank_unregister_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(sioc)); check_report_connect_error(chr, err); goto cleanup; } @@ -1134,6 +1160,9 @@ static void tcp_chr_connect_client_async(Chardev *chr) tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING); sioc = qio_channel_socket_new(); tcp_chr_set_client_ioc_name(chr, sioc); + yank_register_function(CHARDEV_YANK_INSTANCE(chr->label), + yank_generic_iochannel, + QIO_CHANNEL(sioc)); /* * Normally code would use the qio_channel_socket_connect_async * method which uses a QIOTask + qio_task_set_error internally @@ -1376,6 +1405,11 @@ static void qmp_chardev_open_socket(Chardev *chr, qemu_chr_set_feature(chr, QEMU_CHAR_FEATURE_FD_PASS); } + if (!yank_register_instance(CHARDEV_YANK_INSTANCE(chr->label), errp)) { + return; + } + s->registered_yank = true; + /* be isn't opened until we get a connection */ *be_opened = false; diff --git a/include/io/channel.h b/include/io/channel.h index 4d6fe45f63..ab9ea77959 100644 --- a/include/io/channel.h +++ b/include/io/channel.h @@ -92,7 +92,8 @@ struct QIOChannel { * provide additional optional features. * * Consult the corresponding public API docs for a description - * of the semantics of each callback + * of the semantics of each callback. io_shutdown in particular + * must be thread-safe, terminate quickly and must not block. */ struct QIOChannelClass { ObjectClass parent; @@ -510,6 +511,8 @@ int qio_channel_close(QIOChannel *ioc, * QIO_CHANNEL_FEATURE_SHUTDOWN prior to calling * this method. * + * This function is thread-safe, terminates quickly and does not block. + * * Returns: 0 on success, -1 on error */ int qio_channel_shutdown(QIOChannel *ioc, diff --git a/include/qemu/yank.h b/include/qemu/yank.h new file mode 100644 index 0000000000..5b93c70cbf --- /dev/null +++ b/include/qemu/yank.h @@ -0,0 +1,97 @@ +/* + * QEMU yank feature + * + * Copyright (c) Lukas Straub <lukasstraub2@web.de> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef YANK_H +#define YANK_H + +#include "qapi/qapi-types-yank.h" + +typedef void (YankFn)(void *opaque); + +/** + * yank_register_instance: Register a new instance. + * + * This registers a new instance for yanking. Must be called before any yank + * function is registered for this instance. + * + * This function is thread-safe. + * + * @instance: The instance. + * @errp: Error object. + * + * Returns true on success or false if an error occured. + */ +bool yank_register_instance(const YankInstance *instance, Error **errp); + +/** + * yank_unregister_instance: Unregister a instance. + * + * This unregisters a instance. Must be called only after every yank function + * of the instance has been unregistered. + * + * This function is thread-safe. + * + * @instance: The instance. + */ +void yank_unregister_instance(const YankInstance *instance); + +/** + * yank_register_function: Register a yank function + * + * This registers a yank function. All limitations of qmp oob commands apply + * to the yank function as well. See docs/devel/qapi-code-gen.txt under + * "An OOB-capable command handler must satisfy the following conditions". + * + * This function is thread-safe. + * + * @instance: The instance. + * @func: The yank function. + * @opaque: Will be passed to the yank function. + */ +void yank_register_function(const YankInstance *instance, + YankFn *func, + void *opaque); + +/** + * yank_unregister_function: Unregister a yank function + * + * This unregisters a yank function. + * + * This function is thread-safe. + * + * @instance: The instance. + * @func: func that was passed to yank_register_function. + * @opaque: opaque that was passed to yank_register_function. + */ +void yank_unregister_function(const YankInstance *instance, + YankFn *func, + void *opaque); + +/** + * yank_generic_iochannel: Generic yank function for iochannel + * + * This is a generic yank function which will call qio_channel_shutdown on the + * provided QIOChannel. + * + * @opaque: QIOChannel to shutdown + */ +void yank_generic_iochannel(void *opaque); + +#define BLOCKDEV_YANK_INSTANCE(the_node_name) (&(YankInstance) { \ + .type = YANK_INSTANCE_TYPE_BLOCK_NODE, \ + .u.block_node.node_name = (the_node_name) }) + +#define CHARDEV_YANK_INSTANCE(the_id) (&(YankInstance) { \ + .type = YANK_INSTANCE_TYPE_CHARDEV, \ + .u.chardev.id = (the_id) }) + +#define MIGRATION_YANK_INSTANCE (&(YankInstance) { \ + .type = YANK_INSTANCE_TYPE_MIGRATION }) + +#endif diff --git a/io/channel-tls.c b/io/channel-tls.c index 388f019977..2ae1b92fc0 100644 --- a/io/channel-tls.c +++ b/io/channel-tls.c @@ -23,6 +23,7 @@ #include "qemu/module.h" #include "io/channel-tls.h" #include "trace.h" +#include "qemu/atomic.h" static ssize_t qio_channel_tls_write_handler(const char *buf, @@ -277,7 +278,8 @@ static ssize_t qio_channel_tls_readv(QIOChannel *ioc, return QIO_CHANNEL_ERR_BLOCK; } } else if (errno == ECONNABORTED && - (tioc->shutdown & QIO_CHANNEL_SHUTDOWN_READ)) { + (qatomic_load_acquire(&tioc->shutdown) & + QIO_CHANNEL_SHUTDOWN_READ)) { return 0; } @@ -361,7 +363,7 @@ static int qio_channel_tls_shutdown(QIOChannel *ioc, { QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc); - tioc->shutdown |= how; + qatomic_or(&tioc->shutdown, how); return qio_channel_shutdown(tioc->master, how, errp); } diff --git a/migration/channel.c b/migration/channel.c index 8a783baa0b..35fe234e9c 100644 --- a/migration/channel.c +++ b/migration/channel.c @@ -18,6 +18,8 @@ #include "trace.h" #include "qapi/error.h" #include "io/channel-tls.h" +#include "io/channel-socket.h" +#include "qemu/yank.h" /** * @migration_channel_process_incoming - Create new incoming migration channel @@ -35,6 +37,11 @@ void migration_channel_process_incoming(QIOChannel *ioc) trace_migration_set_incoming_channel( ioc, object_get_typename(OBJECT(ioc))); + if (object_dynamic_cast(OBJECT(ioc), TYPE_QIO_CHANNEL_SOCKET)) { + yank_register_function(MIGRATION_YANK_INSTANCE, yank_generic_iochannel, + QIO_CHANNEL(ioc)); + } + if (s->parameters.tls_creds && *s->parameters.tls_creds && !object_dynamic_cast(OBJECT(ioc), @@ -67,6 +74,12 @@ void migration_channel_connect(MigrationState *s, ioc, object_get_typename(OBJECT(ioc)), hostname, error); if (!error) { + if (object_dynamic_cast(OBJECT(ioc), TYPE_QIO_CHANNEL_SOCKET)) { + yank_register_function(MIGRATION_YANK_INSTANCE, + yank_generic_iochannel, + QIO_CHANNEL(ioc)); + } + if (s->parameters.tls_creds && *s->parameters.tls_creds && !object_dynamic_cast(OBJECT(ioc), diff --git a/migration/migration.c b/migration/migration.c index a5da718baa..d5136419bf 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -57,6 +57,7 @@ #include "net/announce.h" #include "qemu/queue.h" #include "multifd.h" +#include "qemu/yank.h" #ifdef CONFIG_VFIO #include "hw/vfio/vfio-common.h" @@ -255,6 +256,8 @@ void migration_incoming_state_destroy(void) qapi_free_SocketAddressList(mis->socket_address_list); mis->socket_address_list = NULL; } + + yank_unregister_instance(MIGRATION_YANK_INSTANCE); } static void migrate_generate_event(int new_state) @@ -416,6 +419,10 @@ static void qemu_start_incoming_migration(const char *uri, Error **errp) { const char *p = NULL; + if (!yank_register_instance(MIGRATION_YANK_INSTANCE, errp)) { + return; + } + qapi_event_send_migration(MIGRATION_STATUS_SETUP); if (strstart(uri, "tcp:", &p) || strstart(uri, "unix:", NULL) || @@ -430,6 +437,7 @@ static void qemu_start_incoming_migration(const char *uri, Error **errp) } else if (strstart(uri, "fd:", &p)) { fd_start_incoming_migration(p, errp); } else { + yank_unregister_instance(MIGRATION_YANK_INSTANCE); error_setg(errp, "unknown migration protocol: %s", uri); } } @@ -1731,6 +1739,7 @@ static void migrate_fd_cleanup(MigrationState *s) } notifier_list_notify(&migration_state_notifiers, s); block_cleanup_parameters(s); + yank_unregister_instance(MIGRATION_YANK_INSTANCE); } static void migrate_fd_cleanup_schedule(MigrationState *s) @@ -2005,6 +2014,7 @@ void qmp_migrate_recover(const char *uri, Error **errp) * only re-setup the migration stream and poke existing migration * to continue using that newly established channel. */ + yank_unregister_instance(MIGRATION_YANK_INSTANCE); qemu_start_incoming_migration(uri, errp); } @@ -2148,6 +2158,12 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, return; } + if (!(has_resume && resume)) { + if (!yank_register_instance(MIGRATION_YANK_INSTANCE, errp)) { + return; + } + } + if (strstart(uri, "tcp:", &p) || strstart(uri, "unix:", NULL) || strstart(uri, "vsock:", NULL)) { @@ -2161,6 +2177,9 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, } else if (strstart(uri, "fd:", &p)) { fd_start_outgoing_migration(s, p, &local_err); } else { + if (!(has_resume && resume)) { + yank_unregister_instance(MIGRATION_YANK_INSTANCE); + } error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "uri", "a valid migration protocol"); migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, @@ -2170,6 +2189,9 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, } if (local_err) { + if (!(has_resume && resume)) { + yank_unregister_instance(MIGRATION_YANK_INSTANCE); + } migrate_fd_error(s, local_err); error_propagate(errp, local_err); return; diff --git a/migration/multifd.c b/migration/multifd.c index 45c690aa11..1a1e589064 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -25,6 +25,9 @@ #include "trace.h" #include "multifd.h" +#include "qemu/yank.h" +#include "io/channel-socket.h" + /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -974,6 +977,13 @@ int multifd_load_cleanup(Error **errp) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; + if (object_dynamic_cast(OBJECT(p->c), TYPE_QIO_CHANNEL_SOCKET) + && OBJECT(p->c)->ref == 1) { + yank_unregister_function(MIGRATION_YANK_INSTANCE, + yank_generic_iochannel, + QIO_CHANNEL(p->c)); + } + object_unref(OBJECT(p->c)); p->c = NULL; qemu_mutex_destroy(&p->mutex); diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c index d2ce32f4b9..afc3a7f642 100644 --- a/migration/qemu-file-channel.c +++ b/migration/qemu-file-channel.c @@ -27,6 +27,7 @@ #include "qemu-file.h" #include "io/channel-socket.h" #include "qemu/iov.h" +#include "qemu/yank.h" static ssize_t channel_writev_buffer(void *opaque, @@ -104,6 +105,12 @@ static int channel_close(void *opaque, Error **errp) int ret; QIOChannel *ioc = QIO_CHANNEL(opaque); ret = qio_channel_close(ioc, errp); + if (object_dynamic_cast(OBJECT(ioc), TYPE_QIO_CHANNEL_SOCKET) + && OBJECT(ioc)->ref == 1) { + yank_unregister_function(MIGRATION_YANK_INSTANCE, + yank_generic_iochannel, + QIO_CHANNEL(ioc)); + } object_unref(OBJECT(ioc)); return ret; } diff --git a/migration/savevm.c b/migration/savevm.c index 27e842812e..4f3b69ecfc 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -62,6 +62,7 @@ #include "migration/colo.h" #include "qemu/bitmap.h" #include "net/announce.h" +#include "qemu/yank.h" const unsigned int postcopy_ram_discard_version; @@ -3006,6 +3007,10 @@ int load_snapshot(const char *name, Error **errp) qemu_system_reset(SHUTDOWN_CAUSE_NONE); mis->from_src_file = f; + if (!yank_register_instance(MIGRATION_YANK_INSTANCE, errp)) { + ret = -EINVAL; + goto err_drain; + } aio_context_acquire(aio_context); ret = qemu_loadvm_state(f); migration_incoming_state_destroy(); diff --git a/qapi/meson.build b/qapi/meson.build index 0e98146f1f..ab68e7900e 100644 --- a/qapi/meson.build +++ b/qapi/meson.build @@ -47,6 +47,7 @@ qapi_all_modules = [ 'trace', 'transaction', 'ui', + 'yank', ] qapi_storage_daemon_modules = [ diff --git a/qapi/qapi-schema.json b/qapi/qapi-schema.json index 0b444b76d2..3441c9a9ae 100644 --- a/qapi/qapi-schema.json +++ b/qapi/qapi-schema.json @@ -86,6 +86,7 @@ { 'include': 'machine.json' } { 'include': 'machine-target.json' } { 'include': 'replay.json' } +{ 'include': 'yank.json' } { 'include': 'misc.json' } { 'include': 'misc-target.json' } { 'include': 'audio.json' } diff --git a/qapi/yank.json b/qapi/yank.json new file mode 100644 index 0000000000..167a775594 --- /dev/null +++ b/qapi/yank.json @@ -0,0 +1,119 @@ +# -*- Mode: Python -*- +# vim: filetype=python +# + +## +# = Yank feature +## + +## +# @YankInstanceType: +# +# An enumeration of yank instance types. See @YankInstance for more +# information. +# +# Since: 6.0 +## +{ 'enum': 'YankInstanceType', + 'data': [ 'block-node', 'chardev', 'migration' ] } + +## +# @YankInstanceBlockNode: +# +# Specifies which block graph node to yank. See @YankInstance for more +# information. +# +# @node-name: the name of the block graph node +# +# Since: 6.0 +## +{ 'struct': 'YankInstanceBlockNode', + 'data': { 'node-name': 'str' } } + +## +# @YankInstanceChardev: +# +# Specifies which character device to yank. See @YankInstance for more +# information. +# +# @id: the chardev's ID +# +# Since: 6.0 +## +{ 'struct': 'YankInstanceChardev', + 'data': { 'id': 'str' } } + +## +# @YankInstance: +# +# A yank instance can be yanked with the @yank qmp command to recover from a +# hanging QEMU. +# +# Currently implemented yank instances: +# - nbd block device: +# Yanking it will shut down the connection to the nbd server without +# attempting to reconnect. +# - socket chardev: +# Yanking it will shut down the connected socket. +# - migration: +# Yanking it will shut down all migration connections. Unlike +# @migrate_cancel, it will not notify the migration process, so migration +# will go into @failed state, instead of @cancelled state. @yank should be +# used to recover from hangs. +# +# Since: 6.0 +## +{ 'union': 'YankInstance', + 'base': { 'type': 'YankInstanceType' }, + 'discriminator': 'type', + 'data': { + 'block-node': 'YankInstanceBlockNode', + 'chardev': 'YankInstanceChardev' } } + +## +# @yank: +# +# Try to recover from hanging QEMU by yanking the specified instances. See +# @YankInstance for more information. +# +# Takes a list of @YankInstance as argument. +# +# Returns: - Nothing on success +# - @DeviceNotFound error, if any of the YankInstances doesn't exist +# +# Example: +# +# -> { "execute": "yank", +# "arguments": { +# "instances": [ +# { "type": "block-node", +# "node-name": "nbd0" } +# ] } } +# <- { "return": {} } +# +# Since: 6.0 +## +{ 'command': 'yank', + 'data': { 'instances': ['YankInstance'] }, + 'allow-oob': true } + +## +# @query-yank: +# +# Query yank instances. See @YankInstance for more information. +# +# Returns: list of @YankInstance +# +# Example: +# +# -> { "execute": "query-yank" } +# <- { "return": [ +# { "type": "block-node", +# "node-name": "nbd0" } +# ] } +# +# Since: 6.0 +## +{ 'command': 'query-yank', + 'returns': ['YankInstance'], + 'allow-oob': true } diff --git a/tests/test-char.c b/tests/test-char.c index 06102977b6..469d25989c 100644 --- a/tests/test-char.c +++ b/tests/test-char.c @@ -937,6 +937,7 @@ static void char_socket_client_dupid_test(gconstpointer opaque) g_assert_nonnull(opts); chr1 = qemu_chr_new_from_opts(opts, NULL, &error_abort); g_assert_nonnull(chr1); + qemu_chr_wait_connected(chr1, &error_abort); chr2 = qemu_chr_new_from_opts(opts, NULL, &local_err); g_assert_null(chr2); diff --git a/util/meson.build b/util/meson.build index a3dfc0f966..540a605b78 100644 --- a/util/meson.build +++ b/util/meson.build @@ -50,6 +50,7 @@ endif if have_system util_ss.add(when: 'CONFIG_GIO', if_true: [files('dbus.c'), gio]) + util_ss.add(files('yank.c')) endif if have_block diff --git a/util/yank.c b/util/yank.c new file mode 100644 index 0000000000..fc08f65209 --- /dev/null +++ b/util/yank.c @@ -0,0 +1,207 @@ +/* + * QEMU yank feature + * + * Copyright (c) Lukas Straub <lukasstraub2@web.de> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qapi/error.h" +#include "qemu/thread.h" +#include "qemu/queue.h" +#include "qemu/lockable.h" +#include "qapi/qapi-commands-yank.h" +#include "qapi/qapi-visit-yank.h" +#include "qapi/clone-visitor.h" +#include "io/channel.h" +#include "qemu/yank.h" + +struct YankFuncAndParam { + YankFn *func; + void *opaque; + QLIST_ENTRY(YankFuncAndParam) next; +}; + +struct YankInstanceEntry { + YankInstance *instance; + QLIST_HEAD(, YankFuncAndParam) yankfns; + QLIST_ENTRY(YankInstanceEntry) next; +}; + +typedef struct YankFuncAndParam YankFuncAndParam; +typedef struct YankInstanceEntry YankInstanceEntry; + +/* + * This lock protects the yank_instance_list below. Because it's taken by + * OOB-capable commands, it must be "fast", i.e. it may only be held for a + * bounded, short time. See docs/devel/qapi-code-gen.txt for additional + * information. + */ +static QemuMutex yank_lock; + +static QLIST_HEAD(, YankInstanceEntry) yank_instance_list + = QLIST_HEAD_INITIALIZER(yank_instance_list); + +static bool yank_instance_equal(const YankInstance *a, const YankInstance *b) +{ + if (a->type != b->type) { + return false; + } + + switch (a->type) { + case YANK_INSTANCE_TYPE_BLOCK_NODE: + return g_str_equal(a->u.block_node.node_name, + b->u.block_node.node_name); + + case YANK_INSTANCE_TYPE_CHARDEV: + return g_str_equal(a->u.chardev.id, b->u.chardev.id); + + case YANK_INSTANCE_TYPE_MIGRATION: + return true; + + default: + abort(); + } +} + +static YankInstanceEntry *yank_find_entry(const YankInstance *instance) +{ + YankInstanceEntry *entry; + + QLIST_FOREACH(entry, &yank_instance_list, next) { + if (yank_instance_equal(entry->instance, instance)) { + return entry; + } + } + return NULL; +} + +bool yank_register_instance(const YankInstance *instance, Error **errp) +{ + YankInstanceEntry *entry; + + QEMU_LOCK_GUARD(&yank_lock); + + if (yank_find_entry(instance)) { + error_setg(errp, "duplicate yank instance"); + return false; + } + + entry = g_new0(YankInstanceEntry, 1); + entry->instance = QAPI_CLONE(YankInstance, instance); + QLIST_INIT(&entry->yankfns); + QLIST_INSERT_HEAD(&yank_instance_list, entry, next); + + return true; +} + +void yank_unregister_instance(const YankInstance *instance) +{ + YankInstanceEntry *entry; + + QEMU_LOCK_GUARD(&yank_lock); + entry = yank_find_entry(instance); + assert(entry); + + assert(QLIST_EMPTY(&entry->yankfns)); + QLIST_REMOVE(entry, next); + qapi_free_YankInstance(entry->instance); + g_free(entry); +} + +void yank_register_function(const YankInstance *instance, + YankFn *func, + void *opaque) +{ + YankInstanceEntry *entry; + YankFuncAndParam *func_entry; + + QEMU_LOCK_GUARD(&yank_lock); + entry = yank_find_entry(instance); + assert(entry); + + func_entry = g_new0(YankFuncAndParam, 1); + func_entry->func = func; + func_entry->opaque = opaque; + + QLIST_INSERT_HEAD(&entry->yankfns, func_entry, next); +} + +void yank_unregister_function(const YankInstance *instance, + YankFn *func, + void *opaque) +{ + YankInstanceEntry *entry; + YankFuncAndParam *func_entry; + + QEMU_LOCK_GUARD(&yank_lock); + entry = yank_find_entry(instance); + assert(entry); + + QLIST_FOREACH(func_entry, &entry->yankfns, next) { + if (func_entry->func == func && func_entry->opaque == opaque) { + QLIST_REMOVE(func_entry, next); + g_free(func_entry); + return; + } + } + + abort(); +} + +void yank_generic_iochannel(void *opaque) +{ + QIOChannel *ioc = QIO_CHANNEL(opaque); + + qio_channel_shutdown(ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); +} + +void qmp_yank(YankInstanceList *instances, + Error **errp) +{ + YankInstanceList *tail; + YankInstanceEntry *entry; + YankFuncAndParam *func_entry; + + QEMU_LOCK_GUARD(&yank_lock); + for (tail = instances; tail; tail = tail->next) { + entry = yank_find_entry(tail->value); + if (!entry) { + error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND, "Instance not found"); + return; + } + } + for (tail = instances; tail; tail = tail->next) { + entry = yank_find_entry(tail->value); + assert(entry); + QLIST_FOREACH(func_entry, &entry->yankfns, next) { + func_entry->func(func_entry->opaque); + } + } +} + +YankInstanceList *qmp_query_yank(Error **errp) +{ + YankInstanceEntry *entry; + YankInstanceList *ret; + + ret = NULL; + + QEMU_LOCK_GUARD(&yank_lock); + QLIST_FOREACH(entry, &yank_instance_list, next) { + YankInstanceList *new_entry; + new_entry = g_new0(YankInstanceList, 1); + new_entry->value = QAPI_CLONE(YankInstance, entry->instance); + new_entry->next = ret; + ret = new_entry; + } + + return ret; +} + +static void __attribute__((__constructor__)) yank_init(void) +{ + qemu_mutex_init(&yank_lock); +} |