diff options
author | Peter Maydell <peter.maydell@linaro.org> | 2018-06-28 15:31:42 +0100 |
---|---|---|
committer | Peter Maydell <peter.maydell@linaro.org> | 2018-06-28 15:31:42 +0100 |
commit | 4a83bf2f339d4b63ecd5ef48b9816c3b7ee24553 (patch) | |
tree | 17521dfbeca338210998624e41446841d956e7bb | |
parent | 18896679aa982609b31b4a7d357a6542e59af093 (diff) | |
parent | ca273df3014930f4ecb6d3ba24aa778354634b56 (diff) |
Merge remote-tracking branch 'remotes/juanquintela/tags/migration/20180627' into staging
migration/next for 20180627
# gpg: Signature made Wed 27 Jun 2018 13:53:53 BST
# gpg: using RSA key F487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>"
# gpg: aka "Juan Quintela <quintela@trasno.org>"
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03 4B82 F487 EF18 5872 D723
* remotes/juanquintela/tags/migration/20180627:
migration: fix crash in when incoming client channel setup fails
postcopy: drop ram_pages parameter from postcopy_ram_incoming_init()
migration: Stop sending whole pages through main channel
migration: Remove not needed semaphore and quit
migration: Wait for blocking IO
migration: Start sending messages
migration: Create ram_save_multifd_page
migration: Create multifd_bytes ram_counter
migration: Synchronize multifd threads with main thread
migration: Add block where to send/receive packets
migration: Multifd channels always wait on the sem
migration: Add multifd traces for start/end thread
migration: Abstract the number of bytes sent
migration: Calculate mbps only during transfer time
migration: Create multifd packet
migration: Create multipage support
Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
-rw-r--r-- | exec.c | 2 | ||||
-rw-r--r-- | hmp.c | 2 | ||||
-rw-r--r-- | include/exec/ram_addr.h | 1 | ||||
-rw-r--r-- | migration/migration.c | 24 | ||||
-rw-r--r-- | migration/postcopy-ram.c | 4 | ||||
-rw-r--r-- | migration/postcopy-ram.h | 2 | ||||
-rw-r--r-- | migration/ram.c | 493 | ||||
-rw-r--r-- | migration/trace-events | 12 | ||||
-rw-r--r-- | qapi/migration.json | 5 |
9 files changed, 515 insertions, 30 deletions
@@ -1930,7 +1930,7 @@ static ram_addr_t find_ram_offset(ram_addr_t size) return offset; } -unsigned long last_ram_page(void) +static unsigned long last_ram_page(void) { RAMBlock *block; ram_addr_t last = 0; @@ -234,6 +234,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict) info->ram->dirty_sync_count); monitor_printf(mon, "page size: %" PRIu64 " kbytes\n", info->ram->page_size >> 10); + monitor_printf(mon, "multifd bytes: %" PRIu64 " kbytes\n", + info->ram->multifd_bytes >> 10); if (info->ram->dirty_pages_rate) { monitor_printf(mon, "dirty pages rate: %" PRIu64 " pages\n", diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h index cf2446a176..33c361cad5 100644 --- a/include/exec/ram_addr.h +++ b/include/exec/ram_addr.h @@ -71,7 +71,6 @@ static inline unsigned long int ramblock_recv_bitmap_offset(void *host_addr, } long qemu_getrampagesize(void); -unsigned long last_ram_page(void); RAMBlock *qemu_ram_alloc_from_file(ram_addr_t size, MemoryRegion *mr, bool share, const char *mem_path, Error **errp); diff --git a/migration/migration.c b/migration/migration.c index e1eaa97df4..94d71f8b24 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -518,11 +518,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc) */ bool migration_has_all_channels(void) { + MigrationIncomingState *mis = migration_incoming_get_current(); bool all_channels; all_channels = multifd_recv_all_channels_created(); - return all_channels; + return all_channels && mis->from_src_file != NULL; } /* @@ -708,6 +709,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s) info->ram->dirty_sync_count = ram_counters.dirty_sync_count; info->ram->postcopy_requests = ram_counters.postcopy_requests; info->ram->page_size = qemu_target_page_size(); + info->ram->multifd_bytes = ram_counters.multifd_bytes; if (migrate_use_xbzrle()) { info->has_xbzrle_cache = true; @@ -2704,10 +2706,17 @@ static MigThrError migration_detect_error(MigrationState *s) } } +/* How many bytes have we transferred since the beggining of the migration */ +static uint64_t migration_total_bytes(MigrationState *s) +{ + return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes; +} + static void migration_calculate_complete(MigrationState *s) { - uint64_t bytes = qemu_ftell(s->to_dst_file); + uint64_t bytes = migration_total_bytes(s); int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); + int64_t transfer_time; s->total_time = end_time - s->start_time; if (!s->downtime) { @@ -2718,8 +2727,9 @@ static void migration_calculate_complete(MigrationState *s) s->downtime = end_time - s->downtime_start; } - if (s->total_time) { - s->mbps = ((double) bytes * 8.0) / s->total_time / 1000; + transfer_time = s->total_time - s->setup_time; + if (transfer_time) { + s->mbps = ((double) bytes * 8.0) / transfer_time / 1000; } } @@ -2727,13 +2737,15 @@ static void migration_update_counters(MigrationState *s, int64_t current_time) { uint64_t transferred, time_spent; + uint64_t current_bytes; /* bytes transferred since the beginning */ double bandwidth; if (current_time < s->iteration_start_time + BUFFER_DELAY) { return; } - transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes; + current_bytes = migration_total_bytes(s); + transferred = current_bytes - s->iteration_initial_bytes; time_spent = current_time - s->iteration_start_time; bandwidth = (double)transferred / time_spent; s->threshold_size = bandwidth * s->parameters.downtime_limit; @@ -2752,7 +2764,7 @@ static void migration_update_counters(MigrationState *s, qemu_file_reset_rate_limit(s->to_dst_file); s->iteration_start_time = current_time; - s->iteration_initial_bytes = qemu_ftell(s->to_dst_file); + s->iteration_initial_bytes = current_bytes; trace_migrate_transferred(transferred, time_spent, bandwidth, s->threshold_size); diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index 48e51556a7..932f188949 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -500,7 +500,7 @@ static int cleanup_range(const char *block_name, void *host_addr, * postcopy later; must be called prior to any precopy. * called from arch_init's similarly named ram_postcopy_incoming_init */ -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages) +int postcopy_ram_incoming_init(MigrationIncomingState *mis) { if (qemu_ram_foreach_migratable_block(init_range, NULL)) { return -1; @@ -1265,7 +1265,7 @@ bool postcopy_ram_supported_by_host(MigrationIncomingState *mis) return false; } -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages) +int postcopy_ram_incoming_init(MigrationIncomingState *mis) { error_report("postcopy_ram_incoming_init: No OS support"); return -1; diff --git a/migration/postcopy-ram.h b/migration/postcopy-ram.h index d900d9c34f..9d55536fd1 100644 --- a/migration/postcopy-ram.h +++ b/migration/postcopy-ram.h @@ -27,7 +27,7 @@ int postcopy_ram_enable_notify(MigrationIncomingState *mis); * postcopy later; must be called prior to any precopy. * called from ram.c's similarly named ram_postcopy_incoming_init */ -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages); +int postcopy_ram_incoming_init(MigrationIncomingState *mis); /* * At the end of a migration where postcopy_ram_incoming_init was called. diff --git a/migration/ram.c b/migration/ram.c index cd5f55117d..1cd98d6398 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -55,6 +55,7 @@ #include "sysemu/sysemu.h" #include "qemu/uuid.h" #include "savevm.h" +#include "qemu/iov.h" /***********************************************************/ /* ram save/restore */ @@ -510,6 +511,8 @@ exit: #define MULTIFD_MAGIC 0x11223344U #define MULTIFD_VERSION 1 +#define MULTIFD_FLAG_SYNC (1 << 0) + typedef struct { uint32_t magic; uint32_t version; @@ -518,6 +521,31 @@ typedef struct { } __attribute__((packed)) MultiFDInit_t; typedef struct { + uint32_t magic; + uint32_t version; + uint32_t flags; + uint32_t size; + uint32_t used; + uint64_t packet_num; + char ramblock[256]; + uint64_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + +typedef struct { + /* number of used pages */ + uint32_t used; + /* number of allocated pages */ + uint32_t allocated; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* offset of each page */ + ram_addr_t *offset; + /* pointer to each page */ + struct iovec *iov; + RAMBlock *block; +} MultiFDPages_t; + +typedef struct { /* this fields are not changed once the thread is created */ /* channel number */ uint8_t id; @@ -535,6 +563,25 @@ typedef struct { bool running; /* should this thread finish */ bool quit; + /* thread has work to do */ + int pending_job; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDSendParams; typedef struct { @@ -547,14 +594,27 @@ typedef struct { QemuThread thread; /* communication channel */ QIOChannel *c; - /* sem where to wait for more work */ - QemuSemaphore sem; /* this mutex protects the following parameters */ QemuMutex mutex; /* is this channel thread running */ bool running; - /* should this thread finish */ - bool quit; + /* array of pages to receive */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -619,12 +679,211 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return msg.id; } +static MultiFDPages_t *multifd_pages_init(size_t size) +{ + MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); + + pages->allocated = size; + pages->iov = g_new0(struct iovec, size); + pages->offset = g_new0(ram_addr_t, size); + + return pages; +} + +static void multifd_pages_clear(MultiFDPages_t *pages) +{ + pages->used = 0; + pages->allocated = 0; + pages->packet_num = 0; + pages->block = NULL; + g_free(pages->iov); + pages->iov = NULL; + g_free(pages->offset); + pages->offset = NULL; + g_free(pages); +} + +static void multifd_send_fill_packet(MultiFDSendParams *p) +{ + MultiFDPacket_t *packet = p->packet; + int i; + + packet->magic = cpu_to_be32(MULTIFD_MAGIC); + packet->version = cpu_to_be32(MULTIFD_VERSION); + packet->flags = cpu_to_be32(p->flags); + packet->size = cpu_to_be32(migrate_multifd_page_count()); + packet->used = cpu_to_be32(p->pages->used); + packet->packet_num = cpu_to_be64(p->packet_num); + + if (p->pages->block) { + strncpy(packet->ramblock, p->pages->block->idstr, 256); + } + + for (i = 0; i < p->pages->used; i++) { + packet->offset[i] = cpu_to_be64(p->pages->offset[i]); + } +} + +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + MultiFDPacket_t *packet = p->packet; + RAMBlock *block; + int i; + + be32_to_cpus(&packet->magic); + if (packet->magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet " + "magic %x and expected magic %x", + packet->magic, MULTIFD_MAGIC); + return -1; + } + + be32_to_cpus(&packet->version); + if (packet->version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet " + "version %d and expected version %d", + packet->version, MULTIFD_VERSION); + return -1; + } + + p->flags = be32_to_cpu(packet->flags); + + be32_to_cpus(&packet->size); + if (packet->size > migrate_multifd_page_count()) { + error_setg(errp, "multifd: received packet " + "with size %d and expected maximum size %d", + packet->size, migrate_multifd_page_count()) ; + return -1; + } + + p->pages->used = be32_to_cpu(packet->used); + if (p->pages->used > packet->size) { + error_setg(errp, "multifd: received packet " + "with size %d and expected maximum size %d", + p->pages->used, packet->size) ; + return -1; + } + + p->packet_num = be64_to_cpu(packet->packet_num); + + if (p->pages->used) { + /* make sure that ramblock is 0 terminated */ + packet->ramblock[255] = 0; + block = qemu_ram_block_by_name(packet->ramblock); + if (!block) { + error_setg(errp, "multifd: unknown ram block %s", + packet->ramblock); + return -1; + } + } + + for (i = 0; i < p->pages->used; i++) { + ram_addr_t offset = be64_to_cpu(packet->offset[i]); + + if (offset > (block->used_length - TARGET_PAGE_SIZE)) { + error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT + " (max " RAM_ADDR_FMT ")", + offset, block->max_length); + return -1; + } + p->pages->iov[i].iov_base = block->host + offset; + p->pages->iov[i].iov_len = TARGET_PAGE_SIZE; + } + + return 0; +} + struct { MultiFDSendParams *params; /* number of created threads */ int count; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; } *multifd_send_state; +/* + * How we use multifd_send_state->pages and channel->pages? + * + * We create a pages for each channel, and a main one. Each time that + * we need to send a batch of pages we interchange the ones between + * multifd_send_state and the channel that is sending it. There are + * two reasons for that: + * - to not have to do so many mallocs during migration + * - to make easier to know what to free at the end of migration + * + * This way we always know who is the owner of each "pages" struct, + * and we don't need any loocking. It belongs to the migration thread + * or to the channel thread. Switching is safe because the migration + * thread is using the channel mutex when changing it, and the channel + * have to had finish with its own, otherwise pending_job can't be + * false. + */ + +static void multifd_send_pages(void) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + uint64_t transferred; + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + p->pages->used = 0; + + p->packet_num = multifd_send_state->packet_num++; + p->pages->block = NULL; + multifd_send_state->pages = p->pages; + p->pages = pages; + transferred = pages->used * TARGET_PAGE_SIZE + p->packet_len; + ram_counters.multifd_bytes += transferred; + ram_counters.transferred += transferred;; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); +} + +static void multifd_queue_page(RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE; + pages->used++; + + if (pages->used < pages->allocated) { + return; + } + } + + multifd_send_pages(); + + if (pages->block != block) { + multifd_queue_page(block, offset); + } +} + static void multifd_send_terminate_threads(Error *err) { int i; @@ -670,33 +929,116 @@ int multifd_save_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; - } + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_send_state->channels_ready); + qemu_sem_destroy(&multifd_send_state->sem_sync); g_free(multifd_send_state->params); multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; g_free(multifd_send_state); multifd_send_state = NULL; return ret; } +static void multifd_send_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + if (multifd_send_state->pages->used) { + multifd_send_pages(); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + + p->packet_num = multifd_send_state->packet_num++; + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&multifd_send_state->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->packet_num); +} + static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; Error *local_err = NULL; + int ret; + + trace_multifd_send_thread_start(p->id); if (multifd_send_initial_packet(p, &local_err) < 0) { goto out; } + /* initial packet */ + p->num_packets = 1; while (true) { + qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); - if (p->quit) { + + if (p->pending_job) { + uint32_t used = p->pages->used; + uint64_t packet_num = p->packet_num; + uint32_t flags = p->flags; + + multifd_send_fill_packet(p); + p->flags = 0; + p->num_packets++; + p->num_pages += used; + p->pages->used = 0; + qemu_mutex_unlock(&p->mutex); + + trace_multifd_send(p->id, packet_num, used, flags); + + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + + ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err); + if (ret != 0) { + break; + } + + qemu_mutex_lock(&p->mutex); + p->pending_job--; + qemu_mutex_unlock(&p->mutex); + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_send_state->sem_sync); + } + qemu_sem_post(&multifd_send_state->channels_ready); + } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ } - qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); } out: @@ -708,6 +1050,8 @@ out: p->running = false; qemu_mutex_unlock(&p->mutex); + trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); + return NULL; } @@ -735,6 +1079,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) int multifd_save_setup(void) { int thread_count; + uint32_t page_count = migrate_multifd_page_count(); uint8_t i; if (!migrate_use_multifd()) { @@ -744,13 +1089,23 @@ int multifd_save_setup(void) multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); atomic_set(&multifd_send_state->count, 0); + multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->sem_sync, 0); + qemu_sem_init(&multifd_send_state->channels_ready, 0); + for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; + p->pending_job = 0; p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(ram_addr_t) * page_count; + p->packet = g_malloc0(p->packet_len); p->name = g_strdup_printf("multifdsend_%d", i); socket_send_channel_create(multifd_new_send_channel_async, p); } @@ -761,6 +1116,10 @@ struct { MultiFDRecvParams *params; /* number of created threads */ int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; } *multifd_recv_state; static void multifd_recv_terminate_threads(Error *err) @@ -781,8 +1140,11 @@ static void multifd_recv_terminate_threads(Error *err) MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); + /* We could arrive here for two reasons: + - normal quit, i.e. everything went fine, just finished + - error quit: We close the channels so the channel threads + finish the qio_channel_read_all_eof() */ + qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); qemu_mutex_unlock(&p->mutex); } } @@ -805,10 +1167,16 @@ int multifd_load_cleanup(Error **errp) object_unref(OBJECT(p->c)); p->c = NULL; qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; } + qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; g_free(multifd_recv_state); @@ -817,30 +1185,95 @@ int multifd_load_cleanup(Error **errp) return ret; } +static void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->packet_num < p->packet_num) { + multifd_recv_state->packet_num = p->packet_num; + } + qemu_mutex_unlock(&p->mutex); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->packet_num); +} + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; + Error *local_err = NULL; + int ret; + + trace_multifd_recv_thread_start(p->id); while (true) { + uint32_t used; + uint32_t flags; + + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0) { /* EOF */ + break; + } + if (ret == -1) { /* Error */ + break; + } + qemu_mutex_lock(&p->mutex); - if (p->quit) { + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { qemu_mutex_unlock(&p->mutex); break; } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags); + p->num_packets++; + p->num_pages += used; qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + + ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err); + if (ret != 0) { + break; + } + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } } + if (local_err) { + multifd_recv_terminate_threads(local_err); + } qemu_mutex_lock(&p->mutex); p->running = false; qemu_mutex_unlock(&p->mutex); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); + return NULL; } int multifd_load_setup(void) { int thread_count; + uint32_t page_count = migrate_multifd_page_count(); uint8_t i; if (!migrate_use_multifd()) { @@ -850,13 +1283,18 @@ int multifd_load_setup(void) multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); + for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); - p->quit = false; + qemu_sem_init(&p->sem_sync, 0); p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(ram_addr_t) * page_count; + p->packet = g_malloc0(p->packet_len); p->name = g_strdup_printf("multifdrecv_%d", i); } return 0; @@ -894,6 +1332,8 @@ void multifd_recv_new_channel(QIOChannel *ioc) } p->c = ioc; object_ref(OBJECT(ioc)); + /* initial packet */ + p->num_packets = 1; p->running = true; qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, @@ -1374,6 +1814,15 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) return pages; } +static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, + ram_addr_t offset) +{ + multifd_queue_page(block, offset); + ram_counters.normal++; + + return 1; +} + static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset, uint8_t *source_buf) { @@ -1779,6 +2228,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, */ if (block == rs->last_sent_block && save_page_use_compression(rs)) { return compress_page_with_multi_thread(rs, block, offset); + } else if (migrate_use_multifd()) { + return ram_save_multifd_page(rs, block, offset); } return ram_save_page(rs, pss, last_stage); @@ -2605,7 +3056,9 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); return 0; } @@ -2685,8 +3138,10 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) */ ram_control_after_iterate(f, RAM_CONTROL_ROUND); + multifd_send_sync_main(); out: qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); ram_counters.transferred += 8; ret = qemu_file_get_error(f); @@ -2738,7 +3193,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque) rcu_read_unlock(); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); return 0; } @@ -3107,9 +3564,7 @@ static int ram_load_cleanup(void *opaque) */ int ram_postcopy_incoming_init(MigrationIncomingState *mis) { - unsigned long ram_pages = last_ram_page(); - - return postcopy_ram_incoming_init(mis, ram_pages); + return postcopy_ram_incoming_init(mis); } /** @@ -3227,6 +3682,7 @@ static int ram_load_postcopy(QEMUFile *f) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: error_report("Unknown combination of migration flags: %#x" @@ -3415,6 +3871,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: if (flags & RAM_SAVE_FLAG_HOOK) { diff --git a/migration/trace-events b/migration/trace-events index 7ea522e453..9430f3cbe0 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -76,6 +76,18 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned migration_bitmap_sync_start(void) "" migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64 migration_throttle(void) "" +multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x" +multifd_recv_sync_main(long packet_num) "packet num %ld" +multifd_recv_sync_main_signal(uint8_t id) "channel %d" +multifd_recv_sync_main_wait(uint8_t id) "channel %d" +multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 +multifd_recv_thread_start(uint8_t id) "%d" +multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x" +multifd_send_sync_main(long packet_num) "packet num %ld" +multifd_send_sync_main_signal(uint8_t id) "channel %d" +multifd_send_sync_main_wait(uint8_t id) "channel %d" +multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 +multifd_send_thread_start(uint8_t id) "%d" ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx" ram_load_loop(const char *rbname, uint64_t addr, int flags, void *host) "%s: addr: 0x%" PRIx64 " flags: 0x%x host: %p" ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x" diff --git a/qapi/migration.json b/qapi/migration.json index 1b4c1db670..186e8a7303 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -39,6 +39,8 @@ # @page-size: The number of bytes per page for the various page-based # statistics (since 2.10) # +# @multifd-bytes: The number of bytes sent through multifd (since 3.0) +# # Since: 0.14.0 ## { 'struct': 'MigrationStats', @@ -46,7 +48,8 @@ 'duplicate': 'int', 'skipped': 'int', 'normal': 'int', 'normal-bytes': 'int', 'dirty-pages-rate' : 'int', 'mbps' : 'number', 'dirty-sync-count' : 'int', - 'postcopy-requests' : 'int', 'page-size' : 'int' } } + 'postcopy-requests' : 'int', 'page-size' : 'int', + 'multifd-bytes' : 'uint64' } } ## # @XBZRLECacheStats: |