diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2015-05-07 18:22:03 +0100 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2015-05-07 18:22:03 +0100 |
commit | 838686357b1a175e9a32569700a153b207a9e10f (patch) | |
tree | 84678cfdd3692484aeb59375e002bb9f32bef10a /arch_init.c | |
parent | 38003aee196a96edccd4d64471beb1b67e9b2b17 (diff) | |
parent | 362ba4e3ee801e8f5e28d72d0009547384222927 (diff) |
Merge remote-tracking branch 'remotes/juanquintela/tags/migration/20150507-1' into staging
migration/next for 20150507
# gpg: Signature made Thu May 7 17:42:19 2015 BST using RSA key ID 5872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>"
# gpg: aka "Juan Quintela <quintela@trasno.org>"
* remotes/juanquintela/tags/migration/20150507-1:
migration: Fix migration state update issue
migration: avoid divide by zero in xbzrle cache miss rate
migration: Add hmp interface to set and query parameters
migration: Add qmp commands to set and query parameters
migration: Use an array instead of 3 parameters
migration: Add interface to control compression
migration: Add the core code for decompression
migration: Make compression co-work with xbzrle
migration: Add the core code of multi-thread compression
migration: Split save_zero_page from ram_save_page
arch_init: Add and free data struct for decompression
arch_init: Alloc and free data struct for compression
qemu-file: Add compression functions to QEMUFile
migration: Add the framework of multi-thread decompression
migration: Add the framework of multi-thread compression
docs: Add a doc about multiple thread compression
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
Diffstat (limited to 'arch_init.c')
-rw-r--r-- | arch_init.c | 517 |
1 files changed, 491 insertions, 26 deletions
diff --git a/arch_init.c b/arch_init.c index 4c8fceed95..23d3feba44 100644 --- a/arch_init.c +++ b/arch_init.c @@ -24,6 +24,7 @@ #include <stdint.h> #include <stdarg.h> #include <stdlib.h> +#include <zlib.h> #ifndef _WIN32 #include <sys/types.h> #include <sys/mman.h> @@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count; #define RAM_SAVE_FLAG_CONTINUE 0x20 #define RAM_SAVE_FLAG_XBZRLE 0x40 /* 0x80 is reserved in migration.h start with 0x100 next */ +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 static struct defconfig_file { const char *filename; @@ -316,6 +318,147 @@ static uint64_t migration_dirty_pages; static uint32_t last_version; static bool ram_bulk_stage; +struct CompressParam { + bool start; + bool done; + QEMUFile *file; + QemuMutex mutex; + QemuCond cond; + RAMBlock *block; + ram_addr_t offset; +}; +typedef struct CompressParam CompressParam; + +struct DecompressParam { + bool start; + QemuMutex mutex; + QemuCond cond; + void *des; + uint8 *compbuf; + int len; +}; +typedef struct DecompressParam DecompressParam; + +static CompressParam *comp_param; +static QemuThread *compress_threads; +/* comp_done_cond is used to wake up the migration thread when + * one of the compression threads has finished the compression. + * comp_done_lock is used to co-work with comp_done_cond. + */ +static QemuMutex *comp_done_lock; +static QemuCond *comp_done_cond; +/* The empty QEMUFileOps will be used by file in CompressParam */ +static const QEMUFileOps empty_ops = { }; + +static bool compression_switch; +static bool quit_comp_thread; +static bool quit_decomp_thread; +static DecompressParam *decomp_param; +static QemuThread *decompress_threads; +static uint8_t *compressed_data_buf; + +static int do_compress_ram_page(CompressParam *param); + +static void *do_data_compress(void *opaque) +{ + CompressParam *param = opaque; + + while (!quit_comp_thread) { + qemu_mutex_lock(¶m->mutex); + /* Re-check the quit_comp_thread in case of + * terminate_compression_threads is called just before + * qemu_mutex_lock(¶m->mutex) and after + * while(!quit_comp_thread), re-check it here can make + * sure the compression thread terminate as expected. + */ + while (!param->start && !quit_comp_thread) { + qemu_cond_wait(¶m->cond, ¶m->mutex); + } + if (!quit_comp_thread) { + do_compress_ram_page(param); + } + param->start = false; + qemu_mutex_unlock(¶m->mutex); + + qemu_mutex_lock(comp_done_lock); + param->done = true; + qemu_cond_signal(comp_done_cond); + qemu_mutex_unlock(comp_done_lock); + } + + return NULL; +} + +static inline void terminate_compression_threads(void) +{ + int idx, thread_count; + + thread_count = migrate_compress_threads(); + quit_comp_thread = true; + for (idx = 0; idx < thread_count; idx++) { + qemu_mutex_lock(&comp_param[idx].mutex); + qemu_cond_signal(&comp_param[idx].cond); + qemu_mutex_unlock(&comp_param[idx].mutex); + } +} + +void migrate_compress_threads_join(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + terminate_compression_threads(); + thread_count = migrate_compress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_thread_join(compress_threads + i); + qemu_fclose(comp_param[i].file); + qemu_mutex_destroy(&comp_param[i].mutex); + qemu_cond_destroy(&comp_param[i].cond); + } + qemu_mutex_destroy(comp_done_lock); + qemu_cond_destroy(comp_done_cond); + g_free(compress_threads); + g_free(comp_param); + g_free(comp_done_cond); + g_free(comp_done_lock); + compress_threads = NULL; + comp_param = NULL; + comp_done_cond = NULL; + comp_done_lock = NULL; +} + +void migrate_compress_threads_create(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + quit_comp_thread = false; + compression_switch = true; + thread_count = migrate_compress_threads(); + compress_threads = g_new0(QemuThread, thread_count); + comp_param = g_new0(CompressParam, thread_count); + comp_done_cond = g_new0(QemuCond, 1); + comp_done_lock = g_new0(QemuMutex, 1); + qemu_cond_init(comp_done_cond); + qemu_mutex_init(comp_done_lock); + for (i = 0; i < thread_count; i++) { + /* com_param[i].file is just used as a dummy buffer to save data, set + * it's ops to empty. + */ + comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); + comp_param[i].done = true; + qemu_mutex_init(&comp_param[i].mutex); + qemu_cond_init(&comp_param[i].cond); + qemu_thread_create(compress_threads + i, "compress", + do_data_compress, comp_param + i, + QEMU_THREAD_JOINABLE); + } +} + /** * save_page_header: Write page header to wire * @@ -520,12 +663,16 @@ static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length) static int64_t start_time; static int64_t bytes_xfer_prev; static int64_t num_dirty_pages_period; +static uint64_t xbzrle_cache_miss_prev; +static uint64_t iterations_prev; static void migration_bitmap_sync_init(void) { start_time = 0; bytes_xfer_prev = 0; num_dirty_pages_period = 0; + xbzrle_cache_miss_prev = 0; + iterations_prev = 0; } /* Called with iothread lock held, to protect ram_list.dirty_memory[] */ @@ -536,8 +683,6 @@ static void migration_bitmap_sync(void) MigrationState *s = migrate_get_current(); int64_t end_time; int64_t bytes_xfer_now; - static uint64_t xbzrle_cache_miss_prev; - static uint64_t iterations_prev; bitmap_sync_count++; @@ -585,7 +730,7 @@ static void migration_bitmap_sync(void) mig_throttle_on = false; } if (migrate_use_xbzrle()) { - if (iterations_prev != 0) { + if (iterations_prev != acct_info.iterations) { acct_info.xbzrle_cache_miss_rate = (double)(acct_info.xbzrle_cache_miss - xbzrle_cache_miss_prev) / @@ -599,8 +744,36 @@ static void migration_bitmap_sync(void) s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE; start_time = end_time; num_dirty_pages_period = 0; - s->dirty_sync_count = bitmap_sync_count; } + s->dirty_sync_count = bitmap_sync_count; +} + +/** + * save_zero_page: Send the zero page to the stream + * + * Returns: Number of pages written. + * + * @f: QEMUFile where to send the data + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @p: pointer to the page + * @bytes_transferred: increase it with the number of transferred bytes + */ +static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset, + uint8_t *p, uint64_t *bytes_transferred) +{ + int pages = -1; + + if (is_zero_range(p, TARGET_PAGE_SIZE)) { + acct_info.dup_pages++; + *bytes_transferred += save_page_header(f, block, + offset | RAM_SAVE_FLAG_COMPRESS); + qemu_put_byte(f, 0); + *bytes_transferred += 1; + pages = 1; + } + + return pages; } /** @@ -651,25 +824,22 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, acct_info.dup_pages++; } } - } else if (is_zero_range(p, TARGET_PAGE_SIZE)) { - acct_info.dup_pages++; - *bytes_transferred += save_page_header(f, block, - offset | RAM_SAVE_FLAG_COMPRESS); - qemu_put_byte(f, 0); - *bytes_transferred += 1; - pages = 1; - /* Must let xbzrle know, otherwise a previous (now 0'd) cached - * page would be stale - */ - xbzrle_cache_zero_page(current_addr); - } else if (!ram_bulk_stage && migrate_use_xbzrle()) { - pages = save_xbzrle_page(f, &p, current_addr, block, - offset, last_stage, bytes_transferred); - if (!last_stage) { - /* Can't send this cached data async, since the cache page - * might get updated before it gets to the wire + } else { + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages > 0) { + /* Must let xbzrle know, otherwise a previous (now 0'd) cached + * page would be stale */ - send_async = false; + xbzrle_cache_zero_page(current_addr); + } else if (!ram_bulk_stage && migrate_use_xbzrle()) { + pages = save_xbzrle_page(f, &p, current_addr, block, + offset, last_stage, bytes_transferred); + if (!last_stage) { + /* Can't send this cached data async, since the cache page + * might get updated before it gets to the wire + */ + send_async = false; + } } } @@ -692,6 +862,178 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, return pages; } +static int do_compress_ram_page(CompressParam *param) +{ + int bytes_sent, blen; + uint8_t *p; + RAMBlock *block = param->block; + ram_addr_t offset = param->offset; + + p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK); + + bytes_sent = save_page_header(param->file, block, offset | + RAM_SAVE_FLAG_COMPRESS_PAGE); + blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, + migrate_compress_level()); + bytes_sent += blen; + + return bytes_sent; +} + +static inline void start_compression(CompressParam *param) +{ + param->done = false; + qemu_mutex_lock(¶m->mutex); + param->start = true; + qemu_cond_signal(¶m->cond); + qemu_mutex_unlock(¶m->mutex); +} + +static inline void start_decompression(DecompressParam *param) +{ + qemu_mutex_lock(¶m->mutex); + param->start = true; + qemu_cond_signal(¶m->cond); + qemu_mutex_unlock(¶m->mutex); +} + +static uint64_t bytes_transferred; + +static void flush_compressed_data(QEMUFile *f) +{ + int idx, len, thread_count; + + if (!migrate_use_compression()) { + return; + } + thread_count = migrate_compress_threads(); + for (idx = 0; idx < thread_count; idx++) { + if (!comp_param[idx].done) { + qemu_mutex_lock(comp_done_lock); + while (!comp_param[idx].done && !quit_comp_thread) { + qemu_cond_wait(comp_done_cond, comp_done_lock); + } + qemu_mutex_unlock(comp_done_lock); + } + if (!quit_comp_thread) { + len = qemu_put_qemu_file(f, comp_param[idx].file); + bytes_transferred += len; + } + } +} + +static inline void set_compress_params(CompressParam *param, RAMBlock *block, + ram_addr_t offset) +{ + param->block = block; + param->offset = offset; +} + +static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, + ram_addr_t offset, + uint64_t *bytes_transferred) +{ + int idx, thread_count, bytes_xmit = -1, pages = -1; + + thread_count = migrate_compress_threads(); + qemu_mutex_lock(comp_done_lock); + while (true) { + for (idx = 0; idx < thread_count; idx++) { + if (comp_param[idx].done) { + bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file); + set_compress_params(&comp_param[idx], block, offset); + start_compression(&comp_param[idx]); + pages = 1; + acct_info.norm_pages++; + *bytes_transferred += bytes_xmit; + break; + } + } + if (pages > 0) { + break; + } else { + qemu_cond_wait(comp_done_cond, comp_done_lock); + } + } + qemu_mutex_unlock(comp_done_lock); + + return pages; +} + +/** + * ram_save_compressed_page: compress the given page and send it to the stream + * + * Returns: Number of pages written. + * + * @f: QEMUFile where to send the data + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @last_stage: if we are at the completion stage + * @bytes_transferred: increase it with the number of transferred bytes + */ +static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, + ram_addr_t offset, bool last_stage, + uint64_t *bytes_transferred) +{ + int pages = -1; + uint64_t bytes_xmit; + MemoryRegion *mr = block->mr; + uint8_t *p; + int ret; + + p = memory_region_get_ram_ptr(mr) + offset; + + bytes_xmit = 0; + ret = ram_control_save_page(f, block->offset, + offset, TARGET_PAGE_SIZE, &bytes_xmit); + if (bytes_xmit) { + *bytes_transferred += bytes_xmit; + pages = 1; + } + if (block == last_sent_block) { + offset |= RAM_SAVE_FLAG_CONTINUE; + } + if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { + if (ret != RAM_SAVE_CONTROL_DELAYED) { + if (bytes_xmit > 0) { + acct_info.norm_pages++; + } else if (bytes_xmit == 0) { + acct_info.dup_pages++; + } + } + } else { + /* When starting the process of a new block, the first page of + * the block should be sent out before other pages in the same + * block, and all the pages in last block should have been sent + * out, keeping this order is important, because the 'cont' flag + * is used to avoid resending the block name. + */ + if (block != last_sent_block) { + flush_compressed_data(f); + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages == -1) { + set_compress_params(&comp_param[0], block, offset); + /* Use the qemu thread to compress the data to make sure the + * first page is sent out before other pages + */ + bytes_xmit = do_compress_ram_page(&comp_param[0]); + acct_info.norm_pages++; + qemu_put_qemu_file(f, comp_param[0].file); + *bytes_transferred += bytes_xmit; + pages = 1; + } + } else { + pages = save_zero_page(f, block, offset, p, bytes_transferred); + if (pages == -1) { + pages = compress_page_with_multi_thread(f, block, offset, + bytes_transferred); + } + } + } + + return pages; +} + /** * ram_find_and_save_block: Finds a dirty page and sends it to f * @@ -731,10 +1073,22 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage, block = QLIST_FIRST_RCU(&ram_list.blocks); complete_round = true; ram_bulk_stage = false; + if (migrate_use_xbzrle()) { + /* If xbzrle is on, stop using the data compression at this + * point. In theory, xbzrle can do better than compression. + */ + flush_compressed_data(f); + compression_switch = false; + } } } else { - pages = ram_save_page(f, block, offset, last_stage, - bytes_transferred); + if (compression_switch && migrate_use_compression()) { + pages = ram_save_compressed_page(f, block, offset, last_stage, + bytes_transferred); + } else { + pages = ram_save_page(f, block, offset, last_stage, + bytes_transferred); + } /* if page is unmodified, continue to the next */ if (pages > 0) { @@ -750,8 +1104,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage, return pages; } -static uint64_t bytes_transferred; - void acct_update_position(QEMUFile *f, size_t size, bool zero) { uint64_t pages = size / TARGET_PAGE_SIZE; @@ -965,6 +1317,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) } i++; } + flush_compressed_data(f); rcu_read_unlock(); /* @@ -1006,6 +1359,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) } } + flush_compressed_data(f); ram_control_after_iterate(f, RAM_CONTROL_FINISH); migration_end(); @@ -1113,10 +1467,104 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } +static void *do_data_decompress(void *opaque) +{ + DecompressParam *param = opaque; + unsigned long pagesize; + + while (!quit_decomp_thread) { + qemu_mutex_lock(¶m->mutex); + while (!param->start && !quit_decomp_thread) { + qemu_cond_wait(¶m->cond, ¶m->mutex); + pagesize = TARGET_PAGE_SIZE; + if (!quit_decomp_thread) { + /* uncompress() will return failed in some case, especially + * when the page is dirted when doing the compression, it's + * not a problem because the dirty page will be retransferred + * and uncompress() won't break the data in other pages. + */ + uncompress((Bytef *)param->des, &pagesize, + (const Bytef *)param->compbuf, param->len); + } + param->start = false; + } + qemu_mutex_unlock(¶m->mutex); + } + + return NULL; +} + +void migrate_decompress_threads_create(void) +{ + int i, thread_count; + + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); + compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + quit_decomp_thread = false; + for (i = 0; i < thread_count; i++) { + qemu_mutex_init(&decomp_param[i].mutex); + qemu_cond_init(&decomp_param[i].cond); + decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + qemu_thread_create(decompress_threads + i, "decompress", + do_data_decompress, decomp_param + i, + QEMU_THREAD_JOINABLE); + } +} + +void migrate_decompress_threads_join(void) +{ + int i, thread_count; + + quit_decomp_thread = true; + thread_count = migrate_decompress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_mutex_lock(&decomp_param[i].mutex); + qemu_cond_signal(&decomp_param[i].cond); + qemu_mutex_unlock(&decomp_param[i].mutex); + } + for (i = 0; i < thread_count; i++) { + qemu_thread_join(decompress_threads + i); + qemu_mutex_destroy(&decomp_param[i].mutex); + qemu_cond_destroy(&decomp_param[i].cond); + g_free(decomp_param[i].compbuf); + } + g_free(decompress_threads); + g_free(decomp_param); + g_free(compressed_data_buf); + decompress_threads = NULL; + decomp_param = NULL; + compressed_data_buf = NULL; +} + +static void decompress_data_with_multi_threads(uint8_t *compbuf, + void *host, int len) +{ + int idx, thread_count; + + thread_count = migrate_decompress_threads(); + while (true) { + for (idx = 0; idx < thread_count; idx++) { + if (!decomp_param[idx].start) { + memcpy(decomp_param[idx].compbuf, compbuf, len); + decomp_param[idx].des = host; + decomp_param[idx].len = len; + start_decompression(&decomp_param[idx]); + break; + } + } + if (idx < thread_count) { + break; + } + } +} + static int ram_load(QEMUFile *f, void *opaque, int version_id) { int flags = 0, ret = 0; static uint64_t seq_iter; + int len = 0; seq_iter++; @@ -1196,6 +1644,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } qemu_get_buffer(f, host, TARGET_PAGE_SIZE); break; + case RAM_SAVE_FLAG_COMPRESS_PAGE: + host = host_from_stream_offset(f, addr, flags); + if (!host) { + error_report("Invalid RAM offset " RAM_ADDR_FMT, addr); + ret = -EINVAL; + break; + } + + len = qemu_get_be32(f); + if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + error_report("Invalid compressed data length: %d", len); + ret = -EINVAL; + break; + } + qemu_get_buffer(f, compressed_data_buf, len); + decompress_data_with_multi_threads(compressed_data_buf, host, len); + break; case RAM_SAVE_FLAG_XBZRLE: host = host_from_stream_offset(f, addr, flags); if (!host) { |