diff options
author | Dr. David Alan Gilbert <dgilbert@redhat.com> | 2015-11-05 18:10:49 +0000 |
---|---|---|
committer | Juan Quintela <quintela@redhat.com> | 2015-11-10 15:00:26 +0100 |
commit | 70b2047774231ba2cb672eb936e12f603fa644aa (patch) | |
tree | 357fdd77c28d103fb71c70838f67c7b1d78f9bf8 | |
parent | f6844b99ce08e836d509ec4be2fbfc5ab13ac18f (diff) |
Return path: Source handling of return path
Open a return path, and handle messages that are received upon it.
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
-rw-r--r-- | include/migration/migration.h | 8 | ||||
-rw-r--r-- | migration/migration.c | 159 | ||||
-rw-r--r-- | trace-events | 10 |
3 files changed, 175 insertions, 2 deletions
diff --git a/include/migration/migration.h b/include/migration/migration.h index 3ce3fda6b7..571466beb2 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -80,6 +80,14 @@ struct MigrationState int state; MigrationParams params; + + /* State related to return path */ + struct { + QEMUFile *from_dst_file; + QemuThread rp_thread; + bool error; + } rp_state; + double mbps; int64_t total_time; int64_t downtime; diff --git a/migration/migration.c b/migration/migration.c index 4317bab291..295deb8a2d 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -696,6 +696,11 @@ static void migrate_fd_cancel(MigrationState *s) QEMUFile *f = migrate_get_current()->file; trace_migrate_fd_cancel(); + if (s->rp_state.from_dst_file) { + /* shutdown the rp socket, so causing the rp thread to shutdown */ + qemu_file_shutdown(s->rp_state.from_dst_file); + } + do { old_state = s->state; if (!migration_is_setup_or_active(old_state)) { @@ -1030,6 +1035,154 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } +/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print or trace something to indicate why + */ +static void mark_source_rp_bad(MigrationState *s) +{ + s->rp_state.error = true; +} + +static struct rp_cmd_args { + ssize_t len; /* -1 = variable */ + const char *name; +} rp_cmd_args[] = { + [MIG_RP_MSG_INVALID] = { .len = -1, .name = "INVALID" }, + [MIG_RP_MSG_SHUT] = { .len = 4, .name = "SHUT" }, + [MIG_RP_MSG_PONG] = { .len = 4, .name = "PONG" }, + [MIG_RP_MSG_MAX] = { .len = -1, .name = "MAX" }, +}; + +/* + * Handles messages sent on the return path towards the source VM + * + */ +static void *source_return_path_thread(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->rp_state.from_dst_file; + uint16_t header_len, header_type; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32, sibling_error; + int res; + + trace_source_return_path_thread_entry(); + while (!ms->rp_state.error && !qemu_file_get_error(rp) && + migration_is_setup_or_active(ms->state)) { + trace_source_return_path_thread_loop_top(); + header_type = qemu_get_be16(rp); + header_len = qemu_get_be16(rp); + + if (header_type >= MIG_RP_MSG_MAX || + header_type == MIG_RP_MSG_INVALID) { + error_report("RP: Received invalid message 0x%04x length 0x%04x", + header_type, header_len); + mark_source_rp_bad(ms); + goto out; + } + + if ((rp_cmd_args[header_type].len != -1 && + header_len != rp_cmd_args[header_type].len) || + header_len > max_len) { + error_report("RP: Received '%s' message (0x%04x) with" + "incorrect length %d expecting %zu", + rp_cmd_args[header_type].name, header_type, header_len, + (size_t)rp_cmd_args[header_type].len); + mark_source_rp_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, header_len); + if (res != header_len) { + error_report("RP: Failed reading data for message 0x%04x" + " read %d expected %d", + header_type, res, header_len); + mark_source_rp_bad(ms); + goto out; + } + + /* OK, we have the message and the data */ + switch (header_type) { + case MIG_RP_MSG_SHUT: + sibling_error = be32_to_cpup((uint32_t *)buf); + trace_source_return_path_thread_shut(sibling_error); + if (sibling_error) { + error_report("RP: Sibling indicated error %d", sibling_error); + mark_source_rp_bad(ms); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RP_MSG_PONG: + tmp32 = be32_to_cpup((uint32_t *)buf); + trace_source_return_path_thread_pong(tmp32); + break; + + default: + break; + } + } + if (rp && qemu_file_get_error(rp)) { + trace_source_return_path_thread_bad_end(); + mark_source_rp_bad(ms); + } + + trace_source_return_path_thread_end(); +out: + ms->rp_state.from_dst_file = NULL; + qemu_fclose(rp); + return NULL; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static int open_return_path_on_source(MigrationState *ms) +{ + + ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->file); + if (!ms->rp_state.from_dst_file) { + return -1; + } + + trace_open_return_path_on_source(); + qemu_thread_create(&ms->rp_state.rp_thread, "return path", + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); + + trace_open_return_path_on_source_continue(); + + return 0; +} + +__attribute__ (( unused )) /* Until later in patch series */ +/* Returns 0 if the RP was ok, otherwise there was an error on the RP */ +static int await_return_path_close_on_source(MigrationState *ms) +{ + /* + * If this is a normal exit then the destination will send a SHUT and the + * rp_thread will exit, however if there's an error we need to cause + * it to exit. + */ + if (qemu_file_get_error(ms->file) && ms->rp_state.from_dst_file) { + /* + * shutdown(2), if we have it, will cause it to unblock if it's stuck + * waiting for the destination. + */ + qemu_file_shutdown(ms->rp_state.from_dst_file); + mark_source_rp_bad(ms); + } + trace_await_return_path_close_on_source_joining(); + qemu_thread_join(&ms->rp_state.rp_thread); + trace_await_return_path_close_on_source_close(); + return ms->rp_state.error; +} + /** * migration_completion: Used by migration_thread when there's not much left. * The caller 'breaks' the loop when this returns. @@ -1074,8 +1227,10 @@ fail: migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED); } -/* migration thread support */ - +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque; diff --git a/trace-events b/trace-events index ca5909bd08..d61fcf9619 100644 --- a/trace-events +++ b/trace-events @@ -1426,12 +1426,22 @@ flic_no_device_api(int err) "flic: no Device Contral API support %d" flic_reset_failed(int err) "flic: reset failed %d" # migration.c +await_return_path_close_on_source_close(void) "" +await_return_path_close_on_source_joining(void) "" migrate_set_state(int new_state) "new state %d" migrate_fd_cleanup(void) "" migrate_fd_error(void) "" migrate_fd_cancel(void) "" migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" PRIu64 migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" +open_return_path_on_source(void) "" +open_return_path_on_source_continue(void) "" +source_return_path_thread_bad_end(void) "" +source_return_path_thread_end(void) "" +source_return_path_thread_entry(void) "" +source_return_path_thread_loop_top(void) "" +source_return_path_thread_pong(uint32_t val) "%x" +source_return_path_thread_shut(uint32_t val) "%x" migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64 migrate_state_too_big(void) "" migrate_global_state_post_load(const char *state) "loaded state: %s" |