aboutsummaryrefslogtreecommitdiff
path: root/migration
diff options
context:
space:
mode:
Diffstat (limited to 'migration')
-rw-r--r--migration/migration.c24
-rw-r--r--migration/postcopy-ram.c4
-rw-r--r--migration/postcopy-ram.h2
-rw-r--r--migration/ram.c493
-rw-r--r--migration/trace-events12
5 files changed, 508 insertions, 27 deletions
diff --git a/migration/migration.c b/migration/migration.c
index e1eaa97df4..94d71f8b24 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -518,11 +518,12 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
*/
bool migration_has_all_channels(void)
{
+ MigrationIncomingState *mis = migration_incoming_get_current();
bool all_channels;
all_channels = multifd_recv_all_channels_created();
- return all_channels;
+ return all_channels && mis->from_src_file != NULL;
}
/*
@@ -708,6 +709,7 @@ static void populate_ram_info(MigrationInfo *info, MigrationState *s)
info->ram->dirty_sync_count = ram_counters.dirty_sync_count;
info->ram->postcopy_requests = ram_counters.postcopy_requests;
info->ram->page_size = qemu_target_page_size();
+ info->ram->multifd_bytes = ram_counters.multifd_bytes;
if (migrate_use_xbzrle()) {
info->has_xbzrle_cache = true;
@@ -2704,10 +2706,17 @@ static MigThrError migration_detect_error(MigrationState *s)
}
}
+/* How many bytes have we transferred since the beggining of the migration */
+static uint64_t migration_total_bytes(MigrationState *s)
+{
+ return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes;
+}
+
static void migration_calculate_complete(MigrationState *s)
{
- uint64_t bytes = qemu_ftell(s->to_dst_file);
+ uint64_t bytes = migration_total_bytes(s);
int64_t end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+ int64_t transfer_time;
s->total_time = end_time - s->start_time;
if (!s->downtime) {
@@ -2718,8 +2727,9 @@ static void migration_calculate_complete(MigrationState *s)
s->downtime = end_time - s->downtime_start;
}
- if (s->total_time) {
- s->mbps = ((double) bytes * 8.0) / s->total_time / 1000;
+ transfer_time = s->total_time - s->setup_time;
+ if (transfer_time) {
+ s->mbps = ((double) bytes * 8.0) / transfer_time / 1000;
}
}
@@ -2727,13 +2737,15 @@ static void migration_update_counters(MigrationState *s,
int64_t current_time)
{
uint64_t transferred, time_spent;
+ uint64_t current_bytes; /* bytes transferred since the beginning */
double bandwidth;
if (current_time < s->iteration_start_time + BUFFER_DELAY) {
return;
}
- transferred = qemu_ftell(s->to_dst_file) - s->iteration_initial_bytes;
+ current_bytes = migration_total_bytes(s);
+ transferred = current_bytes - s->iteration_initial_bytes;
time_spent = current_time - s->iteration_start_time;
bandwidth = (double)transferred / time_spent;
s->threshold_size = bandwidth * s->parameters.downtime_limit;
@@ -2752,7 +2764,7 @@ static void migration_update_counters(MigrationState *s,
qemu_file_reset_rate_limit(s->to_dst_file);
s->iteration_start_time = current_time;
- s->iteration_initial_bytes = qemu_ftell(s->to_dst_file);
+ s->iteration_initial_bytes = current_bytes;
trace_migrate_transferred(transferred, time_spent,
bandwidth, s->threshold_size);
diff --git a/migration/postcopy-ram.c b/migration/postcopy-ram.c
index 48e51556a7..932f188949 100644
--- a/migration/postcopy-ram.c
+++ b/migration/postcopy-ram.c
@@ -500,7 +500,7 @@ static int cleanup_range(const char *block_name, void *host_addr,
* postcopy later; must be called prior to any precopy.
* called from arch_init's similarly named ram_postcopy_incoming_init
*/
-int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages)
+int postcopy_ram_incoming_init(MigrationIncomingState *mis)
{
if (qemu_ram_foreach_migratable_block(init_range, NULL)) {
return -1;
@@ -1265,7 +1265,7 @@ bool postcopy_ram_supported_by_host(MigrationIncomingState *mis)
return false;
}
-int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages)
+int postcopy_ram_incoming_init(MigrationIncomingState *mis)
{
error_report("postcopy_ram_incoming_init: No OS support");
return -1;
diff --git a/migration/postcopy-ram.h b/migration/postcopy-ram.h
index d900d9c34f..9d55536fd1 100644
--- a/migration/postcopy-ram.h
+++ b/migration/postcopy-ram.h
@@ -27,7 +27,7 @@ int postcopy_ram_enable_notify(MigrationIncomingState *mis);
* postcopy later; must be called prior to any precopy.
* called from ram.c's similarly named ram_postcopy_incoming_init
*/
-int postcopy_ram_incoming_init(MigrationIncomingState *mis, size_t ram_pages);
+int postcopy_ram_incoming_init(MigrationIncomingState *mis);
/*
* At the end of a migration where postcopy_ram_incoming_init was called.
diff --git a/migration/ram.c b/migration/ram.c
index cd5f55117d..1cd98d6398 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -55,6 +55,7 @@
#include "sysemu/sysemu.h"
#include "qemu/uuid.h"
#include "savevm.h"
+#include "qemu/iov.h"
/***********************************************************/
/* ram save/restore */
@@ -510,6 +511,8 @@ exit:
#define MULTIFD_MAGIC 0x11223344U
#define MULTIFD_VERSION 1
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
typedef struct {
uint32_t magic;
uint32_t version;
@@ -518,6 +521,31 @@ typedef struct {
} __attribute__((packed)) MultiFDInit_t;
typedef struct {
+ uint32_t magic;
+ uint32_t version;
+ uint32_t flags;
+ uint32_t size;
+ uint32_t used;
+ uint64_t packet_num;
+ char ramblock[256];
+ uint64_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
+typedef struct {
+ /* number of used pages */
+ uint32_t used;
+ /* number of allocated pages */
+ uint32_t allocated;
+ /* global number of generated multifd packets */
+ uint64_t packet_num;
+ /* offset of each page */
+ ram_addr_t *offset;
+ /* pointer to each page */
+ struct iovec *iov;
+ RAMBlock *block;
+} MultiFDPages_t;
+
+typedef struct {
/* this fields are not changed once the thread is created */
/* channel number */
uint8_t id;
@@ -535,6 +563,25 @@ typedef struct {
bool running;
/* should this thread finish */
bool quit;
+ /* thread has work to do */
+ int pending_job;
+ /* array of pages to sent */
+ MultiFDPages_t *pages;
+ /* packet allocated len */
+ uint32_t packet_len;
+ /* pointer to the packet */
+ MultiFDPacket_t *packet;
+ /* multifd flags for each packet */
+ uint32_t flags;
+ /* global number of generated multifd packets */
+ uint64_t packet_num;
+ /* thread local variables */
+ /* packets sent through this channel */
+ uint64_t num_packets;
+ /* pages sent through this channel */
+ uint64_t num_pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
} MultiFDSendParams;
typedef struct {
@@ -547,14 +594,27 @@ typedef struct {
QemuThread thread;
/* communication channel */
QIOChannel *c;
- /* sem where to wait for more work */
- QemuSemaphore sem;
/* this mutex protects the following parameters */
QemuMutex mutex;
/* is this channel thread running */
bool running;
- /* should this thread finish */
- bool quit;
+ /* array of pages to receive */
+ MultiFDPages_t *pages;
+ /* packet allocated len */
+ uint32_t packet_len;
+ /* pointer to the packet */
+ MultiFDPacket_t *packet;
+ /* multifd flags for each packet */
+ uint32_t flags;
+ /* global number of generated multifd packets */
+ uint64_t packet_num;
+ /* thread local variables */
+ /* packets sent through this channel */
+ uint64_t num_packets;
+ /* pages sent through this channel */
+ uint64_t num_pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
} MultiFDRecvParams;
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -619,12 +679,211 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
return msg.id;
}
+static MultiFDPages_t *multifd_pages_init(size_t size)
+{
+ MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
+
+ pages->allocated = size;
+ pages->iov = g_new0(struct iovec, size);
+ pages->offset = g_new0(ram_addr_t, size);
+
+ return pages;
+}
+
+static void multifd_pages_clear(MultiFDPages_t *pages)
+{
+ pages->used = 0;
+ pages->allocated = 0;
+ pages->packet_num = 0;
+ pages->block = NULL;
+ g_free(pages->iov);
+ pages->iov = NULL;
+ g_free(pages->offset);
+ pages->offset = NULL;
+ g_free(pages);
+}
+
+static void multifd_send_fill_packet(MultiFDSendParams *p)
+{
+ MultiFDPacket_t *packet = p->packet;
+ int i;
+
+ packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+ packet->version = cpu_to_be32(MULTIFD_VERSION);
+ packet->flags = cpu_to_be32(p->flags);
+ packet->size = cpu_to_be32(migrate_multifd_page_count());
+ packet->used = cpu_to_be32(p->pages->used);
+ packet->packet_num = cpu_to_be64(p->packet_num);
+
+ if (p->pages->block) {
+ strncpy(packet->ramblock, p->pages->block->idstr, 256);
+ }
+
+ for (i = 0; i < p->pages->used; i++) {
+ packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
+ }
+}
+
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+ MultiFDPacket_t *packet = p->packet;
+ RAMBlock *block;
+ int i;
+
+ be32_to_cpus(&packet->magic);
+ if (packet->magic != MULTIFD_MAGIC) {
+ error_setg(errp, "multifd: received packet "
+ "magic %x and expected magic %x",
+ packet->magic, MULTIFD_MAGIC);
+ return -1;
+ }
+
+ be32_to_cpus(&packet->version);
+ if (packet->version != MULTIFD_VERSION) {
+ error_setg(errp, "multifd: received packet "
+ "version %d and expected version %d",
+ packet->version, MULTIFD_VERSION);
+ return -1;
+ }
+
+ p->flags = be32_to_cpu(packet->flags);
+
+ be32_to_cpus(&packet->size);
+ if (packet->size > migrate_multifd_page_count()) {
+ error_setg(errp, "multifd: received packet "
+ "with size %d and expected maximum size %d",
+ packet->size, migrate_multifd_page_count()) ;
+ return -1;
+ }
+
+ p->pages->used = be32_to_cpu(packet->used);
+ if (p->pages->used > packet->size) {
+ error_setg(errp, "multifd: received packet "
+ "with size %d and expected maximum size %d",
+ p->pages->used, packet->size) ;
+ return -1;
+ }
+
+ p->packet_num = be64_to_cpu(packet->packet_num);
+
+ if (p->pages->used) {
+ /* make sure that ramblock is 0 terminated */
+ packet->ramblock[255] = 0;
+ block = qemu_ram_block_by_name(packet->ramblock);
+ if (!block) {
+ error_setg(errp, "multifd: unknown ram block %s",
+ packet->ramblock);
+ return -1;
+ }
+ }
+
+ for (i = 0; i < p->pages->used; i++) {
+ ram_addr_t offset = be64_to_cpu(packet->offset[i]);
+
+ if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
+ error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
+ " (max " RAM_ADDR_FMT ")",
+ offset, block->max_length);
+ return -1;
+ }
+ p->pages->iov[i].iov_base = block->host + offset;
+ p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
+ }
+
+ return 0;
+}
+
struct {
MultiFDSendParams *params;
/* number of created threads */
int count;
+ /* array of pages to sent */
+ MultiFDPages_t *pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+ /* global number of generated multifd packets */
+ uint64_t packet_num;
+ /* send channels ready */
+ QemuSemaphore channels_ready;
} *multifd_send_state;
+/*
+ * How we use multifd_send_state->pages and channel->pages?
+ *
+ * We create a pages for each channel, and a main one. Each time that
+ * we need to send a batch of pages we interchange the ones between
+ * multifd_send_state and the channel that is sending it. There are
+ * two reasons for that:
+ * - to not have to do so many mallocs during migration
+ * - to make easier to know what to free at the end of migration
+ *
+ * This way we always know who is the owner of each "pages" struct,
+ * and we don't need any loocking. It belongs to the migration thread
+ * or to the channel thread. Switching is safe because the migration
+ * thread is using the channel mutex when changing it, and the channel
+ * have to had finish with its own, otherwise pending_job can't be
+ * false.
+ */
+
+static void multifd_send_pages(void)
+{
+ int i;
+ static int next_channel;
+ MultiFDSendParams *p = NULL; /* make happy gcc */
+ MultiFDPages_t *pages = multifd_send_state->pages;
+ uint64_t transferred;
+
+ qemu_sem_wait(&multifd_send_state->channels_ready);
+ for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+ p = &multifd_send_state->params[i];
+
+ qemu_mutex_lock(&p->mutex);
+ if (!p->pending_job) {
+ p->pending_job++;
+ next_channel = (i + 1) % migrate_multifd_channels();
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ }
+ p->pages->used = 0;
+
+ p->packet_num = multifd_send_state->packet_num++;
+ p->pages->block = NULL;
+ multifd_send_state->pages = p->pages;
+ p->pages = pages;
+ transferred = pages->used * TARGET_PAGE_SIZE + p->packet_len;
+ ram_counters.multifd_bytes += transferred;
+ ram_counters.transferred += transferred;;
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem);
+}
+
+static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+{
+ MultiFDPages_t *pages = multifd_send_state->pages;
+
+ if (!pages->block) {
+ pages->block = block;
+ }
+
+ if (pages->block == block) {
+ pages->offset[pages->used] = offset;
+ pages->iov[pages->used].iov_base = block->host + offset;
+ pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+ pages->used++;
+
+ if (pages->used < pages->allocated) {
+ return;
+ }
+ }
+
+ multifd_send_pages();
+
+ if (pages->block != block) {
+ multifd_queue_page(block, offset);
+ }
+}
+
static void multifd_send_terminate_threads(Error *err)
{
int i;
@@ -670,33 +929,116 @@ int multifd_save_cleanup(Error **errp)
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
- }
+ multifd_pages_clear(p->pages);
+ p->pages = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
+ }
+ qemu_sem_destroy(&multifd_send_state->channels_ready);
+ qemu_sem_destroy(&multifd_send_state->sem_sync);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
+ multifd_pages_clear(multifd_send_state->pages);
+ multifd_send_state->pages = NULL;
g_free(multifd_send_state);
multifd_send_state = NULL;
return ret;
}
+static void multifd_send_sync_main(void)
+{
+ int i;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ if (multifd_send_state->pages->used) {
+ multifd_send_pages();
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ trace_multifd_send_sync_main_signal(p->id);
+
+ qemu_mutex_lock(&p->mutex);
+
+ p->packet_num = multifd_send_state->packet_num++;
+ p->flags |= MULTIFD_FLAG_SYNC;
+ p->pending_job++;
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem);
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ trace_multifd_send_sync_main_wait(p->id);
+ qemu_sem_wait(&multifd_send_state->sem_sync);
+ }
+ trace_multifd_send_sync_main(multifd_send_state->packet_num);
+}
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
Error *local_err = NULL;
+ int ret;
+
+ trace_multifd_send_thread_start(p->id);
if (multifd_send_initial_packet(p, &local_err) < 0) {
goto out;
}
+ /* initial packet */
+ p->num_packets = 1;
while (true) {
+ qemu_sem_wait(&p->sem);
qemu_mutex_lock(&p->mutex);
- if (p->quit) {
+
+ if (p->pending_job) {
+ uint32_t used = p->pages->used;
+ uint64_t packet_num = p->packet_num;
+ uint32_t flags = p->flags;
+
+ multifd_send_fill_packet(p);
+ p->flags = 0;
+ p->num_packets++;
+ p->num_pages += used;
+ p->pages->used = 0;
+ qemu_mutex_unlock(&p->mutex);
+
+ trace_multifd_send(p->id, packet_num, used, flags);
+
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ break;
+ }
+
+ ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+ if (ret != 0) {
+ break;
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->pending_job--;
+ qemu_mutex_unlock(&p->mutex);
+
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_send_state->sem_sync);
+ }
+ qemu_sem_post(&multifd_send_state->channels_ready);
+ } else if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
+ } else {
+ qemu_mutex_unlock(&p->mutex);
+ /* sometimes there are spurious wakeups */
}
- qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
}
out:
@@ -708,6 +1050,8 @@ out:
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+
return NULL;
}
@@ -735,6 +1079,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
int multifd_save_setup(void)
{
int thread_count;
+ uint32_t page_count = migrate_multifd_page_count();
uint8_t i;
if (!migrate_use_multifd()) {
@@ -744,13 +1089,23 @@ int multifd_save_setup(void)
multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
atomic_set(&multifd_send_state->count, 0);
+ multifd_send_state->pages = multifd_pages_init(page_count);
+ qemu_sem_init(&multifd_send_state->sem_sync, 0);
+ qemu_sem_init(&multifd_send_state->channels_ready, 0);
+
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
+ qemu_sem_init(&p->sem_sync, 0);
p->quit = false;
+ p->pending_job = 0;
p->id = i;
+ p->pages = multifd_pages_init(page_count);
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(ram_addr_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
p->name = g_strdup_printf("multifdsend_%d", i);
socket_send_channel_create(multifd_new_send_channel_async, p);
}
@@ -761,6 +1116,10 @@ struct {
MultiFDRecvParams *params;
/* number of created threads */
int count;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+ /* global number of generated multifd packets */
+ uint64_t packet_num;
} *multifd_recv_state;
static void multifd_recv_terminate_threads(Error *err)
@@ -781,8 +1140,11 @@ static void multifd_recv_terminate_threads(Error *err)
MultiFDRecvParams *p = &multifd_recv_state->params[i];
qemu_mutex_lock(&p->mutex);
- p->quit = true;
- qemu_sem_post(&p->sem);
+ /* We could arrive here for two reasons:
+ - normal quit, i.e. everything went fine, just finished
+ - error quit: We close the channels so the channel threads
+ finish the qio_channel_read_all_eof() */
+ qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
qemu_mutex_unlock(&p->mutex);
}
}
@@ -805,10 +1167,16 @@ int multifd_load_cleanup(Error **errp)
object_unref(OBJECT(p->c));
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
- qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
+ multifd_pages_clear(p->pages);
+ p->pages = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
}
+ qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
multifd_recv_state->params = NULL;
g_free(multifd_recv_state);
@@ -817,30 +1185,95 @@ int multifd_load_cleanup(Error **errp)
return ret;
}
+static void multifd_recv_sync_main(void)
+{
+ int i;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ trace_multifd_recv_sync_main_wait(p->id);
+ qemu_sem_wait(&multifd_recv_state->sem_sync);
+ qemu_mutex_lock(&p->mutex);
+ if (multifd_recv_state->packet_num < p->packet_num) {
+ multifd_recv_state->packet_num = p->packet_num;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ trace_multifd_recv_sync_main_signal(p->id);
+ qemu_sem_post(&p->sem_sync);
+ }
+ trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
+}
+
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
+ Error *local_err = NULL;
+ int ret;
+
+ trace_multifd_recv_thread_start(p->id);
while (true) {
+ uint32_t used;
+ uint32_t flags;
+
+ ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret == 0) { /* EOF */
+ break;
+ }
+ if (ret == -1) { /* Error */
+ break;
+ }
+
qemu_mutex_lock(&p->mutex);
- if (p->quit) {
+ ret = multifd_recv_unfill_packet(p, &local_err);
+ if (ret) {
qemu_mutex_unlock(&p->mutex);
break;
}
+
+ used = p->pages->used;
+ flags = p->flags;
+ trace_multifd_recv(p->id, p->packet_num, used, flags);
+ p->num_packets++;
+ p->num_pages += used;
qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+
+ ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+ if (ret != 0) {
+ break;
+ }
+
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_recv_state->sem_sync);
+ qemu_sem_wait(&p->sem_sync);
+ }
}
+ if (local_err) {
+ multifd_recv_terminate_threads(local_err);
+ }
qemu_mutex_lock(&p->mutex);
p->running = false;
qemu_mutex_unlock(&p->mutex);
+ trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
+
return NULL;
}
int multifd_load_setup(void)
{
int thread_count;
+ uint32_t page_count = migrate_multifd_page_count();
uint8_t i;
if (!migrate_use_multifd()) {
@@ -850,13 +1283,18 @@ int multifd_load_setup(void)
multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
atomic_set(&multifd_recv_state->count, 0);
+ qemu_sem_init(&multifd_recv_state->sem_sync, 0);
+
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
qemu_mutex_init(&p->mutex);
- qemu_sem_init(&p->sem, 0);
- p->quit = false;
+ qemu_sem_init(&p->sem_sync, 0);
p->id = i;
+ p->pages = multifd_pages_init(page_count);
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(ram_addr_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
p->name = g_strdup_printf("multifdrecv_%d", i);
}
return 0;
@@ -894,6 +1332,8 @@ void multifd_recv_new_channel(QIOChannel *ioc)
}
p->c = ioc;
object_ref(OBJECT(ioc));
+ /* initial packet */
+ p->num_packets = 1;
p->running = true;
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
@@ -1374,6 +1814,15 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
return pages;
}
+static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
+ ram_addr_t offset)
+{
+ multifd_queue_page(block, offset);
+ ram_counters.normal++;
+
+ return 1;
+}
+
static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset, uint8_t *source_buf)
{
@@ -1779,6 +2228,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
*/
if (block == rs->last_sent_block && save_page_use_compression(rs)) {
return compress_page_with_multi_thread(rs, block, offset);
+ } else if (migrate_use_multifd()) {
+ return ram_save_multifd_page(rs, block, offset);
}
return ram_save_page(rs, pss, last_stage);
@@ -2605,7 +3056,9 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
ram_control_before_iterate(f, RAM_CONTROL_SETUP);
ram_control_after_iterate(f, RAM_CONTROL_SETUP);
+ multifd_send_sync_main();
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+ qemu_fflush(f);
return 0;
}
@@ -2685,8 +3138,10 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
*/
ram_control_after_iterate(f, RAM_CONTROL_ROUND);
+ multifd_send_sync_main();
out:
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+ qemu_fflush(f);
ram_counters.transferred += 8;
ret = qemu_file_get_error(f);
@@ -2738,7 +3193,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
rcu_read_unlock();
+ multifd_send_sync_main();
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
+ qemu_fflush(f);
return 0;
}
@@ -3107,9 +3564,7 @@ static int ram_load_cleanup(void *opaque)
*/
int ram_postcopy_incoming_init(MigrationIncomingState *mis)
{
- unsigned long ram_pages = last_ram_page();
-
- return postcopy_ram_incoming_init(mis, ram_pages);
+ return postcopy_ram_incoming_init(mis);
}
/**
@@ -3227,6 +3682,7 @@ static int ram_load_postcopy(QEMUFile *f)
break;
case RAM_SAVE_FLAG_EOS:
/* normal exit */
+ multifd_recv_sync_main();
break;
default:
error_report("Unknown combination of migration flags: %#x"
@@ -3415,6 +3871,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
case RAM_SAVE_FLAG_EOS:
/* normal exit */
+ multifd_recv_sync_main();
break;
default:
if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 7ea522e453..9430f3cbe0 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -76,6 +76,18 @@ get_queued_page_not_dirty(const char *block_name, uint64_t tmp_offset, unsigned
migration_bitmap_sync_start(void) ""
migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
migration_throttle(void) ""
+multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet number %" PRIu64 " pages %d flags 0x%x"
+multifd_recv_sync_main(long packet_num) "packet num %ld"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
+multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
+multifd_recv_thread_start(uint8_t id) "%d"
+multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags) "channel %d packet_num %" PRIu64 " pages %d flags 0x%x"
+multifd_send_sync_main(long packet_num) "packet num %ld"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
+multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %d packets %" PRIu64 " pages %" PRIu64
+multifd_send_thread_start(uint8_t id) "%d"
ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
ram_load_loop(const char *rbname, uint64_t addr, int flags, void *host) "%s: addr: 0x%" PRIx64 " flags: 0x%x host: %p"
ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"