From 34c55a94b14323c06791395801aadaf5606761dc Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Tue, 10 Apr 2018 23:35:15 +0200 Subject: migration: Create multipage support We only create/destry the page list here. We will use it later. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/migration/ram.c b/migration/ram.c index cd5f55117d..ed4401ee46 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -517,6 +517,20 @@ typedef struct { uint8_t id; } __attribute__((packed)) MultiFDInit_t; +typedef struct { + /* number of used pages */ + uint32_t used; + /* number of allocated pages */ + uint32_t allocated; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* offset of each page */ + ram_addr_t *offset; + /* pointer to each page */ + struct iovec *iov; + RAMBlock *block; +} MultiFDPages_t; + typedef struct { /* this fields are not changed once the thread is created */ /* channel number */ @@ -535,6 +549,8 @@ typedef struct { bool running; /* should this thread finish */ bool quit; + /* array of pages to sent */ + MultiFDPages_t *pages; } MultiFDSendParams; typedef struct { @@ -555,6 +571,8 @@ typedef struct { bool running; /* should this thread finish */ bool quit; + /* array of pages to receive */ + MultiFDPages_t *pages; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -619,10 +637,36 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return msg.id; } +static MultiFDPages_t *multifd_pages_init(size_t size) +{ + MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); + + pages->allocated = size; + pages->iov = g_new0(struct iovec, size); + pages->offset = g_new0(ram_addr_t, size); + + return pages; +} + +static void multifd_pages_clear(MultiFDPages_t *pages) +{ + pages->used = 0; + pages->allocated = 0; + pages->packet_num = 0; + pages->block = NULL; + g_free(pages->iov); + pages->iov = NULL; + g_free(pages->offset); + pages->offset = NULL; + g_free(pages); +} + struct { MultiFDSendParams *params; /* number of created threads */ int count; + /* array of pages to sent */ + MultiFDPages_t *pages; } *multifd_send_state; static void multifd_send_terminate_threads(Error *err) @@ -672,9 +716,13 @@ int multifd_save_cleanup(Error **errp) qemu_sem_destroy(&p->sem); g_free(p->name); p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; } g_free(multifd_send_state->params); multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; g_free(multifd_send_state); multifd_send_state = NULL; return ret; @@ -735,6 +783,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) int multifd_save_setup(void) { int thread_count; + uint32_t page_count = migrate_multifd_page_count(); uint8_t i; if (!migrate_use_multifd()) { @@ -744,6 +793,8 @@ int multifd_save_setup(void) multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); atomic_set(&multifd_send_state->count, 0); + multifd_send_state->pages = multifd_pages_init(page_count); + for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -751,6 +802,7 @@ int multifd_save_setup(void) qemu_sem_init(&p->sem, 0); p->quit = false; p->id = i; + p->pages = multifd_pages_init(page_count); p->name = g_strdup_printf("multifdsend_%d", i); socket_send_channel_create(multifd_new_send_channel_async, p); } @@ -808,6 +860,8 @@ int multifd_load_cleanup(Error **errp) qemu_sem_destroy(&p->sem); g_free(p->name); p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; } g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; @@ -841,6 +895,7 @@ static void *multifd_recv_thread(void *opaque) int multifd_load_setup(void) { int thread_count; + uint32_t page_count = migrate_multifd_page_count(); uint8_t i; if (!migrate_use_multifd()) { @@ -850,6 +905,7 @@ int multifd_load_setup(void) multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); atomic_set(&multifd_recv_state->count, 0); + for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; @@ -857,6 +913,7 @@ int multifd_load_setup(void) qemu_sem_init(&p->sem, 0); p->quit = false; p->id = i; + p->pages = multifd_pages_init(page_count); p->name = g_strdup_printf("multifdrecv_%d", i); } return 0; -- cgit v1.2.3 From 2a26c979b15dfedc1d7f4df7ed7e853d768e2850 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 4 Apr 2018 11:26:58 +0200 Subject: migration: Create multifd packet We still don't put anything there. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert -- fix magic (dave) check offset/ramblock (dave) s/seq/packet_num/ and make it 64bit --- migration/ram.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 1 deletion(-) diff --git a/migration/ram.c b/migration/ram.c index ed4401ee46..fd144fdf9a 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -517,6 +517,17 @@ typedef struct { uint8_t id; } __attribute__((packed)) MultiFDInit_t; +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t flags; + uint32_t size; + uint32_t used; + uint64_t packet_num; + char ramblock[256]; + uint64_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + typedef struct { /* number of used pages */ uint32_t used; @@ -551,6 +562,14 @@ typedef struct { bool quit; /* array of pages to sent */ MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; } MultiFDSendParams; typedef struct { @@ -573,6 +592,14 @@ typedef struct { bool quit; /* array of pages to receive */ MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -661,6 +688,99 @@ static void multifd_pages_clear(MultiFDPages_t *pages) g_free(pages); } +static void multifd_send_fill_packet(MultiFDSendParams *p) +{ + MultiFDPacket_t *packet = p->packet; + int i; + + packet->magic = cpu_to_be32(MULTIFD_MAGIC); + packet->version = cpu_to_be32(MULTIFD_VERSION); + packet->flags = cpu_to_be32(p->flags); + packet->size = cpu_to_be32(migrate_multifd_page_count()); + packet->used = cpu_to_be32(p->pages->used); + packet->packet_num = cpu_to_be64(p->packet_num); + + if (p->pages->block) { + strncpy(packet->ramblock, p->pages->block->idstr, 256); + } + + for (i = 0; i < p->pages->used; i++) { + packet->offset[i] = cpu_to_be64(p->pages->offset[i]); + } +} + +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + MultiFDPacket_t *packet = p->packet; + RAMBlock *block; + int i; + + /* ToDo: We can't use it until we haven't received a message */ + return 0; + + be32_to_cpus(&packet->magic); + if (packet->magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet " + "magic %x and expected magic %x", + packet->magic, MULTIFD_MAGIC); + return -1; + } + + be32_to_cpus(&packet->version); + if (packet->version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet " + "version %d and expected version %d", + packet->version, MULTIFD_VERSION); + return -1; + } + + p->flags = be32_to_cpu(packet->flags); + + be32_to_cpus(&packet->size); + if (packet->size > migrate_multifd_page_count()) { + error_setg(errp, "multifd: received packet " + "with size %d and expected maximum size %d", + packet->size, migrate_multifd_page_count()) ; + return -1; + } + + p->pages->used = be32_to_cpu(packet->used); + if (p->pages->used > packet->size) { + error_setg(errp, "multifd: received packet " + "with size %d and expected maximum size %d", + p->pages->used, packet->size) ; + return -1; + } + + p->packet_num = be64_to_cpu(packet->packet_num); + + if (p->pages->used) { + /* make sure that ramblock is 0 terminated */ + packet->ramblock[255] = 0; + block = qemu_ram_block_by_name(packet->ramblock); + if (!block) { + error_setg(errp, "multifd: unknown ram block %s", + packet->ramblock); + return -1; + } + } + + for (i = 0; i < p->pages->used; i++) { + ram_addr_t offset = be64_to_cpu(packet->offset[i]); + + if (offset > (block->used_length - TARGET_PAGE_SIZE)) { + error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT + " (max " RAM_ADDR_FMT ")", + offset, block->max_length); + return -1; + } + p->pages->iov[i].iov_base = block->host + offset; + p->pages->iov[i].iov_len = TARGET_PAGE_SIZE; + } + + return 0; +} + struct { MultiFDSendParams *params; /* number of created threads */ @@ -718,6 +838,9 @@ int multifd_save_cleanup(Error **errp) p->name = NULL; multifd_pages_clear(p->pages); p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; } g_free(multifd_send_state->params); multifd_send_state->params = NULL; @@ -739,6 +862,7 @@ static void *multifd_send_thread(void *opaque) while (true) { qemu_mutex_lock(&p->mutex); + multifd_send_fill_packet(p); if (p->quit) { qemu_mutex_unlock(&p->mutex); break; @@ -803,6 +927,9 @@ int multifd_save_setup(void) p->quit = false; p->id = i; p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(ram_addr_t) * page_count; + p->packet = g_malloc0(p->packet_len); p->name = g_strdup_printf("multifdsend_%d", i); socket_send_channel_create(multifd_new_send_channel_async, p); } @@ -862,6 +989,9 @@ int multifd_load_cleanup(Error **errp) p->name = NULL; multifd_pages_clear(p->pages); p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; } g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; @@ -874,10 +1004,20 @@ int multifd_load_cleanup(Error **errp) static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; + Error *local_err = NULL; + int ret; while (true) { qemu_mutex_lock(&p->mutex); - if (p->quit) { + if (false) { + /* ToDo: Packet reception goes here */ + + ret = multifd_recv_unfill_packet(p, &local_err); + qemu_mutex_unlock(&p->mutex); + if (ret) { + break; + } + } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; } @@ -914,6 +1054,9 @@ int multifd_load_setup(void) p->quit = false; p->id = i; p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(ram_addr_t) * page_count; + p->packet = g_malloc0(p->packet_len); p->name = g_strdup_printf("multifdrecv_%d", i); } return 0; -- cgit v1.2.3 From 6cde6fbe2b2ad672029ce9737659aa34f581c07a Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Tue, 26 Jun 2018 15:26:35 +0200 Subject: migration: Calculate mbps only during transfer time We used to include in this calculation the setup time, but that can be quite big in rdma or multifd. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/migration.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index e1eaa97df4..d3e6da9bfe 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -2708,6 +2708,7 @@ static void migration_calculate_complete(MigrationState *s) { uint64_t bytes = qemu_ftell(s->to_dst_file); int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); + int64_t transfer_time; s->total_time = end_time - s->start_time; if (!s->downtime) { @@ -2718,8 +2719,9 @@ static void migration_calculate_complete(MigrationState *s) s->downtime = end_time - s->downtime_start; } - if (s->total_time) { - s->mbps = ((double) bytes * 8.0) / s->total_time / 1000; + transfer_time = s->total_time - s->setup_time; + if (transfer_time) { + s->mbps = ((double) bytes * 8.0) / transfer_time / 1000; } } -- cgit v1.2.3 From 0c8f0efdd461b26f64fdf8785c333558315bc297 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Tue, 26 Jun 2018 15:38:00 +0200 Subject: migration: Abstract the number of bytes sent Right now we use the "position" inside the QEMUFile, but things like RDMA already do weird things to be able to maintain that counter right, and multifd will have some similar problems. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/migration.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index d3e6da9bfe..264f3ce84e 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -2704,9 +2704,15 @@ static MigThrError migration_detect_error(MigrationState *s) } } +/* How many bytes have we transferred since the beggining of the migration */ +static uint64_t migration_total_bytes(MigrationState *s) +{ + return qemu_ftell(s->to_dst_file); +} + static void migration_calculate_complete(MigrationState *s) { - uint64_t bytes = qemu_ftell(s->to_dst_file); + uint64_t bytes = migration_total_bytes(s); int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); int64_t transfer_time; @@ -2729,13 +2735,15 @@ static void migration_update_counters(MigrationState *s, int64_t current_time) { uint64_t transferred, time_spent; + uint64_t current_bytes; /* bytes transferred since the beginning */ double bandwidth; if (current_time < s->iteration_start_time + BUFFER_DELAY) { return; } - transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes; + current_bytes = migration_total_bytes(s); + transferred = current_bytes - s->iteration_initial_bytes; time_spent = current_time - s->iteration_start_time; bandwidth = (double)transferred / time_spent; s->threshold_size = bandwidth * s->parameters.downtime_limit; @@ -2754,7 +2762,7 @@ static void migration_update_counters(MigrationState *s, qemu_file_reset_rate_limit(s->to_dst_file); s->iteration_start_time = current_time; - s->iteration_initial_bytes = qemu_ftell(s->to_dst_file); + s->iteration_initial_bytes = current_bytes; trace_migrate_transferred(transferred, time_spent, bandwidth, s->threshold_size); -- cgit v1.2.3 From 408ea6ae4c8f94621dff64d69c7fa3f6e84ef2fb Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Fri, 6 Apr 2018 18:28:59 +0200 Subject: migration: Add multifd traces for start/end thread We want to know how many pages/packets each channel has sent. Add counters for those. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert -- sort trace-events (dave) --- migration/ram.c | 22 ++++++++++++++++++++++ migration/trace-events | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/migration/ram.c b/migration/ram.c index fd144fdf9a..5c040e3ae5 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -570,6 +570,11 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; } MultiFDSendParams; typedef struct { @@ -600,6 +605,11 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -856,9 +866,13 @@ static void *multifd_send_thread(void *opaque) MultiFDSendParams *p = opaque; Error *local_err = NULL; + trace_multifd_send_thread_start(p->id); + if (multifd_send_initial_packet(p, &local_err) < 0) { goto out; } + /* initial packet */ + p->num_packets = 1; while (true) { qemu_mutex_lock(&p->mutex); @@ -880,6 +894,8 @@ out: p->running = false; qemu_mutex_unlock(&p->mutex); + trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); + return NULL; } @@ -1007,6 +1023,8 @@ static void *multifd_recv_thread(void *opaque) Error *local_err = NULL; int ret; + trace_multifd_recv_thread_start(p->id); + while (true) { qemu_mutex_lock(&p->mutex); if (false) { @@ -1029,6 +1047,8 @@ static void *multifd_recv_thread(void *opaque) p->running = false; qemu_mutex_unlock(&p->mutex); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); + return NULL; } @@ -1094,6 +1114,8 @@ void multifd_recv_new_channel(QIOChannel *ioc) } p->c = ioc; object_ref(OBJECT(ioc)); + /* initial packet */ + p->num_packets = 1; p->running = true; qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, diff --git a/migration/trace-events b/migration/trace-events index 3f67758893..a80aaa8b3f 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -76,6 +76,10 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned migration_bitmap_sync_start(void) "" migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64 migration_throttle(void) "" +multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 +multifd_recv_thread_start(uint8_t id) "%d" +multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 +multifd_send_thread_start(uint8_t id) "%d" ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx" ram_load_loop(const char *rbname, uint64_t addr, int flags, void *host) "%s: addr: 0x%" PRIx64 " flags: 0x%x host: %p" ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x" -- cgit v1.2.3 From d82628e4bde23aaf50273cc835c8ce28b43590bd Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 11 Apr 2018 02:44:24 +0200 Subject: migration: Multifd channels always wait on the sem Either for quit, sync or packet, we first wake them. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 5c040e3ae5..45d6c43bfe 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -875,6 +875,7 @@ static void *multifd_send_thread(void *opaque) p->num_packets = 1; while (true) { + qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); multifd_send_fill_packet(p); if (p->quit) { @@ -882,7 +883,9 @@ static void *multifd_send_thread(void *opaque) break; } qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + /* this is impossible */ + error_setg(&local_err, "multifd_send_thread: Unknown command"); + break; } out: @@ -1026,6 +1029,7 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); while (true) { + qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); if (false) { /* ToDo: Packet reception goes here */ @@ -1040,9 +1044,14 @@ static void *multifd_recv_thread(void *opaque) break; } qemu_mutex_unlock(&p->mutex); - qemu_sem_wait(&p->sem); + /* this is impossible */ + error_setg(&local_err, "multifd_recv_thread: Unknown command"); + break; } + if (local_err) { + multifd_recv_terminate_threads(local_err); + } qemu_mutex_lock(&p->mutex); p->running = false; qemu_mutex_unlock(&p->mutex); -- cgit v1.2.3 From 0beb5ed3279abf80d0475ca35f48041b02a9851a Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 11 Apr 2018 03:02:10 +0200 Subject: migration: Add block where to send/receive packets Once there add tracepoints. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 49 ++++++++++++++++++++++++++++++++++++++++++++----- migration/trace-events | 2 ++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 45d6c43bfe..76410f9de8 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -560,6 +560,8 @@ typedef struct { bool running; /* should this thread finish */ bool quit; + /* thread has work to do */ + int pending_job; /* array of pages to sent */ MultiFDPages_t *pages; /* packet allocated len */ @@ -595,6 +597,8 @@ typedef struct { bool running; /* should this thread finish */ bool quit; + /* thread has work to do */ + bool pending_job; /* array of pages to receive */ MultiFDPages_t *pages; /* packet allocated len */ @@ -877,8 +881,28 @@ static void *multifd_send_thread(void *opaque) while (true) { qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); - multifd_send_fill_packet(p); - if (p->quit) { + + if (p->pending_job) { + uint32_t used = p->pages->used; + uint64_t packet_num = p->packet_num; + uint32_t flags = p->flags; + + multifd_send_fill_packet(p); + p->flags = 0; + p->num_packets++; + p->num_pages += used; + p->pages->used = 0; + qemu_mutex_unlock(&p->mutex); + + trace_multifd_send(p->id, packet_num, used, flags); + + /* ToDo: send packet here */ + + qemu_mutex_lock(&p->mutex); + p->pending_job--; + qemu_mutex_unlock(&p->mutex); + continue; + } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; } @@ -944,6 +968,7 @@ int multifd_save_setup(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); p->quit = false; + p->pending_job = 0; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t) @@ -1031,14 +1056,27 @@ static void *multifd_recv_thread(void *opaque) while (true) { qemu_sem_wait(&p->sem); qemu_mutex_lock(&p->mutex); - if (false) { - /* ToDo: Packet reception goes here */ + if (p->pending_job) { + uint32_t used; + uint32_t flags; + qemu_mutex_unlock(&p->mutex); + + /* ToDo: recv packet here */ + qemu_mutex_lock(&p->mutex); ret = multifd_recv_unfill_packet(p, &local_err); - qemu_mutex_unlock(&p->mutex); if (ret) { + qemu_mutex_unlock(&p->mutex); break; } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags); + p->pending_job = false; + p->num_packets++; + p->num_pages += used; + qemu_mutex_unlock(&p->mutex); } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; @@ -1081,6 +1119,7 @@ int multifd_load_setup(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); p->quit = false; + p->pending_job = false; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t) diff --git a/migration/trace-events b/migration/trace-events index a80aaa8b3f..4aad26feed 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -76,8 +76,10 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned migration_bitmap_sync_start(void) "" migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64 migration_throttle(void) "" +multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x" multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%d" +multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x" multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 multifd_send_thread_start(uint8_t id) "%d" ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx" -- cgit v1.2.3 From 6df264ac5a7f532adf718b55d5ef713247d857b1 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 28 Feb 2018 09:10:07 +0100 Subject: migration: Synchronize multifd threads with main thread We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap synchronizations don't happen inside a ram section, so we are safe about two channels trying to overwrite the same memory. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert -- seq needs to be atomic now, will also be accessed from main thread. Fix the if (true || ...) leftover We are back to non-atomics --- migration/ram.c | 145 +++++++++++++++++++++++++++++++++++++++---------- migration/trace-events | 6 ++ 2 files changed, 121 insertions(+), 30 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 76410f9de8..77c66a4391 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -510,6 +510,8 @@ exit: #define MULTIFD_MAGIC 0x11223344U #define MULTIFD_VERSION 1 +#define MULTIFD_FLAG_SYNC (1 << 0) + typedef struct { uint32_t magic; uint32_t version; @@ -577,6 +579,8 @@ typedef struct { uint64_t num_packets; /* pages sent through this channel */ uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDSendParams; typedef struct { @@ -614,6 +618,8 @@ typedef struct { uint64_t num_packets; /* pages sent through this channel */ uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; } MultiFDRecvParams; static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) @@ -801,6 +807,10 @@ struct { int count; /* array of pages to sent */ MultiFDPages_t *pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; } *multifd_send_state; static void multifd_send_terminate_threads(Error *err) @@ -848,6 +858,7 @@ int multifd_save_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; multifd_pages_clear(p->pages); @@ -856,6 +867,7 @@ int multifd_save_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_send_state->sem_sync); g_free(multifd_send_state->params); multifd_send_state->params = NULL; multifd_pages_clear(multifd_send_state->pages); @@ -865,6 +877,33 @@ int multifd_save_cleanup(Error **errp) return ret; } +static void multifd_send_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&multifd_send_state->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->packet_num); +} + static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; @@ -901,15 +940,17 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(&p->mutex); p->pending_job--; qemu_mutex_unlock(&p->mutex); - continue; + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_send_state->sem_sync); + } } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ } - qemu_mutex_unlock(&p->mutex); - /* this is impossible */ - error_setg(&local_err, "multifd_send_thread: Unknown command"); - break; } out: @@ -961,12 +1002,14 @@ int multifd_save_setup(void) multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); atomic_set(&multifd_send_state->count, 0); multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->sem_sync, 0); for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->pending_job = 0; p->id = i; @@ -984,6 +1027,10 @@ struct { MultiFDRecvParams *params; /* number of created threads */ int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; } *multifd_recv_state; static void multifd_recv_terminate_threads(Error *err) @@ -1029,6 +1076,7 @@ int multifd_load_cleanup(Error **errp) p->c = NULL; qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; multifd_pages_clear(p->pages); @@ -1037,6 +1085,7 @@ int multifd_load_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_recv_state->sem_sync); g_free(multifd_recv_state->params); multifd_recv_state->params = NULL; g_free(multifd_recv_state); @@ -1045,6 +1094,42 @@ int multifd_load_cleanup(Error **errp) return ret; } +static void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + qemu_mutex_lock(&p->mutex); + p->pending_job = true; + qemu_mutex_unlock(&p->mutex); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->packet_num < p->packet_num) { + multifd_recv_state->packet_num = p->packet_num; + } + qemu_mutex_unlock(&p->mutex); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_signal(p->id); + + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->packet_num); +} + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *p = opaque; @@ -1054,37 +1139,30 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); while (true) { - qemu_sem_wait(&p->sem); - qemu_mutex_lock(&p->mutex); - if (p->pending_job) { - uint32_t used; - uint32_t flags; - qemu_mutex_unlock(&p->mutex); + uint32_t used; + uint32_t flags; - /* ToDo: recv packet here */ + /* ToDo: recv packet here */ - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; - } - - used = p->pages->used; - flags = p->flags; - trace_multifd_recv(p->id, p->packet_num, used, flags); - p->pending_job = false; - p->num_packets++; - p->num_pages += used; - qemu_mutex_unlock(&p->mutex); - } else if (p->quit) { + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { qemu_mutex_unlock(&p->mutex); break; } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags); + p->pending_job = false; + p->num_packets++; + p->num_pages += used; qemu_mutex_unlock(&p->mutex); - /* this is impossible */ - error_setg(&local_err, "multifd_recv_thread: Unknown command"); - break; + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } } if (local_err) { @@ -1112,12 +1190,14 @@ int multifd_load_setup(void) multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); for (i = 0; i < thread_count; i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); p->quit = false; p->pending_job = false; p->id = i; @@ -2875,6 +2955,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); return 0; @@ -2955,6 +3036,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) */ ram_control_after_iterate(f, RAM_CONTROL_ROUND); + multifd_send_sync_main(); out: qemu_put_be64(f, RAM_SAVE_FLAG_EOS); ram_counters.transferred += 8; @@ -3008,6 +3090,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) rcu_read_unlock(); + multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); return 0; @@ -3497,6 +3580,7 @@ static int ram_load_postcopy(QEMUFile *f) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: error_report("Unknown combination of migration flags: %#x" @@ -3685,6 +3769,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) break; case RAM_SAVE_FLAG_EOS: /* normal exit */ + multifd_recv_sync_main(); break; default: if (flags & RAM_SAVE_FLAG_HOOK) { diff --git a/migration/trace-events b/migration/trace-events index 4aad26feed..8b9edfbfef 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) "" migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64 migration_throttle(void) "" multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x" +multifd_recv_sync_main(long packet_num) "packet num %ld" +multifd_recv_sync_main_signal(uint8_t id) "channel %d" +multifd_recv_sync_main_wait(uint8_t id) "channel %d" multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 multifd_recv_thread_start(uint8_t id) "%d" multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x" +multifd_send_sync_main(long packet_num) "packet num %ld" +multifd_send_sync_main_signal(uint8_t id) "channel %d" +multifd_send_sync_main_wait(uint8_t id) "channel %d" multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64 multifd_send_thread_start(uint8_t id) "%d" ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx" -- cgit v1.2.3 From a61c45bd227951cd792f22b00d4e502c01ae8d9b Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Tue, 26 Jun 2018 15:20:11 +0200 Subject: migration: Create multifd_bytes ram_counter This will include how many bytes they are sent through multifd. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- hmp.c | 2 ++ migration/migration.c | 1 + qapi/migration.json | 5 ++++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/hmp.c b/hmp.c index f601099f90..0da0b0ac33 100644 --- a/hmp.c +++ b/hmp.c @@ -234,6 +234,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict) info->ram->dirty_sync_count); monitor_printf(mon, "page size: %" PRIu64 " kbytes\n", info->ram->page_size >> 10); + monitor_printf(mon, "multifd bytes: %" PRIu64 " kbytes\n", + info->ram->multifd_bytes >> 10); if (info->ram->dirty_pages_rate) { monitor_printf(mon, "dirty pages rate: %" PRIu64 " pages\n", diff --git a/migration/migration.c b/migration/migration.c index 264f3ce84e..2680ba2d47 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -708,6 +708,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s) info->ram->dirty_sync_count = ram_counters.dirty_sync_count; info->ram->postcopy_requests = ram_counters.postcopy_requests; info->ram->page_size = qemu_target_page_size(); + info->ram->multifd_bytes = ram_counters.multifd_bytes; if (migrate_use_xbzrle()) { info->has_xbzrle_cache = true; diff --git a/qapi/migration.json b/qapi/migration.json index 1b4c1db670..186e8a7303 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -39,6 +39,8 @@ # @page-size: The number of bytes per page for the various page-based # statistics (since 2.10) # +# @multifd-bytes: The number of bytes sent through multifd (since 3.0) +# # Since: 0.14.0 ## { 'struct': 'MigrationStats', @@ -46,7 +48,8 @@ 'duplicate': 'int', 'skipped': 'int', 'normal': 'int', 'normal-bytes': 'int', 'dirty-pages-rate' : 'int', 'mbps' : 'number', 'dirty-sync-count' : 'int', - 'postcopy-requests' : 'int', 'page-size' : 'int' } } + 'postcopy-requests' : 'int', 'page-size' : 'int', + 'multifd-bytes' : 'uint64' } } ## # @XBZRLECacheStats: -- cgit v1.2.3 From b9ee2f7d701457f205a3329ea67b33e0b63a0f43 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Fri, 15 Jan 2016 11:40:13 +0100 Subject: migration: Create ram_save_multifd_page The function still don't use multifd, but we have simplified ram_save_page, xbzrle and RDMA stuff is gone. We have added a new counter. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert -- Add last_page parameter Add commets for done and address Remove multifd field, it is the same than normal pages Merge next patch, now we send multiple pages at a time Remove counter for multifd pages, it is identical to normal pages Use iovec's instead of creating the equivalent. Clear memory used by pages (dave) Use g_new0(danp) define MULTIFD_CONTINUE now pages member is a pointer Fix off-by-one in number of pages in one packet Remove RAM_SAVE_FLAG_MULTIFD_PAGE s/multifd_pages_t/MultiFDPages_t/ add comment explaining what it means --- migration/migration.c | 2 +- migration/ram.c | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/migration/migration.c b/migration/migration.c index 2680ba2d47..d075c27886 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -2708,7 +2708,7 @@ static MigThrError migration_detect_error(MigrationState *s) /* How many bytes have we transferred since the beggining of the migration */ static uint64_t migration_total_bytes(MigrationState *s) { - return qemu_ftell(s->to_dst_file); + return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes; } static void migration_calculate_complete(MigrationState *s) diff --git a/migration/ram.c b/migration/ram.c index 77c66a4391..050e2f2000 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -55,6 +55,7 @@ #include "sysemu/sysemu.h" #include "qemu/uuid.h" #include "savevm.h" +#include "qemu/iov.h" /***********************************************************/ /* ram save/restore */ @@ -811,8 +812,87 @@ struct { QemuSemaphore sem_sync; /* global number of generated multifd packets */ uint64_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; } *multifd_send_state; +/* + * How we use multifd_send_state->pages and channel->pages? + * + * We create a pages for each channel, and a main one. Each time that + * we need to send a batch of pages we interchange the ones between + * multifd_send_state and the channel that is sending it. There are + * two reasons for that: + * - to not have to do so many mallocs during migration + * - to make easier to know what to free at the end of migration + * + * This way we always know who is the owner of each "pages" struct, + * and we don't need any loocking. It belongs to the migration thread + * or to the channel thread. Switching is safe because the migration + * thread is using the channel mutex when changing it, and the channel + * have to had finish with its own, otherwise pending_job can't be + * false. + */ + +static void multifd_send_pages(void) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + uint64_t transferred; + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + p->pages->used = 0; + + p->packet_num = multifd_send_state->packet_num++; + p->pages->block = NULL; + multifd_send_state->pages = p->pages; + p->pages = pages; + transferred = pages->used * TARGET_PAGE_SIZE + p->packet_len; + ram_counters.multifd_bytes += transferred; + ram_counters.transferred += transferred;; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); +} + +static void multifd_queue_page(RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE; + pages->used++; + + if (pages->used < pages->allocated) { + return; + } + } + + multifd_send_pages(); + + if (pages->block != block) { + multifd_queue_page(block, offset); + } +} + static void multifd_send_terminate_threads(Error *err) { int i; @@ -867,6 +947,7 @@ int multifd_save_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_send_state->channels_ready); qemu_sem_destroy(&multifd_send_state->sem_sync); g_free(multifd_send_state->params); multifd_send_state->params = NULL; @@ -884,12 +965,17 @@ static void multifd_send_sync_main(void) if (!migrate_use_multifd()) { return; } + if (multifd_send_state->pages->used) { + multifd_send_pages(); + } for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_signal(p->id); qemu_mutex_lock(&p->mutex); + + p->packet_num = multifd_send_state->packet_num++; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; qemu_mutex_unlock(&p->mutex); @@ -944,6 +1030,7 @@ static void *multifd_send_thread(void *opaque) if (flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&multifd_send_state->sem_sync); } + qemu_sem_post(&multifd_send_state->channels_ready); } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; @@ -1003,6 +1090,7 @@ int multifd_save_setup(void) atomic_set(&multifd_send_state->count, 0); multifd_send_state->pages = multifd_pages_init(page_count); qemu_sem_init(&multifd_send_state->sem_sync, 0); + qemu_sem_init(&multifd_send_state->channels_ready, 0); for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -1724,6 +1812,23 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) return pages; } +static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, + ram_addr_t offset) +{ + uint8_t *p; + + p = block->host + offset; + + ram_counters.transferred += save_page_header(rs, rs->f, block, + offset | RAM_SAVE_FLAG_PAGE); + multifd_queue_page(block, offset); + qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); + ram_counters.transferred += TARGET_PAGE_SIZE; + ram_counters.normal++; + + return 1; +} + static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset, uint8_t *source_buf) { @@ -2129,6 +2234,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, */ if (block == rs->last_sent_block && save_page_use_compression(rs)) { return compress_page_with_multi_thread(rs, block, offset); + } else if (migrate_use_multifd()) { + return ram_save_multifd_page(rs, block, offset); } return ram_save_page(rs, pss, last_stage); -- cgit v1.2.3 From 8b2db7f5fd9566ab9cf22b02b623c51b2075a60e Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 11 Apr 2018 12:36:13 +0200 Subject: migration: Start sending messages Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 050e2f2000..b9c8f65059 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -736,9 +736,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) RAMBlock *block; int i; - /* ToDo: We can't use it until we haven't received a message */ - return 0; - be32_to_cpus(&packet->magic); if (packet->magic != MULTIFD_MAGIC) { error_setg(errp, "multifd: received packet " @@ -994,6 +991,7 @@ static void *multifd_send_thread(void *opaque) { MultiFDSendParams *p = opaque; Error *local_err = NULL; + int ret; trace_multifd_send_thread_start(p->id); @@ -1021,7 +1019,16 @@ static void *multifd_send_thread(void *opaque) trace_multifd_send(p->id, packet_num, used, flags); - /* ToDo: send packet here */ + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + + ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err); + if (ret != 0) { + break; + } qemu_mutex_lock(&p->mutex); p->pending_job--; @@ -1230,7 +1237,14 @@ static void *multifd_recv_thread(void *opaque) uint32_t used; uint32_t flags; - /* ToDo: recv packet here */ + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0) { /* EOF */ + break; + } + if (ret == -1) { /* Error */ + break; + } qemu_mutex_lock(&p->mutex); ret = multifd_recv_unfill_packet(p, &local_err); @@ -1247,6 +1261,11 @@ static void *multifd_recv_thread(void *opaque) p->num_pages += used; qemu_mutex_unlock(&p->mutex); + ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err); + if (ret != 0) { + break; + } + if (flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&multifd_recv_state->sem_sync); qemu_sem_wait(&p->sem_sync); -- cgit v1.2.3 From 4d22c148c989f80e4377233bed886e96d85ef645 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Apr 2018 00:38:17 +0200 Subject: migration: Wait for blocking IO We have three conditions here: - channel fails -> error - we have to quit: we close the channel and reads fails - normal read that success, we are in bussiness So forget the complications of waiting in a semaphore. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index b9c8f65059..5d38d699f3 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -602,8 +602,6 @@ typedef struct { bool running; /* should this thread finish */ bool quit; - /* thread has work to do */ - bool pending_job; /* array of pages to receive */ MultiFDPages_t *pages; /* packet allocated len */ @@ -1199,14 +1197,6 @@ static void multifd_recv_sync_main(void) for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; - trace_multifd_recv_sync_main_signal(p->id); - qemu_mutex_lock(&p->mutex); - p->pending_job = true; - qemu_mutex_unlock(&p->mutex); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - trace_multifd_recv_sync_main_wait(p->id); qemu_sem_wait(&multifd_recv_state->sem_sync); qemu_mutex_lock(&p->mutex); @@ -1219,7 +1209,6 @@ static void multifd_recv_sync_main(void) MultiFDRecvParams *p = &multifd_recv_state->params[i]; trace_multifd_recv_sync_main_signal(p->id); - qemu_sem_post(&p->sem_sync); } trace_multifd_recv_sync_main(multifd_recv_state->packet_num); @@ -1256,7 +1245,6 @@ static void *multifd_recv_thread(void *opaque) used = p->pages->used; flags = p->flags; trace_multifd_recv(p->id, p->packet_num, used, flags); - p->pending_job = false; p->num_packets++; p->num_pages += used; qemu_mutex_unlock(&p->mutex); @@ -1306,7 +1294,6 @@ int multifd_load_setup(void) qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); p->quit = false; - p->pending_job = false; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t) -- cgit v1.2.3 From 7a5cc33c4878e998e1e2d7c9092dddf963aeb795 Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Apr 2018 00:49:19 +0200 Subject: migration: Remove not needed semaphore and quit We know quit with shutdwon in the QIO. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert -- Add comment Use shutdown() instead of unref() --- migration/ram.c | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 5d38d699f3..61f7313093 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -594,14 +594,10 @@ typedef struct { QemuThread thread; /* communication channel */ QIOChannel *c; - /* sem where to wait for more work */ - QemuSemaphore sem; /* this mutex protects the following parameters */ QemuMutex mutex; /* is this channel thread running */ bool running; - /* should this thread finish */ - bool quit; /* array of pages to receive */ MultiFDPages_t *pages; /* packet allocated len */ @@ -1144,8 +1140,11 @@ static void multifd_recv_terminate_threads(Error *err) MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); + /* We could arrive here for two reasons: + - normal quit, i.e. everything went fine, just finished + - error quit: We close the channels so the channel threads + finish the qio_channel_read_all_eof() */ + qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); qemu_mutex_unlock(&p->mutex); } } @@ -1168,7 +1167,6 @@ int multifd_load_cleanup(Error **errp) object_unref(OBJECT(p->c)); p->c = NULL; qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); qemu_sem_destroy(&p->sem_sync); g_free(p->name); p->name = NULL; @@ -1291,9 +1289,7 @@ int multifd_load_setup(void) MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); - p->quit = false; p->id = i; p->pages = multifd_pages_init(page_count); p->packet_len = sizeof(MultiFDPacket_t) -- cgit v1.2.3 From 35374cbdff2bf6b6fb7caae5ea8aaa3b0f53f4ca Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 18 Apr 2018 10:13:21 +0200 Subject: migration: Stop sending whole pages through main channel We have to flush() the QEMUFile because now we sent really few data through that channel. Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/ram.c | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 61f7313093..7d23b472cb 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -1817,15 +1817,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - uint8_t *p; - - p = block->host + offset; - - ram_counters.transferred += save_page_header(rs, rs->f, block, - offset | RAM_SAVE_FLAG_PAGE); multifd_queue_page(block, offset); - qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); - ram_counters.transferred += TARGET_PAGE_SIZE; ram_counters.normal++; return 1; @@ -3066,6 +3058,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque) multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); return 0; } @@ -3148,6 +3141,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque) multifd_send_sync_main(); out: qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); ram_counters.transferred += 8; ret = qemu_file_get_error(f); @@ -3201,6 +3195,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque) multifd_send_sync_main(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); + qemu_fflush(f); return 0; } -- cgit v1.2.3 From c136180c901dd84d36577edc9b7f377fb76b832d Mon Sep 17 00:00:00 2001 From: David Hildenbrand Date: Wed, 20 Jun 2018 22:27:36 +0200 Subject: postcopy: drop ram_pages parameter from postcopy_ram_incoming_init() Not needed. Don't expose last_ram_page(). Signed-off-by: David Hildenbrand Message-Id: <20180620202736.21399-1-david@redhat.com> Reviewed-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert Signed-off-by: Juan Quintela --- exec.c | 2 +- include/exec/ram_addr.h | 1 - migration/postcopy-ram.c | 4 ++-- migration/postcopy-ram.h | 2 +- migration/ram.c | 4 +--- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/exec.c b/exec.c index 88edb59060..1e37f7586b 100644 --- a/exec.c +++ b/exec.c @@ -1930,7 +1930,7 @@ static ram_addr_t find_ram_offset(ram_addr_t size) return offset; } -unsigned long last_ram_page(void) +static unsigned long last_ram_page(void) { RAMBlock *block; ram_addr_t last = 0; diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h index cf2446a176..33c361cad5 100644 --- a/include/exec/ram_addr.h +++ b/include/exec/ram_addr.h @@ -71,7 +71,6 @@ static inline unsigned long int ramblock_recv_bitmap_offset(void *host_addr, } long qemu_getrampagesize(void); -unsigned long last_ram_page(void); RAMBlock *qemu_ram_alloc_from_file(ram_addr_t size, MemoryRegion *mr, bool share, const char *mem_path, Error **errp); diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c index 48e51556a7..932f188949 100644 --- a/migration/postcopy-ram.c +++ b/migration/postcopy-ram.c @@ -500,7 +500,7 @@ static int cleanup_range(const char *block_name, void *host_addr, * postcopy later; must be called prior to any precopy. * called from arch_init's similarly named ram_postcopy_incoming_init */ -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages) +int postcopy_ram_incoming_init(MigrationIncomingState *mis) { if (qemu_ram_foreach_migratable_block(init_range, NULL)) { return -1; @@ -1265,7 +1265,7 @@ bool postcopy_ram_supported_by_host(MigrationIncomingState *mis) return false; } -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages) +int postcopy_ram_incoming_init(MigrationIncomingState *mis) { error_report("postcopy_ram_incoming_init: No OS support"); return -1; diff --git a/migration/postcopy-ram.h b/migration/postcopy-ram.h index d900d9c34f..9d55536fd1 100644 --- a/migration/postcopy-ram.h +++ b/migration/postcopy-ram.h @@ -27,7 +27,7 @@ int postcopy_ram_enable_notify(MigrationIncomingState *mis); * postcopy later; must be called prior to any precopy. * called from ram.c's similarly named ram_postcopy_incoming_init */ -int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages); +int postcopy_ram_incoming_init(MigrationIncomingState *mis); /* * At the end of a migration where postcopy_ram_incoming_init was called. diff --git a/migration/ram.c b/migration/ram.c index 7d23b472cb..1cd98d6398 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -3564,9 +3564,7 @@ static int ram_load_cleanup(void *opaque) */ int ram_postcopy_incoming_init(MigrationIncomingState *mis) { - unsigned long ram_pages = last_ram_page(); - - return postcopy_ram_incoming_init(mis, ram_pages); + return postcopy_ram_incoming_init(mis); } /** -- cgit v1.2.3 From ca273df3014930f4ecb6d3ba24aa778354634b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20P=2E=20Berrang=C3=A9?= Date: Tue, 19 Jun 2018 17:35:52 +0100 Subject: migration: fix crash in when incoming client channel setup fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The way we determine if we can start the incoming migration was changed to use migration_has_all_channels() in: commit 428d89084c709e568f9cd301c2f6416a54c53d6d Author: Juan Quintela Date: Mon Jul 24 13:06:25 2017 +0200 migration: Create migration_has_all_channels This method in turn calls multifd_recv_all_channels_created() which is hardcoded to always return 'true' when multifd is not in use. This is a latent bug... ...activated in a following commit where that return result ends up acting as the flag to indicate whether it is possible to start processing the migration: commit 36c2f8be2c4eb0003ac77a14910842b7ddd7337e Author: Juan Quintela Date: Wed Mar 7 08:40:52 2018 +0100 migration: Delay start of migration main routines This means that if channel initialization fails with normal migration, it'll never notice and attempt to start the incoming migration regardless and crash on a NULL pointer. This can be seen, for example, if a client connects to a server requiring TLS, but has an invalid x509 certificate: qemu-system-x86_64: The certificate hasn't got a known issuer qemu-system-x86_64: migration/migration.c:386: process_incoming_migration_co: Assertion `mis->from_src_file' failed. #0 0x00007fffebd24f2b in raise () at /lib64/libc.so.6 #1 0x00007fffebd0f561 in abort () at /lib64/libc.so.6 #2 0x00007fffebd0f431 in _nl_load_domain.cold.0 () at /lib64/libc.so.6 #3 0x00007fffebd1d692 in () at /lib64/libc.so.6 #4 0x0000555555ad027e in process_incoming_migration_co (opaque=) at migration/migration.c:386 #5 0x0000555555c45e8b in coroutine_trampoline (i0=, i1=) at util/coroutine-ucontext.c:116 #6 0x00007fffebd3a6a0 in __start_context () at /lib64/libc.so.6 #7 0x0000000000000000 in () To handle the non-multifd case, we check whether mis->from_src_file is non-NULL. With this in place, the migration server drops the rejected client and stays around waiting for another, hopefully valid, client to arrive. Signed-off-by: Daniel P. Berrangé Message-Id: <20180619163552.18206-1-berrange@redhat.com> Reviewed-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert Signed-off-by: Juan Quintela --- migration/migration.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/migration/migration.c b/migration/migration.c index d075c27886..94d71f8b24 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -518,11 +518,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc) */ bool migration_has_all_channels(void) { + MigrationIncomingState *mis = migration_incoming_get_current(); bool all_channels; all_channels = multifd_recv_all_channels_created(); - return all_channels; + return all_channels && mis->from_src_file != NULL; } /* -- cgit v1.2.3