aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/devel/migration.rst15
-rw-r--r--hmp.c16
-rw-r--r--include/qemu/queue.h1
-rw-r--r--migration/colo.c2
-rw-r--r--migration/migration.c49
-rw-r--r--migration/migration.h2
-rw-r--r--migration/postcopy-ram.c2
-rw-r--r--migration/qemu-file-channel.c12
-rw-r--r--migration/qemu-file.c8
-rw-r--r--migration/ram.c202
-rw-r--r--migration/rdma.c423
-rw-r--r--migration/savevm.c3
-rw-r--r--migration/vmstate.c6
-rw-r--r--qapi/migration.json64
14 files changed, 679 insertions, 126 deletions
diff --git a/docs/devel/migration.rst b/docs/devel/migration.rst
index 6ed3fce061..687570754d 100644
--- a/docs/devel/migration.rst
+++ b/docs/devel/migration.rst
@@ -240,10 +240,13 @@ should succeed even with the data missing. To support this the
subsection can be connected to a device property and from there
to a versioned machine type.
-One important note is that the post_load() function is called "after"
-loading all subsections, because a newer subsection could change same
-value that it uses. A flag, and the combination of pre_load and post_load
-can be used to detect whether a subsection was loaded, and to
+The 'pre_load' and 'post_load' functions on subsections are only
+called if the subsection is loaded.
+
+One important note is that the outer post_load() function is called "after"
+loading all subsections, because a newer subsection could change the same
+value that it uses. A flag, and the combination of outer pre_load and
+post_load can be used to detect whether a subsection was loaded, and to
fall back on default behaviour when the subsection isn't present.
Example:
@@ -315,8 +318,8 @@ For example:
the property to false.
c) Add a static bool support_foo function that tests the property.
d) Add a subsection with a .needed set to the support_foo function
- e) (potentially) Add a pre_load that sets up a default value for 'foo'
- to be used if the subsection isn't loaded.
+ e) (potentially) Add an outer pre_load that sets up a default value
+ for 'foo' to be used if the subsection isn't loaded.
Now that subsection will not be generated when using an older
machine type and the migration stream will be accepted by older
diff --git a/hmp.c b/hmp.c
index d94a47f7c7..4975fa56b0 100644
--- a/hmp.c
+++ b/hmp.c
@@ -327,6 +327,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_THREADS),
params->compress_threads);
+ assert(params->has_compress_wait_thread);
+ monitor_printf(mon, "%s: %s\n",
+ MigrationParameter_str(MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD),
+ params->compress_wait_thread ? "on" : "off");
assert(params->has_decompress_threads);
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_DECOMPRESS_THREADS),
@@ -339,6 +343,10 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, "%s: %u\n",
MigrationParameter_str(MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT),
params->cpu_throttle_increment);
+ assert(params->has_max_cpu_throttle);
+ monitor_printf(mon, "%s: %u\n",
+ MigrationParameter_str(MIGRATION_PARAMETER_MAX_CPU_THROTTLE),
+ params->max_cpu_throttle);
assert(params->has_tls_creds);
monitor_printf(mon, "%s: '%s'\n",
MigrationParameter_str(MIGRATION_PARAMETER_TLS_CREDS),
@@ -1647,6 +1655,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_compress_threads = true;
visit_type_int(v, param, &p->compress_threads, &err);
break;
+ case MIGRATION_PARAMETER_COMPRESS_WAIT_THREAD:
+ p->has_compress_wait_thread = true;
+ visit_type_bool(v, param, &p->compress_wait_thread, &err);
+ break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
p->has_decompress_threads = true;
visit_type_int(v, param, &p->decompress_threads, &err);
@@ -1659,6 +1671,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_cpu_throttle_increment = true;
visit_type_int(v, param, &p->cpu_throttle_increment, &err);
break;
+ case MIGRATION_PARAMETER_MAX_CPU_THROTTLE:
+ p->has_max_cpu_throttle = true;
+ visit_type_int(v, param, &p->max_cpu_throttle, &err);
+ break;
case MIGRATION_PARAMETER_TLS_CREDS:
p->has_tls_creds = true;
p->tls_creds = g_new0(StrOrNull, 1);
diff --git a/include/qemu/queue.h b/include/qemu/queue.h
index 59fd1203a1..ac418efc43 100644
--- a/include/qemu/queue.h
+++ b/include/qemu/queue.h
@@ -341,6 +341,7 @@ struct { \
/*
* Simple queue access methods.
*/
+#define QSIMPLEQ_EMPTY_ATOMIC(head) (atomic_read(&((head)->sqh_first)) == NULL)
#define QSIMPLEQ_EMPTY(head) ((head)->sqh_first == NULL)
#define QSIMPLEQ_FIRST(head) ((head)->sqh_first)
#define QSIMPLEQ_NEXT(elm, field) ((elm)->field.sqe_next)
diff --git a/migration/colo.c b/migration/colo.c
index 4381067ed4..88936f5962 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -534,6 +534,7 @@ void *colo_process_incoming_thread(void *opaque)
uint64_t value;
Error *local_err = NULL;
+ rcu_register_thread();
qemu_sem_init(&mis->colo_incoming_sem, 0);
migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
@@ -666,5 +667,6 @@ out:
}
migration_incoming_exit_colo();
+ rcu_unregister_thread();
return NULL;
}
diff --git a/migration/migration.c b/migration/migration.c
index b7d9854bda..4b316ec343 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -71,6 +71,7 @@
/* Define default autoconverge cpu throttle migration parameters */
#define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
#define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
+#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99
/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
@@ -389,6 +390,7 @@ static void process_incoming_migration_co(void *opaque)
int ret;
assert(mis->from_src_file);
+ mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
@@ -418,7 +420,6 @@ static void process_incoming_migration_co(void *opaque)
/* we get COLO info, and know if we are in COLO mode */
if (!ret && migration_incoming_enable_colo()) {
- mis->migration_incoming_co = qemu_coroutine_self();
qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
mis->have_colo_incoming_thread = true;
@@ -442,6 +443,7 @@ static void process_incoming_migration_co(void *opaque)
}
mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
qemu_bh_schedule(mis->bh);
+ mis->migration_incoming_co = NULL;
}
static void migration_incoming_setup(QEMUFile *f)
@@ -671,6 +673,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->compress_level = s->parameters.compress_level;
params->has_compress_threads = true;
params->compress_threads = s->parameters.compress_threads;
+ params->has_compress_wait_thread = true;
+ params->compress_wait_thread = s->parameters.compress_wait_thread;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
@@ -697,6 +701,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
+ params->has_max_cpu_throttle = true;
+ params->max_cpu_throttle = s->parameters.max_cpu_throttle;
return params;
}
@@ -1043,6 +1049,15 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
return false;
}
+ if (params->has_max_cpu_throttle &&
+ (params->max_cpu_throttle < params->cpu_throttle_initial ||
+ params->max_cpu_throttle > 99)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "max_cpu_throttle",
+ "an integer in the range of cpu_throttle_initial to 99");
+ return false;
+ }
+
return true;
}
@@ -1061,6 +1076,10 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
dest->compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ dest->compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
@@ -1110,6 +1129,9 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_max_postcopy_bandwidth) {
dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ dest->max_cpu_throttle = params->max_cpu_throttle;
+ }
}
static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
@@ -1126,6 +1148,10 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ s->parameters.compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
@@ -1185,6 +1211,9 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_max_postcopy_bandwidth) {
s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ s->parameters.max_cpu_throttle = params->max_cpu_throttle;
+ }
}
void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1871,6 +1900,15 @@ int migrate_compress_threads(void)
return s->parameters.compress_threads;
}
+int migrate_compress_wait_thread(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_wait_thread;
+}
+
int migrate_decompress_threads(void)
{
MigrationState *s;
@@ -1962,7 +2000,6 @@ static int64_t migrate_max_postcopy_bandwidth(void)
return s->parameters.max_postcopy_bandwidth;
}
-
bool migrate_use_block(void)
{
MigrationState *s;
@@ -2104,6 +2141,7 @@ static void *source_return_path_thread(void *opaque)
int res;
trace_source_return_path_thread_entry();
+ rcu_register_thread();
retry:
while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
@@ -2243,6 +2281,7 @@ out:
trace_source_return_path_thread_end();
ms->rp_state.from_dst_file = NULL;
qemu_fclose(rp);
+ rcu_unregister_thread();
return NULL;
}
@@ -3131,6 +3170,8 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+ DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
+ parameters.compress_wait_thread, true),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
@@ -3160,6 +3201,9 @@ static Property migration_properties[] = {
DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState,
parameters.max_postcopy_bandwidth,
DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH),
+ DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState,
+ parameters.max_cpu_throttle,
+ DEFAULT_MIGRATE_MAX_CPU_THROTTLE),
/* Migration capabilities */
DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
@@ -3230,6 +3274,7 @@ static void migration_instance_init(Object *obj)
params->has_x_multifd_page_count = true;
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
+ params->has_max_cpu_throttle = true;
qemu_sem_init(&ms->postcopy_pause_sem, 0);
qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
diff --git a/migration/migration.h b/migration/migration.h
index 64a7b33735..f7813f8261 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -266,11 +266,13 @@ bool migrate_colo_enabled(void);
bool migrate_use_block(void);
bool migrate_use_block_incremental(void);
+int migrate_max_cpu_throttle(void);
bool migrate_use_return_path(void);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
+int migrate_compress_wait_thread(void);
int migrate_decompress_threads(void);
bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c
index 932f188949..3952d78e6b 100644
--- a/migration/postcopy-ram.c
+++ b/migration/postcopy-ram.c
@@ -853,6 +853,7 @@ static void *postcopy_ram_fault_thread(void *opaque)
RAMBlock *rb = NULL;
trace_postcopy_ram_fault_thread_entry();
+ rcu_register_thread();
mis->last_rb = NULL; /* last RAMBlock we sent part of */
qemu_sem_post(&mis->fault_thread_sem);
@@ -1059,6 +1060,7 @@ retry:
}
}
}
+ rcu_unregister_thread();
trace_postcopy_ram_fault_thread_exit();
g_free(pfd);
return NULL;
diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c
index e202d73834..8e639eb496 100644
--- a/migration/qemu-file-channel.c
+++ b/migration/qemu-file-channel.c
@@ -49,7 +49,11 @@ static ssize_t channel_writev_buffer(void *opaque,
ssize_t len;
len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL);
if (len == QIO_CHANNEL_ERR_BLOCK) {
- qio_channel_wait(ioc, G_IO_OUT);
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_OUT);
+ } else {
+ qio_channel_wait(ioc, G_IO_OUT);
+ }
continue;
}
if (len < 0) {
@@ -80,7 +84,11 @@ static ssize_t channel_get_buffer(void *opaque,
ret = qio_channel_read(ioc, (char *)buf, size, NULL);
if (ret < 0) {
if (ret == QIO_CHANNEL_ERR_BLOCK) {
- qio_channel_yield(ioc, G_IO_IN);
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(ioc, G_IO_IN);
+ } else {
+ qio_channel_wait(ioc, G_IO_IN);
+ }
} else {
/* XXX handle Error * object */
return -EIO;
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 0463f4c321..977b9ae07c 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -253,8 +253,12 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
if (f->hooks && f->hooks->save_page) {
int ret = f->hooks->save_page(f, f->opaque, block_offset,
offset, size, bytes_sent);
- f->bytes_xfer += size;
- if (ret != RAM_SAVE_CONTROL_DELAYED) {
+ if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ f->bytes_xfer += size;
+ }
+
+ if (ret != RAM_SAVE_CONTROL_DELAYED &&
+ ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (bytes_sent && *bytes_sent > 0) {
qemu_update_position(f, *bytes_sent);
} else if (ret < 0) {
diff --git a/migration/ram.c b/migration/ram.c
index fa79d0a5b9..79c89425a3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -341,6 +341,7 @@ typedef struct PageSearchStatus PageSearchStatus;
struct CompressParam {
bool done;
bool quit;
+ bool zero_page;
QEMUFile *file;
QemuMutex mutex;
QemuCond cond;
@@ -382,14 +383,15 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf);
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+ ram_addr_t offset, uint8_t *source_buf);
static void *do_data_compress(void *opaque)
{
CompressParam *param = opaque;
RAMBlock *block;
ram_addr_t offset;
+ bool zero_page;
qemu_mutex_lock(&param->mutex);
while (!param->quit) {
@@ -399,11 +401,12 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(&param->mutex);
- do_compress_ram_page(param->file, &param->stream, block, offset,
- param->originbuf);
+ zero_page = do_compress_ram_page(param->file, &param->stream,
+ block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
param->done = true;
+ param->zero_page = zero_page;
qemu_cond_signal(&comp_done_cond);
qemu_mutex_unlock(&comp_done_lock);
@@ -989,6 +992,7 @@ static void *multifd_send_thread(void *opaque)
int ret;
trace_multifd_send_thread_start(p->id);
+ rcu_register_thread();
if (multifd_send_initial_packet(p, &local_err) < 0) {
goto out;
@@ -1051,6 +1055,7 @@ out:
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ rcu_unregister_thread();
trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
return NULL;
@@ -1220,6 +1225,7 @@ static void *multifd_recv_thread(void *opaque)
int ret;
trace_multifd_recv_thread_start(p->id);
+ rcu_register_thread();
while (true) {
uint32_t used;
@@ -1266,6 +1272,7 @@ static void *multifd_recv_thread(void *opaque)
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ rcu_unregister_thread();
trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
return NULL;
@@ -1391,13 +1398,15 @@ static void mig_throttle_guest_down(void)
MigrationState *s = migrate_get_current();
uint64_t pct_initial = s->parameters.cpu_throttle_initial;
uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
+ int pct_max = s->parameters.max_cpu_throttle;
/* We have not started throttling yet. Let's start it. */
if (!cpu_throttle_active()) {
cpu_throttle_set(pct_initial);
} else {
/* Throttling already on, just increase the rate */
- cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement);
+ cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
+ pct_max));
}
}
@@ -1666,6 +1675,31 @@ static void migration_bitmap_sync(RAMState *rs)
}
/**
+ * save_zero_page_to_file: send the zero page to the file
+ *
+ * Returns the size of data written to the file, 0 means the page is not
+ * a zero page
+ *
+ * @rs: current RAM state
+ * @file: the file where the data is saved
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ */
+static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
+ RAMBlock *block, ram_addr_t offset)
+{
+ uint8_t *p = block->host + offset;
+ int len = 0;
+
+ if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
+ qemu_put_byte(file, 0);
+ len += 1;
+ }
+ return len;
+}
+
+/**
* save_zero_page: send the zero page to the stream
*
* Returns the number of pages written.
@@ -1676,19 +1710,14 @@ static void migration_bitmap_sync(RAMState *rs)
*/
static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
{
- uint8_t *p = block->host + offset;
- int pages = -1;
+ int len = save_zero_page_to_file(rs, rs->f, block, offset);
- if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ if (len) {
ram_counters.duplicate++;
- ram_counters.transferred +=
- save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_ZERO);
- qemu_put_byte(rs->f, 0);
- ram_counters.transferred += 1;
- pages = 1;
+ ram_counters.transferred += len;
+ return 1;
}
-
- return pages;
+ return -1;
}
static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
@@ -1823,15 +1852,20 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
return 1;
}
-static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf)
+static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
+ ram_addr_t offset, uint8_t *source_buf)
{
RAMState *rs = ram_state;
- int bytes_sent, blen;
uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
+ bool zero_page = false;
+ int ret;
+
+ if (save_zero_page_to_file(rs, f, block, offset)) {
+ zero_page = true;
+ goto exit;
+ }
- bytes_sent = save_page_header(rs, f, block, offset |
- RAM_SAVE_FLAG_COMPRESS_PAGE);
+ save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
/*
* copy it to a internal buffer to avoid it being modified by VM
@@ -1839,17 +1873,25 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
* decompression
*/
memcpy(source_buf, p, TARGET_PAGE_SIZE);
- blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
- if (blen < 0) {
- bytes_sent = 0;
- qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
+ ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+ if (ret < 0) {
+ qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
- } else {
- bytes_sent += blen;
- ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+ return false;
}
- return bytes_sent;
+exit:
+ ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
+ return zero_page;
+}
+
+static void
+update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+{
+ if (param->zero_page) {
+ ram_counters.duplicate++;
+ }
+ ram_counters.transferred += bytes_xmit;
}
static void flush_compressed_data(RAMState *rs)
@@ -1873,7 +1915,12 @@ static void flush_compressed_data(RAMState *rs)
qemu_mutex_lock(&comp_param[idx].mutex);
if (!comp_param[idx].quit) {
len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- ram_counters.transferred += len;
+ /*
+ * it's safe to fetch zero_page without holding comp_done_lock
+ * as there is no further request submitted to the thread,
+ * i.e, the thread should be waiting for a request at this point.
+ */
+ update_compress_thread_counts(&comp_param[idx], len);
}
qemu_mutex_unlock(&comp_param[idx].mutex);
}
@@ -1890,30 +1937,33 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
ram_addr_t offset)
{
int idx, thread_count, bytes_xmit = -1, pages = -1;
+ bool wait = migrate_compress_wait_thread();
thread_count = migrate_compress_threads();
qemu_mutex_lock(&comp_done_lock);
- while (true) {
- for (idx = 0; idx < thread_count; idx++) {
- if (comp_param[idx].done) {
- comp_param[idx].done = false;
- bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- qemu_mutex_lock(&comp_param[idx].mutex);
- set_compress_params(&comp_param[idx], block, offset);
- qemu_cond_signal(&comp_param[idx].cond);
- qemu_mutex_unlock(&comp_param[idx].mutex);
- pages = 1;
- ram_counters.normal++;
- ram_counters.transferred += bytes_xmit;
- break;
- }
- }
- if (pages > 0) {
+retry:
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ comp_param[idx].done = false;
+ bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ set_compress_params(&comp_param[idx], block, offset);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ pages = 1;
+ update_compress_thread_counts(&comp_param[idx], bytes_xmit);
break;
- } else {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
}
}
+
+ /*
+ * wait for the free thread if the user specifies 'compress-wait-thread',
+ * otherwise we will post the page out in the main thread as normal page.
+ */
+ if (pages < 0 && wait) {
+ qemu_cond_wait(&comp_done_cond, &comp_done_lock);
+ goto retry;
+ }
qemu_mutex_unlock(&comp_done_lock);
return pages;
@@ -1983,6 +2033,10 @@ static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
{
RAMBlock *block = NULL;
+ if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
+ return NULL;
+ }
+
qemu_mutex_lock(&rs->src_page_req_mutex);
if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
struct RAMSrcPageRequest *entry =
@@ -2175,6 +2229,39 @@ static bool save_page_use_compression(RAMState *rs)
return false;
}
+/*
+ * try to compress the page before posting it out, return true if the page
+ * has been properly handled by compression, otherwise needs other
+ * paths to handle it
+ */
+static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
+{
+ if (!save_page_use_compression(rs)) {
+ return false;
+ }
+
+ /*
+ * When starting the process of a new block, the first page of
+ * the block should be sent out before other pages in the same
+ * block, and all the pages in last block should have been sent
+ * out, keeping this order is important, because the 'cont' flag
+ * is used to avoid resending the block name.
+ *
+ * We post the fist page as normal page as compression will take
+ * much CPU resource.
+ */
+ if (block != rs->last_sent_block) {
+ flush_compressed_data(rs);
+ return false;
+ }
+
+ if (compress_page_with_multi_thread(rs, block, offset) > 0) {
+ return true;
+ }
+
+ return false;
+}
+
/**
* ram_save_target_page: save one target page
*
@@ -2195,15 +2282,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
return res;
}
- /*
- * When starting the process of a new block, the first page of
- * the block should be sent out before other pages in the same
- * block, and all the pages in last block should have been sent
- * out, keeping this order is important, because the 'cont' flag
- * is used to avoid resending the block name.
- */
- if (block != rs->last_sent_block && save_page_use_compression(rs)) {
- flush_compressed_data(rs);
+ if (save_compress_page(rs, block, offset)) {
+ return 1;
}
res = save_zero_page(rs, block, offset);
@@ -2221,14 +2301,10 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
}
/*
- * Make sure the first page is sent out before other pages.
- *
- * we post it as normal page as compression will take much
- * CPU resource.
+ * do not use multifd for compression as the first page in the new
+ * block should be posted out before sending the compressed page
*/
- if (block == rs->last_sent_block && save_page_use_compression(rs)) {
- return compress_page_with_multi_thread(rs, block, offset);
- } else if (migrate_use_multifd()) {
+ if (!save_page_use_compression(rs) && migrate_use_multifd()) {
return ram_save_multifd_page(rs, block, offset);
}
diff --git a/migration/rdma.c b/migration/rdma.c
index 8bd7159059..ae07515e83 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
" to abort!"); \
rdma->error_reported = 1; \
} \
+ rcu_read_unlock(); \
return rdma->error_state; \
} \
} while (0)
@@ -387,6 +388,10 @@ typedef struct RDMAContext {
uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
GHashTable *blockmap;
+
+ /* the RDMAContext for return path */
+ struct RDMAContext *return_path;
+ bool is_return_path;
} RDMAContext;
#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
@@ -398,7 +403,8 @@ typedef struct QIOChannelRDMA QIOChannelRDMA;
struct QIOChannelRDMA {
QIOChannel parent;
- RDMAContext *rdma;
+ RDMAContext *rdmain;
+ RDMAContext *rdmaout;
QEMUFile *file;
bool blocking; /* XXX we don't actually honour this yet */
};
@@ -1483,27 +1489,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
*/
static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
{
+ struct rdma_cm_event *cm_event;
+ int ret = -1;
+
/*
* Coroutine doesn't start until migration_fd_process_incoming()
* so don't yield unless we know we're running inside of a coroutine.
*/
- if (rdma->migration_started_on_destination) {
+ if (rdma->migration_started_on_destination &&
+ migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
yield_until_fd_readable(rdma->comp_channel->fd);
} else {
/* This is the source side, we're in a separate thread
* or destination prior to migration_fd_process_incoming()
+ * after postcopy, the destination also in a seprate thread.
* we can't yield; so we have to poll the fd.
* But we need to be able to handle 'cancel' or an error
* without hanging forever.
*/
while (!rdma->error_state && !rdma->received_error) {
- GPollFD pfds[1];
+ GPollFD pfds[2];
pfds[0].fd = rdma->comp_channel->fd;
pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[0].revents = 0;
+
+ pfds[1].fd = rdma->channel->fd;
+ pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[1].revents = 0;
+
/* 0.1s timeout, should be fine for a 'cancel' */
- switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+ switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
+ case 2:
case 1: /* fd active */
- return 0;
+ if (pfds[0].revents) {
+ return 0;
+ }
+
+ if (pfds[1].revents) {
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (!ret) {
+ rdma_ack_cm_event(cm_event);
+ }
+
+ error_report("receive cm event while wait comp channel,"
+ "cm event is %d", cm_event->event);
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ return -EPIPE;
+ }
+ }
+ break;
case 0: /* Timeout, go around again */
break;
@@ -2323,10 +2358,22 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma_destroy_id(rdma->cm_id);
rdma->cm_id = NULL;
}
+
+ /* the destination side, listen_id and channel is shared */
if (rdma->listen_id) {
- rdma_destroy_id(rdma->listen_id);
+ if (!rdma->is_return_path) {
+ rdma_destroy_id(rdma->listen_id);
+ }
rdma->listen_id = NULL;
+
+ if (rdma->channel) {
+ if (!rdma->is_return_path) {
+ rdma_destroy_event_channel(rdma->channel);
+ }
+ rdma->channel = NULL;
+ }
}
+
if (rdma->channel) {
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
@@ -2555,6 +2602,25 @@ err_dest_init_create_listen_id:
}
+static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
+ RDMAContext *rdma)
+{
+ int idx;
+
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ rdma_return_path->wr_data[idx].control_len = 0;
+ rdma_return_path->wr_data[idx].control_curr = NULL;
+ }
+
+ /*the CM channel and CM id is shared*/
+ rdma_return_path->channel = rdma->channel;
+ rdma_return_path->listen_id = rdma->listen_id;
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+}
+
static void *qemu_rdma_data_init(const char *host_port, Error **errp)
{
RDMAContext *rdma = NULL;
@@ -2595,12 +2661,20 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
QEMUFile *f = rioc->file;
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
ssize_t done = 0;
size_t i;
size_t len = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
/*
@@ -2610,6 +2684,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
ret = qemu_rdma_write_flush(f, rdma);
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -2629,6 +2704,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -2637,6 +2713,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
}
}
+ rcu_read_unlock();
return done;
}
@@ -2670,12 +2747,20 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head;
int ret = 0;
ssize_t i;
size_t done = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
for (i = 0; i < niov; i++) {
@@ -2687,7 +2772,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
* were given and dish out the bytes until we run
* out of bytes.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Got what we needed, so go to next iovec */
@@ -2709,25 +2794,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
/*
* SEND was received with new bytes, now try again.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Still didn't get enough, so lets just return */
if (want) {
if (done == 0) {
+ rcu_read_unlock();
return QIO_CHANNEL_ERR_BLOCK;
} else {
break;
}
}
}
+ rcu_read_unlock();
return done;
}
@@ -2779,15 +2867,29 @@ qio_channel_rdma_source_prepare(GSource *source,
gint *timeout)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
*timeout = -1;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when prepare Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
@@ -2795,14 +2897,28 @@ static gboolean
qio_channel_rdma_source_check(GSource *source)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when check Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
@@ -2813,14 +2929,28 @@ qio_channel_rdma_source_dispatch(GSource *source,
{
QIOChannelFunc func = (QIOChannelFunc)callback;
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when dispatch Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return (*func)(QIO_CHANNEL(rsource->rioc),
(cond & rsource->condition),
user_data);
@@ -2860,20 +2990,91 @@ static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
return source;
}
+static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ if (io_read) {
+ aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ } else {
+ aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ }
+}
static int qio_channel_rdma_close(QIOChannel *ioc,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
trace_qemu_rdma_close();
- if (rioc->rdma) {
- if (!rioc->rdma->error_state) {
- rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+
+ rdmain = rioc->rdmain;
+ if (rdmain) {
+ atomic_rcu_set(&rioc->rdmain, NULL);
+ }
+
+ rdmaout = rioc->rdmaout;
+ if (rdmaout) {
+ atomic_rcu_set(&rioc->rdmaout, NULL);
+ }
+
+ synchronize_rcu();
+
+ if (rdmain) {
+ qemu_rdma_cleanup(rdmain);
+ }
+
+ if (rdmaout) {
+ qemu_rdma_cleanup(rdmaout);
+ }
+
+ g_free(rdmain);
+ g_free(rdmaout);
+
+ return 0;
+}
+
+static int
+qio_channel_rdma_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
+
+ rcu_read_lock();
+
+ rdmain = atomic_rcu_read(&rioc->rdmain);
+ rdmaout = atomic_rcu_read(&rioc->rdmain);
+
+ switch (how) {
+ case QIO_CHANNEL_SHUTDOWN_READ:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ break;
+ case QIO_CHANNEL_SHUTDOWN_WRITE:
+ if (rdmaout) {
+ rdmaout->error_state = -1;
}
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ break;
+ case QIO_CHANNEL_SHUTDOWN_BOTH:
+ default:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ if (rdmaout) {
+ rdmaout->error_state = -1;
+ }
+ break;
}
+
+ rcu_read_unlock();
return 0;
}
@@ -2916,11 +3117,24 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
size_t size, uint64_t *bytes_sent)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return RAM_SAVE_CONTROL_NOT_SUPP;
+ }
+
qemu_fflush(f);
if (size > 0) {
@@ -3002,12 +3216,45 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
}
}
+ rcu_read_unlock();
return RAM_SAVE_CONTROL_DELAYED;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
+static void rdma_accept_incoming_migration(void *opaque);
+
+static void rdma_cm_poll_handler(void *opaque)
+{
+ RDMAContext *rdma = opaque;
+ int ret;
+ struct rdma_cm_event *cm_event;
+ MigrationIncomingState *mis = migration_incoming_get_current();
+
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (ret) {
+ error_report("get_cm_event failed %d", errno);
+ return;
+ }
+ rdma_ack_cm_event(cm_event);
+
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ error_report("receive cm event, cm event is %d", cm_event->event);
+ rdma->error_state = -EPIPE;
+ if (rdma->return_path) {
+ rdma->return_path->error_state = -EPIPE;
+ }
+
+ if (mis->migration_incoming_co) {
+ qemu_coroutine_enter(mis->migration_incoming_co);
+ }
+ return;
+ }
+}
+
static int qemu_rdma_accept(RDMAContext *rdma)
{
RDMACapabilities cap;
@@ -3102,7 +3349,15 @@ static int qemu_rdma_accept(RDMAContext *rdma)
}
}
- qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
+ /* Accept the second connection request for return path */
+ if (migrate_postcopy() && !rdma->is_return_path) {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
+ NULL,
+ (void *)(intptr_t)rdma->return_path);
+ } else {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+ NULL, rdma);
+ }
ret = rdma_accept(rdma->cm_id, &conn_param);
if (ret) {
@@ -3171,8 +3426,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
- RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ RDMAContext *rdma;
+ RDMALocalBlocks *local;
RDMAControlHeader head;
RDMARegister *reg, *registers;
RDMACompress *comp;
@@ -3185,8 +3440,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
int count = 0;
int i = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ local = &rdma->local_ram_blocks;
do {
trace_qemu_rdma_registration_handle_wait();
@@ -3420,6 +3684,7 @@ out:
if (ret < 0) {
rdma->error_state = ret;
}
+ rcu_read_unlock();
return ret;
}
@@ -3433,10 +3698,18 @@ out:
static int
rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
{
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int curr;
int found = -1;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
/* Find the matching RAMBlock in our local list */
for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
@@ -3447,6 +3720,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
if (found == -1) {
error_report("RAMBlock '%s' not found on destination", name);
+ rcu_read_unlock();
return -ENOENT;
}
@@ -3454,6 +3728,7 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
trace_rdma_block_notification_handle(name, rdma->next_src_index);
rdma->next_src_index++;
+ rcu_read_unlock();
return 0;
}
@@ -3476,14 +3751,27 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
+
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
trace_qemu_rdma_registration_start(flags);
qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
qemu_fflush(f);
+ rcu_read_unlock();
return 0;
}
@@ -3496,12 +3784,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
{
Error *local_err = NULL, **errp = &local_err;
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
qemu_fflush(f);
ret = qemu_rdma_drain_cq(f, rdma);
@@ -3530,6 +3830,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
qemu_rdma_reg_whole_ram_blocks : NULL);
if (ret < 0) {
ERROR(errp, "receiving remote info!");
+ rcu_read_unlock();
return ret;
}
@@ -3553,6 +3854,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
"not identical on both the source and destination.",
local->nb_blocks, nb_dest_blocks);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
@@ -3569,6 +3871,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
local->block[i].length,
rdma->dest_blocks[i].length);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
local->block[i].remote_host_addr =
@@ -3586,9 +3889,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
goto err;
}
+ rcu_read_unlock();
return 0;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
@@ -3606,10 +3911,15 @@ static const QEMUFileHooks rdma_write_hooks = {
static void qio_channel_rdma_finalize(Object *obj)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
- if (rioc->rdma) {
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ if (rioc->rdmain) {
+ qemu_rdma_cleanup(rioc->rdmain);
+ g_free(rioc->rdmain);
+ rioc->rdmain = NULL;
+ }
+ if (rioc->rdmaout) {
+ qemu_rdma_cleanup(rioc->rdmaout);
+ g_free(rioc->rdmaout);
+ rioc->rdmaout = NULL;
}
}
@@ -3623,6 +3933,8 @@ static void qio_channel_rdma_class_init(ObjectClass *klass,
ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
ioc_klass->io_close = qio_channel_rdma_close;
ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
+ ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
}
static const TypeInfo qio_channel_rdma_info = {
@@ -3649,13 +3961,16 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
}
rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
- rioc->rdma = rdma;
if (mode[0] == 'w') {
rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ rioc->rdmaout = rdma;
+ rioc->rdmain = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ rioc->rdmain = rdma;
+ rioc->rdmaout = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
}
@@ -3679,6 +3994,10 @@ static void rdma_accept_incoming_migration(void *opaque)
trace_qemu_rdma_accept_incoming_migration_accepted();
+ if (rdma->is_return_path) {
+ return;
+ }
+
f = qemu_fopen_rdma(rdma, "rb");
if (f == NULL) {
ERROR(errp, "could not qemu_fopen_rdma!");
@@ -3693,7 +4012,7 @@ static void rdma_accept_incoming_migration(void *opaque)
void rdma_start_incoming_migration(const char *host_port, Error **errp)
{
int ret;
- RDMAContext *rdma;
+ RDMAContext *rdma, *rdma_return_path;
Error *local_err = NULL;
trace_rdma_start_incoming_migration();
@@ -3720,12 +4039,24 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
trace_rdma_start_incoming_migration_after_rdma_listen();
+ /* initialize the RDMAContext for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
+ }
+
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
err:
error_propagate(errp, local_err);
g_free(rdma);
+ g_free(rdma_return_path);
}
void rdma_start_outgoing_migration(void *opaque,
@@ -3733,6 +4064,7 @@ void rdma_start_outgoing_migration(void *opaque,
{
MigrationState *s = opaque;
RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
+ RDMAContext *rdma_return_path = NULL;
int ret = 0;
if (rdma == NULL) {
@@ -3753,6 +4085,32 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
+ /* RDMA postcopy need a seprate queue pair for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, errp);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ ret = qemu_rdma_source_init(rdma_return_path,
+ s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ ret = qemu_rdma_connect(rdma_return_path, errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+ }
+
trace_rdma_start_outgoing_migration_after_rdma_connect();
s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
@@ -3760,4 +4118,5 @@ void rdma_start_outgoing_migration(void *opaque,
return;
err:
g_free(rdma);
+ g_free(rdma_return_path);
}
diff --git a/migration/savevm.c b/migration/savevm.c
index 7f92567a10..13e51f0e34 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -1622,6 +1622,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
qemu_sem_post(&mis->listen_thread_sem);
trace_postcopy_ram_listen_thread_start();
+ rcu_register_thread();
/*
* Because we're a thread and not a coroutine we can't yield
* in qemu_file, and thus we must be blocking now.
@@ -1662,6 +1663,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
* to leave the guest running and fire MCEs for pages that never
* arrived as a desperate recovery step.
*/
+ rcu_unregister_thread();
exit(EXIT_FAILURE);
}
@@ -1676,6 +1678,7 @@ static void *postcopy_ram_listen_thread(void *opaque)
migration_incoming_state_destroy();
qemu_loadvm_state_cleanup();
+ rcu_unregister_thread();
return NULL;
}
diff --git a/migration/vmstate.c b/migration/vmstate.c
index 6b9079bb51..0bc240a317 100644
--- a/migration/vmstate.c
+++ b/migration/vmstate.c
@@ -418,7 +418,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
static const VMStateDescription *
vmstate_get_subsection(const VMStateDescription **sub, char *idstr)
{
- while (sub && *sub && (*sub)->needed) {
+ while (sub && *sub) {
if (strcmp(idstr, (*sub)->name) == 0) {
return *sub;
}
@@ -486,8 +486,8 @@ static int vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd,
int ret = 0;
trace_vmstate_subsection_save_top(vmsd->name);
- while (sub && *sub && (*sub)->needed) {
- if ((*sub)->needed(opaque)) {
+ while (sub && *sub) {
+ if (vmstate_save_needed(*sub, opaque)) {
const VMStateDescription *vmsdsub = *sub;
uint8_t len;
diff --git a/qapi/migration.json b/qapi/migration.json
index 186e8a7303..f62d3f9a4b 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -212,13 +212,13 @@
# -> { "execute": "query-migrate" }
# <- { "return": {
# "status": "completed",
+# "total-time":12345,
+# "setup-time":12345,
+# "downtime":12345,
# "ram":{
# "transferred":123,
# "remaining":123,
# "total":246,
-# "total-time":12345,
-# "setup-time":12345,
-# "downtime":12345,
# "duplicate":123,
# "normal":123,
# "normal-bytes":123456,
@@ -238,13 +238,13 @@
# <- {
# "return":{
# "status":"active",
+# "total-time":12345,
+# "setup-time":12345,
+# "expected-downtime":12345,
# "ram":{
# "transferred":123,
# "remaining":123,
# "total":246,
-# "total-time":12345,
-# "setup-time":12345,
-# "expected-downtime":12345,
# "duplicate":123,
# "normal":123,
# "normal-bytes":123456,
@@ -259,13 +259,13 @@
# <- {
# "return":{
# "status":"active",
+# "total-time":12345,
+# "setup-time":12345,
+# "expected-downtime":12345,
# "ram":{
# "total":1057024,
# "remaining":1053304,
# "transferred":3720,
-# "total-time":12345,
-# "setup-time":12345,
-# "expected-downtime":12345,
# "duplicate":123,
# "normal":123,
# "normal-bytes":123456,
@@ -285,14 +285,13 @@
# <- {
# "return":{
# "status":"active",
-# "capabilities" : [ { "capability": "xbzrle", "state" : true } ],
+# "total-time":12345,
+# "setup-time":12345,
+# "expected-downtime":12345,
# "ram":{
# "total":1057024,
# "remaining":1053304,
# "transferred":3720,
-# "total-time":12345,
-# "setup-time":12345,
-# "expected-downtime":12345,
# "duplicate":10,
# "normal":3333,
# "normal-bytes":3412992,
@@ -462,6 +461,11 @@
# @compress-threads: Set compression thread count to be used in live migration,
# the compression thread count is an integer between 1 and 255.
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: Set decompression thread count to be used in live
# migration, the decompression thread count is an integer between 1
# and 255. Usually, decompression is at least 4 times as fast as
@@ -523,15 +527,20 @@
# @max-postcopy-bandwidth: Background transfer bandwidth during postcopy.
# Defaults to 0 (unlimited). In bytes per second.
# (Since 3.0)
+#
+# @max-cpu-throttle: maximum cpu throttle percentage.
+# Defaults to 99. (Since 3.1)
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
'data': ['compress-level', 'compress-threads', 'decompress-threads',
+ 'compress-wait-thread',
'cpu-throttle-initial', 'cpu-throttle-increment',
'tls-creds', 'tls-hostname', 'max-bandwidth',
'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
'x-multifd-channels', 'x-multifd-page-count',
- 'xbzrle-cache-size', 'max-postcopy-bandwidth' ] }
+ 'xbzrle-cache-size', 'max-postcopy-bandwidth',
+ 'max-cpu-throttle' ] }
##
# @MigrateSetParameters:
@@ -540,6 +549,11 @@
#
# @compress-threads: compression thread count
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -603,6 +617,10 @@
# @max-postcopy-bandwidth: Background transfer bandwidth during postcopy.
# Defaults to 0 (unlimited). In bytes per second.
# (Since 3.0)
+#
+# @max-cpu-throttle: maximum cpu throttle percentage.
+# The default value is 99. (Since 3.1)
+#
# Since: 2.4
##
# TODO either fuse back into MigrationParameters, or make
@@ -610,6 +628,7 @@
{ 'struct': 'MigrateSetParameters',
'data': { '*compress-level': 'int',
'*compress-threads': 'int',
+ '*compress-wait-thread': 'bool',
'*decompress-threads': 'int',
'*cpu-throttle-initial': 'int',
'*cpu-throttle-increment': 'int',
@@ -622,7 +641,8 @@
'*x-multifd-channels': 'int',
'*x-multifd-page-count': 'int',
'*xbzrle-cache-size': 'size',
- '*max-postcopy-bandwidth': 'size' } }
+ '*max-postcopy-bandwidth': 'size',
+ '*max-cpu-throttle': 'int' } }
##
# @migrate-set-parameters:
@@ -649,6 +669,11 @@
#
# @compress-threads: compression thread count
#
+# @compress-wait-thread: Controls behavior when all compression threads are
+# currently busy. If true (default), wait for a free
+# compression thread to become available; otherwise,
+# send the page uncompressed. (Since 3.1)
+#
# @decompress-threads: decompression thread count
#
# @cpu-throttle-initial: Initial percentage of time guest cpus are
@@ -709,11 +734,17 @@
# @max-postcopy-bandwidth: Background transfer bandwidth during postcopy.
# Defaults to 0 (unlimited). In bytes per second.
# (Since 3.0)
+#
+# @max-cpu-throttle: maximum cpu throttle percentage.
+# Defaults to 99.
+# (Since 3.1)
+#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
'data': { '*compress-level': 'uint8',
'*compress-threads': 'uint8',
+ '*compress-wait-thread': 'bool',
'*decompress-threads': 'uint8',
'*cpu-throttle-initial': 'uint8',
'*cpu-throttle-increment': 'uint8',
@@ -726,7 +757,8 @@
'*x-multifd-channels': 'uint8',
'*x-multifd-page-count': 'uint32',
'*xbzrle-cache-size': 'size',
- '*max-postcopy-bandwidth': 'size' } }
+ '*max-postcopy-bandwidth': 'size',
+ '*max-cpu-throttle':'uint8'} }
##
# @query-migrate-parameters: