aboutsummaryrefslogtreecommitdiff
path: root/migration
diff options
context:
space:
mode:
Diffstat (limited to 'migration')
-rw-r--r--migration/colo.c2
-rw-r--r--migration/migration.c49
-rw-r--r--migration/migration.h2
-rw-r--r--migration/postcopy-ram.c2
-rw-r--r--migration/qemu-file-channel.c12
-rw-r--r--migration/qemu-file.c8
-rw-r--r--migration/ram.c202
-rw-r--r--migration/rdma.c423
-rw-r--r--migration/savevm.c3
-rw-r--r--migration/vmstate.c6
10 files changed, 605 insertions, 104 deletions
diff --git a/migration/colo.c b/migration/colo.c
index 4381067ed4..88936f5962 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -534,6 +534,7 @@ void *colo_process_incoming_thread(void *opaque)
uint64_t value;
Error *local_err = NULL;
+ rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);
migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
@@ -666,5 +667,6 @@ out:
}
migration_incoming_exit_colo();
+ rcu_unregister_thread();
return NULL;
}
diff --git a/migration/migration.c b/migration/migration.c
index b7d9854bda..4b316ec343 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -71,6 +71,7 @@
/* Define default autoconverge cpu throttle migration parameters */
#define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
#define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
+#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99
/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
@@ -389,6 +390,7 @@ static void process_incoming_migration_co(void *opaque)
int ret;
assert(mis->from_src_file);
+ mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
@@ -418,7 +420,6 @@ static void process_incoming_migration_co(void *opaque)
/* we get COLO info, and know if we are in COLO mode */
if (!ret && migration_incoming_enable_colo()) {
- mis->migration_incoming_co = qemu_coroutine_self();
qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
mis->have_colo_incoming_thread = true;
@@ -442,6 +443,7 @@ static void process_incoming_migration_co(void *opaque)
}
mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
qemu_bh_schedule(mis->bh);
+ mis->migration_incoming_co = NULL;
}
static void migration_incoming_setup(QEMUFile *f)
@@ -671,6 +673,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->compress_level = s->parameters.compress_level;
params->has_compress_threads = true;
params->compress_threads = s->parameters.compress_threads;
+ params->has_compress_wait_thread = true;
+ params->compress_wait_thread = s->parameters.compress_wait_thread;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
@@ -697,6 +701,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
+ params->has_max_cpu_throttle = true;
+ params->max_cpu_throttle = s->parameters.max_cpu_throttle;
return params;
}
@@ -1043,6 +1049,15 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
return false;
}
+ if (params->has_max_cpu_throttle &&
+ (params->max_cpu_throttle < params->cpu_throttle_initial ||
+ params->max_cpu_throttle > 99)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "max_cpu_throttle",
+ "an integer in the range of cpu_throttle_initial to 99");
+ return false;
+ }
+
return true;
}
@@ -1061,6 +1076,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
dest->compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ dest->compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
@@ -1110,6 +1129,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_max_postcopy_bandwidth) {
dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ dest->max_cpu_throttle = params->max_cpu_throttle;
+ }
}
static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
@@ -1126,6 +1148,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ s->parameters.compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
@@ -1185,6 +1211,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_max_postcopy_bandwidth) {
s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ s->parameters.max_cpu_throttle = params->max_cpu_throttle;
+ }
}
void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1871,6 +1900,15 @@ int migrate_compress_threads(void)
return s->parameters.compress_threads;
}
+int migrate_compress_wait_thread(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_wait_thread;
+}
+
int migrate_decompress_threads(void)
{
MigrationState *s;
@@ -1962,7 +2000,6 @@ static int64_t migrate_max_postcopy_bandwidth(void)
return s->parameters.max_postcopy_bandwidth;
}
-
bool migrate_use_block(void)
{
MigrationState *s;
@@ -2104,6 +2141,7 @@ static void *source_return_path_thread(void *opaque)
int res;
trace_source_return_path_thread_entry();
+ rcu_register_thread();
retry:
while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
@@ -2243,6 +2281,7 @@ out:
trace_source_return_path_thread_end();
ms->rp_state.from_dst_file = NULL;
qemu_fclose(rp);
+ rcu_unregister_thread();
return NULL;
}
@@ -3131,6 +3170,8 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+ DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
+ parameters.compress_wait_thread, true),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
@@ -3160,6 +3201,9 @@ static Property migration_properties[] = {
DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState,
parameters.max_postcopy_bandwidth,
DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH),
+ DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState,
+ parameters.max_cpu_throttle,
+ DEFAULT_MIGRATE_MAX_CPU_THROTTLE),
/* Migration capabilities */
DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -3230,6 +3274,7 @@ static void migration_instance_init(Object *obj)
params->has_x_multifd_page_count = true;
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
+ params->has_max_cpu_throttle = true;
qemu_sem_init(&ms->postcopy_pause_sem, 0);
qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
diff --git a/migration/migration.h b/migration/migration.h
index 64a7b33735..f7813f8261 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -266,11 +266,13 @@ bool migrate_colo_enabled(void);
bool migrate_use_block(void);
bool migrate_use_block_incremental(void);
+int migrate_max_cpu_throttle(void);
bool migrate_use_return_path(void);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
+int migrate_compress_wait_thread(void);
int migrate_decompress_threads(void);
bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c
index c2e387ed44..853d8b32ca 100644
--- a/migration/postcopy-ram.c
+++ b/migration/postcopy-ram.c
@@ -867,6 +867,7 @@ static void *postcopy_ram_fault_thread(void *opaque)
RAMBlock *rb = NULL;
trace_postcopy_ram_fault_thread_entry();
+ rcu_register_thread();
mis->last_rb = NULL; /* last RAMBlock we sent part of */
qemu_sem_post(&mis->fault_thread_sem);
@@ -1073,6 +1074,7 @@ retry:
}
}
}
+ rcu_unregister_thread();
trace_postcopy_ram_fault_thread_exit();
g_free(pfd);
return NULL;
diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c
index e202d73834..8e639eb496 100644
--- a/migration/qemu-file-channel.c
+++ b/migration/qemu-file-channel.c
@@ -49,7 +49,11 @@ static ssize_t channel_writev_buffer(void *opaque,
ssize_t len;
len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL);
if (len == QIO_CHANNEL_ERR_BLOCK) {
- qio_channel_wait(ioc, G_IO_OUT);
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_OUT);
+ } else {
+ qio_channel_wait(ioc, G_IO_OUT);
+ }
continue;
}
if (len < 0) {
@@ -80,7 +84,11 @@ static ssize_t channel_get_buffer(void *opaque,
ret = qio_channel_read(ioc, (char *)buf, size, NULL);
if (ret < 0) {
if (ret == QIO_CHANNEL_ERR_BLOCK) {
- qio_channel_yield(ioc, G_IO_IN);
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_IN);
+ } else {
+ qio_channel_wait(ioc, G_IO_IN);
+ }
} else {
/* XXX handle Error * object */
return -EIO;
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 0463f4c321..977b9ae07c 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -253,8 +253,12 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
if (f->hooks && f->hooks->save_page) {
int ret = f->hooks->save_page(f, f->opaque, block_offset,
offset, size, bytes_sent);
- f->bytes_xfer += size;
- if (ret != RAM_SAVE_CONTROL_DELAYED) {
+ if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ f->bytes_xfer += size;
+ }
+
+ if (ret != RAM_SAVE_CONTROL_DELAYED &&
+ ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (bytes_sent && *bytes_sent > 0) {
qemu_update_position(f, *bytes_sent);
} else if (ret < 0) {
diff --git a/migration/ram.c b/migration/ram.c
index fa79d0a5b9..79c89425a3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -341,6 +341,7 @@ typedef struct PageSearchStatus PageSearchStatus;
struct CompressParam {
bool done;
bool quit;
+ bool zero_page;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
@@ -382,14 +383,15 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf);
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+ ram_addr_t offset, uint8_t *source_buf);
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
+ bool zero_page;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
@@ -399,11 +401,12 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(&param->mutex);
- do_compress_ram_page(param->file, &param->stream, block, offset,
- param->originbuf);
+ zero_page = do_compress_ram_page(param->file, &param->stream,
+ block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
+ param->zero_page = zero_page;
qemu_cond_signal(&comp_done_cond);
qemu_mutex_unlock(&comp_done_lock);
@@ -989,6 +992,7 @@ static void *multifd_send_thread(void *opaque)
int ret;
trace_multifd_send_thread_start(p->id);
+ rcu_register_thread();
if (multifd_send_initial_packet(p, &local_err) < 0) {
goto out;
@@ -1051,6 +1055,7 @@ out:
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ rcu_unregister_thread();
trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
return NULL;
@@ -1220,6 +1225,7 @@ static void *multifd_recv_thread(void *opaque)
int ret;
trace_multifd_recv_thread_start(p->id);
+ rcu_register_thread();
while (true) {
uint32_t used;
@@ -1266,6 +1272,7 @@ static void *multifd_recv_thread(void *opaque)
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ rcu_unregister_thread();
trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
return NULL;
@@ -1391,13 +1398,15 @@ static void mig_throttle_guest_down(void)
MigrationState *s = migrate_get_current();
uint64_t pct_initial = s->parameters.cpu_throttle_initial;
uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
+ int pct_max = s->parameters.max_cpu_throttle;
/* We have not started throttling yet. Let's start it. */
if (!cpu_throttle_active()) {
cpu_throttle_set(pct_initial);
} else {
/* Throttling already on, just increase the rate */
- cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement);
+ cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
+ pct_max));
}
}
@@ -1666,6 +1675,31 @@ static void migration_bitmap_sync(RAMState *rs)
}
/**
+ * save_zero_page_to_file: send the zero page to the file
+ *
+ * Returns the size of data written to the file, 0 means the page is not
+ * a zero page
+ *
+ * @rs: current RAM state
+ * @file: the file where the data is saved
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ */
+static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
+ RAMBlock *block, ram_addr_t offset)
+{
+ uint8_t *p = block->host + offset;
+ int len = 0;
+
+ if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
+ qemu_put_byte(file, 0);
+ len += 1;
+ }
+ return len;
+}
+
+/**
* save_zero_page: send the zero page to the stream
*
* Returns the number of pages written.
@@ -1676,19 +1710,14 @@ static void migration_bitmap_sync(RAMState *rs)
*/
static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
{
- uint8_t *p = block->host + offset;
- int pages = -1;
+ int len = save_zero_page_to_file(rs, rs->f, block, offset);
- if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ if (len) {
ram_counters.duplicate++;
- ram_counters.transferred +=
- save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_ZERO);
- qemu_put_byte(rs->f, 0);
- ram_counters.transferred += 1;
- pages = 1;
+ ram_counters.transferred += len;
+ return 1;
}
-
- return pages;
+ return -1;
}
static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
@@ -1823,15 +1852,20 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
return 1;
}
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf)
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+ ram_addr_t offset, uint8_t *source_buf)
{
RAMState *rs = ram_state;
- int bytes_sent, blen;
uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
+ bool zero_page = false;
+ int ret;
+
+ if (save_zero_page_to_file(rs, f, block, offset)) {
+ zero_page = true;
+ goto exit;
+ }
- bytes_sent = save_page_header(rs, f, block, offset |
- RAM_SAVE_FLAG_COMPRESS_PAGE);
+ save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
/*
* copy it to a internal buffer to avoid it being modified by VM
@@ -1839,17 +1873,25 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
* decompression
*/
memcpy(source_buf, p, TARGET_PAGE_SIZE);
- blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
- if (blen < 0) {
- bytes_sent = 0;
- qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
+ ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+ if (ret < 0) {
+ qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
- } else {
- bytes_sent += blen;
- ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+ return false;
}
- return bytes_sent;
+exit:
+ ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+ return zero_page;
+}
+
+static void
+update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+{
+ if (param->zero_page) {
+ ram_counters.duplicate++;
+ }
+ ram_counters.transferred += bytes_xmit;
}
static void flush_compressed_data(RAMState *rs)
@@ -1873,7 +1915,12 @@ static void flush_compressed_data(RAMState *rs)
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- ram_counters.transferred += len;
+ /*
+ * it's safe to fetch zero_page without holding comp_done_lock
+ * as there is no further request submitted to the thread,
+ * i.e, the thread should be waiting for a request at this point.
+ */
+ update_compress_thread_counts(&comp_param[idx], len);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
@@ -1890,30 +1937,33 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
ram_addr_t offset)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
+ bool wait = migrate_compress_wait_thread();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (comp_param[idx].done) {
- comp_param[idx].done = false;
- bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- qemu_mutex_lock(&comp_param[idx].mutex);
- set_compress_params(&comp_param[idx], block, offset);
- qemu_cond_signal(&comp_param[idx].cond);
- qemu_mutex_unlock(&comp_param[idx].mutex);
- pages = 1;
- ram_counters.normal++;
- ram_counters.transferred += bytes_xmit;
- break;
- }
- }
- if (pages > 0) {
+retry:
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ comp_param[idx].done = false;
+ bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ set_compress_params(&comp_param[idx], block, offset);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ pages = 1;
+ update_compress_thread_counts(&comp_param[idx], bytes_xmit);
break;
- } else {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
+
+ /*
+ * wait for the free thread if the user specifies 'compress-wait-thread',
+ * otherwise we will post the page out in the main thread as normal page.
+ */
+ if (pages < 0 && wait) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+ goto retry;
+ }
qemu_mutex_unlock(&comp_done_lock);
return pages;
@@ -1983,6 +2033,10 @@ static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
{
RAMBlock *block = NULL;
+ if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
+ return NULL;
+ }
+
qemu_mutex_lock(&rs->src_page_req_mutex);
if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
struct RAMSrcPageRequest *entry =
@@ -2175,6 +2229,39 @@ static bool save_page_use_compression(RAMState *rs)
return false;
}
+/*
+ * try to compress the page before posting it out, return true if the page
+ * has been properly handled by compression, otherwise needs other
+ * paths to handle it
+ */
+static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
+{
+ if (!save_page_use_compression(rs)) {
+ return false;
+ }
+
+ /*
+ * When starting the process of a new block, the first page of
+ * the block should be sent out before other pages in the same
+ * block, and all the pages in last block should have been sent
+ * out, keeping this order is important, because the 'cont' flag
+ * is used to avoid resending the block name.
+ *
+ * We post the fist page as normal page as compression will take
+ * much CPU resource.
+ */
+ if (block != rs->last_sent_block) {
+ flush_compressed_data(rs);
+ return false;
+ }
+
+ if (compress_page_with_multi_thread(rs, block, offset) > 0) {
+ return true;
+ }
+
+ return false;
+}
+
/**
* ram_save_target_page: save one target page
*
@@ -2195,15 +2282,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
return res;
}
- /*
- * When starting the process of a new block, the first page of
- * the block should be sent out before other pages in the same
- * block, and all the pages in last block should have been sent
- * out, keeping this order is important, because the 'cont' flag
- * is used to avoid resending the block name.
- */
- if (block != rs->last_sent_block && save_page_use_compression(rs)) {
- flush_compressed_data(rs);
+ if (save_compress_page(rs, block, offset)) {
+ return 1;
}
res = save_zero_page(rs, block, offset);
@@ -2221,14 +2301,10 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
}
/*
- * Make sure the first page is sent out before other pages.
- *
- * we post it as normal page as compression will take much
- * CPU resource.
+ * do not use multifd for compression as the first page in the new
+ * block should be posted out before sending the compressed page
*/
- if (block == rs->last_sent_block && save_page_use_compression(rs)) {
- return compress_page_with_multi_thread(rs, block, offset);
- } else if (migrate_use_multifd()) {
+ if (!save_page_use_compression(rs) && migrate_use_multifd()) {
return ram_save_multifd_page(rs, block, offset);
}
diff --git a/migration/rdma.c b/migration/rdma.c
index 8bd7159059..ae07515e83 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
" to abort!"); \
rdma->error_reported = 1; \
} \
+ rcu_read_unlock(); \
return rdma->error_state; \
} \
} while (0)
@@ -387,6 +388,10 @@ typedef struct RDMAContext {
uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
GHashTable *blockmap;
+
+ /* the RDMAContext for return path */
+ struct RDMAContext *return_path;
+ bool is_return_path;
} RDMAContext;
#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
@@ -398,7 +403,8 @@ typedef struct QIOChannelRDMA QIOChannelRDMA;
struct QIOChannelRDMA {
QIOChannel parent;
- RDMAContext *rdma;
+ RDMAContext *rdmain;
+ RDMAContext *rdmaout;
QEMUFile *file;
bool blocking; /* XXX we don't actually honour this yet */
};
@@ -1483,27 +1489,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
*/
static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
{
+ struct rdma_cm_event *cm_event;
+ int ret = -1;
+
/*
* Coroutine doesn't start until migration_fd_process_incoming()
* so don't yield unless we know we're running inside of a coroutine.
*/
- if (rdma->migration_started_on_destination) {
+ if (rdma->migration_started_on_destination &&
+ migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
yield_until_fd_readable(rdma->comp_channel->fd);
} else {
/* This is the source side, we're in a separate thread
* or destination prior to migration_fd_process_incoming()
+ * after postcopy, the destination also in a seprate thread.
* we can't yield; so we have to poll the fd.
* But we need to be able to handle 'cancel' or an error
* without hanging forever.
*/
while (!rdma->error_state && !rdma->received_error) {
- GPollFD pfds[1];
+ GPollFD pfds[2];
pfds[0].fd = rdma->comp_channel->fd;
pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[0].revents = 0;
+
+ pfds[1].fd = rdma->channel->fd;
+ pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[1].revents = 0;
+
/* 0.1s timeout, should be fine for a 'cancel' */
- switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+ switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
+ case 2:
case 1: /* fd active */
- return 0;
+ if (pfds[0].revents) {
+ return 0;
+ }
+
+ if (pfds[1].revents) {
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (!ret) {
+ rdma_ack_cm_event(cm_event);
+ }
+
+ error_report("receive cm event while wait comp channel,"
+ "cm event is %d", cm_event->event);
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ return -EPIPE;
+ }
+ }
+ break;
case 0: /* Timeout, go around again */
break;
@@ -2323,10 +2358,22 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma_destroy_id(rdma->cm_id);
rdma->cm_id = NULL;
}
+
+ /* the destination side, listen_id and channel is shared */
if (rdma->listen_id) {
- rdma_destroy_id(rdma->listen_id);
+ if (!rdma->is_return_path) {
+ rdma_destroy_id(rdma->listen_id);
+ }
rdma->listen_id = NULL;
+
+ if (rdma->channel) {
+ if (!rdma->is_return_path) {
+ rdma_destroy_event_channel(rdma->channel);
+ }
+ rdma->channel = NULL;
+ }
}
+
if (rdma->channel) {
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
@@ -2555,6 +2602,25 @@ err_dest_init_create_listen_id:
}
+static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
+ RDMAContext *rdma)
+{
+ int idx;
+
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ rdma_return_path->wr_data[idx].control_len = 0;
+ rdma_return_path->wr_data[idx].control_curr = NULL;
+ }
+
+ /*the CM channel and CM id is shared*/
+ rdma_return_path->channel = rdma->channel;
+ rdma_return_path->listen_id = rdma->listen_id;
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+}
+
static void *qemu_rdma_data_init(const char *host_port, Error **errp)
{
RDMAContext *rdma = NULL;
@@ -2595,12 +2661,20 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
QEMUFile *f = rioc->file;
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
ssize_t done = 0;
size_t i;
size_t len = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
/*
@@ -2610,6 +2684,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
ret = qemu_rdma_write_flush(f, rdma);
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -2629,6 +2704,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -2637,6 +2713,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
}
}
+ rcu_read_unlock();
return done;
}
@@ -2670,12 +2747,20 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head;
int ret = 0;
ssize_t i;
size_t done = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
for (i = 0; i < niov; i++) {
@@ -2687,7 +2772,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
* were given and dish out the bytes until we run
* out of bytes.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Got what we needed, so go to next iovec */
@@ -2709,25 +2794,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
/*
* SEND was received with new bytes, now try again.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Still didn't get enough, so lets just return */
if (want) {
if (done == 0) {
+ rcu_read_unlock();
return QIO_CHANNEL_ERR_BLOCK;
} else {
break;
}
}
}
+ rcu_read_unlock();
return done;
}
@@ -2779,15 +2867,29 @@ qio_channel_rdma_source_prepare(GSource *source,
gint *timeout)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
*timeout = -1;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when prepare Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
@@ -2795,14 +2897,28 @@ static gboolean
qio_channel_rdma_source_check(GSource *source)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when check Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
@@ -2813,14 +2929,28 @@ qio_channel_rdma_source_dispatch(GSource *source,
{
QIOChannelFunc func = (QIOChannelFunc)callback;
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when dispatch Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return (*func)(QIO_CHANNEL(rsource->rioc),
(cond & rsource->condition),
user_data);
@@ -2860,20 +2990,91 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
return source;
}
+static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ if (io_read) {
+ aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ } else {
+ aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ }
+}
static int qio_channel_rdma_close(QIOChannel *ioc,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
trace_qemu_rdma_close();
- if (rioc->rdma) {
- if (!rioc->rdma->error_state) {
- rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+
+ rdmain = rioc->rdmain;
+ if (rdmain) {
+ atomic_rcu_set(&rioc->rdmain, NULL);
+ }
+
+ rdmaout = rioc->rdmaout;
+ if (rdmaout) {
+ atomic_rcu_set(&rioc->rdmaout, NULL);
+ }
+
+ synchronize_rcu();
+
+ if (rdmain) {
+ qemu_rdma_cleanup(rdmain);
+ }
+
+ if (rdmaout) {
+ qemu_rdma_cleanup(rdmaout);
+ }
+
+ g_free(rdmain);
+ g_free(rdmaout);
+
+ return 0;
+}
+
+static int
+qio_channel_rdma_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
+
+ rcu_read_lock();
+
+ rdmain = atomic_rcu_read(&rioc->rdmain);
+ rdmaout = atomic_rcu_read(&rioc->rdmain);
+
+ switch (how) {
+ case QIO_CHANNEL_SHUTDOWN_READ:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ break;
+ case QIO_CHANNEL_SHUTDOWN_WRITE:
+ if (rdmaout) {
+ rdmaout->error_state = -1;
}
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ break;
+ case QIO_CHANNEL_SHUTDOWN_BOTH:
+ default:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ if (rdmaout) {
+ rdmaout->error_state = -1;
+ }
+ break;
}
+
+ rcu_read_unlock();
return 0;
}
@@ -2916,11 +3117,24 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
size_t size, uint64_t *bytes_sent)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return RAM_SAVE_CONTROL_NOT_SUPP;
+ }
+
qemu_fflush(f);
if (size > 0) {
@@ -3002,12 +3216,45 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
}
}
+ rcu_read_unlock();
return RAM_SAVE_CONTROL_DELAYED;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
+static void rdma_accept_incoming_migration(void *opaque);
+
+static void rdma_cm_poll_handler(void *opaque)
+{
+ RDMAContext *rdma = opaque;
+ int ret;
+ struct rdma_cm_event *cm_event;
+ MigrationIncomingState *mis = migration_incoming_get_current();
+
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (ret) {
+ error_report("get_cm_event failed %d", errno);
+ return;
+ }
+ rdma_ack_cm_event(cm_event);
+
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ error_report("receive cm event, cm event is %d", cm_event->event);
+ rdma->error_state = -EPIPE;
+ if (rdma->return_path) {
+ rdma->return_path->error_state = -EPIPE;
+ }
+
+ if (mis->migration_incoming_co) {
+ qemu_coroutine_enter(mis->migration_incoming_co);
+ }
+ return;
+ }
+}
+
static int qemu_rdma_accept(RDMAContext *rdma)
{
RDMACapabilities cap;
@@ -3102,7 +3349,15 @@ static int qemu_rdma_accept(RDMAContext *rdma)
}
}
- qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
+ /* Accept the second connection request for return path */
+ if (migrate_postcopy() && !rdma->is_return_path) {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
+ NULL,
+ (void *)(intptr_t)rdma->return_path);
+ } else {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+ NULL, rdma);
+ }
ret = rdma_accept(rdma->cm_id, &conn_param);
if (ret) {
@@ -3171,8 +3426,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
- RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ RDMAContext *rdma;
+ RDMALocalBlocks *local;
RDMAControlHeader head;
RDMARegister *reg, *registers;
RDMACompress *comp;
@@ -3185,8 +3440,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
int count = 0;
int i = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ local = &rdma->local_ram_blocks;
do {
trace_qemu_rdma_registration_handle_wait();
@@ -3420,6 +3684,7 @@ out:
if (ret < 0) {
rdma->error_state = ret;
}
+ rcu_read_unlock();
return ret;
}
@@ -3433,10 +3698,18 @@ out:
static int
rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
{
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int curr;
int found = -1;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
/* Find the matching RAMBlock in our local list */
for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
@@ -3447,6 +3720,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
if (found == -1) {
error_report("RAMBlock '%s' not found on destination", name);
+ rcu_read_unlock();
return -ENOENT;
}
@@ -3454,6 +3728,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
trace_rdma_block_notification_handle(name, rdma->next_src_index);
rdma->next_src_index++;
+ rcu_read_unlock();
return 0;
}
@@ -3476,14 +3751,27 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
+
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
trace_qemu_rdma_registration_start(flags);
qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
qemu_fflush(f);
+ rcu_read_unlock();
return 0;
}
@@ -3496,12 +3784,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
{
Error *local_err = NULL, **errp = &local_err;
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
qemu_fflush(f);
ret = qemu_rdma_drain_cq(f, rdma);
@@ -3530,6 +3830,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
qemu_rdma_reg_whole_ram_blocks : NULL);
if (ret < 0) {
ERROR(errp, "receiving remote info!");
+ rcu_read_unlock();
return ret;
}
@@ -3553,6 +3854,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
"not identical on both the source and destination.",
local->nb_blocks, nb_dest_blocks);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
@@ -3569,6 +3871,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
local->block[i].length,
rdma->dest_blocks[i].length);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
local->block[i].remote_host_addr =
@@ -3586,9 +3889,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
goto err;
}
+ rcu_read_unlock();
return 0;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -3606,10 +3911,15 @@ static const QEMUFileHooks rdma_write_hooks = {
static void qio_channel_rdma_finalize(Object *obj)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
- if (rioc->rdma) {
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ if (rioc->rdmain) {
+ qemu_rdma_cleanup(rioc->rdmain);
+ g_free(rioc->rdmain);
+ rioc->rdmain = NULL;
+ }
+ if (rioc->rdmaout) {
+ qemu_rdma_cleanup(rioc->rdmaout);
+ g_free(rioc->rdmaout);
+ rioc->rdmaout = NULL;
}
}
@@ -3623,6 +3933,8 @@ static void qio_channel_rdma_class_init(ObjectClass *klass,
ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
ioc_klass->io_close = qio_channel_rdma_close;
ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
+ ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
}
static const TypeInfo qio_channel_rdma_info = {
@@ -3649,13 +3961,16 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
}
rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
- rioc->rdma = rdma;
if (mode[0] == 'w') {
rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ rioc->rdmaout = rdma;
+ rioc->rdmain = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ rioc->rdmain = rdma;
+ rioc->rdmaout = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
}
@@ -3679,6 +3994,10 @@ static void rdma_accept_incoming_migration(void *opaque)
trace_qemu_rdma_accept_incoming_migration_accepted();
+ if (rdma->is_return_path) {
+ return;
+ }
+
f = qemu_fopen_rdma(rdma, "rb");
if (f == NULL) {
ERROR(errp, "could not qemu_fopen_rdma!");
@@ -3693,7 +4012,7 @@ static void rdma_accept_incoming_migration(void *opaque)
void rdma_start_incoming_migration(const char *host_port, Error **errp)
{
int ret;
- RDMAContext *rdma;
+ RDMAContext *rdma, *rdma_return_path;
Error *local_err = NULL;
trace_rdma_start_incoming_migration();
@@ -3720,12 +4039,24 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
trace_rdma_start_incoming_migration_after_rdma_listen();
+ /* initialize the RDMAContext for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
+ }
+
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
err:
error_propagate(errp, local_err);
g_free(rdma);
+ g_free(rdma_return_path);
}
void rdma_start_outgoing_migration(void *opaque,
@@ -3733,6 +4064,7 @@ void rdma_start_outgoing_migration(void *opaque,
{
MigrationState *s = opaque;
RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
+ RDMAContext *rdma_return_path = NULL;
int ret = 0;
if (rdma == NULL) {
@@ -3753,6 +4085,32 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
+ /* RDMA postcopy need a seprate queue pair for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, errp);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ ret = qemu_rdma_source_init(rdma_return_path,
+ s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ ret = qemu_rdma_connect(rdma_return_path, errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+ }
+
trace_rdma_start_outgoing_migration_after_rdma_connect();
s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
@@ -3760,4 +4118,5 @@ void rdma_start_outgoing_migration(void *opaque,
return;
err:
g_free(rdma);
+ g_free(rdma_return_path);
}
diff --git a/migration/savevm.c b/migration/savevm.c
index 7f92567a10..13e51f0e34 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -1622,6 +1622,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
qemu_sem_post(&mis->listen_thread_sem);
trace_postcopy_ram_listen_thread_start();
+ rcu_register_thread();
/*
* Because we're a thread and not a coroutine we can't yield
* in qemu_file, and thus we must be blocking now.
@@ -1662,6 +1663,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
* to leave the guest running and fire MCEs for pages that never
* arrived as a desperate recovery step.
*/
+ rcu_unregister_thread();
exit(EXIT_FAILURE);
}
@@ -1676,6 +1678,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
migration_incoming_state_destroy();
qemu_loadvm_state_cleanup();
+ rcu_unregister_thread();
return NULL;
}
diff --git a/migration/vmstate.c b/migration/vmstate.c
index 6b9079bb51..0bc240a317 100644
--- a/migration/vmstate.c
+++ b/migration/vmstate.c
@@ -418,7 +418,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
static const VMStateDescription *
vmstate_get_subsection(const VMStateDescription **sub, char *idstr)
{
- while (sub && *sub && (*sub)->needed) {
+ while (sub && *sub) {
if (strcmp(idstr, (*sub)->name) == 0) {
return *sub;
}
@@ -486,8 +486,8 @@ static int vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd,
int ret = 0;
trace_vmstate_subsection_save_top(vmsd->name);
- while (sub && *sub && (*sub)->needed) {
- if ((*sub)->needed(opaque)) {
+ while (sub && *sub) {
+ if (vmstate_save_needed(*sub, opaque)) {
const VMStateDescription *vmsdsub = *sub;
uint8_t len;