diff options
-rw-r--r-- | docs/devel/migration.rst | 14 | ||||
-rw-r--r-- | hmp.c | 15 | ||||
-rw-r--r-- | migration/migration.c | 51 | ||||
-rw-r--r-- | migration/migration.h | 13 | ||||
-rw-r--r-- | migration/postcopy-ram.c | 268 | ||||
-rw-r--r-- | migration/qemu-file.c | 43 | ||||
-rw-r--r-- | migration/qemu-file.h | 6 | ||||
-rw-r--r-- | migration/ram.c | 482 | ||||
-rw-r--r-- | migration/trace-events | 6 | ||||
-rw-r--r-- | qapi/migration.json | 17 | ||||
-rw-r--r-- | tests/migration-test.c | 16 |
11 files changed, 716 insertions, 215 deletions
diff --git a/docs/devel/migration.rst b/docs/devel/migration.rst index e32b087f6e..9342a8af06 100644 --- a/docs/devel/migration.rst +++ b/docs/devel/migration.rst @@ -401,6 +401,20 @@ will now cause the transition from precopy to postcopy. It can be issued immediately after migration is started or any time later on. Issuing it after the end of a migration is harmless. +Blocktime is a postcopy live migration metric, intended to show how +long the vCPU was in state of interruptable sleep due to pagefault. +That metric is calculated both for all vCPUs as overlapped value, and +separately for each vCPU. These values are calculated on destination +side. To enable postcopy blocktime calculation, enter following +command on destination monitor: + +``migrate_set_capability postcopy-blocktime on`` + +Postcopy blocktime can be retrieved by query-migrate qmp command. +postcopy-blocktime value of qmp command will show overlapped blocking +time for all vCPU, postcopy-vcpu-blocktime will show list of blocking +time per vCPU. + .. note:: During the postcopy phase, the bandwidth limits set using ``migrate_set_speed`` is ignored (to avoid delaying requested pages that @@ -274,6 +274,21 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict) info->cpu_throttle_percentage); } + if (info->has_postcopy_blocktime) { + monitor_printf(mon, "postcopy blocktime: %u\n", + info->postcopy_blocktime); + } + + if (info->has_postcopy_vcpu_blocktime) { + Visitor *v; + char *str; + v = string_output_visitor_new(false, &str); + visit_type_uint32List(v, NULL, &info->postcopy_vcpu_blocktime, NULL); + visit_complete(v, &str); + monitor_printf(mon, "postcopy vcpu blocktime: %s\n", str); + g_free(str); + visit_free(v); + } qapi_free_MigrationInfo(info); qapi_free_MigrationCapabilityStatusList(caps); } diff --git a/migration/migration.c b/migration/migration.c index 52a5092add..0bdb28e144 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -630,14 +630,15 @@ static void populate_disk_info(MigrationInfo *info) } } -MigrationInfo *qmp_query_migrate(Error **errp) +static void fill_source_migration_info(MigrationInfo *info) { - MigrationInfo *info = g_malloc0(sizeof(*info)); MigrationState *s = migrate_get_current(); switch (s->state) { case MIGRATION_STATUS_NONE: /* no migration has happened ever */ + /* do not overwrite destination migration status */ + return; break; case MIGRATION_STATUS_SETUP: info->has_status = true; @@ -688,8 +689,6 @@ MigrationInfo *qmp_query_migrate(Error **errp) break; } info->status = s->state; - - return info; } /** @@ -753,6 +752,41 @@ static bool migrate_caps_check(bool *cap_list, return true; } +static void fill_destination_migration_info(MigrationInfo *info) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + + switch (mis->state) { + case MIGRATION_STATUS_NONE: + return; + break; + case MIGRATION_STATUS_SETUP: + case MIGRATION_STATUS_CANCELLING: + case MIGRATION_STATUS_CANCELLED: + case MIGRATION_STATUS_ACTIVE: + case MIGRATION_STATUS_POSTCOPY_ACTIVE: + case MIGRATION_STATUS_FAILED: + case MIGRATION_STATUS_COLO: + info->has_status = true; + break; + case MIGRATION_STATUS_COMPLETED: + info->has_status = true; + fill_destination_postcopy_migration_info(info); + break; + } + info->status = mis->state; +} + +MigrationInfo *qmp_query_migrate(Error **errp) +{ + MigrationInfo *info = g_malloc0(sizeof(*info)); + + fill_destination_migration_info(info); + fill_source_migration_info(info); + + return info; +} + void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, Error **errp) { @@ -1541,6 +1575,15 @@ bool migrate_zero_blocks(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; } +bool migrate_postcopy_blocktime(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_BLOCKTIME]; +} + bool migrate_use_compression(void) { MigrationState *s; diff --git a/migration/migration.h b/migration/migration.h index 8d2f320c48..7c69598c54 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -22,6 +22,8 @@ #include "hw/qdev.h" #include "io/channel.h" +struct PostcopyBlocktimeContext; + /* State for the incoming migration */ struct MigrationIncomingState { QEMUFile *from_src_file; @@ -65,10 +67,20 @@ struct MigrationIncomingState { /* The coroutine we should enter (back) after failover */ Coroutine *migration_incoming_co; QemuSemaphore colo_incoming_sem; + + /* + * PostcopyBlocktimeContext to keep information for postcopy + * live migration, to calculate vCPU block time + * */ + struct PostcopyBlocktimeContext *blocktime_ctx; }; MigrationIncomingState *migration_incoming_get_current(void); void migration_incoming_state_destroy(void); +/* + * Functions to work with blocktime context + */ +void fill_destination_postcopy_migration_info(MigrationInfo *info); #define TYPE_MIGRATION "migration" @@ -230,6 +242,7 @@ int migrate_compress_level(void); int migrate_compress_threads(void); int migrate_decompress_threads(void); bool migrate_use_events(void); +bool migrate_postcopy_blocktime(void); /* Sending on the return path - generic and then for each message type */ void migrate_send_rp_shut(MigrationIncomingState *mis, diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index 4a0b33b373..8ceeaa2a93 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -90,6 +90,103 @@ int postcopy_notify(enum PostcopyNotifyReason reason, Error **errp) #include <sys/eventfd.h> #include <linux/userfaultfd.h> +typedef struct PostcopyBlocktimeContext { + /* time when page fault initiated per vCPU */ + uint32_t *page_fault_vcpu_time; + /* page address per vCPU */ + uintptr_t *vcpu_addr; + uint32_t total_blocktime; + /* blocktime per vCPU */ + uint32_t *vcpu_blocktime; + /* point in time when last page fault was initiated */ + uint32_t last_begin; + /* number of vCPU are suspended */ + int smp_cpus_down; + uint64_t start_time; + + /* + * Handler for exit event, necessary for + * releasing whole blocktime_ctx + */ + Notifier exit_notifier; +} PostcopyBlocktimeContext; + +static void destroy_blocktime_context(struct PostcopyBlocktimeContext *ctx) +{ + g_free(ctx->page_fault_vcpu_time); + g_free(ctx->vcpu_addr); + g_free(ctx->vcpu_blocktime); + g_free(ctx); +} + +static void migration_exit_cb(Notifier *n, void *data) +{ + PostcopyBlocktimeContext *ctx = container_of(n, PostcopyBlocktimeContext, + exit_notifier); + destroy_blocktime_context(ctx); +} + +static struct PostcopyBlocktimeContext *blocktime_context_new(void) +{ + PostcopyBlocktimeContext *ctx = g_new0(PostcopyBlocktimeContext, 1); + ctx->page_fault_vcpu_time = g_new0(uint32_t, smp_cpus); + ctx->vcpu_addr = g_new0(uintptr_t, smp_cpus); + ctx->vcpu_blocktime = g_new0(uint32_t, smp_cpus); + + ctx->exit_notifier.notify = migration_exit_cb; + ctx->start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); + qemu_add_exit_notifier(&ctx->exit_notifier); + return ctx; +} + +static uint32List *get_vcpu_blocktime_list(PostcopyBlocktimeContext *ctx) +{ + uint32List *list = NULL, *entry = NULL; + int i; + + for (i = smp_cpus - 1; i >= 0; i--) { + entry = g_new0(uint32List, 1); + entry->value = ctx->vcpu_blocktime[i]; + entry->next = list; + list = entry; + } + + return list; +} + +/* + * This function just populates MigrationInfo from postcopy's + * blocktime context. It will not populate MigrationInfo, + * unless postcopy-blocktime capability was set. + * + * @info: pointer to MigrationInfo to populate + */ +void fill_destination_postcopy_migration_info(MigrationInfo *info) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + PostcopyBlocktimeContext *bc = mis->blocktime_ctx; + + if (!bc) { + return; + } + + info->has_postcopy_blocktime = true; + info->postcopy_blocktime = bc->total_blocktime; + info->has_postcopy_vcpu_blocktime = true; + info->postcopy_vcpu_blocktime = get_vcpu_blocktime_list(bc); +} + +static uint32_t get_postcopy_total_blocktime(void) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + PostcopyBlocktimeContext *bc = mis->blocktime_ctx; + + if (!bc) { + return 0; + } + + return bc->total_blocktime; +} /** * receive_ufd_features: check userfault fd features, to request only supported @@ -182,6 +279,19 @@ static bool ufd_check_and_apply(int ufd, MigrationIncomingState *mis) } } +#ifdef UFFD_FEATURE_THREAD_ID + if (migrate_postcopy_blocktime() && mis && + UFFD_FEATURE_THREAD_ID & supported_features) { + /* kernel supports that feature */ + /* don't create blocktime_context if it exists */ + if (!mis->blocktime_ctx) { + mis->blocktime_ctx = blocktime_context_new(); + } + + asked_features |= UFFD_FEATURE_THREAD_ID; + } +#endif + /* * request features, even if asked_features is 0, due to * kernel expects UFFD_API before UFFDIO_REGISTER, per @@ -451,6 +561,9 @@ int postcopy_ram_incoming_cleanup(MigrationIncomingState *mis) munmap(mis->postcopy_tmp_zero_page, mis->largest_page_size); mis->postcopy_tmp_zero_page = NULL; } + trace_postcopy_ram_incoming_cleanup_blocktime( + get_postcopy_total_blocktime()); + trace_postcopy_ram_incoming_cleanup_exit(); return 0; } @@ -575,6 +688,148 @@ int postcopy_request_shared_page(struct PostCopyFD *pcfd, RAMBlock *rb, return 0; } +static int get_mem_fault_cpu_index(uint32_t pid) +{ + CPUState *cpu_iter; + + CPU_FOREACH(cpu_iter) { + if (cpu_iter->thread_id == pid) { + trace_get_mem_fault_cpu_index(cpu_iter->cpu_index, pid); + return cpu_iter->cpu_index; + } + } + trace_get_mem_fault_cpu_index(-1, pid); + return -1; +} + +static uint32_t get_low_time_offset(PostcopyBlocktimeContext *dc) +{ + int64_t start_time_offset = qemu_clock_get_ms(QEMU_CLOCK_REALTIME) - + dc->start_time; + return start_time_offset < 1 ? 1 : start_time_offset & UINT32_MAX; +} + +/* + * This function is being called when pagefault occurs. It + * tracks down vCPU blocking time. + * + * @addr: faulted host virtual address + * @ptid: faulted process thread id + * @rb: ramblock appropriate to addr + */ +static void mark_postcopy_blocktime_begin(uintptr_t addr, uint32_t ptid, + RAMBlock *rb) +{ + int cpu, already_received; + MigrationIncomingState *mis = migration_incoming_get_current(); + PostcopyBlocktimeContext *dc = mis->blocktime_ctx; + uint32_t low_time_offset; + + if (!dc || ptid == 0) { + return; + } + cpu = get_mem_fault_cpu_index(ptid); + if (cpu < 0) { + return; + } + + low_time_offset = get_low_time_offset(dc); + if (dc->vcpu_addr[cpu] == 0) { + atomic_inc(&dc->smp_cpus_down); + } + + atomic_xchg(&dc->last_begin, low_time_offset); + atomic_xchg(&dc->page_fault_vcpu_time[cpu], low_time_offset); + atomic_xchg(&dc->vcpu_addr[cpu], addr); + + /* check it here, not at the begining of the function, + * due to, check could accur early than bitmap_set in + * qemu_ufd_copy_ioctl */ + already_received = ramblock_recv_bitmap_test(rb, (void *)addr); + if (already_received) { + atomic_xchg(&dc->vcpu_addr[cpu], 0); + atomic_xchg(&dc->page_fault_vcpu_time[cpu], 0); + atomic_dec(&dc->smp_cpus_down); + } + trace_mark_postcopy_blocktime_begin(addr, dc, dc->page_fault_vcpu_time[cpu], + cpu, already_received); +} + +/* + * This function just provide calculated blocktime per cpu and trace it. + * Total blocktime is calculated in mark_postcopy_blocktime_end. + * + * + * Assume we have 3 CPU + * + * S1 E1 S1 E1 + * -----***********------------xxx***************------------------------> CPU1 + * + * S2 E2 + * ------------****************xxx---------------------------------------> CPU2 + * + * S3 E3 + * ------------------------****xxx********-------------------------------> CPU3 + * + * We have sequence S1,S2,E1,S3,S1,E2,E3,E1 + * S2,E1 - doesn't match condition due to sequence S1,S2,E1 doesn't include CPU3 + * S3,S1,E2 - sequence includes all CPUs, in this case overlap will be S1,E2 - + * it's a part of total blocktime. + * S1 - here is last_begin + * Legend of the picture is following: + * * - means blocktime per vCPU + * x - means overlapped blocktime (total blocktime) + * + * @addr: host virtual address + */ +static void mark_postcopy_blocktime_end(uintptr_t addr) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + PostcopyBlocktimeContext *dc = mis->blocktime_ctx; + int i, affected_cpu = 0; + bool vcpu_total_blocktime = false; + uint32_t read_vcpu_time, low_time_offset; + + if (!dc) { + return; + } + + low_time_offset = get_low_time_offset(dc); + /* lookup cpu, to clear it, + * that algorithm looks straighforward, but it's not + * optimal, more optimal algorithm is keeping tree or hash + * where key is address value is a list of */ + for (i = 0; i < smp_cpus; i++) { + uint32_t vcpu_blocktime = 0; + + read_vcpu_time = atomic_fetch_add(&dc->page_fault_vcpu_time[i], 0); + if (atomic_fetch_add(&dc->vcpu_addr[i], 0) != addr || + read_vcpu_time == 0) { + continue; + } + atomic_xchg(&dc->vcpu_addr[i], 0); + vcpu_blocktime = low_time_offset - read_vcpu_time; + affected_cpu += 1; + /* we need to know is that mark_postcopy_end was due to + * faulted page, another possible case it's prefetched + * page and in that case we shouldn't be here */ + if (!vcpu_total_blocktime && + atomic_fetch_add(&dc->smp_cpus_down, 0) == smp_cpus) { + vcpu_total_blocktime = true; + } + /* continue cycle, due to one page could affect several vCPUs */ + dc->vcpu_blocktime[i] += vcpu_blocktime; + } + + atomic_sub(&dc->smp_cpus_down, affected_cpu); + if (vcpu_total_blocktime) { + dc->total_blocktime += low_time_offset - atomic_fetch_add( + &dc->last_begin, 0); + } + trace_mark_postcopy_blocktime_end(addr, dc, dc->total_blocktime, + affected_cpu); +} + /* * Handle faults detected by the USERFAULT markings */ @@ -681,7 +936,12 @@ static void *postcopy_ram_fault_thread(void *opaque) rb_offset &= ~(qemu_ram_pagesize(rb) - 1); trace_postcopy_ram_fault_thread_request(msg.arg.pagefault.address, qemu_ram_get_idstr(rb), - rb_offset); + rb_offset, + msg.arg.pagefault.feat.ptid); + mark_postcopy_blocktime_begin( + (uintptr_t)(msg.arg.pagefault.address), + msg.arg.pagefault.feat.ptid, rb); + /* * Send the request to the source - we want to request one * of our host page sizes (which is >= TPS) @@ -829,6 +1089,8 @@ static int qemu_ufd_copy_ioctl(int userfault_fd, void *host_addr, if (!ret) { ramblock_recv_bitmap_set_range(rb, host_addr, pagesize / qemu_target_page_size()); + mark_postcopy_blocktime_end((uintptr_t)host_addr); + } return ret; } @@ -947,6 +1209,10 @@ void *postcopy_get_tmp_page(MigrationIncomingState *mis) #else /* No target OS support, stubs just fail */ +void fill_destination_postcopy_migration_info(MigrationInfo *info) +{ +} + bool postcopy_ram_supported_by_host(MigrationIncomingState *mis) { error_report("%s: No OS support", __func__); diff --git a/migration/qemu-file.c b/migration/qemu-file.c index bb63c779cc..0463f4c321 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -658,8 +658,32 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } -/* Compress size bytes of data start at p with specific compression - * level and store the compressed data to the buffer of f. +/* return the size after compression, or negative value on error */ +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + const uint8_t *source, size_t source_len) +{ + int err; + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = source_len; + stream->next_in = (uint8_t *)source; + stream->avail_out = dest_len; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->next_out - dest; +} + +/* Compress size bytes of data start at p and store the compressed + * data to the buffer of f. * * When f is not writable, return -1 if f has no space to save the * compressed data. @@ -667,9 +691,8 @@ uint64_t qemu_get_be64(QEMUFile *f) * do fflush first, if f still has no space to save the compressed * data, return -1. */ - -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, - int level) +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, + const uint8_t *p, size_t size) { ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); @@ -683,11 +706,13 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, return -1; } } - if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, - (Bytef *)p, size, level) != Z_OK) { - error_report("Compress Failed!"); - return 0; + + blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), + blen, p, size); + if (blen < 0) { + return -1; } + qemu_put_be32(f, blen); if (f->ops->writev_buffer) { add_to_iovec(f, f->buf + f->buf_index, blen, false); diff --git a/migration/qemu-file.h b/migration/qemu-file.h index f4f356ab12..2ccfcfb2a8 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -25,6 +25,8 @@ #ifndef MIGRATION_QEMU_FILE_H #define MIGRATION_QEMU_FILE_H +#include <zlib.h> + /* Read a chunk of data from a file at the given position. The pos argument * can be ignored if the file is only be used for streaming. The number of * bytes actually read should be returned. @@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f); size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, - int level); +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, + const uint8_t *p, size_t size); int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); /* diff --git a/migration/ram.c b/migration/ram.c index 0e90efa092..912810c18e 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -269,6 +269,10 @@ struct CompressParam { QemuCond cond; RAMBlock *block; ram_addr_t offset; + + /* internally used fields */ + z_stream stream; + uint8_t *originbuf; }; typedef struct CompressParam CompressParam; @@ -280,6 +284,7 @@ struct DecompressParam { void *des; uint8_t *compbuf; int len; + z_stream stream; }; typedef struct DecompressParam DecompressParam; @@ -294,13 +299,14 @@ static QemuCond comp_done_cond; /* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; +static QEMUFile *decomp_file; static DecompressParam *decomp_param; static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, - ram_addr_t offset); +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, + ram_addr_t offset, uint8_t *source_buf); static void *do_data_compress(void *opaque) { @@ -316,7 +322,8 @@ static void *do_data_compress(void *opaque) param->block = NULL; qemu_mutex_unlock(¶m->mutex); - do_compress_ram_page(param->file, block, offset); + do_compress_ram_page(param->file, ¶m->stream, block, offset, + param->originbuf); qemu_mutex_lock(&comp_done_lock); param->done = true; @@ -357,10 +364,20 @@ static void compress_threads_save_cleanup(void) terminate_compression_threads(); thread_count = migrate_compress_threads(); for (i = 0; i < thread_count; i++) { + /* + * we use it as a indicator which shows if the thread is + * properly init'd or not + */ + if (!comp_param[i].file) { + break; + } qemu_thread_join(compress_threads + i); - qemu_fclose(comp_param[i].file); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); + deflateEnd(&comp_param[i].stream); + g_free(comp_param[i].originbuf); + qemu_fclose(comp_param[i].file); + comp_param[i].file = NULL; } qemu_mutex_destroy(&comp_done_lock); qemu_cond_destroy(&comp_done_cond); @@ -370,12 +387,12 @@ static void compress_threads_save_cleanup(void) comp_param = NULL; } -static void compress_threads_save_setup(void) +static int compress_threads_save_setup(void) { int i, thread_count; if (!migrate_use_compression()) { - return; + return 0; } thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); @@ -383,6 +400,17 @@ static void compress_threads_save_setup(void) qemu_cond_init(&comp_done_cond); qemu_mutex_init(&comp_done_lock); for (i = 0; i < thread_count; i++) { + comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!comp_param[i].originbuf) { + goto exit; + } + + if (deflateInit(&comp_param[i].stream, + migrate_compress_level()) != Z_OK) { + g_free(comp_param[i].originbuf); + goto exit; + } + /* comp_param[i].file is just used as a dummy buffer to save data, * set its ops to empty. */ @@ -395,6 +423,11 @@ static void compress_threads_save_setup(void) do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); } + return 0; + +exit: + compress_threads_save_cleanup(); + return -1; } /* Multiple fd's */ @@ -941,6 +974,72 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages) ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS); } +/* + * @pages: the number of pages written by the control path, + * < 0 - error + * > 0 - number of pages written + * + * Return true if the pages has been saved, otherwise false is returned. + */ +static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset, + int *pages) +{ + uint64_t bytes_xmit = 0; + int ret; + + *pages = -1; + ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE, + &bytes_xmit); + if (ret == RAM_SAVE_CONTROL_NOT_SUPP) { + return false; + } + + if (bytes_xmit) { + ram_counters.transferred += bytes_xmit; + *pages = 1; + } + + if (ret == RAM_SAVE_CONTROL_DELAYED) { + return true; + } + + if (bytes_xmit > 0) { + ram_counters.normal++; + } else if (bytes_xmit == 0) { + ram_counters.duplicate++; + } + + return true; +} + +/* + * directly send the page to the stream + * + * Returns the number of pages written. + * + * @rs: current RAM state + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @buf: the page to be sent + * @async: send to page asyncly + */ +static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset, + uint8_t *buf, bool async) +{ + ram_counters.transferred += save_page_header(rs, rs->f, block, + offset | RAM_SAVE_FLAG_PAGE); + if (async) { + qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE, + migrate_release_ram() & + migration_in_postcopy()); + } else { + qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE); + } + ram_counters.transferred += TARGET_PAGE_SIZE; + ram_counters.normal++; + return 1; +} + /** * ram_save_page: send the given page to the stream * @@ -957,73 +1056,31 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages) static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) { int pages = -1; - uint64_t bytes_xmit; - ram_addr_t current_addr; uint8_t *p; - int ret; bool send_async = true; RAMBlock *block = pss->block; ram_addr_t offset = pss->page << TARGET_PAGE_BITS; + ram_addr_t current_addr = block->offset + offset; p = block->host + offset; trace_ram_save_page(block->idstr, (uint64_t)offset, p); - /* In doubt sent page as normal */ - bytes_xmit = 0; - ret = ram_control_save_page(rs->f, block->offset, - offset, TARGET_PAGE_SIZE, &bytes_xmit); - if (bytes_xmit) { - ram_counters.transferred += bytes_xmit; - pages = 1; - } - XBZRLE_cache_lock(); - - current_addr = block->offset + offset; - - if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { - if (ret != RAM_SAVE_CONTROL_DELAYED) { - if (bytes_xmit > 0) { - ram_counters.normal++; - } else if (bytes_xmit == 0) { - ram_counters.duplicate++; - } - } - } else { - pages = save_zero_page(rs, block, offset); - if (pages > 0) { - /* Must let xbzrle know, otherwise a previous (now 0'd) cached - * page would be stale + if (!rs->ram_bulk_stage && !migration_in_postcopy() && + migrate_use_xbzrle()) { + pages = save_xbzrle_page(rs, &p, current_addr, block, + offset, last_stage); + if (!last_stage) { + /* Can't send this cached data async, since the cache page + * might get updated before it gets to the wire */ - xbzrle_cache_zero_page(rs, current_addr); - ram_release_pages(block->idstr, offset, pages); - } else if (!rs->ram_bulk_stage && - !migration_in_postcopy() && migrate_use_xbzrle()) { - pages = save_xbzrle_page(rs, &p, current_addr, block, - offset, last_stage); - if (!last_stage) { - /* Can't send this cached data async, since the cache page - * might get updated before it gets to the wire - */ - send_async = false; - } + send_async = false; } } /* XBZRLE overflow or normal page */ if (pages == -1) { - ram_counters.transferred += - save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_PAGE); - if (send_async) { - qemu_put_buffer_async(rs->f, p, TARGET_PAGE_SIZE, - migrate_release_ram() & - migration_in_postcopy()); - } else { - qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); - } - ram_counters.transferred += TARGET_PAGE_SIZE; - pages = 1; - ram_counters.normal++; + pages = save_normal_page(rs, block, offset, p, send_async); } XBZRLE_cache_unlock(); @@ -1031,8 +1088,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) return pages; } -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, - ram_addr_t offset) +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, + ram_addr_t offset, uint8_t *source_buf) { RAMState *rs = ram_state; int bytes_sent, blen; @@ -1040,8 +1097,14 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, bytes_sent = save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); - blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, - migrate_compress_level()); + + /* + * copy it to a internal buffer to avoid it being modified by VM + * so that we can catch up the error during compression and + * decompression + */ + memcpy(source_buf, p, TARGET_PAGE_SIZE); + blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); if (blen < 0) { bytes_sent = 0; qemu_file_set_error(migrate_get_current()->to_dst_file, blen); @@ -1122,83 +1185,6 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, } /** - * ram_save_compressed_page: compress the given page and send it to the stream - * - * Returns the number of pages written. - * - * @rs: current RAM state - * @block: block that contains the page we want to send - * @offset: offset inside the block for the page - * @last_stage: if we are at the completion stage - */ -static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss, - bool last_stage) -{ - int pages = -1; - uint64_t bytes_xmit = 0; - uint8_t *p; - int ret, blen; - RAMBlock *block = pss->block; - ram_addr_t offset = pss->page << TARGET_PAGE_BITS; - - p = block->host + offset; - - ret = ram_control_save_page(rs->f, block->offset, - offset, TARGET_PAGE_SIZE, &bytes_xmit); - if (bytes_xmit) { - ram_counters.transferred += bytes_xmit; - pages = 1; - } - if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { - if (ret != RAM_SAVE_CONTROL_DELAYED) { - if (bytes_xmit > 0) { - ram_counters.normal++; - } else if (bytes_xmit == 0) { - ram_counters.duplicate++; - } - } - } else { - /* When starting the process of a new block, the first page of - * the block should be sent out before other pages in the same - * block, and all the pages in last block should have been sent - * out, keeping this order is important, because the 'cont' flag - * is used to avoid resending the block name. - */ - if (block != rs->last_sent_block) { - flush_compressed_data(rs); - pages = save_zero_page(rs, block, offset); - if (pages == -1) { - /* Make sure the first page is sent out before other pages */ - bytes_xmit = save_page_header(rs, rs->f, block, offset | - RAM_SAVE_FLAG_COMPRESS_PAGE); - blen = qemu_put_compression_data(rs->f, p, TARGET_PAGE_SIZE, - migrate_compress_level()); - if (blen > 0) { - ram_counters.transferred += bytes_xmit + blen; - ram_counters.normal++; - pages = 1; - } else { - qemu_file_set_error(rs->f, blen); - error_report("compressed data failed!"); - } - } - if (pages > 0) { - ram_release_pages(block->idstr, offset, pages); - } - } else { - pages = save_zero_page(rs, block, offset); - if (pages == -1) { - pages = compress_page_with_multi_thread(rs, block, offset); - } else { - ram_release_pages(block->idstr, offset, pages); - } - } - } - - return pages; -} - -/** * find_dirty_block: find the next dirty page and update any state * associated with the search process. * @@ -1434,44 +1420,80 @@ err: return -1; } +static bool save_page_use_compression(RAMState *rs) +{ + if (!migrate_use_compression()) { + return false; + } + + /* + * If xbzrle is on, stop using the data compression after first + * round of migration even if compression is enabled. In theory, + * xbzrle can do better than compression. + */ + if (rs->ram_bulk_stage || !migrate_use_xbzrle()) { + return true; + } + + return false; +} + /** * ram_save_target_page: save one target page * * Returns the number of pages written * * @rs: current RAM state - * @ms: current migration state * @pss: data about the page we want to send * @last_stage: if we are at the completion stage */ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) { - int res = 0; + RAMBlock *block = pss->block; + ram_addr_t offset = pss->page << TARGET_PAGE_BITS; + int res; - /* Check the pages is dirty and if it is send it */ - if (migration_bitmap_clear_dirty(rs, pss->block, pss->page)) { - /* - * If xbzrle is on, stop using the data compression after first - * round of migration even if compression is enabled. In theory, - * xbzrle can do better than compression. + if (control_save_page(rs, block, offset, &res)) { + return res; + } + + /* + * When starting the process of a new block, the first page of + * the block should be sent out before other pages in the same + * block, and all the pages in last block should have been sent + * out, keeping this order is important, because the 'cont' flag + * is used to avoid resending the block name. + */ + if (block != rs->last_sent_block && save_page_use_compression(rs)) { + flush_compressed_data(rs); + } + + res = save_zero_page(rs, block, offset); + if (res > 0) { + /* Must let xbzrle know, otherwise a previous (now 0'd) cached + * page would be stale */ - if (migrate_use_compression() && - (rs->ram_bulk_stage || !migrate_use_xbzrle())) { - res = ram_save_compressed_page(rs, pss, last_stage); - } else { - res = ram_save_page(rs, pss, last_stage); + if (!save_page_use_compression(rs)) { + XBZRLE_cache_lock(); + xbzrle_cache_zero_page(rs, block->offset + offset); + XBZRLE_cache_unlock(); } + ram_release_pages(block->idstr, offset, res); + return res; + } - if (res < 0) { - return res; - } - if (pss->block->unsentmap) { - clear_bit(pss->page, pss->block->unsentmap); - } + /* + * Make sure the first page is sent out before other pages. + * + * we post it as normal page as compression will take much + * CPU resource. + */ + if (block == rs->last_sent_block && save_page_use_compression(rs)) { + res = compress_page_with_multi_thread(rs, block, offset); } - return res; + return ram_save_page(rs, pss, last_stage); } /** @@ -1500,12 +1522,22 @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss, qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS; do { + /* Check the pages is dirty and if it is send it */ + if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) { + pss->page++; + continue; + } + tmppages = ram_save_target_page(rs, pss, last_stage); if (tmppages < 0) { return tmppages; } pages += tmppages; + if (pss->block->unsentmap) { + clear_bit(pss->page, pss->block->unsentmap); + } + pss->page++; } while ((pss->page & (pagesize_bits - 1)) && offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS)); @@ -2214,9 +2246,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque) RAMState **rsp = opaque; RAMBlock *block; + if (compress_threads_save_setup()) { + return -1; + } + /* migration has already setup the bitmap, reuse it. */ if (!migration_in_colo_state()) { if (ram_init_all(rsp) != 0) { + compress_threads_save_cleanup(); return -1; } } @@ -2236,7 +2273,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque) } rcu_read_unlock(); - compress_threads_save_setup(); ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); @@ -2501,12 +2537,37 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } +/* return the size after decompression, or negative value on error */ +static int +qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + const uint8_t *source, size_t source_len) +{ + int err; + + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = source_len; + stream->next_in = (uint8_t *)source; + stream->avail_out = dest_len; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->total_out; +} + static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; unsigned long pagesize; uint8_t *des; - int len; + int len, ret; qemu_mutex_lock(¶m->mutex); while (!param->quit) { @@ -2517,13 +2578,13 @@ static void *do_data_decompress(void *opaque) qemu_mutex_unlock(¶m->mutex); pagesize = TARGET_PAGE_SIZE; - /* uncompress() will return failed in some case, especially - * when the page is dirted when doing the compression, it's - * not a problem because the dirty page will be retransferred - * and uncompress() won't break the data in other pages. - */ - uncompress((Bytef *)des, &pagesize, - (const Bytef *)param->compbuf, len); + + ret = qemu_uncompress_data(¶m->stream, des, pagesize, + param->compbuf, len); + if (ret < 0) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); + } qemu_mutex_lock(&decomp_done_lock); param->done = true; @@ -2540,12 +2601,12 @@ static void *do_data_decompress(void *opaque) return NULL; } -static void wait_for_decompress_done(void) +static int wait_for_decompress_done(void) { int idx, thread_count; if (!migrate_use_compression()) { - return; + return 0; } thread_count = migrate_decompress_threads(); @@ -2556,30 +2617,7 @@ static void wait_for_decompress_done(void) } } qemu_mutex_unlock(&decomp_done_lock); -} - -static void compress_threads_load_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return; - } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - for (i = 0; i < thread_count; i++) { - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } + return qemu_file_get_error(decomp_file); } static void compress_threads_load_cleanup(void) @@ -2591,21 +2629,70 @@ static void compress_threads_load_cleanup(void) } thread_count = migrate_decompress_threads(); for (i = 0; i < thread_count; i++) { + /* + * we use it as a indicator which shows if the thread is + * properly init'd or not + */ + if (!decomp_param[i].compbuf) { + break; + } + qemu_mutex_lock(&decomp_param[i].mutex); decomp_param[i].quit = true; qemu_cond_signal(&decomp_param[i].cond); qemu_mutex_unlock(&decomp_param[i].mutex); } for (i = 0; i < thread_count; i++) { + if (!decomp_param[i].compbuf) { + break; + } + qemu_thread_join(decompress_threads + i); qemu_mutex_destroy(&decomp_param[i].mutex); qemu_cond_destroy(&decomp_param[i].cond); + inflateEnd(&decomp_param[i].stream); g_free(decomp_param[i].compbuf); + decomp_param[i].compbuf = NULL; } g_free(decompress_threads); g_free(decomp_param); decompress_threads = NULL; decomp_param = NULL; + decomp_file = NULL; +} + +static int compress_threads_load_setup(QEMUFile *f) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return 0; + } + + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); + qemu_mutex_init(&decomp_done_lock); + qemu_cond_init(&decomp_done_cond); + decomp_file = f; + for (i = 0; i < thread_count; i++) { + if (inflateInit(&decomp_param[i].stream) != Z_OK) { + goto exit; + } + + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + qemu_mutex_init(&decomp_param[i].mutex); + qemu_cond_init(&decomp_param[i].cond); + decomp_param[i].done = true; + decomp_param[i].quit = false; + qemu_thread_create(decompress_threads + i, "decompress", + do_data_decompress, decomp_param + i, + QEMU_THREAD_JOINABLE); + } + return 0; +exit: + compress_threads_load_cleanup(); + return -1; } static void decompress_data_with_multi_threads(QEMUFile *f, @@ -2647,8 +2734,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f, */ static int ram_load_setup(QEMUFile *f, void *opaque) { + if (compress_threads_load_setup(f)) { + return -1; + } + xbzrle_load_setup(); - compress_threads_load_setup(); ramblock_recv_map_init(); return 0; } @@ -2999,7 +3089,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - wait_for_decompress_done(); + ret |= wait_for_decompress_done(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); return ret; diff --git a/migration/trace-events b/migration/trace-events index a180d7b008..d6be74b7a7 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -115,6 +115,8 @@ process_incoming_migration_co_end(int ret, int ps) "ret=%d postcopy-state=%d" process_incoming_migration_co_postcopy_end_main(void) "" migration_set_incoming_channel(void *ioc, const char *ioctype) "ioc=%p ioctype=%s" migration_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname, void *err) "ioc=%p ioctype=%s hostname=%s err=%p" +mark_postcopy_blocktime_begin(uint64_t addr, void *dd, uint32_t time, int cpu, int received) "addr: 0x%" PRIx64 ", dd: %p, time: %u, cpu: %d, already_received: %d" +mark_postcopy_blocktime_end(uint64_t addr, void *dd, uint32_t time, int affected_cpu) "addr: 0x%" PRIx64 ", dd: %p, time: %u, affected_cpu: %d" # migration/rdma.c qemu_rdma_accept_incoming_migration(void) "" @@ -193,11 +195,12 @@ postcopy_ram_fault_thread_exit(void) "" postcopy_ram_fault_thread_fds_core(int baseufd, int quitfd) "ufd: %d quitfd: %d" postcopy_ram_fault_thread_fds_extra(size_t index, const char *name, int fd) "%zd/%s: %d" postcopy_ram_fault_thread_quit(void) "" -postcopy_ram_fault_thread_request(uint64_t hostaddr, const char *ramblock, size_t offset) "Request for HVA=0x%" PRIx64 " rb=%s offset=0x%zx" +postcopy_ram_fault_thread_request(uint64_t hostaddr, const char *ramblock, size_t offset, uint32_t pid) "Request for HVA=0x%" PRIx64 " rb=%s offset=0x%zx pid=%u" postcopy_ram_incoming_cleanup_closeuf(void) "" postcopy_ram_incoming_cleanup_entry(void) "" postcopy_ram_incoming_cleanup_exit(void) "" postcopy_ram_incoming_cleanup_join(void) "" +postcopy_ram_incoming_cleanup_blocktime(uint64_t total) "total blocktime %" PRIu64 postcopy_request_shared_page(const char *sharer, const char *rb, uint64_t rb_offset) "for %s in %s offset 0x%"PRIx64 postcopy_request_shared_page_present(const char *sharer, const char *rb, uint64_t rb_offset) "%s already %s offset 0x%"PRIx64 postcopy_wake_shared(uint64_t client_addr, const char *rb) "at 0x%"PRIx64" in %s" @@ -206,6 +209,7 @@ save_xbzrle_page_skipping(void) "" save_xbzrle_page_overflow(void) "" ram_save_iterate_big_wait(uint64_t milliconds, int iterations) "big wait: %" PRIu64 " milliseconds, %d iterations" ram_load_complete(int ret, uint64_t seq_iter) "exit_code %d seq iteration %" PRIu64 +get_mem_fault_cpu_index(int cpu, uint32_t pid) "cpu: %d, pid: %u" # migration/exec.c migration_exec_outgoing(const char *cmd) "cmd=%s" diff --git a/qapi/migration.json b/qapi/migration.json index 9d0bf82cf4..f3974c6807 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -156,6 +156,13 @@ # @status is 'failed'. Clients should not attempt to parse the # error strings. (Since 2.7) # +# @postcopy-blocktime: total time when all vCPU were blocked during postcopy +# live migration (Since 2.13) +# +# @postcopy-vcpu-blocktime: list of the postcopy blocktime per vCPU (Since 2.13) +# + +# # Since: 0.14.0 ## { 'struct': 'MigrationInfo', @@ -167,7 +174,9 @@ '*downtime': 'int', '*setup-time': 'int', '*cpu-throttle-percentage': 'int', - '*error-desc': 'str'} } + '*error-desc': 'str', + '*postcopy-blocktime' : 'uint32', + '*postcopy-vcpu-blocktime': ['uint32']} } ## # @query-migrate: @@ -354,16 +363,20 @@ # # @x-multifd: Use more than one fd for migration (since 2.11) # +# # @dirty-bitmaps: If enabled, QEMU will migrate named dirty bitmaps. # (since 2.12) # +# @postcopy-blocktime: Calculate downtime for postcopy live migration +# (since 2.13) +# # Since: 1.2 ## { 'enum': 'MigrationCapability', 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'compress', 'events', 'postcopy-ram', 'x-colo', 'release-ram', 'block', 'return-path', 'pause-before-switchover', 'x-multifd', - 'dirty-bitmaps' ] } + 'dirty-bitmaps', 'postcopy-blocktime' ] } ## # @MigrationCapabilityStatus: diff --git a/tests/migration-test.c b/tests/migration-test.c index 422bf1afdf..dde7c464c3 100644 --- a/tests/migration-test.c +++ b/tests/migration-test.c @@ -26,6 +26,7 @@ const unsigned start_address = 1024 * 1024; const unsigned end_address = 100 * 1024 * 1024; bool got_stop; +static bool uffd_feature_thread_id; #if defined(__linux__) #include <sys/syscall.h> @@ -55,6 +56,7 @@ static bool ufd_version_check(void) g_test_message("Skipping test: UFFDIO_API failed"); return false; } + uffd_feature_thread_id = api_struct.features & UFFD_FEATURE_THREAD_ID; ioctl_mask = (__u64)1 << _UFFDIO_REGISTER | (__u64)1 << _UFFDIO_UNREGISTER; @@ -223,6 +225,16 @@ static uint64_t get_migration_pass(QTestState *who) return result; } +static void read_blocktime(QTestState *who) +{ + QDict *rsp, *rsp_return; + + rsp = wait_command(who, "{ 'execute': 'query-migrate' }"); + rsp_return = qdict_get_qdict(rsp, "return"); + g_assert(qdict_haskey(rsp_return, "postcopy-blocktime")); + QDECREF(rsp); +} + static void wait_for_migration_complete(QTestState *who) { while (true) { @@ -533,6 +545,7 @@ static void test_migrate(void) migrate_set_capability(from, "postcopy-ram", "true"); migrate_set_capability(to, "postcopy-ram", "true"); + migrate_set_capability(to, "postcopy-blocktime", "true"); /* We want to pick a speed slow enough that the test completes * quickly, but that it doesn't complete precopy even on a slow @@ -559,6 +572,9 @@ static void test_migrate(void) wait_for_serial("dest_serial"); wait_for_migration_complete(from); + if (uffd_feature_thread_id) { + read_blocktime(to); + } g_free(uri); test_migrate_end(from, to, true); |