diff options
Diffstat (limited to 'migration')
-rw-r--r-- | migration/colo.c | 2 | ||||
-rw-r--r-- | migration/migration.c | 49 | ||||
-rw-r--r-- | migration/migration.h | 2 | ||||
-rw-r--r-- | migration/postcopy-ram.c | 2 | ||||
-rw-r--r-- | migration/qemu-file-channel.c | 12 | ||||
-rw-r--r-- | migration/qemu-file.c | 8 | ||||
-rw-r--r-- | migration/ram.c | 202 | ||||
-rw-r--r-- | migration/rdma.c | 423 | ||||
-rw-r--r-- | migration/savevm.c | 3 | ||||
-rw-r--r-- | migration/vmstate.c | 6 |
10 files changed, 605 insertions, 104 deletions
diff --git a/migration/colo.c b/migration/colo.c index 4381067ed4..88936f5962 100644 --- a/migration/colo.c +++ b/migration/colo.c @@ -534,6 +534,7 @@ void *colo_process_incoming_thread(void *opaque) uint64_t value; Error *local_err = NULL; + rcu_register_thread(); qemu_sem_init(&mis->colo_incoming_sem, 0); migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, @@ -666,5 +667,6 @@ out: } migration_incoming_exit_colo(); + rcu_unregister_thread(); return NULL; } diff --git a/migration/migration.c b/migration/migration.c index b7d9854bda..4b316ec343 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -71,6 +71,7 @@ /* Define default autoconverge cpu throttle migration parameters */ #define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20 #define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10 +#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99 /* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024) @@ -389,6 +390,7 @@ static void process_incoming_migration_co(void *opaque) int ret; assert(mis->from_src_file); + mis->migration_incoming_co = qemu_coroutine_self(); mis->largest_page_size = qemu_ram_pagesize_largest(); postcopy_state_set(POSTCOPY_INCOMING_NONE); migrate_set_state(&mis->state, MIGRATION_STATUS_NONE, @@ -418,7 +420,6 @@ static void process_incoming_migration_co(void *opaque) /* we get COLO info, and know if we are in COLO mode */ if (!ret && migration_incoming_enable_colo()) { - mis->migration_incoming_co = qemu_coroutine_self(); qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming", colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE); mis->have_colo_incoming_thread = true; @@ -442,6 +443,7 @@ static void process_incoming_migration_co(void *opaque) } mis->bh = qemu_bh_new(process_incoming_migration_bh, mis); qemu_bh_schedule(mis->bh); + mis->migration_incoming_co = NULL; } static void migration_incoming_setup(QEMUFile *f) @@ -671,6 +673,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp) params->compress_level = s->parameters.compress_level; params->has_compress_threads = true; params->compress_threads = s->parameters.compress_threads; + params->has_compress_wait_thread = true; + params->compress_wait_thread = s->parameters.compress_wait_thread; params->has_decompress_threads = true; params->decompress_threads = s->parameters.decompress_threads; params->has_cpu_throttle_initial = true; @@ -697,6 +701,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp) params->xbzrle_cache_size = s->parameters.xbzrle_cache_size; params->has_max_postcopy_bandwidth = true; params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth; + params->has_max_cpu_throttle = true; + params->max_cpu_throttle = s->parameters.max_cpu_throttle; return params; } @@ -1043,6 +1049,15 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp) return false; } + if (params->has_max_cpu_throttle && + (params->max_cpu_throttle < params->cpu_throttle_initial || + params->max_cpu_throttle > 99)) { + error_setg(errp, QERR_INVALID_PARAMETER_VALUE, + "max_cpu_throttle", + "an integer in the range of cpu_throttle_initial to 99"); + return false; + } + return true; } @@ -1061,6 +1076,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params, dest->compress_threads = params->compress_threads; } + if (params->has_compress_wait_thread) { + dest->compress_wait_thread = params->compress_wait_thread; + } + if (params->has_decompress_threads) { dest->decompress_threads = params->decompress_threads; } @@ -1110,6 +1129,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params, if (params->has_max_postcopy_bandwidth) { dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth; } + if (params->has_max_cpu_throttle) { + dest->max_cpu_throttle = params->max_cpu_throttle; + } } static void migrate_params_apply(MigrateSetParameters *params, Error **errp) @@ -1126,6 +1148,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) s->parameters.compress_threads = params->compress_threads; } + if (params->has_compress_wait_thread) { + s->parameters.compress_wait_thread = params->compress_wait_thread; + } + if (params->has_decompress_threads) { s->parameters.decompress_threads = params->decompress_threads; } @@ -1185,6 +1211,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) if (params->has_max_postcopy_bandwidth) { s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth; } + if (params->has_max_cpu_throttle) { + s->parameters.max_cpu_throttle = params->max_cpu_throttle; + } } void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp) @@ -1871,6 +1900,15 @@ int migrate_compress_threads(void) return s->parameters.compress_threads; } +int migrate_compress_wait_thread(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters.compress_wait_thread; +} + int migrate_decompress_threads(void) { MigrationState *s; @@ -1962,7 +2000,6 @@ static int64_t migrate_max_postcopy_bandwidth(void) return s->parameters.max_postcopy_bandwidth; } - bool migrate_use_block(void) { MigrationState *s; @@ -2104,6 +2141,7 @@ static void *source_return_path_thread(void *opaque) int res; trace_source_return_path_thread_entry(); + rcu_register_thread(); retry: while (!ms->rp_state.error && !qemu_file_get_error(rp) && @@ -2243,6 +2281,7 @@ out: trace_source_return_path_thread_end(); ms->rp_state.from_dst_file = NULL; qemu_fclose(rp); + rcu_unregister_thread(); return NULL; } @@ -3131,6 +3170,8 @@ static Property migration_properties[] = { DEFINE_PROP_UINT8("x-compress-threads", MigrationState, parameters.compress_threads, DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT), + DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState, + parameters.compress_wait_thread, true), DEFINE_PROP_UINT8("x-decompress-threads", MigrationState, parameters.decompress_threads, DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT), @@ -3160,6 +3201,9 @@ static Property migration_properties[] = { DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState, parameters.max_postcopy_bandwidth, DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH), + DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState, + parameters.max_cpu_throttle, + DEFAULT_MIGRATE_MAX_CPU_THROTTLE), /* Migration capabilities */ DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE), @@ -3230,6 +3274,7 @@ static void migration_instance_init(Object *obj) params->has_x_multifd_page_count = true; params->has_xbzrle_cache_size = true; params->has_max_postcopy_bandwidth = true; + params->has_max_cpu_throttle = true; qemu_sem_init(&ms->postcopy_pause_sem, 0); qemu_sem_init(&ms->postcopy_pause_rp_sem, 0); diff --git a/migration/migration.h b/migration/migration.h index 64a7b33735..f7813f8261 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -266,11 +266,13 @@ bool migrate_colo_enabled(void); bool migrate_use_block(void); bool migrate_use_block_incremental(void); +int migrate_max_cpu_throttle(void); bool migrate_use_return_path(void); bool migrate_use_compression(void); int migrate_compress_level(void); int migrate_compress_threads(void); +int migrate_compress_wait_thread(void); int migrate_decompress_threads(void); bool migrate_use_events(void); bool migrate_postcopy_blocktime(void); diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index c2e387ed44..853d8b32ca 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -867,6 +867,7 @@ static void *postcopy_ram_fault_thread(void *opaque) RAMBlock *rb = NULL; trace_postcopy_ram_fault_thread_entry(); + rcu_register_thread(); mis->last_rb = NULL; /* last RAMBlock we sent part of */ qemu_sem_post(&mis->fault_thread_sem); @@ -1073,6 +1074,7 @@ retry: } } } + rcu_unregister_thread(); trace_postcopy_ram_fault_thread_exit(); g_free(pfd); return NULL; diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c index e202d73834..8e639eb496 100644 --- a/migration/qemu-file-channel.c +++ b/migration/qemu-file-channel.c @@ -49,7 +49,11 @@ static ssize_t channel_writev_buffer(void *opaque, ssize_t len; len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL); if (len == QIO_CHANNEL_ERR_BLOCK) { - qio_channel_wait(ioc, G_IO_OUT); + if (qemu_in_coroutine()) { + qio_channel_yield(ioc, G_IO_OUT); + } else { + qio_channel_wait(ioc, G_IO_OUT); + } continue; } if (len < 0) { @@ -80,7 +84,11 @@ static ssize_t channel_get_buffer(void *opaque, ret = qio_channel_read(ioc, (char *)buf, size, NULL); if (ret < 0) { if (ret == QIO_CHANNEL_ERR_BLOCK) { - qio_channel_yield(ioc, G_IO_IN); + if (qemu_in_coroutine()) { + qio_channel_yield(ioc, G_IO_IN); + } else { + qio_channel_wait(ioc, G_IO_IN); + } } else { /* XXX handle Error * object */ return -EIO; diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 0463f4c321..977b9ae07c 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -253,8 +253,12 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset, if (f->hooks && f->hooks->save_page) { int ret = f->hooks->save_page(f, f->opaque, block_offset, offset, size, bytes_sent); - f->bytes_xfer += size; - if (ret != RAM_SAVE_CONTROL_DELAYED) { + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { + f->bytes_xfer += size; + } + + if (ret != RAM_SAVE_CONTROL_DELAYED && + ret != RAM_SAVE_CONTROL_NOT_SUPP) { if (bytes_sent && *bytes_sent > 0) { qemu_update_position(f, *bytes_sent); } else if (ret < 0) { diff --git a/migration/ram.c b/migration/ram.c index fa79d0a5b9..79c89425a3 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -341,6 +341,7 @@ typedef struct PageSearchStatus PageSearchStatus; struct CompressParam { bool done; bool quit; + bool zero_page; QEMUFile *file; QemuMutex mutex; QemuCond cond; @@ -382,14 +383,15 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); +static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, + ram_addr_t offset, uint8_t *source_buf); static void *do_data_compress(void *opaque) { CompressParam *param = opaque; RAMBlock *block; ram_addr_t offset; + bool zero_page; qemu_mutex_lock(¶m->mutex); while (!param->quit) { @@ -399,11 +401,12 @@ static void *do_data_compress(void *opaque) param->block = NULL; qemu_mutex_unlock(¶m->mutex); - do_compress_ram_page(param->file, ¶m->stream, block, offset, - param->originbuf); + zero_page = do_compress_ram_page(param->file, ¶m->stream, + block, offset, param->originbuf); qemu_mutex_lock(&comp_done_lock); param->done = true; + param->zero_page = zero_page; qemu_cond_signal(&comp_done_cond); qemu_mutex_unlock(&comp_done_lock); @@ -989,6 +992,7 @@ static void *multifd_send_thread(void *opaque) int ret; trace_multifd_send_thread_start(p->id); + rcu_register_thread(); if (multifd_send_initial_packet(p, &local_err) < 0) { goto out; @@ -1051,6 +1055,7 @@ out: p->running = false; qemu_mutex_unlock(&p->mutex); + rcu_unregister_thread(); trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); return NULL; @@ -1220,6 +1225,7 @@ static void *multifd_recv_thread(void *opaque) int ret; trace_multifd_recv_thread_start(p->id); + rcu_register_thread(); while (true) { uint32_t used; @@ -1266,6 +1272,7 @@ static void *multifd_recv_thread(void *opaque) p->running = false; qemu_mutex_unlock(&p->mutex); + rcu_unregister_thread(); trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); return NULL; @@ -1391,13 +1398,15 @@ static void mig_throttle_guest_down(void) MigrationState *s = migrate_get_current(); uint64_t pct_initial = s->parameters.cpu_throttle_initial; uint64_t pct_icrement = s->parameters.cpu_throttle_increment; + int pct_max = s->parameters.max_cpu_throttle; /* We have not started throttling yet. Let's start it. */ if (!cpu_throttle_active()) { cpu_throttle_set(pct_initial); } else { /* Throttling already on, just increase the rate */ - cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement); + cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement, + pct_max)); } } @@ -1666,6 +1675,31 @@ static void migration_bitmap_sync(RAMState *rs) } /** + * save_zero_page_to_file: send the zero page to the file + * + * Returns the size of data written to the file, 0 means the page is not + * a zero page + * + * @rs: current RAM state + * @file: the file where the data is saved + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + */ +static int save_zero_page_to_file(RAMState *rs, QEMUFile *file, + RAMBlock *block, ram_addr_t offset) +{ + uint8_t *p = block->host + offset; + int len = 0; + + if (is_zero_range(p, TARGET_PAGE_SIZE)) { + len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO); + qemu_put_byte(file, 0); + len += 1; + } + return len; +} + +/** * save_zero_page: send the zero page to the stream * * Returns the number of pages written. @@ -1676,19 +1710,14 @@ static void migration_bitmap_sync(RAMState *rs) */ static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - uint8_t *p = block->host + offset; - int pages = -1; + int len = save_zero_page_to_file(rs, rs->f, block, offset); - if (is_zero_range(p, TARGET_PAGE_SIZE)) { + if (len) { ram_counters.duplicate++; - ram_counters.transferred += - save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_ZERO); - qemu_put_byte(rs->f, 0); - ram_counters.transferred += 1; - pages = 1; + ram_counters.transferred += len; + return 1; } - - return pages; + return -1; } static void ram_release_pages(const char *rbname, uint64_t offset, int pages) @@ -1823,15 +1852,20 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, return 1; } -static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf) +static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, + ram_addr_t offset, uint8_t *source_buf) { RAMState *rs = ram_state; - int bytes_sent, blen; uint8_t *p = block->host + (offset & TARGET_PAGE_MASK); + bool zero_page = false; + int ret; + + if (save_zero_page_to_file(rs, f, block, offset)) { + zero_page = true; + goto exit; + } - bytes_sent = save_page_header(rs, f, block, offset | - RAM_SAVE_FLAG_COMPRESS_PAGE); + save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); /* * copy it to a internal buffer to avoid it being modified by VM @@ -1839,17 +1873,25 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, * decompression */ memcpy(source_buf, p, TARGET_PAGE_SIZE); - blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); - if (blen < 0) { - bytes_sent = 0; - qemu_file_set_error(migrate_get_current()->to_dst_file, blen); + ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); + if (ret < 0) { + qemu_file_set_error(migrate_get_current()->to_dst_file, ret); error_report("compressed data failed!"); - } else { - bytes_sent += blen; - ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1); + return false; } - return bytes_sent; +exit: + ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1); + return zero_page; +} + +static void +update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +{ + if (param->zero_page) { + ram_counters.duplicate++; + } + ram_counters.transferred += bytes_xmit; } static void flush_compressed_data(RAMState *rs) @@ -1873,7 +1915,12 @@ static void flush_compressed_data(RAMState *rs) qemu_mutex_lock(&comp_param[idx].mutex); if (!comp_param[idx].quit) { len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - ram_counters.transferred += len; + /* + * it's safe to fetch zero_page without holding comp_done_lock + * as there is no further request submitted to the thread, + * i.e, the thread should be waiting for a request at this point. + */ + update_compress_thread_counts(&comp_param[idx], len); } qemu_mutex_unlock(&comp_param[idx].mutex); } @@ -1890,30 +1937,33 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { int idx, thread_count, bytes_xmit = -1, pages = -1; + bool wait = migrate_compress_wait_thread(); thread_count = migrate_compress_threads(); qemu_mutex_lock(&comp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - ram_counters.normal++; - ram_counters.transferred += bytes_xmit; - break; - } - } - if (pages > 0) { +retry: + for (idx = 0; idx < thread_count; idx++) { + if (comp_param[idx].done) { + comp_param[idx].done = false; + bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); + qemu_mutex_lock(&comp_param[idx].mutex); + set_compress_params(&comp_param[idx], block, offset); + qemu_cond_signal(&comp_param[idx].cond); + qemu_mutex_unlock(&comp_param[idx].mutex); + pages = 1; + update_compress_thread_counts(&comp_param[idx], bytes_xmit); break; - } else { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); } } + + /* + * wait for the free thread if the user specifies 'compress-wait-thread', + * otherwise we will post the page out in the main thread as normal page. + */ + if (pages < 0 && wait) { + qemu_cond_wait(&comp_done_cond, &comp_done_lock); + goto retry; + } qemu_mutex_unlock(&comp_done_lock); return pages; @@ -1983,6 +2033,10 @@ static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset) { RAMBlock *block = NULL; + if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) { + return NULL; + } + qemu_mutex_lock(&rs->src_page_req_mutex); if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) { struct RAMSrcPageRequest *entry = @@ -2175,6 +2229,39 @@ static bool save_page_use_compression(RAMState *rs) return false; } +/* + * try to compress the page before posting it out, return true if the page + * has been properly handled by compression, otherwise needs other + * paths to handle it + */ +static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) +{ + if (!save_page_use_compression(rs)) { + return false; + } + + /* + * When starting the process of a new block, the first page of + * the block should be sent out before other pages in the same + * block, and all the pages in last block should have been sent + * out, keeping this order is important, because the 'cont' flag + * is used to avoid resending the block name. + * + * We post the fist page as normal page as compression will take + * much CPU resource. + */ + if (block != rs->last_sent_block) { + flush_compressed_data(rs); + return false; + } + + if (compress_page_with_multi_thread(rs, block, offset) > 0) { + return true; + } + + return false; +} + /** * ram_save_target_page: save one target page * @@ -2195,15 +2282,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, return res; } - /* - * When starting the process of a new block, the first page of - * the block should be sent out before other pages in the same - * block, and all the pages in last block should have been sent - * out, keeping this order is important, because the 'cont' flag - * is used to avoid resending the block name. - */ - if (block != rs->last_sent_block && save_page_use_compression(rs)) { - flush_compressed_data(rs); + if (save_compress_page(rs, block, offset)) { + return 1; } res = save_zero_page(rs, block, offset); @@ -2221,14 +2301,10 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, } /* - * Make sure the first page is sent out before other pages. - * - * we post it as normal page as compression will take much - * CPU resource. + * do not use multifd for compression as the first page in the new + * block should be posted out before sending the compressed page */ - if (block == rs->last_sent_block && save_page_use_compression(rs)) { - return compress_page_with_multi_thread(rs, block, offset); - } else if (migrate_use_multifd()) { + if (!save_page_use_compression(rs) && migrate_use_multifd()) { return ram_save_multifd_page(rs, block, offset); } diff --git a/migration/rdma.c b/migration/rdma.c index 8bd7159059..ae07515e83 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; " to abort!"); \ rdma->error_reported = 1; \ } \ + rcu_read_unlock(); \ return rdma->error_state; \ } \ } while (0) @@ -387,6 +388,10 @@ typedef struct RDMAContext { uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; GHashTable *blockmap; + + /* the RDMAContext for return path */ + struct RDMAContext *return_path; + bool is_return_path; } RDMAContext; #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" @@ -398,7 +403,8 @@ typedef struct QIOChannelRDMA QIOChannelRDMA; struct QIOChannelRDMA { QIOChannel parent; - RDMAContext *rdma; + RDMAContext *rdmain; + RDMAContext *rdmaout; QEMUFile *file; bool blocking; /* XXX we don't actually honour this yet */ }; @@ -1483,27 +1489,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out, */ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma) { + struct rdma_cm_event *cm_event; + int ret = -1; + /* * Coroutine doesn't start until migration_fd_process_incoming() * so don't yield unless we know we're running inside of a coroutine. */ - if (rdma->migration_started_on_destination) { + if (rdma->migration_started_on_destination && + migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { yield_until_fd_readable(rdma->comp_channel->fd); } else { /* This is the source side, we're in a separate thread * or destination prior to migration_fd_process_incoming() + * after postcopy, the destination also in a seprate thread. * we can't yield; so we have to poll the fd. * But we need to be able to handle 'cancel' or an error * without hanging forever. */ while (!rdma->error_state && !rdma->received_error) { - GPollFD pfds[1]; + GPollFD pfds[2]; pfds[0].fd = rdma->comp_channel->fd; pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; + pfds[0].revents = 0; + + pfds[1].fd = rdma->channel->fd; + pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; + pfds[1].revents = 0; + /* 0.1s timeout, should be fine for a 'cancel' */ - switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) { + switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { + case 2: case 1: /* fd active */ - return 0; + if (pfds[0].revents) { + return 0; + } + + if (pfds[1].revents) { + ret = rdma_get_cm_event(rdma->channel, &cm_event); + if (!ret) { + rdma_ack_cm_event(cm_event); + } + + error_report("receive cm event while wait comp channel," + "cm event is %d", cm_event->event); + if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || + cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { + return -EPIPE; + } + } + break; case 0: /* Timeout, go around again */ break; @@ -2323,10 +2358,22 @@ static void qemu_rdma_cleanup(RDMAContext *rdma) rdma_destroy_id(rdma->cm_id); rdma->cm_id = NULL; } + + /* the destination side, listen_id and channel is shared */ if (rdma->listen_id) { - rdma_destroy_id(rdma->listen_id); + if (!rdma->is_return_path) { + rdma_destroy_id(rdma->listen_id); + } rdma->listen_id = NULL; + + if (rdma->channel) { + if (!rdma->is_return_path) { + rdma_destroy_event_channel(rdma->channel); + } + rdma->channel = NULL; + } } + if (rdma->channel) { rdma_destroy_event_channel(rdma->channel); rdma->channel = NULL; @@ -2555,6 +2602,25 @@ err_dest_init_create_listen_id: } +static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, + RDMAContext *rdma) +{ + int idx; + + for (idx = 0; idx < RDMA_WRID_MAX; idx++) { + rdma_return_path->wr_data[idx].control_len = 0; + rdma_return_path->wr_data[idx].control_curr = NULL; + } + + /*the CM channel and CM id is shared*/ + rdma_return_path->channel = rdma->channel; + rdma_return_path->listen_id = rdma->listen_id; + + rdma->return_path = rdma_return_path; + rdma_return_path->return_path = rdma; + rdma_return_path->is_return_path = true; +} + static void *qemu_rdma_data_init(const char *host_port, Error **errp) { RDMAContext *rdma = NULL; @@ -2595,12 +2661,20 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); QEMUFile *f = rioc->file; - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; int ret; ssize_t done = 0; size_t i; size_t len = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmaout); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); /* @@ -2610,6 +2684,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, ret = qemu_rdma_write_flush(f, rdma); if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -2629,6 +2704,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -2637,6 +2713,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, } } + rcu_read_unlock(); return done; } @@ -2670,12 +2747,20 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, Error **errp) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; RDMAControlHeader head; int ret = 0; ssize_t i; size_t done = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmain); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); for (i = 0; i < niov; i++) { @@ -2687,7 +2772,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, * were given and dish out the bytes until we run * out of bytes. */ - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + ret = qemu_rdma_fill(rdma, data, want, 0); done += ret; want -= ret; /* Got what we needed, so go to next iovec */ @@ -2709,25 +2794,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } /* * SEND was received with new bytes, now try again. */ - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + ret = qemu_rdma_fill(rdma, data, want, 0); done += ret; want -= ret; /* Still didn't get enough, so lets just return */ if (want) { if (done == 0) { + rcu_read_unlock(); return QIO_CHANNEL_ERR_BLOCK; } else { break; } } } + rcu_read_unlock(); return done; } @@ -2779,15 +2867,29 @@ qio_channel_rdma_source_prepare(GSource *source, gint *timeout) { QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; - RDMAContext *rdma = rsource->rioc->rdma; + RDMAContext *rdma; GIOCondition cond = 0; *timeout = -1; + rcu_read_lock(); + if (rsource->condition == G_IO_IN) { + rdma = atomic_rcu_read(&rsource->rioc->rdmain); + } else { + rdma = atomic_rcu_read(&rsource->rioc->rdmaout); + } + + if (!rdma) { + error_report("RDMAContext is NULL when prepare Gsource"); + rcu_read_unlock(); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } cond |= G_IO_OUT; + rcu_read_unlock(); return cond & rsource->condition; } @@ -2795,14 +2897,28 @@ static gboolean qio_channel_rdma_source_check(GSource *source) { QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; - RDMAContext *rdma = rsource->rioc->rdma; + RDMAContext *rdma; GIOCondition cond = 0; + rcu_read_lock(); + if (rsource->condition == G_IO_IN) { + rdma = atomic_rcu_read(&rsource->rioc->rdmain); + } else { + rdma = atomic_rcu_read(&rsource->rioc->rdmaout); + } + + if (!rdma) { + error_report("RDMAContext is NULL when check Gsource"); + rcu_read_unlock(); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } cond |= G_IO_OUT; + rcu_read_unlock(); return cond & rsource->condition; } @@ -2813,14 +2929,28 @@ qio_channel_rdma_source_dispatch(GSource *source, { QIOChannelFunc func = (QIOChannelFunc)callback; QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; - RDMAContext *rdma = rsource->rioc->rdma; + RDMAContext *rdma; GIOCondition cond = 0; + rcu_read_lock(); + if (rsource->condition == G_IO_IN) { + rdma = atomic_rcu_read(&rsource->rioc->rdmain); + } else { + rdma = atomic_rcu_read(&rsource->rioc->rdmaout); + } + + if (!rdma) { + error_report("RDMAContext is NULL when dispatch Gsource"); + rcu_read_unlock(); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } cond |= G_IO_OUT; + rcu_read_unlock(); return (*func)(QIO_CHANNEL(rsource->rioc), (cond & rsource->condition), user_data); @@ -2860,20 +2990,91 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, return source; } +static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, + AioContext *ctx, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + if (io_read) { + aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd, + false, io_read, io_write, NULL, opaque); + } else { + aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd, + false, io_read, io_write, NULL, opaque); + } +} static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + RDMAContext *rdmain, *rdmaout; trace_qemu_rdma_close(); - if (rioc->rdma) { - if (!rioc->rdma->error_state) { - rioc->rdma->error_state = qemu_file_get_error(rioc->file); + + rdmain = rioc->rdmain; + if (rdmain) { + atomic_rcu_set(&rioc->rdmain, NULL); + } + + rdmaout = rioc->rdmaout; + if (rdmaout) { + atomic_rcu_set(&rioc->rdmaout, NULL); + } + + synchronize_rcu(); + + if (rdmain) { + qemu_rdma_cleanup(rdmain); + } + + if (rdmaout) { + qemu_rdma_cleanup(rdmaout); + } + + g_free(rdmain); + g_free(rdmaout); + + return 0; +} + +static int +qio_channel_rdma_shutdown(QIOChannel *ioc, + QIOChannelShutdown how, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + RDMAContext *rdmain, *rdmaout; + + rcu_read_lock(); + + rdmain = atomic_rcu_read(&rioc->rdmain); + rdmaout = atomic_rcu_read(&rioc->rdmain); + + switch (how) { + case QIO_CHANNEL_SHUTDOWN_READ: + if (rdmain) { + rdmain->error_state = -1; + } + break; + case QIO_CHANNEL_SHUTDOWN_WRITE: + if (rdmaout) { + rdmaout->error_state = -1; } - qemu_rdma_cleanup(rioc->rdma); - g_free(rioc->rdma); - rioc->rdma = NULL; + break; + case QIO_CHANNEL_SHUTDOWN_BOTH: + default: + if (rdmain) { + rdmain->error_state = -1; + } + if (rdmaout) { + rdmaout->error_state = -1; + } + break; } + + rcu_read_unlock(); return 0; } @@ -2916,11 +3117,24 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, size_t size, uint64_t *bytes_sent) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; int ret; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmaout); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); + if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); + return RAM_SAVE_CONTROL_NOT_SUPP; + } + qemu_fflush(f); if (size > 0) { @@ -3002,12 +3216,45 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, } } + rcu_read_unlock(); return RAM_SAVE_CONTROL_DELAYED; err: rdma->error_state = ret; + rcu_read_unlock(); return ret; } +static void rdma_accept_incoming_migration(void *opaque); + +static void rdma_cm_poll_handler(void *opaque) +{ + RDMAContext *rdma = opaque; + int ret; + struct rdma_cm_event *cm_event; + MigrationIncomingState *mis = migration_incoming_get_current(); + + ret = rdma_get_cm_event(rdma->channel, &cm_event); + if (ret) { + error_report("get_cm_event failed %d", errno); + return; + } + rdma_ack_cm_event(cm_event); + + if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || + cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { + error_report("receive cm event, cm event is %d", cm_event->event); + rdma->error_state = -EPIPE; + if (rdma->return_path) { + rdma->return_path->error_state = -EPIPE; + } + + if (mis->migration_incoming_co) { + qemu_coroutine_enter(mis->migration_incoming_co); + } + return; + } +} + static int qemu_rdma_accept(RDMAContext *rdma) { RDMACapabilities cap; @@ -3102,7 +3349,15 @@ static int qemu_rdma_accept(RDMAContext *rdma) } } - qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); + /* Accept the second connection request for return path */ + if (migrate_postcopy() && !rdma->is_return_path) { + qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, + NULL, + (void *)(intptr_t)rdma->return_path); + } else { + qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, + NULL, rdma); + } ret = rdma_accept(rdma->cm_id, &conn_param); if (ret) { @@ -3171,8 +3426,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, .repeat = 1 }; QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; - RDMALocalBlocks *local = &rdma->local_ram_blocks; + RDMAContext *rdma; + RDMALocalBlocks *local; RDMAControlHeader head; RDMARegister *reg, *registers; RDMACompress *comp; @@ -3185,8 +3440,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) int count = 0; int i = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmain); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); + local = &rdma->local_ram_blocks; do { trace_qemu_rdma_registration_handle_wait(); @@ -3420,6 +3684,7 @@ out: if (ret < 0) { rdma->error_state = ret; } + rcu_read_unlock(); return ret; } @@ -3433,10 +3698,18 @@ out: static int rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) { - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; int curr; int found = -1; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmain); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + /* Find the matching RAMBlock in our local list */ for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { @@ -3447,6 +3720,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) if (found == -1) { error_report("RAMBlock '%s' not found on destination", name); + rcu_read_unlock(); return -ENOENT; } @@ -3454,6 +3728,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) trace_rdma_block_notification_handle(name, rdma->next_src_index); rdma->next_src_index++; + rcu_read_unlock(); return 0; } @@ -3476,14 +3751,27 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, uint64_t flags, void *data) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; + + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmaout); + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } CHECK_ERROR_STATE(); + if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); + return 0; + } + trace_qemu_rdma_registration_start(flags); qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); qemu_fflush(f); + rcu_read_unlock(); return 0; } @@ -3496,12 +3784,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, { Error *local_err = NULL, **errp = &local_err; QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; RDMAControlHeader head = { .len = 0, .repeat = 1 }; int ret = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdmaout); + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); + if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); + return 0; + } + qemu_fflush(f); ret = qemu_rdma_drain_cq(f, rdma); @@ -3530,6 +3830,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, qemu_rdma_reg_whole_ram_blocks : NULL); if (ret < 0) { ERROR(errp, "receiving remote info!"); + rcu_read_unlock(); return ret; } @@ -3553,6 +3854,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, "not identical on both the source and destination.", local->nb_blocks, nb_dest_blocks); rdma->error_state = -EINVAL; + rcu_read_unlock(); return -EINVAL; } @@ -3569,6 +3871,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, local->block[i].length, rdma->dest_blocks[i].length); rdma->error_state = -EINVAL; + rcu_read_unlock(); return -EINVAL; } local->block[i].remote_host_addr = @@ -3586,9 +3889,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, goto err; } + rcu_read_unlock(); return 0; err: rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -3606,10 +3911,15 @@ static const QEMUFileHooks rdma_write_hooks = { static void qio_channel_rdma_finalize(Object *obj) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); - if (rioc->rdma) { - qemu_rdma_cleanup(rioc->rdma); - g_free(rioc->rdma); - rioc->rdma = NULL; + if (rioc->rdmain) { + qemu_rdma_cleanup(rioc->rdmain); + g_free(rioc->rdmain); + rioc->rdmain = NULL; + } + if (rioc->rdmaout) { + qemu_rdma_cleanup(rioc->rdmaout); + g_free(rioc->rdmaout); + rioc->rdmaout = NULL; } } @@ -3623,6 +3933,8 @@ static void qio_channel_rdma_class_init(ObjectClass *klass, ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; ioc_klass->io_close = qio_channel_rdma_close; ioc_klass->io_create_watch = qio_channel_rdma_create_watch; + ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; + ioc_klass->io_shutdown = qio_channel_rdma_shutdown; } static const TypeInfo qio_channel_rdma_info = { @@ -3649,13 +3961,16 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) } rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); - rioc->rdma = rdma; if (mode[0] == 'w') { rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); + rioc->rdmaout = rdma; + rioc->rdmain = rdma->return_path; qemu_file_set_hooks(rioc->file, &rdma_write_hooks); } else { rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); + rioc->rdmain = rdma; + rioc->rdmaout = rdma->return_path; qemu_file_set_hooks(rioc->file, &rdma_read_hooks); } @@ -3679,6 +3994,10 @@ static void rdma_accept_incoming_migration(void *opaque) trace_qemu_rdma_accept_incoming_migration_accepted(); + if (rdma->is_return_path) { + return; + } + f = qemu_fopen_rdma(rdma, "rb"); if (f == NULL) { ERROR(errp, "could not qemu_fopen_rdma!"); @@ -3693,7 +4012,7 @@ static void rdma_accept_incoming_migration(void *opaque) void rdma_start_incoming_migration(const char *host_port, Error **errp) { int ret; - RDMAContext *rdma; + RDMAContext *rdma, *rdma_return_path; Error *local_err = NULL; trace_rdma_start_incoming_migration(); @@ -3720,12 +4039,24 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp) trace_rdma_start_incoming_migration_after_rdma_listen(); + /* initialize the RDMAContext for return path */ + if (migrate_postcopy()) { + rdma_return_path = qemu_rdma_data_init(host_port, &local_err); + + if (rdma_return_path == NULL) { + goto err; + } + + qemu_rdma_return_path_dest_init(rdma_return_path, rdma); + } + qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma); return; err: error_propagate(errp, local_err); g_free(rdma); + g_free(rdma_return_path); } void rdma_start_outgoing_migration(void *opaque, @@ -3733,6 +4064,7 @@ void rdma_start_outgoing_migration(void *opaque, { MigrationState *s = opaque; RDMAContext *rdma = qemu_rdma_data_init(host_port, errp); + RDMAContext *rdma_return_path = NULL; int ret = 0; if (rdma == NULL) { @@ -3753,6 +4085,32 @@ void rdma_start_outgoing_migration(void *opaque, goto err; } + /* RDMA postcopy need a seprate queue pair for return path */ + if (migrate_postcopy()) { + rdma_return_path = qemu_rdma_data_init(host_port, errp); + + if (rdma_return_path == NULL) { + goto err; + } + + ret = qemu_rdma_source_init(rdma_return_path, + s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp); + + if (ret) { + goto err; + } + + ret = qemu_rdma_connect(rdma_return_path, errp); + + if (ret) { + goto err; + } + + rdma->return_path = rdma_return_path; + rdma_return_path->return_path = rdma; + rdma_return_path->is_return_path = true; + } + trace_rdma_start_outgoing_migration_after_rdma_connect(); s->to_dst_file = qemu_fopen_rdma(rdma, "wb"); @@ -3760,4 +4118,5 @@ void rdma_start_outgoing_migration(void *opaque, return; err: g_free(rdma); + g_free(rdma_return_path); } diff --git a/migration/savevm.c b/migration/savevm.c index 7f92567a10..13e51f0e34 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -1622,6 +1622,7 @@ static void *postcopy_ram_listen_thread(void *opaque) qemu_sem_post(&mis->listen_thread_sem); trace_postcopy_ram_listen_thread_start(); + rcu_register_thread(); /* * Because we're a thread and not a coroutine we can't yield * in qemu_file, and thus we must be blocking now. @@ -1662,6 +1663,7 @@ static void *postcopy_ram_listen_thread(void *opaque) * to leave the guest running and fire MCEs for pages that never * arrived as a desperate recovery step. */ + rcu_unregister_thread(); exit(EXIT_FAILURE); } @@ -1676,6 +1678,7 @@ static void *postcopy_ram_listen_thread(void *opaque) migration_incoming_state_destroy(); qemu_loadvm_state_cleanup(); + rcu_unregister_thread(); return NULL; } diff --git a/migration/vmstate.c b/migration/vmstate.c index 6b9079bb51..0bc240a317 100644 --- a/migration/vmstate.c +++ b/migration/vmstate.c @@ -418,7 +418,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd, static const VMStateDescription * vmstate_get_subsection(const VMStateDescription **sub, char *idstr) { - while (sub && *sub && (*sub)->needed) { + while (sub && *sub) { if (strcmp(idstr, (*sub)->name) == 0) { return *sub; } @@ -486,8 +486,8 @@ static int vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd, int ret = 0; trace_vmstate_subsection_save_top(vmsd->name); - while (sub && *sub && (*sub)->needed) { - if ((*sub)->needed(opaque)) { + while (sub && *sub) { + if (vmstate_save_needed(*sub, opaque)) { const VMStateDescription *vmsdsub = *sub; uint8_t len; |