aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/migration.txt4
-rw-r--r--hmp-commands.hx2
-rw-r--r--hmp.c57
-rw-r--r--hw/s390x/s390-skeys.c26
-rw-r--r--include/migration/migration.h26
-rw-r--r--include/migration/qemu-file.h57
-rw-r--r--include/qapi/error.h2
-rw-r--r--include/qemu/typedefs.h1
-rw-r--r--include/sysemu/sysemu.h2
-rw-r--r--io/channel-buffer.c1
-rw-r--r--migration/Makefile.objs7
-rw-r--r--migration/exec.c62
-rw-r--r--migration/fd.c75
-rw-r--r--migration/migration.c157
-rw-r--r--migration/qemu-file-buf.c464
-rw-r--r--migration/qemu-file-channel.c180
-rw-r--r--migration/qemu-file-internal.h53
-rw-r--r--migration/qemu-file-stdio.c196
-rw-r--r--migration/qemu-file-unix.c323
-rw-r--r--migration/qemu-file.c110
-rw-r--r--migration/ram.c6
-rw-r--r--migration/rdma.c380
-rw-r--r--migration/savevm.c63
-rw-r--r--migration/socket.c183
-rw-r--r--migration/tcp.c102
-rw-r--r--migration/tls.c161
-rw-r--r--migration/unix.c103
-rw-r--r--qapi-schema.json65
-rw-r--r--tests/Makefile6
-rw-r--r--tests/test-vmstate.c55
-rw-r--r--trace-events25
-rw-r--r--util/error.c2
32 files changed, 1281 insertions, 1675 deletions
diff --git a/docs/migration.txt b/docs/migration.txt
index 90209ab294..6503c17685 100644
--- a/docs/migration.txt
+++ b/docs/migration.txt
@@ -403,8 +403,8 @@ listen thread: --- page -- page -- page -- page -- page --
On receipt of CMD_PACKAGED (1)
All the data associated with the package - the ( ... ) section in the
-diagram - is read into memory (into a QEMUSizedBuffer), and the main thread
-recurses into qemu_loadvm_state_main to process the contents of the package (2)
+diagram - is read into memory, and the main thread recurses into
+qemu_loadvm_state_main to process the contents of the package (2)
which contains commands (3,6) and devices (4...)
On receipt of 'postcopy listen' - 3 -(i.e. the 1st command in the package)
diff --git a/hmp-commands.hx b/hmp-commands.hx
index 4f4f60a0df..98b4b1a82c 100644
--- a/hmp-commands.hx
+++ b/hmp-commands.hx
@@ -1008,7 +1008,7 @@ ETEXI
{
.name = "migrate_set_parameter",
- .args_type = "parameter:s,value:i",
+ .args_type = "parameter:s,value:s",
.params = "parameter value",
.help = "Set the parameter for migration",
.mhandler.cmd = hmp_migrate_set_parameter,
diff --git a/hmp.c b/hmp.c
index 9f9bcf9d83..a4b1d3d220 100644
--- a/hmp.c
+++ b/hmp.c
@@ -35,6 +35,7 @@
#include "block/qapi.h"
#include "qemu-io.h"
#include "qemu/cutils.h"
+#include "qemu/error-report.h"
#ifdef CONFIG_SPICE
#include <spice/enums.h>
@@ -168,8 +169,15 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
}
if (info->has_status) {
- monitor_printf(mon, "Migration status: %s\n",
+ monitor_printf(mon, "Migration status: %s",
MigrationStatus_lookup[info->status]);
+ if (info->status == MIGRATION_STATUS_FAILED &&
+ info->has_error_desc) {
+ monitor_printf(mon, " (%s)\n", info->error_desc);
+ } else {
+ monitor_printf(mon, "\n");
+ }
+
monitor_printf(mon, "total time: %" PRIu64 " milliseconds\n",
info->total_time);
if (info->has_expected_downtime) {
@@ -286,6 +294,12 @@ void hmp_info_migrate_parameters(Monitor *mon, const QDict *qdict)
monitor_printf(mon, " %s: %" PRId64,
MigrationParameter_lookup[MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT],
params->cpu_throttle_increment);
+ monitor_printf(mon, " %s: '%s'",
+ MigrationParameter_lookup[MIGRATION_PARAMETER_TLS_CREDS],
+ params->tls_creds ? : "");
+ monitor_printf(mon, " %s: '%s'",
+ MigrationParameter_lookup[MIGRATION_PARAMETER_TLS_HOSTNAME],
+ params->tls_hostname ? : "");
monitor_printf(mon, "\n");
}
@@ -1235,13 +1249,17 @@ void hmp_migrate_set_capability(Monitor *mon, const QDict *qdict)
void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
{
const char *param = qdict_get_str(qdict, "parameter");
- int value = qdict_get_int(qdict, "value");
+ const char *valuestr = qdict_get_str(qdict, "value");
+ long valueint = 0;
Error *err = NULL;
bool has_compress_level = false;
bool has_compress_threads = false;
bool has_decompress_threads = false;
bool has_cpu_throttle_initial = false;
bool has_cpu_throttle_increment = false;
+ bool has_tls_creds = false;
+ bool has_tls_hostname = false;
+ bool use_int_value = false;
int i;
for (i = 0; i < MIGRATION_PARAMETER__MAX; i++) {
@@ -1249,25 +1267,46 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
switch (i) {
case MIGRATION_PARAMETER_COMPRESS_LEVEL:
has_compress_level = true;
+ use_int_value = true;
break;
case MIGRATION_PARAMETER_COMPRESS_THREADS:
has_compress_threads = true;
+ use_int_value = true;
break;
case MIGRATION_PARAMETER_DECOMPRESS_THREADS:
has_decompress_threads = true;
+ use_int_value = true;
break;
case MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL:
has_cpu_throttle_initial = true;
+ use_int_value = true;
break;
case MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT:
has_cpu_throttle_increment = true;
break;
+ case MIGRATION_PARAMETER_TLS_CREDS:
+ has_tls_creds = true;
+ break;
+ case MIGRATION_PARAMETER_TLS_HOSTNAME:
+ has_tls_hostname = true;
+ break;
}
- qmp_migrate_set_parameters(has_compress_level, value,
- has_compress_threads, value,
- has_decompress_threads, value,
- has_cpu_throttle_initial, value,
- has_cpu_throttle_increment, value,
+
+ if (use_int_value) {
+ if (qemu_strtol(valuestr, NULL, 10, &valueint) < 0) {
+ error_setg(&err, "Unable to parse '%s' as an int",
+ valuestr);
+ goto cleanup;
+ }
+ }
+
+ qmp_migrate_set_parameters(has_compress_level, valueint,
+ has_compress_threads, valueint,
+ has_decompress_threads, valueint,
+ has_cpu_throttle_initial, valueint,
+ has_cpu_throttle_increment, valueint,
+ has_tls_creds, valuestr,
+ has_tls_hostname, valuestr,
&err);
break;
}
@@ -1277,6 +1316,7 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
error_setg(&err, QERR_INVALID_PARAMETER, param);
}
+ cleanup:
if (err) {
error_report_err(err);
}
@@ -1533,6 +1573,9 @@ static void hmp_migrate_status_cb(void *opaque)
if (status->is_block_migration) {
monitor_printf(status->mon, "\n");
}
+ if (info->has_error_desc) {
+ error_report("%s", info->error_desc);
+ }
monitor_resume(status->mon);
timer_del(status->timer);
g_free(status);
diff --git a/hw/s390x/s390-skeys.c b/hw/s390x/s390-skeys.c
index d772cfc7ea..e2d4e1af79 100644
--- a/hw/s390x/s390-skeys.c
+++ b/hw/s390x/s390-skeys.c
@@ -47,15 +47,11 @@ void s390_skeys_init(void)
qdev_init_nofail(DEVICE(obj));
}
-static void write_keys(QEMUFile *f, uint8_t *keys, uint64_t startgfn,
+static void write_keys(FILE *f, uint8_t *keys, uint64_t startgfn,
uint64_t count, Error **errp)
{
uint64_t curpage = startgfn;
uint64_t maxpage = curpage + count - 1;
- const char *fmt = "page=%03" PRIx64 ": key(%d) => ACC=%X, FP=%d, REF=%d,"
- " ch=%d, reserved=%d\n";
- char buf[128];
- int len;
for (; curpage <= maxpage; curpage++) {
uint8_t acc = (*keys & 0xF0) >> 4;
@@ -64,10 +60,9 @@ static void write_keys(QEMUFile *f, uint8_t *keys, uint64_t startgfn,
int ch = (*keys & 0x02);
int res = (*keys & 0x01);
- len = snprintf(buf, sizeof(buf), fmt, curpage,
- *keys, acc, fp, ref, ch, res);
- assert(len < sizeof(buf));
- qemu_put_buffer(f, (uint8_t *)buf, len);
+ fprintf(f, "page=%03" PRIx64 ": key(%d) => ACC=%X, FP=%d, REF=%d,"
+ " ch=%d, reserved=%d\n",
+ curpage, *keys, acc, fp, ref, ch, res);
keys++;
}
}
@@ -116,7 +111,8 @@ void qmp_dump_skeys(const char *filename, Error **errp)
vaddr cur_gfn = 0;
uint8_t *buf;
int ret;
- QEMUFile *f;
+ int fd;
+ FILE *f;
/* Quick check to see if guest is using storage keys*/
if (!skeyclass->skeys_enabled(ss)) {
@@ -125,8 +121,14 @@ void qmp_dump_skeys(const char *filename, Error **errp)
return;
}
- f = qemu_fopen(filename, "wb");
+ fd = qemu_open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0600);
+ if (fd < 0) {
+ error_setg_file_open(errp, errno, filename);
+ return;
+ }
+ f = fdopen(fd, "wb");
if (!f) {
+ close(fd);
error_setg_file_open(errp, errno, filename);
return;
}
@@ -162,7 +164,7 @@ out_free:
error_propagate(errp, lerr);
g_free(buf);
out:
- qemu_fclose(f);
+ fclose(f);
}
static void qemu_s390_skeys_init(Object *obj)
diff --git a/include/migration/migration.h b/include/migration/migration.h
index 9e36a97fc5..13b12b7e87 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -135,9 +135,12 @@ struct MigrationState
QemuThread thread;
QEMUBH *cleanup_bh;
QEMUFile *to_dst_file;
- int parameters[MIGRATION_PARAMETER__MAX];
+
+ /* New style params from 'migrate-set-parameters' */
+ MigrationParameters parameters;
int state;
+ /* Old style params from 'migrate' command */
MigrationParams params;
/* State related to return path */
@@ -171,6 +174,9 @@ struct MigrationState
QSIMPLEQ_HEAD(src_page_requests, MigrationSrcPageRequest) src_page_requests;
/* The RAMBlock used in the last src_page_request */
RAMBlock *last_req_rb;
+
+ /* The last error that occurred */
+ Error *error;
};
void migrate_set_state(int *state, int old_state, int new_state);
@@ -179,6 +185,22 @@ void process_incoming_migration(QEMUFile *f);
void qemu_start_incoming_migration(const char *uri, Error **errp);
+void migration_set_incoming_channel(MigrationState *s,
+ QIOChannel *ioc);
+
+void migration_tls_set_incoming_channel(MigrationState *s,
+ QIOChannel *ioc,
+ Error **errp);
+
+void migration_set_outgoing_channel(MigrationState *s,
+ QIOChannel *ioc,
+ const char *hostname);
+
+void migration_tls_set_outgoing_channel(MigrationState *s,
+ QIOChannel *ioc,
+ const char *hostname,
+ Error **errp);
+
uint64_t migrate_max_downtime(void);
void exec_start_incoming_migration(const char *host_port, Error **errp);
@@ -201,7 +223,7 @@ void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **
void rdma_start_incoming_migration(const char *host_port, Error **errp);
-void migrate_fd_error(MigrationState *s);
+void migrate_fd_error(MigrationState *s, const Error *error);
void migrate_fd_connect(MigrationState *s);
diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 3f6b4ed581..2409a98967 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -23,16 +23,11 @@
*/
#ifndef QEMU_FILE_H
#define QEMU_FILE_H 1
+#include "qemu-common.h"
#include "exec/cpu-common.h"
+#include "io/channel.h"
-/* This function writes a chunk of data to a file at the given position.
- * The pos argument can be ignored if the file is only being used for
- * streaming. The handler should try to write all of the data it can.
- */
-typedef ssize_t (QEMUFilePutBufferFunc)(void *opaque, const uint8_t *buf,
- int64_t pos, size_t size);
-
/* Read a chunk of data from a file at the given position. The pos argument
* can be ignored if the file is only be used for streaming. The number of
* bytes actually read should be returned.
@@ -53,8 +48,13 @@ typedef int (QEMUFileCloseFunc)(void *opaque);
*/
typedef int (QEMUFileGetFD)(void *opaque);
+/* Called to change the blocking mode of the file
+ */
+typedef int (QEMUFileSetBlocking)(void *opaque, bool enabled);
+
/*
- * This function writes an iovec to file.
+ * This function writes an iovec to file. The handler must write all
+ * of the data or return a negative errno value.
*/
typedef ssize_t (QEMUFileWritevBufferFunc)(void *opaque, struct iovec *iov,
int iovcnt, int64_t pos);
@@ -101,32 +101,25 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr);
typedef struct QEMUFileOps {
- QEMUFilePutBufferFunc *put_buffer;
QEMUFileGetBufferFunc *get_buffer;
QEMUFileCloseFunc *close;
- QEMUFileGetFD *get_fd;
+ QEMUFileSetBlocking *set_blocking;
QEMUFileWritevBufferFunc *writev_buffer;
- QEMURamHookFunc *before_ram_iterate;
- QEMURamHookFunc *after_ram_iterate;
- QEMURamHookFunc *hook_ram_load;
- QEMURamSaveFunc *save_page;
QEMURetPathFunc *get_return_path;
QEMUFileShutdownFunc *shut_down;
} QEMUFileOps;
-struct QEMUSizedBuffer {
- struct iovec *iov;
- size_t n_iov;
- size_t size; /* total allocated size in all iov's */
- size_t used; /* number of used bytes */
-};
+typedef struct QEMUFileHooks {
+ QEMURamHookFunc *before_ram_iterate;
+ QEMURamHookFunc *after_ram_iterate;
+ QEMURamHookFunc *hook_ram_load;
+ QEMURamSaveFunc *save_page;
+} QEMUFileHooks;
QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
-QEMUFile *qemu_fopen(const char *filename, const char *mode);
-QEMUFile *qemu_fdopen(int fd, const char *mode);
-QEMUFile *qemu_fopen_socket(int fd, const char *mode);
-QEMUFile *qemu_popen_cmd(const char *command, const char *mode);
-QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input);
+QEMUFile *qemu_fopen_channel_input(QIOChannel *ioc);
+QEMUFile *qemu_fopen_channel_output(QIOChannel *ioc);
+void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks);
int qemu_get_fd(QEMUFile *f);
int qemu_fclose(QEMUFile *f);
int64_t qemu_ftell(QEMUFile *f);
@@ -141,20 +134,6 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size);
bool qemu_file_mode_is_not_valid(const char *mode);
bool qemu_file_is_writable(QEMUFile *f);
-QEMUSizedBuffer *qsb_create(const uint8_t *buffer, size_t len);
-void qsb_free(QEMUSizedBuffer *);
-size_t qsb_set_length(QEMUSizedBuffer *qsb, size_t length);
-size_t qsb_get_length(const QEMUSizedBuffer *qsb);
-ssize_t qsb_get_buffer(const QEMUSizedBuffer *, off_t start, size_t count,
- uint8_t *buf);
-ssize_t qsb_write_at(QEMUSizedBuffer *qsb, const uint8_t *buf,
- off_t pos, size_t count);
-
-
-/*
- * For use on files opened with qemu_bufopen
- */
-const QEMUSizedBuffer *qemu_buf_get(QEMUFile *f);
static inline void qemu_put_ubyte(QEMUFile *f, unsigned int v)
{
diff --git a/include/qapi/error.h b/include/qapi/error.h
index 11be2327c0..0576659603 100644
--- a/include/qapi/error.h
+++ b/include/qapi/error.h
@@ -134,7 +134,7 @@ typedef enum ErrorClass {
/*
* Get @err's human-readable error message.
*/
-const char *error_get_pretty(Error *err);
+const char *error_get_pretty(const Error *err);
/*
* Get @err's error class.
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 1dcf6f5d53..b113fcf156 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -82,7 +82,6 @@ typedef struct QemuOpt QemuOpt;
typedef struct QemuOpts QemuOpts;
typedef struct QemuOptsList QemuOptsList;
typedef struct QEMUSGList QEMUSGList;
-typedef struct QEMUSizedBuffer QEMUSizedBuffer;
typedef struct QEMUTimer QEMUTimer;
typedef struct QEMUTimerListGroup QEMUTimerListGroup;
typedef struct QObject QObject;
diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
index 618169c4d5..94281413d0 100644
--- a/include/sysemu/sysemu.h
+++ b/include/sysemu/sysemu.h
@@ -119,7 +119,7 @@ void qemu_savevm_command_send(QEMUFile *f, enum qemu_vm_cmd command,
uint16_t len, uint8_t *data);
void qemu_savevm_send_ping(QEMUFile *f, uint32_t value);
void qemu_savevm_send_open_return_path(QEMUFile *f);
-int qemu_savevm_send_packaged(QEMUFile *f, const QEMUSizedBuffer *qsb);
+int qemu_savevm_send_packaged(QEMUFile *f, const uint8_t *buf, size_t len);
void qemu_savevm_send_postcopy_advise(QEMUFile *f);
void qemu_savevm_send_postcopy_listen(QEMUFile *f);
void qemu_savevm_send_postcopy_run(QEMUFile *f);
diff --git a/io/channel-buffer.c b/io/channel-buffer.c
index 3e5117bf28..43d795976d 100644
--- a/io/channel-buffer.c
+++ b/io/channel-buffer.c
@@ -140,6 +140,7 @@ static int qio_channel_buffer_close(QIOChannel *ioc,
QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);
g_free(bioc->data);
+ bioc->data = NULL;
bioc->capacity = bioc->usage = bioc->offset = 0;
return 0;
diff --git a/migration/Makefile.objs b/migration/Makefile.objs
index d25ff483eb..30ad945918 100644
--- a/migration/Makefile.objs
+++ b/migration/Makefile.objs
@@ -1,11 +1,12 @@
-common-obj-y += migration.o tcp.o
+common-obj-y += migration.o socket.o fd.o exec.o
+common-obj-y += tls.o
common-obj-y += vmstate.o
-common-obj-y += qemu-file.o qemu-file-buf.o qemu-file-unix.o qemu-file-stdio.o
+common-obj-y += qemu-file.o
+common-obj-y += qemu-file-channel.o
common-obj-y += xbzrle.o postcopy-ram.o
common-obj-y += qjson.o
common-obj-$(CONFIG_RDMA) += rdma.o
-common-obj-$(CONFIG_POSIX) += exec.o unix.o fd.o
common-obj-y += block.o
diff --git a/migration/exec.c b/migration/exec.c
index 559420969b..1515cc3319 100644
--- a/migration/exec.c
+++ b/migration/exec.c
@@ -3,10 +3,12 @@
*
* Copyright IBM, Corp. 2008
* Copyright Dell MessageOne 2008
+ * Copyright Red Hat, Inc. 2015-2016
*
* Authors:
* Anthony Liguori <aliguori@us.ibm.com>
* Charles Duffy <charles_duffy@messageone.com>
+ * Daniel P. Berrange <berrange@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2. See
* the COPYING file in the top-level directory.
@@ -18,53 +20,53 @@
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "qemu-common.h"
-#include "qemu/sockets.h"
-#include "qemu/main-loop.h"
#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "block/block.h"
-#include <sys/wait.h>
+#include "io/channel-command.h"
+#include "trace.h"
-//#define DEBUG_MIGRATION_EXEC
-
-#ifdef DEBUG_MIGRATION_EXEC
-#define DPRINTF(fmt, ...) \
- do { printf("migration-exec: " fmt, ## __VA_ARGS__); } while (0)
-#else
-#define DPRINTF(fmt, ...) \
- do { } while (0)
-#endif
void exec_start_outgoing_migration(MigrationState *s, const char *command, Error **errp)
{
- s->to_dst_file = qemu_popen_cmd(command, "w");
- if (s->to_dst_file == NULL) {
- error_setg_errno(errp, errno, "failed to popen the migration target");
+ QIOChannel *ioc;
+ const char *argv[] = { "/bin/sh", "-c", command, NULL };
+
+ trace_migration_exec_outgoing(command);
+ ioc = QIO_CHANNEL(qio_channel_command_new_spawn(argv,
+ O_WRONLY,
+ errp));
+ if (!ioc) {
return;
}
- migrate_fd_connect(s);
+ migration_set_outgoing_channel(s, ioc, NULL);
+ object_unref(OBJECT(ioc));
}
-static void exec_accept_incoming_migration(void *opaque)
+static gboolean exec_accept_incoming_migration(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
{
- QEMUFile *f = opaque;
-
- qemu_set_fd_handler(qemu_get_fd(f), NULL, NULL, NULL);
- process_incoming_migration(f);
+ migration_set_incoming_channel(migrate_get_current(), ioc);
+ object_unref(OBJECT(ioc));
+ return FALSE; /* unregister */
}
void exec_start_incoming_migration(const char *command, Error **errp)
{
- QEMUFile *f;
+ QIOChannel *ioc;
+ const char *argv[] = { "/bin/sh", "-c", command, NULL };
- DPRINTF("Attempting to start an incoming migration\n");
- f = qemu_popen_cmd(command, "r");
- if(f == NULL) {
- error_setg_errno(errp, errno, "failed to popen the migration source");
+ trace_migration_exec_incoming(command);
+ ioc = QIO_CHANNEL(qio_channel_command_new_spawn(argv,
+ O_RDONLY,
+ errp));
+ if (!ioc) {
return;
}
- qemu_set_fd_handler(qemu_get_fd(f), exec_accept_incoming_migration, NULL,
- f);
+ qio_channel_add_watch(ioc,
+ G_IO_IN,
+ exec_accept_incoming_migration,
+ NULL,
+ NULL);
}
diff --git a/migration/fd.c b/migration/fd.c
index 3d788bb297..fc5c9eee02 100644
--- a/migration/fd.c
+++ b/migration/fd.c
@@ -1,10 +1,11 @@
/*
* QEMU live migration via generic fd
*
- * Copyright Red Hat, Inc. 2009
+ * Copyright Red Hat, Inc. 2009-2016
*
* Authors:
* Chris Lalancette <clalance@redhat.com>
+ * Daniel P. Berrange <berrange@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2. See
* the COPYING file in the top-level directory.
@@ -16,75 +17,57 @@
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "qemu-common.h"
-#include "qemu/main-loop.h"
-#include "qemu/sockets.h"
#include "migration/migration.h"
#include "monitor/monitor.h"
-#include "migration/qemu-file.h"
-#include "block/block.h"
+#include "io/channel-util.h"
+#include "trace.h"
-//#define DEBUG_MIGRATION_FD
-
-#ifdef DEBUG_MIGRATION_FD
-#define DPRINTF(fmt, ...) \
- do { printf("migration-fd: " fmt, ## __VA_ARGS__); } while (0)
-#else
-#define DPRINTF(fmt, ...) \
- do { } while (0)
-#endif
-
-static bool fd_is_socket(int fd)
-{
- struct stat stat;
- int ret = fstat(fd, &stat);
- if (ret == -1) {
- /* When in doubt say no */
- return false;
- }
- return S_ISSOCK(stat.st_mode);
-}
void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp)
{
+ QIOChannel *ioc;
int fd = monitor_get_fd(cur_mon, fdname, errp);
if (fd == -1) {
return;
}
- if (fd_is_socket(fd)) {
- s->to_dst_file = qemu_fopen_socket(fd, "wb");
- } else {
- s->to_dst_file = qemu_fdopen(fd, "wb");
+ trace_migration_fd_outgoing(fd);
+ ioc = qio_channel_new_fd(fd, errp);
+ if (!ioc) {
+ close(fd);
+ return;
}
- migrate_fd_connect(s);
+ migration_set_outgoing_channel(s, ioc, NULL);
+ object_unref(OBJECT(ioc));
}
-static void fd_accept_incoming_migration(void *opaque)
+static gboolean fd_accept_incoming_migration(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
{
- QEMUFile *f = opaque;
-
- qemu_set_fd_handler(qemu_get_fd(f), NULL, NULL, NULL);
- process_incoming_migration(f);
+ migration_set_incoming_channel(migrate_get_current(), ioc);
+ object_unref(OBJECT(ioc));
+ return FALSE; /* unregister */
}
void fd_start_incoming_migration(const char *infd, Error **errp)
{
+ QIOChannel *ioc;
int fd;
- QEMUFile *f;
-
- DPRINTF("Attempting to start an incoming migration via fd\n");
fd = strtol(infd, NULL, 0);
- if (fd_is_socket(fd)) {
- f = qemu_fopen_socket(fd, "rb");
- } else {
- f = qemu_fdopen(fd, "rb");
- }
- if(f == NULL) {
- error_setg_errno(errp, errno, "failed to open the source descriptor");
+ trace_migration_fd_incoming(fd);
+
+ ioc = qio_channel_new_fd(fd, errp);
+ if (!ioc) {
+ close(fd);
return;
}
- qemu_set_fd_handler(fd, fd_accept_incoming_migration, NULL, f);
+ qio_channel_add_watch(ioc,
+ G_IO_IN,
+ fd_accept_incoming_migration,
+ NULL,
+ NULL);
}
diff --git a/migration/migration.c b/migration/migration.c
index f5327e8c0a..7ecbadee6f 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -34,6 +34,8 @@
#include "qom/cpu.h"
#include "exec/memory.h"
#include "exec/address-spaces.h"
+#include "io/channel-buffer.h"
+#include "io/channel-tls.h"
#define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */
@@ -81,16 +83,13 @@ MigrationState *migrate_get_current(void)
.bandwidth_limit = MAX_THROTTLE,
.xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
.mbps = -1,
- .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
- DEFAULT_MIGRATE_COMPRESS_LEVEL,
- .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
- DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
- .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
- DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
- .parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL] =
- DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL,
- .parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT] =
- DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
+ .parameters = {
+ .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
+ .compress_threads = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+ .decompress_threads = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
+ .cpu_throttle_initial = DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL,
+ .cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
+ },
};
if (!once) {
@@ -310,14 +309,12 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
} else if (strstart(uri, "rdma:", &p)) {
rdma_start_incoming_migration(p, errp);
#endif
-#if !defined(WIN32)
} else if (strstart(uri, "exec:", &p)) {
exec_start_incoming_migration(p, errp);
} else if (strstart(uri, "unix:", &p)) {
unix_start_incoming_migration(p, errp);
} else if (strstart(uri, "fd:", &p)) {
fd_start_incoming_migration(p, errp);
-#endif
} else {
error_setg(errp, "unknown migration protocol: %s", uri);
}
@@ -422,14 +419,60 @@ static void process_incoming_migration_co(void *opaque)
void process_incoming_migration(QEMUFile *f)
{
Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
- int fd = qemu_get_fd(f);
- assert(fd != -1);
migrate_decompress_threads_create();
- qemu_set_nonblock(fd);
+ qemu_file_set_blocking(f, false);
qemu_coroutine_enter(co, f);
}
+
+void migration_set_incoming_channel(MigrationState *s,
+ QIOChannel *ioc)
+{
+ trace_migration_set_incoming_channel(
+ ioc, object_get_typename(OBJECT(ioc)));
+
+ if (s->parameters.tls_creds &&
+ !object_dynamic_cast(OBJECT(ioc),
+ TYPE_QIO_CHANNEL_TLS)) {
+ Error *local_err = NULL;
+ migration_tls_set_incoming_channel(s, ioc, &local_err);
+ if (local_err) {
+ error_report_err(local_err);
+ }
+ } else {
+ QEMUFile *f = qemu_fopen_channel_input(ioc);
+ process_incoming_migration(f);
+ }
+}
+
+
+void migration_set_outgoing_channel(MigrationState *s,
+ QIOChannel *ioc,
+ const char *hostname)
+{
+ trace_migration_set_outgoing_channel(
+ ioc, object_get_typename(OBJECT(ioc)), hostname);
+
+ if (s->parameters.tls_creds &&
+ !object_dynamic_cast(OBJECT(ioc),
+ TYPE_QIO_CHANNEL_TLS)) {
+ Error *local_err = NULL;
+ migration_tls_set_outgoing_channel(s, ioc, hostname, &local_err);
+ if (local_err) {
+ migrate_fd_error(s, local_err);
+ error_free(local_err);
+ }
+ } else {
+ QEMUFile *f = qemu_fopen_channel_output(ioc);
+
+ s->to_dst_file = f;
+
+ migrate_fd_connect(s);
+ }
+}
+
+
/*
* Send a message on the return channel back to the source
* of the migration.
@@ -516,15 +559,13 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
MigrationState *s = migrate_get_current();
params = g_malloc0(sizeof(*params));
- params->compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
- params->compress_threads =
- s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
- params->decompress_threads =
- s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
- params->cpu_throttle_initial =
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL];
- params->cpu_throttle_increment =
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT];
+ params->compress_level = s->parameters.compress_level;
+ params->compress_threads = s->parameters.compress_threads;
+ params->decompress_threads = s->parameters.decompress_threads;
+ params->cpu_throttle_initial = s->parameters.cpu_throttle_initial;
+ params->cpu_throttle_increment = s->parameters.cpu_throttle_increment;
+ params->tls_creds = g_strdup(s->parameters.tls_creds);
+ params->tls_hostname = g_strdup(s->parameters.tls_hostname);
return params;
}
@@ -672,6 +713,10 @@ MigrationInfo *qmp_query_migrate(Error **errp)
break;
case MIGRATION_STATUS_FAILED:
info->has_status = true;
+ if (s->error) {
+ info->has_error_desc = true;
+ info->error_desc = g_strdup(error_get_pretty(s->error));
+ }
break;
case MIGRATION_STATUS_CANCELLED:
info->has_status = true;
@@ -721,7 +766,12 @@ void qmp_migrate_set_parameters(bool has_compress_level,
bool has_cpu_throttle_initial,
int64_t cpu_throttle_initial,
bool has_cpu_throttle_increment,
- int64_t cpu_throttle_increment, Error **errp)
+ int64_t cpu_throttle_increment,
+ bool has_tls_creds,
+ const char *tls_creds,
+ bool has_tls_hostname,
+ const char *tls_hostname,
+ Error **errp)
{
MigrationState *s = migrate_get_current();
@@ -758,26 +808,31 @@ void qmp_migrate_set_parameters(bool has_compress_level,
}
if (has_compress_level) {
- s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
+ s->parameters.compress_level = compress_level;
}
if (has_compress_threads) {
- s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = compress_threads;
+ s->parameters.compress_threads = compress_threads;
}
if (has_decompress_threads) {
- s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
- decompress_threads;
+ s->parameters.decompress_threads = decompress_threads;
}
if (has_cpu_throttle_initial) {
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL] =
- cpu_throttle_initial;
+ s->parameters.cpu_throttle_initial = cpu_throttle_initial;
}
-
if (has_cpu_throttle_increment) {
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT] =
- cpu_throttle_increment;
+ s->parameters.cpu_throttle_increment = cpu_throttle_increment;
+ }
+ if (has_tls_creds) {
+ g_free(s->parameters.tls_creds);
+ s->parameters.tls_creds = g_strdup(tls_creds);
+ }
+ if (has_tls_hostname) {
+ g_free(s->parameters.tls_hostname);
+ s->parameters.tls_hostname = g_strdup(tls_hostname);
}
}
+
void qmp_migrate_start_postcopy(Error **errp)
{
MigrationState *s = migrate_get_current();
@@ -844,12 +899,15 @@ static void migrate_fd_cleanup(void *opaque)
notifier_list_notify(&migration_state_notifiers, s);
}
-void migrate_fd_error(MigrationState *s)
+void migrate_fd_error(MigrationState *s, const Error *error)
{
- trace_migrate_fd_error();
+ trace_migrate_fd_error(error ? error_get_pretty(error) : "");
assert(s->to_dst_file == NULL);
migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
MIGRATION_STATUS_FAILED);
+ if (!s->error) {
+ s->error = error_copy(error);
+ }
notifier_list_notify(&migration_state_notifiers, s);
}
@@ -948,6 +1006,8 @@ MigrationState *migrate_init(const MigrationParams *params)
s->postcopy_after_devices = false;
s->migration_thread_running = false;
s->last_req_rb = NULL;
+ error_free(s->error);
+ s->error = NULL;
migrate_set_state(&s->state, MIGRATION_STATUS_NONE, MIGRATION_STATUS_SETUP);
@@ -1040,14 +1100,12 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
} else if (strstart(uri, "rdma:", &p)) {
rdma_start_outgoing_migration(s, p, &local_err);
#endif
-#if !defined(WIN32)
} else if (strstart(uri, "exec:", &p)) {
exec_start_outgoing_migration(s, p, &local_err);
} else if (strstart(uri, "unix:", &p)) {
unix_start_outgoing_migration(s, p, &local_err);
} else if (strstart(uri, "fd:", &p)) {
fd_start_outgoing_migration(s, p, &local_err);
-#endif
} else {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "uri",
"a valid migration protocol");
@@ -1057,7 +1115,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
}
if (local_err) {
- migrate_fd_error(s);
+ migrate_fd_error(s, local_err);
error_propagate(errp, local_err);
return;
}
@@ -1170,7 +1228,7 @@ int migrate_compress_level(void)
s = migrate_get_current();
- return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
+ return s->parameters.compress_level;
}
int migrate_compress_threads(void)
@@ -1179,7 +1237,7 @@ int migrate_compress_threads(void)
s = migrate_get_current();
- return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
+ return s->parameters.compress_threads;
}
int migrate_decompress_threads(void)
@@ -1188,7 +1246,7 @@ int migrate_decompress_threads(void)
s = migrate_get_current();
- return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
+ return s->parameters.decompress_threads;
}
bool migrate_use_events(void)
@@ -1429,7 +1487,8 @@ static int await_return_path_close_on_source(MigrationState *ms)
static int postcopy_start(MigrationState *ms, bool *old_vm_running)
{
int ret;
- const QEMUSizedBuffer *qsb;
+ QIOChannelBuffer *bioc;
+ QEMUFile *fb;
int64_t time_at_stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
migrate_set_state(&ms->state, MIGRATION_STATUS_ACTIVE,
MIGRATION_STATUS_POSTCOPY_ACTIVE);
@@ -1488,11 +1547,9 @@ static int postcopy_start(MigrationState *ms, bool *old_vm_running)
* So we wrap the device state up in a package with a length at the start;
* to do this we use a qemu_buf to hold the whole of the device state.
*/
- QEMUFile *fb = qemu_bufopen("w", NULL);
- if (!fb) {
- error_report("Failed to create buffered file");
- goto fail;
- }
+ bioc = qio_channel_buffer_new(4096);
+ fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+ object_unref(OBJECT(bioc));
/*
* Make sure the receiver can get incoming pages before we send the rest
@@ -1506,10 +1563,9 @@ static int postcopy_start(MigrationState *ms, bool *old_vm_running)
qemu_savevm_send_postcopy_run(fb);
/* <><> end of stuff going into the package */
- qsb = qemu_buf_get(fb);
/* Now send that blob */
- if (qemu_savevm_send_packaged(ms->to_dst_file, qsb)) {
+ if (qemu_savevm_send_packaged(ms->to_dst_file, bioc->data, bioc->usage)) {
goto fail_closefb;
}
qemu_fclose(fb);
@@ -1793,6 +1849,7 @@ void migrate_fd_connect(MigrationState *s)
s->expected_downtime = max_downtime/1000000;
s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
+ qemu_file_set_blocking(s->to_dst_file, true);
qemu_file_set_rate_limit(s->to_dst_file,
s->bandwidth_limit / XFER_LIMIT_RATIO);
diff --git a/migration/qemu-file-buf.c b/migration/qemu-file-buf.c
deleted file mode 100644
index 7b8e78e99c..0000000000
--- a/migration/qemu-file-buf.c
+++ /dev/null
@@ -1,464 +0,0 @@
-/*
- * QEMU System Emulator
- *
- * Copyright (c) 2003-2008 Fabrice Bellard
- * Copyright (c) 2014 IBM Corp.
- *
- * Authors:
- * Stefan Berger <stefanb@linux.vnet.ibm.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-#include "qemu/osdep.h"
-#include "qemu-common.h"
-#include "qemu/error-report.h"
-#include "qemu/iov.h"
-#include "qemu/sockets.h"
-#include "qemu/coroutine.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "migration/qemu-file-internal.h"
-#include "trace.h"
-
-#define QSB_CHUNK_SIZE (1 << 10)
-#define QSB_MAX_CHUNK_SIZE (16 * QSB_CHUNK_SIZE)
-
-/**
- * Create a QEMUSizedBuffer
- * This type of buffer uses scatter-gather lists internally and
- * can grow to any size. Any data array in the scatter-gather list
- * can hold different amount of bytes.
- *
- * @buffer: Optional buffer to copy into the QSB
- * @len: size of initial buffer; if @buffer is given, buffer must
- * hold at least len bytes
- *
- * Returns a pointer to a QEMUSizedBuffer or NULL on allocation failure
- */
-QEMUSizedBuffer *qsb_create(const uint8_t *buffer, size_t len)
-{
- QEMUSizedBuffer *qsb;
- size_t alloc_len, num_chunks, i, to_copy;
- size_t chunk_size = (len > QSB_MAX_CHUNK_SIZE)
- ? QSB_MAX_CHUNK_SIZE
- : QSB_CHUNK_SIZE;
-
- num_chunks = DIV_ROUND_UP(len ? len : QSB_CHUNK_SIZE, chunk_size);
- alloc_len = num_chunks * chunk_size;
-
- qsb = g_try_new0(QEMUSizedBuffer, 1);
- if (!qsb) {
- return NULL;
- }
-
- qsb->iov = g_try_new0(struct iovec, num_chunks);
- if (!qsb->iov) {
- g_free(qsb);
- return NULL;
- }
-
- qsb->n_iov = num_chunks;
-
- for (i = 0; i < num_chunks; i++) {
- qsb->iov[i].iov_base = g_try_malloc0(chunk_size);
- if (!qsb->iov[i].iov_base) {
- /* qsb_free is safe since g_free can cope with NULL */
- qsb_free(qsb);
- return NULL;
- }
-
- qsb->iov[i].iov_len = chunk_size;
- if (buffer) {
- to_copy = (len - qsb->used) > chunk_size
- ? chunk_size : (len - qsb->used);
- memcpy(qsb->iov[i].iov_base, &buffer[qsb->used], to_copy);
- qsb->used += to_copy;
- }
- }
-
- qsb->size = alloc_len;
-
- return qsb;
-}
-
-/**
- * Free the QEMUSizedBuffer
- *
- * @qsb: The QEMUSizedBuffer to free
- */
-void qsb_free(QEMUSizedBuffer *qsb)
-{
- size_t i;
-
- if (!qsb) {
- return;
- }
-
- for (i = 0; i < qsb->n_iov; i++) {
- g_free(qsb->iov[i].iov_base);
- }
- g_free(qsb->iov);
- g_free(qsb);
-}
-
-/**
- * Get the number of used bytes in the QEMUSizedBuffer
- *
- * @qsb: A QEMUSizedBuffer
- *
- * Returns the number of bytes currently used in this buffer
- */
-size_t qsb_get_length(const QEMUSizedBuffer *qsb)
-{
- return qsb->used;
-}
-
-/**
- * Set the length of the buffer; the primary usage of this
- * function is to truncate the number of used bytes in the buffer.
- * The size will not be extended beyond the current number of
- * allocated bytes in the QEMUSizedBuffer.
- *
- * @qsb: A QEMUSizedBuffer
- * @new_len: The new length of bytes in the buffer
- *
- * Returns the number of bytes the buffer was truncated or extended
- * to.
- */
-size_t qsb_set_length(QEMUSizedBuffer *qsb, size_t new_len)
-{
- if (new_len <= qsb->size) {
- qsb->used = new_len;
- } else {
- qsb->used = qsb->size;
- }
- return qsb->used;
-}
-
-/**
- * Get the iovec that holds the data for a given position @pos.
- *
- * @qsb: A QEMUSizedBuffer
- * @pos: The index of a byte in the buffer
- * @d_off: Pointer to an offset that this function will indicate
- * at what position within the returned iovec the byte
- * is to be found
- *
- * Returns the index of the iovec that holds the byte at the given
- * index @pos in the byte stream; a negative number if the iovec
- * for the given position @pos does not exist.
- */
-static ssize_t qsb_get_iovec(const QEMUSizedBuffer *qsb,
- off_t pos, off_t *d_off)
-{
- ssize_t i;
- off_t curr = 0;
-
- if (pos > qsb->used) {
- return -1;
- }
-
- for (i = 0; i < qsb->n_iov; i++) {
- if (curr + qsb->iov[i].iov_len > pos) {
- *d_off = pos - curr;
- return i;
- }
- curr += qsb->iov[i].iov_len;
- }
- return -1;
-}
-
-/*
- * Convert the QEMUSizedBuffer into a flat buffer.
- *
- * Note: If at all possible, try to avoid this function since it
- * may unnecessarily copy memory around.
- *
- * @qsb: pointer to QEMUSizedBuffer
- * @start: offset to start at
- * @count: number of bytes to copy
- * @buf: a pointer to a buffer to write into (at least @count bytes)
- *
- * Returns the number of bytes copied into the output buffer
- */
-ssize_t qsb_get_buffer(const QEMUSizedBuffer *qsb, off_t start,
- size_t count, uint8_t *buffer)
-{
- const struct iovec *iov;
- size_t to_copy, all_copy;
- ssize_t index;
- off_t s_off;
- off_t d_off = 0;
- char *s;
-
- if (start > qsb->used) {
- return 0;
- }
-
- all_copy = qsb->used - start;
- if (all_copy > count) {
- all_copy = count;
- } else {
- count = all_copy;
- }
-
- index = qsb_get_iovec(qsb, start, &s_off);
- if (index < 0) {
- return 0;
- }
-
- while (all_copy > 0) {
- iov = &qsb->iov[index];
-
- s = iov->iov_base;
-
- to_copy = iov->iov_len - s_off;
- if (to_copy > all_copy) {
- to_copy = all_copy;
- }
- memcpy(&buffer[d_off], &s[s_off], to_copy);
-
- d_off += to_copy;
- all_copy -= to_copy;
-
- s_off = 0;
- index++;
- }
-
- return count;
-}
-
-/**
- * Grow the QEMUSizedBuffer to the given size and allocate
- * memory for it.
- *
- * @qsb: A QEMUSizedBuffer
- * @new_size: The new size of the buffer
- *
- * Return:
- * a negative error code in case of memory allocation failure
- * or
- * the new size of the buffer. The returned size may be greater or equal
- * to @new_size.
- */
-static ssize_t qsb_grow(QEMUSizedBuffer *qsb, size_t new_size)
-{
- size_t needed_chunks, i;
-
- if (qsb->size < new_size) {
- struct iovec *new_iov;
- size_t size_diff = new_size - qsb->size;
- size_t chunk_size = (size_diff > QSB_MAX_CHUNK_SIZE)
- ? QSB_MAX_CHUNK_SIZE : QSB_CHUNK_SIZE;
-
- needed_chunks = DIV_ROUND_UP(size_diff, chunk_size);
-
- new_iov = g_try_new(struct iovec, qsb->n_iov + needed_chunks);
- if (new_iov == NULL) {
- return -ENOMEM;
- }
-
- /* Allocate new chunks as needed into new_iov */
- for (i = qsb->n_iov; i < qsb->n_iov + needed_chunks; i++) {
- new_iov[i].iov_base = g_try_malloc0(chunk_size);
- new_iov[i].iov_len = chunk_size;
- if (!new_iov[i].iov_base) {
- size_t j;
-
- /* Free previously allocated new chunks */
- for (j = qsb->n_iov; j < i; j++) {
- g_free(new_iov[j].iov_base);
- }
- g_free(new_iov);
-
- return -ENOMEM;
- }
- }
-
- /*
- * Now we can't get any allocation errors, copy over to new iov
- * and switch.
- */
- for (i = 0; i < qsb->n_iov; i++) {
- new_iov[i] = qsb->iov[i];
- }
-
- qsb->n_iov += needed_chunks;
- g_free(qsb->iov);
- qsb->iov = new_iov;
- qsb->size += (needed_chunks * chunk_size);
- }
-
- return qsb->size;
-}
-
-/**
- * Write into the QEMUSizedBuffer at a given position and a given
- * number of bytes. This function will automatically grow the
- * QEMUSizedBuffer.
- *
- * @qsb: A QEMUSizedBuffer
- * @source: A byte array to copy data from
- * @pos: The position within the @qsb to write data to
- * @size: The number of bytes to copy into the @qsb
- *
- * Returns @size or a negative error code in case of memory allocation failure,
- * or with an invalid 'pos'
- */
-ssize_t qsb_write_at(QEMUSizedBuffer *qsb, const uint8_t *source,
- off_t pos, size_t count)
-{
- ssize_t rc = qsb_grow(qsb, pos + count);
- size_t to_copy;
- size_t all_copy = count;
- const struct iovec *iov;
- ssize_t index;
- char *dest;
- off_t d_off, s_off = 0;
-
- if (rc < 0) {
- return rc;
- }
-
- if (pos + count > qsb->used) {
- qsb->used = pos + count;
- }
-
- index = qsb_get_iovec(qsb, pos, &d_off);
- if (index < 0) {
- return -EINVAL;
- }
-
- while (all_copy > 0) {
- iov = &qsb->iov[index];
-
- dest = iov->iov_base;
-
- to_copy = iov->iov_len - d_off;
- if (to_copy > all_copy) {
- to_copy = all_copy;
- }
-
- memcpy(&dest[d_off], &source[s_off], to_copy);
-
- s_off += to_copy;
- all_copy -= to_copy;
-
- d_off = 0;
- index++;
- }
-
- return count;
-}
-
-typedef struct QEMUBuffer {
- QEMUSizedBuffer *qsb;
- QEMUFile *file;
- bool qsb_allocated;
-} QEMUBuffer;
-
-static ssize_t buf_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
- size_t size)
-{
- QEMUBuffer *s = opaque;
- ssize_t len = qsb_get_length(s->qsb) - pos;
-
- if (len <= 0) {
- return 0;
- }
-
- if (len > size) {
- len = size;
- }
- return qsb_get_buffer(s->qsb, pos, len, buf);
-}
-
-static ssize_t buf_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, size_t size)
-{
- QEMUBuffer *s = opaque;
-
- return qsb_write_at(s->qsb, buf, pos, size);
-}
-
-static int buf_close(void *opaque)
-{
- QEMUBuffer *s = opaque;
-
- if (s->qsb_allocated) {
- qsb_free(s->qsb);
- }
-
- g_free(s);
-
- return 0;
-}
-
-const QEMUSizedBuffer *qemu_buf_get(QEMUFile *f)
-{
- QEMUBuffer *p;
-
- qemu_fflush(f);
-
- p = f->opaque;
-
- return p->qsb;
-}
-
-static const QEMUFileOps buf_read_ops = {
- .get_buffer = buf_get_buffer,
- .close = buf_close,
-};
-
-static const QEMUFileOps buf_write_ops = {
- .put_buffer = buf_put_buffer,
- .close = buf_close,
-};
-
-QEMUFile *qemu_bufopen(const char *mode, QEMUSizedBuffer *input)
-{
- QEMUBuffer *s;
-
- if (mode == NULL || (mode[0] != 'r' && mode[0] != 'w') ||
- mode[1] != '\0') {
- error_report("qemu_bufopen: Argument validity check failed");
- return NULL;
- }
-
- s = g_new0(QEMUBuffer, 1);
- s->qsb = input;
-
- if (s->qsb == NULL) {
- s->qsb = qsb_create(NULL, 0);
- s->qsb_allocated = true;
- }
- if (!s->qsb) {
- g_free(s);
- error_report("qemu_bufopen: qsb_create failed");
- return NULL;
- }
-
-
- if (mode[0] == 'r') {
- s->file = qemu_fopen_ops(s, &buf_read_ops);
- } else {
- s->file = qemu_fopen_ops(s, &buf_write_ops);
- }
- return s->file;
-}
diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c
new file mode 100644
index 0000000000..45c13f1028
--- /dev/null
+++ b/migration/qemu-file-channel.c
@@ -0,0 +1,180 @@
+/*
+ * QEMUFile backend for QIOChannel objects
+ *
+ * Copyright (c) 2015-2016 Red Hat, Inc
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+#include "migration/qemu-file.h"
+#include "io/channel-socket.h"
+#include "qemu/iov.h"
+
+
+static ssize_t channel_writev_buffer(void *opaque,
+ struct iovec *iov,
+ int iovcnt,
+ int64_t pos)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+ ssize_t done = 0;
+ struct iovec *local_iov = g_new(struct iovec, iovcnt);
+ struct iovec *local_iov_head = local_iov;
+ unsigned int nlocal_iov = iovcnt;
+
+ nlocal_iov = iov_copy(local_iov, nlocal_iov,
+ iov, iovcnt,
+ 0, iov_size(iov, iovcnt));
+
+ while (nlocal_iov > 0) {
+ ssize_t len;
+ len = qio_channel_writev(ioc, local_iov, nlocal_iov, NULL);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ qio_channel_wait(ioc, G_IO_OUT);
+ continue;
+ }
+ if (len < 0) {
+ /* XXX handle Error objects */
+ done = -EIO;
+ goto cleanup;
+ }
+
+ iov_discard_front(&local_iov, &nlocal_iov, len);
+ done += len;
+ }
+
+ cleanup:
+ g_free(local_iov_head);
+ return done;
+}
+
+
+static ssize_t channel_get_buffer(void *opaque,
+ uint8_t *buf,
+ int64_t pos,
+ size_t size)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+ ssize_t ret;
+
+ do {
+ ret = qio_channel_read(ioc, (char *)buf, size, NULL);
+ if (ret < 0) {
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ qio_channel_yield(ioc, G_IO_IN);
+ } else {
+ /* XXX handle Error * object */
+ return -EIO;
+ }
+ }
+ } while (ret == QIO_CHANNEL_ERR_BLOCK);
+
+ return ret;
+}
+
+
+static int channel_close(void *opaque)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+ qio_channel_close(ioc, NULL);
+ object_unref(OBJECT(ioc));
+ return 0;
+}
+
+
+static int channel_shutdown(void *opaque,
+ bool rd,
+ bool wr)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+
+ if (qio_channel_has_feature(ioc,
+ QIO_CHANNEL_FEATURE_SHUTDOWN)) {
+ QIOChannelShutdown mode;
+ if (rd && wr) {
+ mode = QIO_CHANNEL_SHUTDOWN_BOTH;
+ } else if (rd) {
+ mode = QIO_CHANNEL_SHUTDOWN_READ;
+ } else {
+ mode = QIO_CHANNEL_SHUTDOWN_WRITE;
+ }
+ if (qio_channel_shutdown(ioc, mode, NULL) < 0) {
+ /* XXX handler Error * object */
+ return -EIO;
+ }
+ }
+ return 0;
+}
+
+
+static int channel_set_blocking(void *opaque,
+ bool enabled)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+
+ if (qio_channel_set_blocking(ioc, enabled, NULL) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static QEMUFile *channel_get_input_return_path(void *opaque)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+
+ return qemu_fopen_channel_output(ioc);
+}
+
+static QEMUFile *channel_get_output_return_path(void *opaque)
+{
+ QIOChannel *ioc = QIO_CHANNEL(opaque);
+
+ return qemu_fopen_channel_input(ioc);
+}
+
+static const QEMUFileOps channel_input_ops = {
+ .get_buffer = channel_get_buffer,
+ .close = channel_close,
+ .shut_down = channel_shutdown,
+ .set_blocking = channel_set_blocking,
+ .get_return_path = channel_get_input_return_path,
+};
+
+
+static const QEMUFileOps channel_output_ops = {
+ .writev_buffer = channel_writev_buffer,
+ .close = channel_close,
+ .shut_down = channel_shutdown,
+ .set_blocking = channel_set_blocking,
+ .get_return_path = channel_get_output_return_path,
+};
+
+
+QEMUFile *qemu_fopen_channel_input(QIOChannel *ioc)
+{
+ object_ref(OBJECT(ioc));
+ return qemu_fopen_ops(ioc, &channel_input_ops);
+}
+
+QEMUFile *qemu_fopen_channel_output(QIOChannel *ioc)
+{
+ object_ref(OBJECT(ioc));
+ return qemu_fopen_ops(ioc, &channel_output_ops);
+}
diff --git a/migration/qemu-file-internal.h b/migration/qemu-file-internal.h
deleted file mode 100644
index d95e8538e7..0000000000
--- a/migration/qemu-file-internal.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * QEMU System Emulator
- *
- * Copyright (c) 2003-2008 Fabrice Bellard
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-
-#ifndef QEMU_FILE_INTERNAL_H
-#define QEMU_FILE_INTERNAL_H 1
-
-#include "qemu-common.h"
-#include "qemu/iov.h"
-
-#define IO_BUF_SIZE 32768
-#define MAX_IOV_SIZE MIN(IOV_MAX, 64)
-
-struct QEMUFile {
- const QEMUFileOps *ops;
- void *opaque;
-
- int64_t bytes_xfer;
- int64_t xfer_limit;
-
- int64_t pos; /* start of buffer when writing, end of buffer
- when reading */
- int buf_index;
- int buf_size; /* 0 when writing */
- uint8_t buf[IO_BUF_SIZE];
-
- struct iovec iov[MAX_IOV_SIZE];
- unsigned int iovcnt;
-
- int last_error;
-};
-
-#endif
diff --git a/migration/qemu-file-stdio.c b/migration/qemu-file-stdio.c
deleted file mode 100644
index f402e8f708..0000000000
--- a/migration/qemu-file-stdio.c
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * QEMU System Emulator
- *
- * Copyright (c) 2003-2008 Fabrice Bellard
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-#include "qemu/osdep.h"
-#include "qemu-common.h"
-#include "qemu/coroutine.h"
-#include "migration/qemu-file.h"
-
-typedef struct QEMUFileStdio {
- FILE *stdio_file;
- QEMUFile *file;
-} QEMUFileStdio;
-
-static int stdio_get_fd(void *opaque)
-{
- QEMUFileStdio *s = opaque;
-
- return fileno(s->stdio_file);
-}
-
-static ssize_t stdio_put_buffer(void *opaque, const uint8_t *buf, int64_t pos,
- size_t size)
-{
- QEMUFileStdio *s = opaque;
- size_t res;
-
- res = fwrite(buf, 1, size, s->stdio_file);
-
- if (res != size) {
- return -errno;
- }
- return res;
-}
-
-static ssize_t stdio_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
- size_t size)
-{
- QEMUFileStdio *s = opaque;
- FILE *fp = s->stdio_file;
- ssize_t bytes;
-
- for (;;) {
- clearerr(fp);
- bytes = fread(buf, 1, size, fp);
- if (bytes != 0 || !ferror(fp)) {
- break;
- }
- if (errno == EAGAIN) {
- yield_until_fd_readable(fileno(fp));
- } else if (errno != EINTR) {
- break;
- }
- }
- return bytes;
-}
-
-static int stdio_pclose(void *opaque)
-{
- QEMUFileStdio *s = opaque;
- int ret;
- ret = pclose(s->stdio_file);
- if (ret == -1) {
- ret = -errno;
- } else if (!WIFEXITED(ret) || WEXITSTATUS(ret) != 0) {
- /* close succeeded, but non-zero exit code: */
- ret = -EIO; /* fake errno value */
- }
- g_free(s);
- return ret;
-}
-
-static int stdio_fclose(void *opaque)
-{
- QEMUFileStdio *s = opaque;
- int ret = 0;
-
- if (qemu_file_is_writable(s->file)) {
- int fd = fileno(s->stdio_file);
- struct stat st;
-
- ret = fstat(fd, &st);
- if (ret == 0 && S_ISREG(st.st_mode)) {
- /*
- * If the file handle is a regular file make sure the
- * data is flushed to disk before signaling success.
- */
- ret = fsync(fd);
- if (ret != 0) {
- ret = -errno;
- return ret;
- }
- }
- }
- if (fclose(s->stdio_file) == EOF) {
- ret = -errno;
- }
- g_free(s);
- return ret;
-}
-
-static const QEMUFileOps stdio_pipe_read_ops = {
- .get_fd = stdio_get_fd,
- .get_buffer = stdio_get_buffer,
- .close = stdio_pclose
-};
-
-static const QEMUFileOps stdio_pipe_write_ops = {
- .get_fd = stdio_get_fd,
- .put_buffer = stdio_put_buffer,
- .close = stdio_pclose
-};
-
-QEMUFile *qemu_popen_cmd(const char *command, const char *mode)
-{
- FILE *stdio_file;
- QEMUFileStdio *s;
-
- if (mode == NULL || (mode[0] != 'r' && mode[0] != 'w') || mode[1] != 0) {
- fprintf(stderr, "qemu_popen: Argument validity check failed\n");
- return NULL;
- }
-
- stdio_file = popen(command, mode);
- if (stdio_file == NULL) {
- return NULL;
- }
-
- s = g_new0(QEMUFileStdio, 1);
-
- s->stdio_file = stdio_file;
-
- if (mode[0] == 'r') {
- s->file = qemu_fopen_ops(s, &stdio_pipe_read_ops);
- } else {
- s->file = qemu_fopen_ops(s, &stdio_pipe_write_ops);
- }
- return s->file;
-}
-
-static const QEMUFileOps stdio_file_read_ops = {
- .get_fd = stdio_get_fd,
- .get_buffer = stdio_get_buffer,
- .close = stdio_fclose
-};
-
-static const QEMUFileOps stdio_file_write_ops = {
- .get_fd = stdio_get_fd,
- .put_buffer = stdio_put_buffer,
- .close = stdio_fclose
-};
-
-QEMUFile *qemu_fopen(const char *filename, const char *mode)
-{
- QEMUFileStdio *s;
-
- if (qemu_file_mode_is_not_valid(mode)) {
- return NULL;
- }
-
- s = g_new0(QEMUFileStdio, 1);
-
- s->stdio_file = fopen(filename, mode);
- if (!s->stdio_file) {
- goto fail;
- }
-
- if (mode[0] == 'w') {
- s->file = qemu_fopen_ops(s, &stdio_file_write_ops);
- } else {
- s->file = qemu_fopen_ops(s, &stdio_file_read_ops);
- }
- return s->file;
-fail:
- g_free(s);
- return NULL;
-}
diff --git a/migration/qemu-file-unix.c b/migration/qemu-file-unix.c
deleted file mode 100644
index 4474e18ff8..0000000000
--- a/migration/qemu-file-unix.c
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * QEMU System Emulator
- *
- * Copyright (c) 2003-2008 Fabrice Bellard
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-#include "qemu/osdep.h"
-#include "qemu-common.h"
-#include "qemu/error-report.h"
-#include "qemu/iov.h"
-#include "qemu/sockets.h"
-#include "qemu/coroutine.h"
-#include "migration/qemu-file.h"
-#include "migration/qemu-file-internal.h"
-
-typedef struct QEMUFileSocket {
- int fd;
- QEMUFile *file;
-} QEMUFileSocket;
-
-static ssize_t socket_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
- int64_t pos)
-{
- QEMUFileSocket *s = opaque;
- ssize_t len;
- ssize_t size = iov_size(iov, iovcnt);
- ssize_t offset = 0;
- int err;
-
- while (size > 0) {
- len = iov_send(s->fd, iov, iovcnt, offset, size);
-
- if (len > 0) {
- size -= len;
- offset += len;
- }
-
- if (size > 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- error_report("socket_writev_buffer: Got err=%d for (%zu/%zu)",
- errno, (size_t)size, (size_t)len);
- /*
- * If I've already sent some but only just got the error, I
- * could return the amount validly sent so far and wait for the
- * next call to report the error, but I'd rather flag the error
- * immediately.
- */
- return -errno;
- }
-
- /* Emulate blocking */
- GPollFD pfd;
-
- pfd.fd = s->fd;
- pfd.events = G_IO_OUT | G_IO_ERR;
- pfd.revents = 0;
- TFR(err = g_poll(&pfd, 1, -1 /* no timeout */));
- /* Errors other than EINTR intentionally ignored */
- }
- }
-
- return offset;
-}
-
-static int socket_get_fd(void *opaque)
-{
- QEMUFileSocket *s = opaque;
-
- return s->fd;
-}
-
-static ssize_t socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
- size_t size)
-{
- QEMUFileSocket *s = opaque;
- ssize_t len;
-
- for (;;) {
- len = qemu_recv(s->fd, buf, size, 0);
- if (len != -1) {
- break;
- }
- if (errno == EAGAIN) {
- yield_until_fd_readable(s->fd);
- } else if (errno != EINTR) {
- break;
- }
- }
-
- if (len == -1) {
- len = -errno;
- }
- return len;
-}
-
-static int socket_close(void *opaque)
-{
- QEMUFileSocket *s = opaque;
- closesocket(s->fd);
- g_free(s);
- return 0;
-}
-
-static int socket_shutdown(void *opaque, bool rd, bool wr)
-{
- QEMUFileSocket *s = opaque;
-
- if (shutdown(s->fd, rd ? (wr ? SHUT_RDWR : SHUT_RD) : SHUT_WR)) {
- return -errno;
- } else {
- return 0;
- }
-}
-
-static int socket_return_close(void *opaque)
-{
- QEMUFileSocket *s = opaque;
- /*
- * Note: We don't close the socket, that should be done by the forward
- * path.
- */
- g_free(s);
- return 0;
-}
-
-static const QEMUFileOps socket_return_read_ops = {
- .get_fd = socket_get_fd,
- .get_buffer = socket_get_buffer,
- .close = socket_return_close,
- .shut_down = socket_shutdown,
-};
-
-static const QEMUFileOps socket_return_write_ops = {
- .get_fd = socket_get_fd,
- .writev_buffer = socket_writev_buffer,
- .close = socket_return_close,
- .shut_down = socket_shutdown,
-};
-
-/*
- * Give a QEMUFile* off the same socket but data in the opposite
- * direction.
- */
-static QEMUFile *socket_get_return_path(void *opaque)
-{
- QEMUFileSocket *forward = opaque;
- QEMUFileSocket *reverse;
-
- if (qemu_file_get_error(forward->file)) {
- /* If the forward file is in error, don't try and open a return */
- return NULL;
- }
-
- reverse = g_malloc0(sizeof(QEMUFileSocket));
- reverse->fd = forward->fd;
- /* I don't think there's a better way to tell which direction 'this' is */
- if (forward->file->ops->get_buffer != NULL) {
- /* being called from the read side, so we need to be able to write */
- return qemu_fopen_ops(reverse, &socket_return_write_ops);
- } else {
- return qemu_fopen_ops(reverse, &socket_return_read_ops);
- }
-}
-
-static ssize_t unix_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
- int64_t pos)
-{
- QEMUFileSocket *s = opaque;
- ssize_t len, offset;
- ssize_t size = iov_size(iov, iovcnt);
- ssize_t total = 0;
-
- assert(iovcnt > 0);
- offset = 0;
- while (size > 0) {
- /* Find the next start position; skip all full-sized vector elements */
- while (offset >= iov[0].iov_len) {
- offset -= iov[0].iov_len;
- iov++, iovcnt--;
- }
-
- /* skip `offset' bytes from the (now) first element, undo it on exit */
- assert(iovcnt > 0);
- iov[0].iov_base += offset;
- iov[0].iov_len -= offset;
-
- do {
- len = writev(s->fd, iov, iovcnt);
- } while (len == -1 && errno == EINTR);
- if (len == -1) {
- return -errno;
- }
-
- /* Undo the changes above */
- iov[0].iov_base -= offset;
- iov[0].iov_len += offset;
-
- /* Prepare for the next iteration */
- offset += len;
- total += len;
- size -= len;
- }
-
- return total;
-}
-
-static ssize_t unix_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
- size_t size)
-{
- QEMUFileSocket *s = opaque;
- ssize_t len;
-
- for (;;) {
- len = read(s->fd, buf, size);
- if (len != -1) {
- break;
- }
- if (errno == EAGAIN) {
- yield_until_fd_readable(s->fd);
- } else if (errno != EINTR) {
- break;
- }
- }
-
- if (len == -1) {
- len = -errno;
- }
- return len;
-}
-
-static int unix_close(void *opaque)
-{
- QEMUFileSocket *s = opaque;
- close(s->fd);
- g_free(s);
- return 0;
-}
-
-static const QEMUFileOps unix_read_ops = {
- .get_fd = socket_get_fd,
- .get_buffer = unix_get_buffer,
- .close = unix_close
-};
-
-static const QEMUFileOps unix_write_ops = {
- .get_fd = socket_get_fd,
- .writev_buffer = unix_writev_buffer,
- .close = unix_close
-};
-
-QEMUFile *qemu_fdopen(int fd, const char *mode)
-{
- QEMUFileSocket *s;
-
- if (mode == NULL ||
- (mode[0] != 'r' && mode[0] != 'w') ||
- mode[1] != 'b' || mode[2] != 0) {
- fprintf(stderr, "qemu_fdopen: Argument validity check failed\n");
- return NULL;
- }
-
- s = g_new0(QEMUFileSocket, 1);
- s->fd = fd;
-
- if (mode[0] == 'r') {
- s->file = qemu_fopen_ops(s, &unix_read_ops);
- } else {
- s->file = qemu_fopen_ops(s, &unix_write_ops);
- }
- return s->file;
-}
-
-static const QEMUFileOps socket_read_ops = {
- .get_fd = socket_get_fd,
- .get_buffer = socket_get_buffer,
- .close = socket_close,
- .shut_down = socket_shutdown,
- .get_return_path = socket_get_return_path
-};
-
-static const QEMUFileOps socket_write_ops = {
- .get_fd = socket_get_fd,
- .writev_buffer = socket_writev_buffer,
- .close = socket_close,
- .shut_down = socket_shutdown,
- .get_return_path = socket_get_return_path
-};
-
-QEMUFile *qemu_fopen_socket(int fd, const char *mode)
-{
- QEMUFileSocket *s;
-
- if (qemu_file_mode_is_not_valid(mode)) {
- return NULL;
- }
-
- s = g_new0(QEMUFileSocket, 1);
- s->fd = fd;
- if (mode[0] == 'w') {
- qemu_set_block(s->fd);
- s->file = qemu_fopen_ops(s, &socket_write_ops);
- } else {
- s->file = qemu_fopen_ops(s, &socket_read_ops);
- }
- return s->file;
-}
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 6f4a1299b3..8aea1c7094 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -30,9 +30,31 @@
#include "qemu/coroutine.h"
#include "migration/migration.h"
#include "migration/qemu-file.h"
-#include "migration/qemu-file-internal.h"
#include "trace.h"
+#define IO_BUF_SIZE 32768
+#define MAX_IOV_SIZE MIN(IOV_MAX, 64)
+
+struct QEMUFile {
+ const QEMUFileOps *ops;
+ const QEMUFileHooks *hooks;
+ void *opaque;
+
+ int64_t bytes_xfer;
+ int64_t xfer_limit;
+
+ int64_t pos; /* start of buffer when writing, end of buffer
+ when reading */
+ int buf_index;
+ int buf_size; /* 0 when writing */
+ uint8_t buf[IO_BUF_SIZE];
+
+ struct iovec iov[MAX_IOV_SIZE];
+ unsigned int iovcnt;
+
+ int last_error;
+};
+
/*
* Stop a file from being read/written - not all backing files can do this
* typically only sockets can.
@@ -80,6 +102,12 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
return f;
}
+
+void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks)
+{
+ f->hooks = hooks;
+}
+
/*
* Get last error for stream f
*
@@ -101,48 +129,49 @@ void qemu_file_set_error(QEMUFile *f, int ret)
bool qemu_file_is_writable(QEMUFile *f)
{
- return f->ops->writev_buffer || f->ops->put_buffer;
+ return f->ops->writev_buffer;
}
/**
* Flushes QEMUFile buffer
*
* If there is writev_buffer QEMUFileOps it uses it otherwise uses
- * put_buffer ops.
+ * put_buffer ops. This will flush all pending data. If data was
+ * only partially flushed, it will set an error state.
*/
void qemu_fflush(QEMUFile *f)
{
ssize_t ret = 0;
+ ssize_t expect = 0;
if (!qemu_file_is_writable(f)) {
return;
}
- if (f->ops->writev_buffer) {
- if (f->iovcnt > 0) {
- ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos);
- }
- } else {
- if (f->buf_index > 0) {
- ret = f->ops->put_buffer(f->opaque, f->buf, f->pos, f->buf_index);
- }
+ if (f->iovcnt > 0) {
+ expect = iov_size(f->iov, f->iovcnt);
+ ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos);
}
+
if (ret >= 0) {
f->pos += ret;
}
+ /* We expect the QEMUFile write impl to send the full
+ * data set we requested, so sanity check that.
+ */
+ if (ret != expect) {
+ qemu_file_set_error(f, ret < 0 ? ret : -EIO);
+ }
f->buf_index = 0;
f->iovcnt = 0;
- if (ret < 0) {
- qemu_file_set_error(f, ret);
- }
}
void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
{
int ret = 0;
- if (f->ops->before_ram_iterate) {
- ret = f->ops->before_ram_iterate(f, f->opaque, flags, NULL);
+ if (f->hooks && f->hooks->before_ram_iterate) {
+ ret = f->hooks->before_ram_iterate(f, f->opaque, flags, NULL);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -153,8 +182,8 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
{
int ret = 0;
- if (f->ops->after_ram_iterate) {
- ret = f->ops->after_ram_iterate(f, f->opaque, flags, NULL);
+ if (f->hooks && f->hooks->after_ram_iterate) {
+ ret = f->hooks->after_ram_iterate(f, f->opaque, flags, NULL);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -165,8 +194,8 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data)
{
int ret = -EINVAL;
- if (f->ops->hook_ram_load) {
- ret = f->ops->hook_ram_load(f, f->opaque, flags, data);
+ if (f->hooks && f->hooks->hook_ram_load) {
+ ret = f->hooks->hook_ram_load(f, f->opaque, flags, data);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -185,9 +214,9 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
ram_addr_t offset, size_t size,
uint64_t *bytes_sent)
{
- if (f->ops->save_page) {
- int ret = f->ops->save_page(f, f->opaque, block_offset,
- offset, size, bytes_sent);
+ if (f->hooks && f->hooks->save_page) {
+ int ret = f->hooks->save_page(f, f->opaque, block_offset,
+ offset, size, bytes_sent);
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (bytes_sent && *bytes_sent > 0) {
@@ -239,14 +268,6 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
return len;
}
-int qemu_get_fd(QEMUFile *f)
-{
- if (f->ops->get_fd) {
- return f->ops->get_fd(f->opaque);
- }
- return -1;
-}
-
void qemu_update_position(QEMUFile *f, size_t size)
{
f->pos += size;
@@ -301,11 +322,6 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size)
void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size)
{
- if (!f->ops->writev_buffer) {
- qemu_put_buffer(f, buf, size);
- return;
- }
-
if (f->last_error) {
return;
}
@@ -329,9 +345,7 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
}
memcpy(f->buf + f->buf_index, buf, l);
f->bytes_xfer += l;
- if (f->ops->writev_buffer) {
- add_to_iovec(f, f->buf + f->buf_index, l);
- }
+ add_to_iovec(f, f->buf + f->buf_index, l);
f->buf_index += l;
if (f->buf_index == IO_BUF_SIZE) {
qemu_fflush(f);
@@ -352,9 +366,7 @@ void qemu_put_byte(QEMUFile *f, int v)
f->buf[f->buf_index] = v;
f->bytes_xfer++;
- if (f->ops->writev_buffer) {
- add_to_iovec(f, f->buf + f->buf_index, 1);
- }
+ add_to_iovec(f, f->buf + f->buf_index, 1);
f->buf_index++;
if (f->buf_index == IO_BUF_SIZE) {
qemu_fflush(f);
@@ -518,12 +530,8 @@ int64_t qemu_ftell_fast(QEMUFile *f)
int64_t ret = f->pos;
int i;
- if (f->ops->writev_buffer) {
- for (i = 0; i < f->iovcnt; i++) {
- ret += f->iov[i].iov_len;
- }
- } else {
- ret += f->buf_index;
+ for (i = 0; i < f->iovcnt; i++) {
+ ret += f->iov[i].iov_len;
}
return ret;
@@ -670,9 +678,7 @@ size_t qemu_get_counted_string(QEMUFile *f, char buf[256])
*/
void qemu_file_set_blocking(QEMUFile *f, bool block)
{
- if (block) {
- qemu_set_block(qemu_get_fd(f));
- } else {
- qemu_set_nonblock(qemu_get_fd(f));
+ if (f->ops->set_blocking) {
+ f->ops->set_blocking(f->opaque, block);
}
}
diff --git a/migration/ram.c b/migration/ram.c
index 54e215128c..844ea4694f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -429,10 +429,8 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
static void mig_throttle_guest_down(void)
{
MigrationState *s = migrate_get_current();
- uint64_t pct_initial =
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INITIAL];
- uint64_t pct_icrement =
- s->parameters[MIGRATION_PARAMETER_CPU_THROTTLE_INCREMENT];
+ uint64_t pct_initial = s->parameters.cpu_throttle_initial;
+ uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
/* We have not started throttling yet. Let's start it. */
if (!cpu_throttle_active()) {
diff --git a/migration/rdma.c b/migration/rdma.c
index f6a9992b3e..51bafc702b 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -2,10 +2,12 @@
* RDMA protocol and interfaces
*
* Copyright IBM, Corp. 2010-2013
+ * Copyright Red Hat, Inc. 2015-2016
*
* Authors:
* Michael R. Hines <mrhines@us.ibm.com>
* Jiuxing Liu <jl@us.ibm.com>
+ * Daniel P. Berrange <berrange@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory.
@@ -374,14 +376,20 @@ typedef struct RDMAContext {
GHashTable *blockmap;
} RDMAContext;
-/*
- * Interface to the rest of the migration call stack.
- */
-typedef struct QEMUFileRDMA {
+#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
+#define QIO_CHANNEL_RDMA(obj) \
+ OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
+
+typedef struct QIOChannelRDMA QIOChannelRDMA;
+
+
+struct QIOChannelRDMA {
+ QIOChannel parent;
RDMAContext *rdma;
+ QEMUFile *file;
size_t len;
- void *file;
-} QEMUFileRDMA;
+ bool blocking; /* XXX we don't actually honour this yet */
+};
/*
* Main structure for IB Send/Recv control messages.
@@ -2518,15 +2526,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
* SEND messages for control only.
* VM's ram is handled with regular RDMA messages.
*/
-static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, size_t size)
-{
- QEMUFileRDMA *r = opaque;
- QEMUFile *f = r->file;
- RDMAContext *rdma = r->rdma;
- size_t remaining = size;
- uint8_t * data = (void *) buf;
+static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ QEMUFile *f = rioc->file;
+ RDMAContext *rdma = rioc->rdma;
int ret;
+ ssize_t done = 0;
+ size_t i;
CHECK_ERROR_STATE();
@@ -2540,27 +2552,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
return ret;
}
- while (remaining) {
- RDMAControlHeader head;
+ for (i = 0; i < niov; i++) {
+ size_t remaining = iov[i].iov_len;
+ uint8_t * data = (void *)iov[i].iov_base;
+ while (remaining) {
+ RDMAControlHeader head;
- r->len = MIN(remaining, RDMA_SEND_INCREMENT);
- remaining -= r->len;
+ rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
+ remaining -= rioc->len;
- /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
- head.len = (uint32_t)r->len;
- head.type = RDMA_CONTROL_QEMU_FILE;
+ head.len = rioc->len;
+ head.type = RDMA_CONTROL_QEMU_FILE;
- ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+ ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
- if (ret < 0) {
- rdma->error_state = ret;
- return ret;
- }
+ if (ret < 0) {
+ rdma->error_state = ret;
+ return ret;
+ }
- data += r->len;
+ data += rioc->len;
+ done += rioc->len;
+ }
}
- return size;
+ return done;
}
static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
@@ -2585,41 +2601,74 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
* RDMA links don't use bytestreams, so we have to
* return bytes to QEMUFile opportunistically.
*/
-static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
- int64_t pos, size_t size)
-{
- QEMUFileRDMA *r = opaque;
- RDMAContext *rdma = r->rdma;
+static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdma = rioc->rdma;
RDMAControlHeader head;
int ret = 0;
+ ssize_t i;
+ size_t done = 0;
CHECK_ERROR_STATE();
- /*
- * First, we hold on to the last SEND message we
- * were given and dish out the bytes until we run
- * out of bytes.
- */
- r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
- if (r->len) {
- return r->len;
- }
+ for (i = 0; i < niov; i++) {
+ size_t want = iov[i].iov_len;
+ uint8_t *data = (void *)iov[i].iov_base;
- /*
- * Once we run out, we block and wait for another
- * SEND message to arrive.
- */
- ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+ /*
+ * First, we hold on to the last SEND message we
+ * were given and dish out the bytes until we run
+ * out of bytes.
+ */
+ ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ done += ret;
+ want -= ret;
+ /* Got what we needed, so go to next iovec */
+ if (want == 0) {
+ continue;
+ }
- if (ret < 0) {
- rdma->error_state = ret;
- return ret;
- }
+ /* If we got any data so far, then don't wait
+ * for more, just return what we have */
+ if (done > 0) {
+ break;
+ }
- /*
- * SEND was received with new bytes, now try again.
- */
- return qemu_rdma_fill(r->rdma, buf, size, 0);
+
+ /* We've got nothing at all, so lets wait for
+ * more to arrive
+ */
+ ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+ if (ret < 0) {
+ rdma->error_state = ret;
+ return ret;
+ }
+
+ /*
+ * SEND was received with new bytes, now try again.
+ */
+ ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ done += ret;
+ want -= ret;
+
+ /* Still didn't get enough, so lets just return */
+ if (want) {
+ if (done == 0) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ } else {
+ break;
+ }
+ }
+ }
+ rioc->len = done;
+ return rioc->len;
}
/*
@@ -2646,15 +2695,122 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
return 0;
}
-static int qemu_rdma_close(void *opaque)
+
+static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
+ bool blocking,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ /* XXX we should make readv/writev actually honour this :-) */
+ rioc->blocking = blocking;
+ return 0;
+}
+
+
+typedef struct QIOChannelRDMASource QIOChannelRDMASource;
+struct QIOChannelRDMASource {
+ GSource parent;
+ QIOChannelRDMA *rioc;
+ GIOCondition condition;
+};
+
+static gboolean
+qio_channel_rdma_source_prepare(GSource *source,
+ gint *timeout)
+{
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+ *timeout = -1;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_check(GSource *source)
+{
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return (*func)(QIO_CHANNEL(rsource->rioc),
+ (cond & rsource->condition),
+ user_data);
+}
+
+static void
+qio_channel_rdma_source_finalize(GSource *source)
+{
+ QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
+
+ object_unref(OBJECT(ssource->rioc));
+}
+
+GSourceFuncs qio_channel_rdma_source_funcs = {
+ qio_channel_rdma_source_prepare,
+ qio_channel_rdma_source_check,
+ qio_channel_rdma_source_dispatch,
+ qio_channel_rdma_source_finalize
+};
+
+static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ QIOChannelRDMASource *ssource;
+ GSource *source;
+
+ source = g_source_new(&qio_channel_rdma_source_funcs,
+ sizeof(QIOChannelRDMASource));
+ ssource = (QIOChannelRDMASource *)source;
+
+ ssource->rioc = rioc;
+ object_ref(OBJECT(rioc));
+
+ ssource->condition = condition;
+
+ return source;
+}
+
+
+static int qio_channel_rdma_close(QIOChannel *ioc,
+ Error **errp)
{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
trace_qemu_rdma_close();
- QEMUFileRDMA *r = opaque;
- if (r->rdma) {
- qemu_rdma_cleanup(r->rdma);
- g_free(r->rdma);
+ if (rioc->rdma) {
+ qemu_rdma_cleanup(rioc->rdma);
+ g_free(rioc->rdma);
+ rioc->rdma = NULL;
}
- g_free(r);
return 0;
}
@@ -2696,8 +2852,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
ram_addr_t block_offset, ram_addr_t offset,
size_t size, uint64_t *bytes_sent)
{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
int ret;
CHECK_ERROR_STATE();
@@ -2951,8 +3107,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
};
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
RDMALocalBlocks *local = &rdma->local_ram_blocks;
RDMAControlHeader head;
RDMARegister *reg, *registers;
@@ -3207,9 +3363,10 @@ out:
* We've already built our local RAMBlock list, but not yet sent the list to
* the source.
*/
-static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
+static int
+rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
{
- RDMAContext *rdma = rfile->rdma;
+ RDMAContext *rdma = rioc->rdma;
int curr;
int found = -1;
@@ -3251,8 +3408,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
CHECK_ERROR_STATE();
@@ -3271,8 +3428,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
Error *local_err = NULL, **errp = &local_err;
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
@@ -3368,47 +3525,74 @@ err:
return ret;
}
-static int qemu_rdma_get_fd(void *opaque)
-{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
-
- return rdma->comp_channel->fd;
-}
-
-static const QEMUFileOps rdma_read_ops = {
- .get_buffer = qemu_rdma_get_buffer,
- .get_fd = qemu_rdma_get_fd,
- .close = qemu_rdma_close,
+static const QEMUFileHooks rdma_read_hooks = {
.hook_ram_load = rdma_load_hook,
};
-static const QEMUFileOps rdma_write_ops = {
- .put_buffer = qemu_rdma_put_buffer,
- .close = qemu_rdma_close,
+static const QEMUFileHooks rdma_write_hooks = {
.before_ram_iterate = qemu_rdma_registration_start,
.after_ram_iterate = qemu_rdma_registration_stop,
.save_page = qemu_rdma_save_page,
};
-static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+
+static void qio_channel_rdma_finalize(Object *obj)
{
- QEMUFileRDMA *r;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
+ if (rioc->rdma) {
+ qemu_rdma_cleanup(rioc->rdma);
+ g_free(rioc->rdma);
+ rioc->rdma = NULL;
+ }
+}
+
+static void qio_channel_rdma_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_rdma_writev;
+ ioc_klass->io_readv = qio_channel_rdma_readv;
+ ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
+ ioc_klass->io_close = qio_channel_rdma_close;
+ ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+}
+
+static const TypeInfo qio_channel_rdma_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_RDMA,
+ .instance_size = sizeof(QIOChannelRDMA),
+ .instance_finalize = qio_channel_rdma_finalize,
+ .class_init = qio_channel_rdma_class_init,
+};
+
+static void qio_channel_rdma_register_types(void)
+{
+ type_register_static(&qio_channel_rdma_info);
+}
+
+type_init(qio_channel_rdma_register_types);
+
+static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+ QIOChannelRDMA *rioc;
if (qemu_file_mode_is_not_valid(mode)) {
return NULL;
}
- r = g_new0(QEMUFileRDMA, 1);
- r->rdma = rdma;
+ rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+ rioc->rdma = rdma;
if (mode[0] == 'w') {
- r->file = qemu_fopen_ops(r, &rdma_write_ops);
+ rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
- r->file = qemu_fopen_ops(r, &rdma_read_ops);
+ rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
}
- return r->file;
+ return rioc->file;
}
static void rdma_accept_incoming_migration(void *opaque)
@@ -3481,16 +3665,14 @@ void rdma_start_outgoing_migration(void *opaque,
const char *host_port, Error **errp)
{
MigrationState *s = opaque;
- Error *local_err = NULL, **temp = &local_err;
- RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
+ RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
int ret = 0;
if (rdma == NULL) {
- ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
goto err;
}
- ret = qemu_rdma_source_init(rdma, &local_err,
+ ret = qemu_rdma_source_init(rdma, errp,
s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
if (ret) {
@@ -3498,7 +3680,7 @@ void rdma_start_outgoing_migration(void *opaque,
}
trace_rdma_start_outgoing_migration_after_rdma_source_init();
- ret = qemu_rdma_connect(rdma, &local_err);
+ ret = qemu_rdma_connect(rdma, errp);
if (ret) {
goto err;
@@ -3510,7 +3692,5 @@ void rdma_start_outgoing_migration(void *opaque,
migrate_fd_connect(s);
return;
err:
- error_propagate(errp, local_err);
g_free(rdma);
- migrate_fd_error(s);
}
diff --git a/migration/savevm.c b/migration/savevm.c
index 65ce0c61a3..6c21231131 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -51,6 +51,8 @@
#include "block/snapshot.h"
#include "block/qapi.h"
#include "qemu/cutils.h"
+#include "io/channel-buffer.h"
+#include "io/channel-file.h"
#ifndef ETH_P_RARP
#define ETH_P_RARP 0x8035
@@ -158,13 +160,6 @@ static ssize_t block_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
return qiov.size;
}
-static ssize_t block_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, size_t size)
-{
- bdrv_save_vmstate(opaque, buf, pos, size);
- return size;
-}
-
static ssize_t block_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
size_t size)
{
@@ -182,7 +177,6 @@ static const QEMUFileOps bdrv_read_ops = {
};
static const QEMUFileOps bdrv_write_ops = {
- .put_buffer = block_put_buffer,
.writev_buffer = block_writev_buffer,
.close = bdrv_fclose
};
@@ -760,10 +754,8 @@ void qemu_savevm_send_open_return_path(QEMUFile *f)
* 0 on success
* -ve on error
*/
-int qemu_savevm_send_packaged(QEMUFile *f, const QEMUSizedBuffer *qsb)
+int qemu_savevm_send_packaged(QEMUFile *f, const uint8_t *buf, size_t len)
{
- size_t cur_iov;
- size_t len = qsb_get_length(qsb);
uint32_t tmp;
if (len > MAX_VM_CMD_PACKAGED_SIZE) {
@@ -777,18 +769,7 @@ int qemu_savevm_send_packaged(QEMUFile *f, const QEMUSizedBuffer *qsb)
trace_qemu_savevm_send_packaged();
qemu_savevm_command_send(f, MIG_CMD_PACKAGED, 4, (uint8_t *)&tmp);
- /* all the data follows (concatinating the iov's) */
- for (cur_iov = 0; cur_iov < qsb->n_iov; cur_iov++) {
- /* The iov entries are partially filled */
- size_t towrite = MIN(qsb->iov[cur_iov].iov_len, len);
- len -= towrite;
-
- if (!towrite) {
- break;
- }
-
- qemu_put_buffer(f, qsb->iov[cur_iov].iov_base, towrite);
- }
+ qemu_put_buffer(f, buf, len);
return 0;
}
@@ -1578,39 +1559,36 @@ static int loadvm_postcopy_handle_run(MigrationIncomingState *mis)
static int loadvm_handle_cmd_packaged(MigrationIncomingState *mis)
{
int ret;
- uint8_t *buffer;
- uint32_t length;
- QEMUSizedBuffer *qsb;
+ size_t length;
+ QIOChannelBuffer *bioc;
length = qemu_get_be32(mis->from_src_file);
trace_loadvm_handle_cmd_packaged(length);
if (length > MAX_VM_CMD_PACKAGED_SIZE) {
- error_report("Unreasonably large packaged state: %u", length);
+ error_report("Unreasonably large packaged state: %zu", length);
return -1;
}
- buffer = g_malloc0(length);
- ret = qemu_get_buffer(mis->from_src_file, buffer, (int)length);
+
+ bioc = qio_channel_buffer_new(length);
+ ret = qemu_get_buffer(mis->from_src_file,
+ bioc->data,
+ length);
if (ret != length) {
- g_free(buffer);
- error_report("CMD_PACKAGED: Buffer receive fail ret=%d length=%d",
+ object_unref(OBJECT(bioc));
+ error_report("CMD_PACKAGED: Buffer receive fail ret=%d length=%zu",
ret, length);
return (ret < 0) ? ret : -EAGAIN;
}
+ bioc->usage += length;
trace_loadvm_handle_cmd_packaged_received(ret);
- /* Setup a dummy QEMUFile that actually reads from the buffer */
- qsb = qsb_create(buffer, length);
- g_free(buffer); /* Because qsb_create copies */
- if (!qsb) {
- error_report("Unable to create qsb");
- }
- QEMUFile *packf = qemu_bufopen("r", qsb);
+ QEMUFile *packf = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
ret = qemu_loadvm_state_main(packf, mis);
trace_loadvm_handle_cmd_packaged_main(ret);
qemu_fclose(packf);
- qsb_free(qsb);
+ object_unref(OBJECT(bioc));
return ret;
}
@@ -2061,6 +2039,7 @@ void hmp_savevm(Monitor *mon, const QDict *qdict)
void qmp_xen_save_devices_state(const char *filename, Error **errp)
{
QEMUFile *f;
+ QIOChannelFile *ioc;
int saved_vm_running;
int ret;
@@ -2068,11 +2047,11 @@ void qmp_xen_save_devices_state(const char *filename, Error **errp)
vm_stop(RUN_STATE_SAVE_VM);
global_state_store_running();
- f = qemu_fopen(filename, "wb");
- if (!f) {
- error_setg_file_open(errp, errno, filename);
+ ioc = qio_channel_file_new_path(filename, O_WRONLY | O_CREAT, 0660, errp);
+ if (!ioc) {
goto the_end;
}
+ f = qemu_fopen_channel_output(QIO_CHANNEL(ioc));
ret = qemu_save_device_state(f);
qemu_fclose(f);
if (ret < 0) {
diff --git a/migration/socket.c b/migration/socket.c
new file mode 100644
index 0000000000..977a8d3c1d
--- /dev/null
+++ b/migration/socket.c
@@ -0,0 +1,183 @@
+/*
+ * QEMU live migration via Unix Domain Sockets
+ *
+ * Copyright Red Hat, Inc. 2009-2016
+ *
+ * Authors:
+ * Chris Lalancette <clalance@redhat.com>
+ * Daniel P. Berrange <berrange@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
+ */
+
+#include "qemu/osdep.h"
+
+#include "qemu-common.h"
+#include "qemu/error-report.h"
+#include "qapi/error.h"
+#include "migration/migration.h"
+#include "migration/qemu-file.h"
+#include "io/channel-socket.h"
+#include "trace.h"
+
+
+static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
+{
+ InetSocketAddress *iaddr = inet_parse(host_port, errp);
+ SocketAddress *saddr;
+
+ if (!iaddr) {
+ return NULL;
+ }
+
+ saddr = g_new0(SocketAddress, 1);
+ saddr->type = SOCKET_ADDRESS_KIND_INET;
+ saddr->u.inet.data = iaddr;
+
+ return saddr;
+}
+
+
+static SocketAddress *unix_build_address(const char *path)
+{
+ SocketAddress *saddr;
+
+ saddr = g_new0(SocketAddress, 1);
+ saddr->type = SOCKET_ADDRESS_KIND_UNIX;
+ saddr->u.q_unix.data = g_new0(UnixSocketAddress, 1);
+ saddr->u.q_unix.data->path = g_strdup(path);
+
+ return saddr;
+}
+
+
+struct SocketConnectData {
+ MigrationState *s;
+ char *hostname;
+};
+
+static void socket_connect_data_free(void *opaque)
+{
+ struct SocketConnectData *data = opaque;
+ if (!data) {
+ return;
+ }
+ g_free(data->hostname);
+ g_free(data);
+}
+
+static void socket_outgoing_migration(Object *src,
+ Error *err,
+ gpointer opaque)
+{
+ struct SocketConnectData *data = opaque;
+ QIOChannel *sioc = QIO_CHANNEL(src);
+
+ if (err) {
+ trace_migration_socket_outgoing_error(error_get_pretty(err));
+ data->s->to_dst_file = NULL;
+ migrate_fd_error(data->s, err);
+ } else {
+ trace_migration_socket_outgoing_connected(data->hostname);
+ migration_set_outgoing_channel(data->s, sioc, data->hostname);
+ }
+ object_unref(src);
+}
+
+static void socket_start_outgoing_migration(MigrationState *s,
+ SocketAddress *saddr,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = qio_channel_socket_new();
+ struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
+ data->s = s;
+ if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
+ data->hostname = g_strdup(saddr->u.inet.data->host);
+ }
+ qio_channel_socket_connect_async(sioc,
+ saddr,
+ socket_outgoing_migration,
+ data,
+ socket_connect_data_free);
+ qapi_free_SocketAddress(saddr);
+}
+
+void tcp_start_outgoing_migration(MigrationState *s,
+ const char *host_port,
+ Error **errp)
+{
+ SocketAddress *saddr = tcp_build_address(host_port, errp);
+ socket_start_outgoing_migration(s, saddr, errp);
+}
+
+void unix_start_outgoing_migration(MigrationState *s,
+ const char *path,
+ Error **errp)
+{
+ SocketAddress *saddr = unix_build_address(path);
+ socket_start_outgoing_migration(s, saddr, errp);
+}
+
+
+static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
+{
+ QIOChannelSocket *sioc;
+ Error *err = NULL;
+
+ sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
+ &err);
+ if (!sioc) {
+ error_report("could not accept migration connection (%s)",
+ error_get_pretty(err));
+ goto out;
+ }
+
+ trace_migration_socket_incoming_accepted();
+
+ migration_set_incoming_channel(migrate_get_current(),
+ QIO_CHANNEL(sioc));
+ object_unref(OBJECT(sioc));
+
+out:
+ /* Close listening socket as its no longer needed */
+ qio_channel_close(ioc, NULL);
+ return FALSE; /* unregister */
+}
+
+
+static void socket_start_incoming_migration(SocketAddress *saddr,
+ Error **errp)
+{
+ QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+
+ if (qio_channel_socket_listen_sync(listen_ioc, saddr, errp) < 0) {
+ object_unref(OBJECT(listen_ioc));
+ qapi_free_SocketAddress(saddr);
+ return;
+ }
+
+ qio_channel_add_watch(QIO_CHANNEL(listen_ioc),
+ G_IO_IN,
+ socket_accept_incoming_migration,
+ listen_ioc,
+ (GDestroyNotify)object_unref);
+ qapi_free_SocketAddress(saddr);
+}
+
+void tcp_start_incoming_migration(const char *host_port, Error **errp)
+{
+ SocketAddress *saddr = tcp_build_address(host_port, errp);
+ socket_start_incoming_migration(saddr, errp);
+}
+
+void unix_start_incoming_migration(const char *path, Error **errp)
+{
+ SocketAddress *saddr = unix_build_address(path);
+ socket_start_incoming_migration(saddr, errp);
+}
diff --git a/migration/tcp.c b/migration/tcp.c
deleted file mode 100644
index e1fa7f8f18..0000000000
--- a/migration/tcp.c
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * QEMU live migration
- *
- * Copyright IBM, Corp. 2008
- *
- * Authors:
- * Anthony Liguori <aliguori@us.ibm.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2. See
- * the COPYING file in the top-level directory.
- *
- * Contributions after 2012-01-13 are licensed under the terms of the
- * GNU GPL, version 2 or (at your option) any later version.
- */
-
-#include "qemu/osdep.h"
-
-#include "qemu-common.h"
-#include "qemu/error-report.h"
-#include "qemu/sockets.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "block/block.h"
-#include "qemu/main-loop.h"
-
-//#define DEBUG_MIGRATION_TCP
-
-#ifdef DEBUG_MIGRATION_TCP
-#define DPRINTF(fmt, ...) \
- do { printf("migration-tcp: " fmt, ## __VA_ARGS__); } while (0)
-#else
-#define DPRINTF(fmt, ...) \
- do { } while (0)
-#endif
-
-static void tcp_wait_for_connect(int fd, Error *err, void *opaque)
-{
- MigrationState *s = opaque;
-
- if (fd < 0) {
- DPRINTF("migrate connect error: %s\n", error_get_pretty(err));
- s->to_dst_file = NULL;
- migrate_fd_error(s);
- } else {
- DPRINTF("migrate connect success\n");
- s->to_dst_file = qemu_fopen_socket(fd, "wb");
- migrate_fd_connect(s);
- }
-}
-
-void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp)
-{
- inet_nonblocking_connect(host_port, tcp_wait_for_connect, s, errp);
-}
-
-static void tcp_accept_incoming_migration(void *opaque)
-{
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
- int s = (intptr_t)opaque;
- QEMUFile *f;
- int c;
-
- do {
- c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
- } while (c < 0 && errno == EINTR);
- qemu_set_fd_handler(s, NULL, NULL, NULL);
- closesocket(s);
-
- DPRINTF("accepted migration\n");
-
- if (c < 0) {
- error_report("could not accept migration connection (%s)",
- strerror(errno));
- return;
- }
-
- f = qemu_fopen_socket(c, "rb");
- if (f == NULL) {
- error_report("could not qemu_fopen socket");
- goto out;
- }
-
- process_incoming_migration(f);
- return;
-
-out:
- closesocket(c);
-}
-
-void tcp_start_incoming_migration(const char *host_port, Error **errp)
-{
- int s;
-
- s = inet_listen(host_port, NULL, 256, SOCK_STREAM, 0, errp);
- if (s < 0) {
- return;
- }
-
- qemu_set_fd_handler(s, tcp_accept_incoming_migration, NULL,
- (void *)(intptr_t)s);
-}
diff --git a/migration/tls.c b/migration/tls.c
new file mode 100644
index 0000000000..75f959ff9c
--- /dev/null
+++ b/migration/tls.c
@@ -0,0 +1,161 @@
+/*
+ * QEMU migration TLS support
+ *
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "migration/migration.h"
+#include "io/channel-tls.h"
+#include "crypto/tlscreds.h"
+#include "qemu/error-report.h"
+#include "qapi/error.h"
+#include "trace.h"
+
+static QCryptoTLSCreds *
+migration_tls_get_creds(MigrationState *s,
+ QCryptoTLSCredsEndpoint endpoint,
+ Error **errp)
+{
+ Object *creds;
+ QCryptoTLSCreds *ret;
+
+ creds = object_resolve_path_component(
+ object_get_objects_root(), s->parameters.tls_creds);
+ if (!creds) {
+ error_setg(errp, "No TLS credentials with id '%s'",
+ s->parameters.tls_creds);
+ return NULL;
+ }
+ ret = (QCryptoTLSCreds *)object_dynamic_cast(
+ creds, TYPE_QCRYPTO_TLS_CREDS);
+ if (!ret) {
+ error_setg(errp, "Object with id '%s' is not TLS credentials",
+ s->parameters.tls_creds);
+ return NULL;
+ }
+ if (ret->endpoint != endpoint) {
+ error_setg(errp,
+ "Expected TLS credentials for a %s endpoint",
+ endpoint == QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT ?
+ "client" : "server");
+ return NULL;
+ }
+
+ object_ref(OBJECT(ret));
+ return ret;
+}
+
+
+static void migration_tls_incoming_handshake(Object *src,
+ Error *err,
+ gpointer opaque)
+{
+ QIOChannel *ioc = QIO_CHANNEL(src);
+
+ if (err) {
+ trace_migration_tls_incoming_handshake_error(error_get_pretty(err));
+ error_report("%s", error_get_pretty(err));
+ } else {
+ trace_migration_tls_incoming_handshake_complete();
+ migration_set_incoming_channel(migrate_get_current(), ioc);
+ }
+ object_unref(OBJECT(ioc));
+}
+
+void migration_tls_set_incoming_channel(MigrationState *s,
+ QIOChannel *ioc,
+ Error **errp)
+{
+ QCryptoTLSCreds *creds;
+ QIOChannelTLS *tioc;
+
+ creds = migration_tls_get_creds(
+ s, QCRYPTO_TLS_CREDS_ENDPOINT_SERVER, errp);
+ if (!creds) {
+ return;
+ }
+
+ tioc = qio_channel_tls_new_server(
+ ioc, creds,
+ NULL, /* XXX pass ACL name */
+ errp);
+ if (!tioc) {
+ return;
+ }
+
+ trace_migration_tls_incoming_handshake_start();
+ qio_channel_tls_handshake(tioc,
+ migration_tls_incoming_handshake,
+ NULL,
+ NULL);
+}
+
+
+static void migration_tls_outgoing_handshake(Object *src,
+ Error *err,
+ gpointer opaque)
+{
+ MigrationState *s = opaque;
+ QIOChannel *ioc = QIO_CHANNEL(src);
+
+ if (err) {
+ trace_migration_tls_outgoing_handshake_error(error_get_pretty(err));
+ s->to_dst_file = NULL;
+ migrate_fd_error(s, err);
+ } else {
+ trace_migration_tls_outgoing_handshake_complete();
+ migration_set_outgoing_channel(s, ioc, NULL);
+ }
+ object_unref(OBJECT(ioc));
+}
+
+
+void migration_tls_set_outgoing_channel(MigrationState *s,
+ QIOChannel *ioc,
+ const char *hostname,
+ Error **errp)
+{
+ QCryptoTLSCreds *creds;
+ QIOChannelTLS *tioc;
+
+ creds = migration_tls_get_creds(
+ s, QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT, errp);
+ if (!creds) {
+ return;
+ }
+
+ if (s->parameters.tls_hostname) {
+ hostname = s->parameters.tls_hostname;
+ }
+ if (!hostname) {
+ error_setg(errp, "No hostname available for TLS");
+ return;
+ }
+
+ tioc = qio_channel_tls_new_client(
+ ioc, creds, hostname, errp);
+ if (!tioc) {
+ return;
+ }
+
+ trace_migration_tls_outgoing_handshake_start(hostname);
+ qio_channel_tls_handshake(tioc,
+ migration_tls_outgoing_handshake,
+ s,
+ NULL);
+}
diff --git a/migration/unix.c b/migration/unix.c
deleted file mode 100644
index d9aac36b9a..0000000000
--- a/migration/unix.c
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * QEMU live migration via Unix Domain Sockets
- *
- * Copyright Red Hat, Inc. 2009
- *
- * Authors:
- * Chris Lalancette <clalance@redhat.com>
- *
- * This work is licensed under the terms of the GNU GPL, version 2. See
- * the COPYING file in the top-level directory.
- *
- * Contributions after 2012-01-13 are licensed under the terms of the
- * GNU GPL, version 2 or (at your option) any later version.
- */
-
-#include "qemu/osdep.h"
-
-#include "qemu-common.h"
-#include "qemu/error-report.h"
-#include "qemu/sockets.h"
-#include "qemu/main-loop.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "block/block.h"
-
-//#define DEBUG_MIGRATION_UNIX
-
-#ifdef DEBUG_MIGRATION_UNIX
-#define DPRINTF(fmt, ...) \
- do { printf("migration-unix: " fmt, ## __VA_ARGS__); } while (0)
-#else
-#define DPRINTF(fmt, ...) \
- do { } while (0)
-#endif
-
-static void unix_wait_for_connect(int fd, Error *err, void *opaque)
-{
- MigrationState *s = opaque;
-
- if (fd < 0) {
- DPRINTF("migrate connect error: %s\n", error_get_pretty(err));
- s->to_dst_file = NULL;
- migrate_fd_error(s);
- } else {
- DPRINTF("migrate connect success\n");
- s->to_dst_file = qemu_fopen_socket(fd, "wb");
- migrate_fd_connect(s);
- }
-}
-
-void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp)
-{
- unix_nonblocking_connect(path, unix_wait_for_connect, s, errp);
-}
-
-static void unix_accept_incoming_migration(void *opaque)
-{
- struct sockaddr_un addr;
- socklen_t addrlen = sizeof(addr);
- int s = (intptr_t)opaque;
- QEMUFile *f;
- int c, err;
-
- do {
- c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
- err = errno;
- } while (c < 0 && err == EINTR);
- qemu_set_fd_handler(s, NULL, NULL, NULL);
- close(s);
-
- DPRINTF("accepted migration\n");
-
- if (c < 0) {
- error_report("could not accept migration connection (%s)",
- strerror(err));
- return;
- }
-
- f = qemu_fopen_socket(c, "rb");
- if (f == NULL) {
- error_report("could not qemu_fopen socket");
- goto out;
- }
-
- process_incoming_migration(f);
- return;
-
-out:
- close(c);
-}
-
-void unix_start_incoming_migration(const char *path, Error **errp)
-{
- int s;
-
- s = unix_listen(path, NULL, 0, errp);
- if (s < 0) {
- return;
- }
-
- qemu_set_fd_handler(s, unix_accept_incoming_migration, NULL,
- (void *)(intptr_t)s);
-}
diff --git a/qapi-schema.json b/qapi-schema.json
index 9a322d1836..8483bdfcce 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -484,6 +484,10 @@
# throttled during auto-converge. This is only present when auto-converge
# has started throttling guest cpus. (Since 2.7)
#
+# @error-desc: #optional the human readable error description string, when
+# @status is 'failed'. Clients should not attempt to parse the
+# error strings. (Since 2.6)
+#
# Since: 0.14.0
##
{ 'struct': 'MigrationInfo',
@@ -494,7 +498,8 @@
'*expected-downtime': 'int',
'*downtime': 'int',
'*setup-time': 'int',
- '*cpu-throttle-percentage': 'int'} }
+ '*cpu-throttle-percentage': 'int',
+ '*error-desc': 'str'} }
##
# @query-migrate
@@ -612,11 +617,28 @@
# @cpu-throttle-increment: throttle percentage increase each time
# auto-converge detects that migration is not making
# progress. The default value is 10. (Since 2.7)
+#
+# @tls-creds: ID of the 'tls-creds' object that provides credentials for
+# establishing a TLS connection over the migration data channel.
+# On the outgoing side of the migration, the credentials must
+# be for a 'client' endpoint, while for the incoming side the
+# credentials must be for a 'server' endpoint. Setting this
+# will enable TLS for all migrations. The default is unset,
+# resulting in unsecured migration at the QEMU level. (Since 2.7)
+#
+# @tls-hostname: hostname of the target host for the migration. This is
+# required when using x509 based TLS credentials and the
+# migration URI does not already include a hostname. For
+# example if using fd: or exec: based migration, the
+# hostname must be provided so that the server's x509
+# certificate identity canbe validated. (Since 2.7)
+#
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
'data': ['compress-level', 'compress-threads', 'decompress-threads',
- 'cpu-throttle-initial', 'cpu-throttle-increment'] }
+ 'cpu-throttle-initial', 'cpu-throttle-increment',
+ 'tls-creds', 'tls-hostname'] }
#
# @migrate-set-parameters
@@ -636,6 +658,22 @@
# @cpu-throttle-increment: throttle percentage increase each time
# auto-converge detects that migration is not making
# progress. The default value is 10. (Since 2.7)
+#
+# @tls-creds: ID of the 'tls-creds' object that provides credentials for
+# establishing a TLS connection over the migration data channel.
+# On the outgoing side of the migration, the credentials must
+# be for a 'client' endpoint, while for the incoming side the
+# credentials must be for a 'server' endpoint. Setting this
+# will enable TLS for all migrations. The default is unset,
+# resulting in unsecured migration at the QEMU level. (Since 2.7)
+#
+# @tls-hostname: hostname of the target host for the migration. This is
+# required when using x509 based TLS credentials and the
+# migration URI does not already include a hostname. For
+# example if using fd: or exec: based migration, the
+# hostname must be provided so that the server's x509
+# certificate identity canbe validated. (Since 2.7)
+#
# Since: 2.4
##
{ 'command': 'migrate-set-parameters',
@@ -643,7 +681,9 @@
'*compress-threads': 'int',
'*decompress-threads': 'int',
'*cpu-throttle-initial': 'int',
- '*cpu-throttle-increment': 'int'} }
+ '*cpu-throttle-increment': 'int',
+ '*tls-creds': 'str',
+ '*tls-hostname': 'str'} }
#
# @MigrationParameters
@@ -662,6 +702,21 @@
# auto-converge detects that migration is not making
# progress. The default value is 10. (Since 2.7)
#
+# @tls-creds: ID of the 'tls-creds' object that provides credentials for
+# establishing a TLS connection over the migration data channel.
+# On the outgoing side of the migration, the credentials must
+# be for a 'client' endpoint, while for the incoming side the
+# credentials must be for a 'server' endpoint. Setting this
+# will enable TLS for all migrations. The default is unset,
+# resulting in unsecured migration at the QEMU level. (Since 2.6)
+#
+# @tls-hostname: hostname of the target host for the migration. This is
+# required when using x509 based TLS credentials and the
+# migration URI does not already include a hostname. For
+# example if using fd: or exec: based migration, the
+# hostname must be provided so that the server's x509
+# certificate identity canbe validated. (Since 2.6)
+#
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
@@ -669,7 +724,9 @@
'compress-threads': 'int',
'decompress-threads': 'int',
'cpu-throttle-initial': 'int',
- 'cpu-throttle-increment': 'int'} }
+ 'cpu-throttle-increment': 'int',
+ 'tls-creds': 'str',
+ 'tls-hostname': 'str'} }
##
# @query-migrate-parameters
#
diff --git a/tests/Makefile b/tests/Makefile
index 55463ed0b2..4bc041c8c7 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -439,9 +439,9 @@ tests/test-qdev-global-props$(EXESUF): tests/test-qdev-global-props.o \
hw/core/fw-path-provider.o \
$(test-qapi-obj-y)
tests/test-vmstate$(EXESUF): tests/test-vmstate.o \
- migration/vmstate.o migration/qemu-file.o migration/qemu-file-buf.o \
- migration/qemu-file-unix.o migration/qjson.o \
- $(test-qom-obj-y)
+ migration/vmstate.o migration/qemu-file.o \
+ migration/qemu-file-channel.o migration/qjson.o \
+ $(test-io-obj-y)
tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \
$(test-util-obj-y)
tests/test-base64$(EXESUF): tests/test-base64.o \
diff --git a/tests/test-vmstate.c b/tests/test-vmstate.c
index 713d4443b2..d19b16a60e 100644
--- a/tests/test-vmstate.c
+++ b/tests/test-vmstate.c
@@ -29,6 +29,7 @@
#include "migration/migration.h"
#include "migration/vmstate.h"
#include "qemu/coroutine.h"
+#include "io/channel-file.h"
static char temp_file[] = "/tmp/vmst.test.XXXXXX";
static int temp_fd;
@@ -44,35 +45,22 @@ void yield_until_fd_readable(int fd)
select(fd + 1, &fds, NULL, NULL, NULL);
}
-/*
- * Some tests use 'open_test_file' to work on a real fd, some use
- * an in memory file (QEMUSizedBuffer+qemu_bufopen); we could pick one
- * but this way we test both.
- */
/* Duplicate temp_fd and seek to the beginning of the file */
static QEMUFile *open_test_file(bool write)
{
int fd = dup(temp_fd);
+ QIOChannel *ioc;
lseek(fd, 0, SEEK_SET);
if (write) {
g_assert_cmpint(ftruncate(fd, 0), ==, 0);
}
- return qemu_fdopen(fd, write ? "wb" : "rb");
-}
-
-/*
- * Check that the contents of the memory-buffered file f match
- * the given size/data.
- */
-static void check_mem_file(QEMUFile *f, void *data, size_t size)
-{
- uint8_t *result = g_malloc(size);
- const QEMUSizedBuffer *qsb = qemu_buf_get(f);
- g_assert_cmpint(qsb_get_length(qsb), ==, size);
- g_assert_cmpint(qsb_get_buffer(qsb, 0, size, result), ==, size);
- g_assert_cmpint(memcmp(result, data, size), ==, 0);
- g_free(result);
+ ioc = QIO_CHANNEL(qio_channel_file_new_fd(fd));
+ if (write) {
+ return qemu_fopen_channel_output(ioc);
+ } else {
+ return qemu_fopen_channel_input(ioc);
+ }
}
#define SUCCESS(val) \
@@ -392,7 +380,7 @@ static const VMStateDescription vmstate_skipping = {
static void test_save_noskip(void)
{
- QEMUFile *fsave = qemu_bufopen("w", NULL);
+ QEMUFile *fsave = open_test_file(true);
TestStruct obj = { .a = 1, .b = 2, .c = 3, .d = 4, .e = 5, .f = 6,
.skip_c_e = false };
vmstate_save_state(fsave, &vmstate_skipping, &obj, NULL);
@@ -406,13 +394,14 @@ static void test_save_noskip(void)
0, 0, 0, 5, /* e */
0, 0, 0, 0, 0, 0, 0, 6, /* f */
};
- check_mem_file(fsave, expected, sizeof(expected));
+
qemu_fclose(fsave);
+ compare_vmstate(expected, sizeof(expected));
}
static void test_save_skip(void)
{
- QEMUFile *fsave = qemu_bufopen("w", NULL);
+ QEMUFile *fsave = open_test_file(true);
TestStruct obj = { .a = 1, .b = 2, .c = 3, .d = 4, .e = 5, .f = 6,
.skip_c_e = true };
vmstate_save_state(fsave, &vmstate_skipping, &obj, NULL);
@@ -424,13 +413,14 @@ static void test_save_skip(void)
0, 0, 0, 0, 0, 0, 0, 4, /* d */
0, 0, 0, 0, 0, 0, 0, 6, /* f */
};
- check_mem_file(fsave, expected, sizeof(expected));
qemu_fclose(fsave);
+ compare_vmstate(expected, sizeof(expected));
}
static void test_load_noskip(void)
{
+ QEMUFile *fsave = open_test_file(true);
uint8_t buf[] = {
0, 0, 0, 10, /* a */
0, 0, 0, 20, /* b */
@@ -440,10 +430,10 @@ static void test_load_noskip(void)
0, 0, 0, 0, 0, 0, 0, 60, /* f */
QEMU_VM_EOF, /* just to ensure we won't get EOF reported prematurely */
};
+ qemu_put_buffer(fsave, buf, sizeof(buf));
+ qemu_fclose(fsave);
- QEMUSizedBuffer *qsb = qsb_create(buf, sizeof(buf));
- g_assert(qsb);
- QEMUFile *loading = qemu_bufopen("r", qsb);
+ QEMUFile *loading = open_test_file(false);
TestStruct obj = { .skip_c_e = false };
vmstate_load_state(loading, &vmstate_skipping, &obj, 2);
g_assert(!qemu_file_get_error(loading));
@@ -454,11 +444,11 @@ static void test_load_noskip(void)
g_assert_cmpint(obj.e, ==, 50);
g_assert_cmpint(obj.f, ==, 60);
qemu_fclose(loading);
- qsb_free(qsb);
}
static void test_load_skip(void)
{
+ QEMUFile *fsave = open_test_file(true);
uint8_t buf[] = {
0, 0, 0, 10, /* a */
0, 0, 0, 20, /* b */
@@ -466,10 +456,10 @@ static void test_load_skip(void)
0, 0, 0, 0, 0, 0, 0, 60, /* f */
QEMU_VM_EOF, /* just to ensure we won't get EOF reported prematurely */
};
+ qemu_put_buffer(fsave, buf, sizeof(buf));
+ qemu_fclose(fsave);
- QEMUSizedBuffer *qsb = qsb_create(buf, sizeof(buf));
- g_assert(qsb);
- QEMUFile *loading = qemu_bufopen("r", qsb);
+ QEMUFile *loading = open_test_file(false);
TestStruct obj = { .skip_c_e = true, .c = 300, .e = 500 };
vmstate_load_state(loading, &vmstate_skipping, &obj, 2);
g_assert(!qemu_file_get_error(loading));
@@ -480,13 +470,14 @@ static void test_load_skip(void)
g_assert_cmpint(obj.e, ==, 500);
g_assert_cmpint(obj.f, ==, 60);
qemu_fclose(loading);
- qsb_free(qsb);
}
int main(int argc, char **argv)
{
temp_fd = mkstemp(temp_file);
+ module_call_init(MODULE_INIT_QOM);
+
g_test_init(&argc, &argv, NULL);
g_test_add_func("/vmstate/simple/primitive", test_simple_primitive);
g_test_add_func("/vmstate/versioned/load/v1", test_load_v1);
diff --git a/trace-events b/trace-events
index 4450d8f377..767fd04595 100644
--- a/trace-events
+++ b/trace-events
@@ -1483,7 +1483,7 @@ await_return_path_close_on_source_close(void) ""
await_return_path_close_on_source_joining(void) ""
migrate_set_state(int new_state) "new state %d"
migrate_fd_cleanup(void) ""
-migrate_fd_error(void) ""
+migrate_fd_error(const char *error_desc) "error=%s"
migrate_fd_cancel(void) ""
migrate_handle_rp_req_pages(const char *rbname, size_t start, size_t len) "in %s at %zx len %zx"
migrate_pending(uint64_t size, uint64_t max, uint64_t post, uint64_t nonpost) "pending size %" PRIu64 " max %" PRIu64 " (post=%" PRIu64 " nonpost=%" PRIu64 ")"
@@ -1513,6 +1513,8 @@ migrate_state_too_big(void) ""
migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64
process_incoming_migration_co_end(int ret, int ps) "ret=%d postcopy-state=%d"
process_incoming_migration_co_postcopy_end_main(void) ""
+migration_set_incoming_channel(void *ioc, const char *ioctype) "ioc=%p ioctype=%s"
+migration_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname) "ioc=%p ioctype=%s hostname=%s"
# migration/rdma.c
qemu_rdma_accept_incoming_migration(void) ""
@@ -1597,6 +1599,27 @@ postcopy_ram_incoming_cleanup_entry(void) ""
postcopy_ram_incoming_cleanup_exit(void) ""
postcopy_ram_incoming_cleanup_join(void) ""
+# migration/exec.c
+migration_exec_outgoing(const char *cmd) "cmd=%s"
+migration_exec_incoming(const char *cmd) "cmd=%s"
+
+# migration/fd.c
+migration_fd_outgoing(int fd) "fd=%d"
+migration_fd_incoming(int fd) "fd=%d"
+
+# migration/socket.c
+migration_socket_incoming_accepted(void) ""
+migration_socket_outgoing_connected(const char *hostname) "hostname=%s"
+migration_socket_outgoing_error(const char *err) "error=%s"
+
+# migration/tls.c
+migration_tls_outgoing_handshake_start(const char *hostname) "hostname=%s"
+migration_tls_outgoing_handshake_error(const char *err) "err=%s"
+migration_tls_outgoing_handshake_complete(void) ""
+migration_tls_incoming_handshake_start(void) ""
+migration_tls_incoming_handshake_error(const char *err) "err=%s"
+migration_tls_incoming_handshake_complete(void) ""
+
# kvm-all.c
kvm_ioctl(int type, void *arg) "type 0x%x, arg %p"
kvm_vm_ioctl(int type, void *arg) "type 0x%x, arg %p"
diff --git a/util/error.c b/util/error.c
index cae2511732..9c40b1f458 100644
--- a/util/error.c
+++ b/util/error.c
@@ -217,7 +217,7 @@ ErrorClass error_get_class(const Error *err)
return err->err_class;
}
-const char *error_get_pretty(Error *err)
+const char *error_get_pretty(const Error *err)
{
return err->msg;
}