aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/migration/migration.h8
-rw-r--r--migration/migration.c159
-rw-r--r--trace-events10
3 files changed, 175 insertions, 2 deletions
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 3ce3fda6b7..571466beb2 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -80,6 +80,14 @@ struct MigrationState
int state;
MigrationParams params;
+
+ /* State related to return path */
+ struct {
+ QEMUFile *from_dst_file;
+ QemuThread rp_thread;
+ bool error;
+ } rp_state;
+
double mbps;
int64_t total_time;
int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index 4317bab291..295deb8a2d 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -696,6 +696,11 @@ static void migrate_fd_cancel(MigrationState *s)
QEMUFile *f = migrate_get_current()->file;
trace_migrate_fd_cancel();
+ if (s->rp_state.from_dst_file) {
+ /* shutdown the rp socket, so causing the rp thread to shutdown */
+ qemu_file_shutdown(s->rp_state.from_dst_file);
+ }
+
do {
old_state = s->state;
if (!migration_is_setup_or_active(old_state)) {
@@ -1030,6 +1035,154 @@ int64_t migrate_xbzrle_cache_size(void)
return s->xbzrle_cache_size;
}
+/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print or trace something to indicate why
+ */
+static void mark_source_rp_bad(MigrationState *s)
+{
+ s->rp_state.error = true;
+}
+
+static struct rp_cmd_args {
+ ssize_t len; /* -1 = variable */
+ const char *name;
+} rp_cmd_args[] = {
+ [MIG_RP_MSG_INVALID] = { .len = -1, .name = "INVALID" },
+ [MIG_RP_MSG_SHUT] = { .len = 4, .name = "SHUT" },
+ [MIG_RP_MSG_PONG] = { .len = 4, .name = "PONG" },
+ [MIG_RP_MSG_MAX] = { .len = -1, .name = "MAX" },
+};
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+ MigrationState *ms = opaque;
+ QEMUFile *rp = ms->rp_state.from_dst_file;
+ uint16_t header_len, header_type;
+ const int max_len = 512;
+ uint8_t buf[max_len];
+ uint32_t tmp32, sibling_error;
+ int res;
+
+ trace_source_return_path_thread_entry();
+ while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
+ migration_is_setup_or_active(ms->state)) {
+ trace_source_return_path_thread_loop_top();
+ header_type = qemu_get_be16(rp);
+ header_len = qemu_get_be16(rp);
+
+ if (header_type >= MIG_RP_MSG_MAX ||
+ header_type == MIG_RP_MSG_INVALID) {
+ error_report("RP: Received invalid message 0x%04x length 0x%04x",
+ header_type, header_len);
+ mark_source_rp_bad(ms);
+ goto out;
+ }
+
+ if ((rp_cmd_args[header_type].len != -1 &&
+ header_len != rp_cmd_args[header_type].len) ||
+ header_len > max_len) {
+ error_report("RP: Received '%s' message (0x%04x) with"
+ "incorrect length %d expecting %zu",
+ rp_cmd_args[header_type].name, header_type, header_len,
+ (size_t)rp_cmd_args[header_type].len);
+ mark_source_rp_bad(ms);
+ goto out;
+ }
+
+ /* We know we've got a valid header by this point */
+ res = qemu_get_buffer(rp, buf, header_len);
+ if (res != header_len) {
+ error_report("RP: Failed reading data for message 0x%04x"
+ " read %d expected %d",
+ header_type, res, header_len);
+ mark_source_rp_bad(ms);
+ goto out;
+ }
+
+ /* OK, we have the message and the data */
+ switch (header_type) {
+ case MIG_RP_MSG_SHUT:
+ sibling_error = be32_to_cpup((uint32_t *)buf);
+ trace_source_return_path_thread_shut(sibling_error);
+ if (sibling_error) {
+ error_report("RP: Sibling indicated error %d", sibling_error);
+ mark_source_rp_bad(ms);
+ }
+ /*
+ * We'll let the main thread deal with closing the RP
+ * we could do a shutdown(2) on it, but we're the only user
+ * anyway, so there's nothing gained.
+ */
+ goto out;
+
+ case MIG_RP_MSG_PONG:
+ tmp32 = be32_to_cpup((uint32_t *)buf);
+ trace_source_return_path_thread_pong(tmp32);
+ break;
+
+ default:
+ break;
+ }
+ }
+ if (rp && qemu_file_get_error(rp)) {
+ trace_source_return_path_thread_bad_end();
+ mark_source_rp_bad(ms);
+ }
+
+ trace_source_return_path_thread_end();
+out:
+ ms->rp_state.from_dst_file = NULL;
+ qemu_fclose(rp);
+ return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_return_path_on_source(MigrationState *ms)
+{
+
+ ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->file);
+ if (!ms->rp_state.from_dst_file) {
+ return -1;
+ }
+
+ trace_open_return_path_on_source();
+ qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+ source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+ trace_open_return_path_on_source_continue();
+
+ return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
+static int await_return_path_close_on_source(MigrationState *ms)
+{
+ /*
+ * If this is a normal exit then the destination will send a SHUT and the
+ * rp_thread will exit, however if there's an error we need to cause
+ * it to exit.
+ */
+ if (qemu_file_get_error(ms->file) && ms->rp_state.from_dst_file) {
+ /*
+ * shutdown(2), if we have it, will cause it to unblock if it's stuck
+ * waiting for the destination.
+ */
+ qemu_file_shutdown(ms->rp_state.from_dst_file);
+ mark_source_rp_bad(ms);
+ }
+ trace_await_return_path_close_on_source_joining();
+ qemu_thread_join(&ms->rp_state.rp_thread);
+ trace_await_return_path_close_on_source_close();
+ return ms->rp_state.error;
+}
+
/**
* migration_completion: Used by migration_thread when there's not much left.
* The caller 'breaks' the loop when this returns.
@@ -1074,8 +1227,10 @@ fail:
migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED);
}
-/* migration thread support */
-
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
static void *migration_thread(void *opaque)
{
MigrationState *s = opaque;
diff --git a/trace-events b/trace-events
index ca5909bd08..d61fcf9619 100644
--- a/trace-events
+++ b/trace-events
@@ -1426,12 +1426,22 @@ flic_no_device_api(int err) "flic: no Device Contral API support %d"
flic_reset_failed(int err) "flic: reset failed %d"
# migration.c
+await_return_path_close_on_source_close(void) ""
+await_return_path_close_on_source_joining(void) ""
migrate_set_state(int new_state) "new state %d"
migrate_fd_cleanup(void) ""
migrate_fd_error(void) ""
migrate_fd_cancel(void) ""
migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" PRIu64
migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d"
+open_return_path_on_source(void) ""
+open_return_path_on_source_continue(void) ""
+source_return_path_thread_bad_end(void) ""
+source_return_path_thread_end(void) ""
+source_return_path_thread_entry(void) ""
+source_return_path_thread_loop_top(void) ""
+source_return_path_thread_pong(uint32_t val) "%x"
+source_return_path_thread_shut(uint32_t val) "%x"
migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64
migrate_state_too_big(void) ""
migrate_global_state_post_load(const char *state) "loaded state: %s"