aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/io/channel-null.h55
-rw-r--r--io/channel-null.c237
-rw-r--r--io/channel-socket.c19
-rw-r--r--io/meson.build1
-rw-r--r--io/trace-events3
-rw-r--r--migration/block.c10
-rw-r--r--migration/channel-block.c195
-rw-r--r--migration/channel-block.h59
-rw-r--r--migration/channel.c4
-rw-r--r--migration/colo.c5
-rw-r--r--migration/meson.build2
-rw-r--r--migration/migration.c68
-rw-r--r--migration/multifd.c4
-rw-r--r--migration/qemu-file-channel.c194
-rw-r--r--migration/qemu-file-channel.h32
-rw-r--r--migration/qemu-file.c193
-rw-r--r--migration/qemu-file.h125
-rw-r--r--migration/ram.c8
-rw-r--r--migration/rdma.c185
-rw-r--r--migration/savevm.c55
-rw-r--r--migration/vmstate.c5
-rw-r--r--monitor/hmp-cmds.c6
-rw-r--r--qapi/migration.json33
-rw-r--r--tests/unit/meson.build1
-rw-r--r--tests/unit/test-io-channel-null.c95
-rw-r--r--tests/unit/test-vmstate.c5
26 files changed, 928 insertions, 671 deletions
diff --git a/include/io/channel-null.h b/include/io/channel-null.h
new file mode 100644
index 0000000000..f6d54e63cf
--- /dev/null
+++ b/include/io/channel-null.h
@@ -0,0 +1,55 @@
+/*
+ * QEMU I/O channels null driver
+ *
+ * Copyright (c) 2022 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifndef QIO_CHANNEL_FILE_H
+#define QIO_CHANNEL_FILE_H
+
+#include "io/channel.h"
+#include "qom/object.h"
+
+#define TYPE_QIO_CHANNEL_NULL "qio-channel-null"
+OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelNull, QIO_CHANNEL_NULL)
+
+
+/**
+ * QIOChannelNull:
+ *
+ * The QIOChannelNull object provides a channel implementation
+ * that discards all writes and returns EOF for all reads.
+ */
+
+struct QIOChannelNull {
+ QIOChannel parent;
+ bool closed;
+};
+
+
+/**
+ * qio_channel_null_new:
+ *
+ * Create a new IO channel object that discards all writes
+ * and returns EOF for all reads.
+ *
+ * Returns: the new channel object
+ */
+QIOChannelNull *
+qio_channel_null_new(void);
+
+#endif /* QIO_CHANNEL_NULL_H */
diff --git a/io/channel-null.c b/io/channel-null.c
new file mode 100644
index 0000000000..75e3781507
--- /dev/null
+++ b/io/channel-null.c
@@ -0,0 +1,237 @@
+/*
+ * QEMU I/O channels null driver
+ *
+ * Copyright (c) 2022 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-null.h"
+#include "io/channel-watch.h"
+#include "qapi/error.h"
+#include "trace.h"
+#include "qemu/iov.h"
+
+typedef struct QIOChannelNullSource QIOChannelNullSource;
+struct QIOChannelNullSource {
+ GSource parent;
+ QIOChannel *ioc;
+ GIOCondition condition;
+};
+
+
+QIOChannelNull *
+qio_channel_null_new(void)
+{
+ QIOChannelNull *ioc;
+
+ ioc = QIO_CHANNEL_NULL(object_new(TYPE_QIO_CHANNEL_NULL));
+
+ trace_qio_channel_null_new(ioc);
+
+ return ioc;
+}
+
+
+static void
+qio_channel_null_init(Object *obj)
+{
+ QIOChannelNull *ioc = QIO_CHANNEL_NULL(obj);
+ ioc->closed = false;
+}
+
+
+static ssize_t
+qio_channel_null_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds G_GNUC_UNUSED,
+ size_t *nfds G_GNUC_UNUSED,
+ Error **errp)
+{
+ QIOChannelNull *nioc = QIO_CHANNEL_NULL(ioc);
+
+ if (nioc->closed) {
+ error_setg_errno(errp, EINVAL,
+ "Channel is closed");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static ssize_t
+qio_channel_null_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds G_GNUC_UNUSED,
+ size_t nfds G_GNUC_UNUSED,
+ int flags G_GNUC_UNUSED,
+ Error **errp)
+{
+ QIOChannelNull *nioc = QIO_CHANNEL_NULL(ioc);
+
+ if (nioc->closed) {
+ error_setg_errno(errp, EINVAL,
+ "Channel is closed");
+ return -1;
+ }
+
+ return iov_size(iov, niov);
+}
+
+
+static int
+qio_channel_null_set_blocking(QIOChannel *ioc G_GNUC_UNUSED,
+ bool enabled G_GNUC_UNUSED,
+ Error **errp G_GNUC_UNUSED)
+{
+ return 0;
+}
+
+
+static off_t
+qio_channel_null_seek(QIOChannel *ioc G_GNUC_UNUSED,
+ off_t offset G_GNUC_UNUSED,
+ int whence G_GNUC_UNUSED,
+ Error **errp G_GNUC_UNUSED)
+{
+ return 0;
+}
+
+
+static int
+qio_channel_null_close(QIOChannel *ioc,
+ Error **errp G_GNUC_UNUSED)
+{
+ QIOChannelNull *nioc = QIO_CHANNEL_NULL(ioc);
+
+ nioc->closed = true;
+ return 0;
+}
+
+
+static void
+qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
+ AioContext *ctx G_GNUC_UNUSED,
+ IOHandler *io_read G_GNUC_UNUSED,
+ IOHandler *io_write G_GNUC_UNUSED,
+ void *opaque G_GNUC_UNUSED)
+{
+}
+
+
+static gboolean
+qio_channel_null_source_prepare(GSource *source G_GNUC_UNUSED,
+ gint *timeout)
+{
+ *timeout = -1;
+
+ return TRUE;
+}
+
+
+static gboolean
+qio_channel_null_source_check(GSource *source G_GNUC_UNUSED)
+{
+ return TRUE;
+}
+
+
+static gboolean
+qio_channel_null_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelNullSource *ssource = (QIOChannelNullSource *)source;
+
+ return (*func)(ssource->ioc,
+ ssource->condition,
+ user_data);
+}
+
+
+static void
+qio_channel_null_source_finalize(GSource *source)
+{
+ QIOChannelNullSource *ssource = (QIOChannelNullSource *)source;
+
+ object_unref(OBJECT(ssource->ioc));
+}
+
+
+GSourceFuncs qio_channel_null_source_funcs = {
+ qio_channel_null_source_prepare,
+ qio_channel_null_source_check,
+ qio_channel_null_source_dispatch,
+ qio_channel_null_source_finalize
+};
+
+
+static GSource *
+qio_channel_null_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
+{
+ GSource *source;
+ QIOChannelNullSource *ssource;
+
+ source = g_source_new(&qio_channel_null_source_funcs,
+ sizeof(QIOChannelNullSource));
+ ssource = (QIOChannelNullSource *)source;
+
+ ssource->ioc = ioc;
+ object_ref(OBJECT(ioc));
+
+ ssource->condition = condition;
+
+ return source;
+}
+
+
+static void
+qio_channel_null_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_null_writev;
+ ioc_klass->io_readv = qio_channel_null_readv;
+ ioc_klass->io_set_blocking = qio_channel_null_set_blocking;
+ ioc_klass->io_seek = qio_channel_null_seek;
+ ioc_klass->io_close = qio_channel_null_close;
+ ioc_klass->io_create_watch = qio_channel_null_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_null_set_aio_fd_handler;
+}
+
+
+static const TypeInfo qio_channel_null_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_NULL,
+ .instance_size = sizeof(QIOChannelNull),
+ .instance_init = qio_channel_null_init,
+ .class_init = qio_channel_null_class_init,
+};
+
+
+static void
+qio_channel_null_register_types(void)
+{
+ type_register_static(&qio_channel_null_info);
+}
+
+type_init(qio_channel_null_register_types);
diff --git a/io/channel-socket.c b/io/channel-socket.c
index dc9c165de1..4466bb1cd4 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -578,11 +578,17 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
memcpy(CMSG_DATA(cmsg), fds, fdsize);
}
-#ifdef QEMU_MSG_ZEROCOPY
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
+#ifdef QEMU_MSG_ZEROCOPY
sflags = MSG_ZEROCOPY;
- }
+#else
+ /*
+ * We expect QIOChannel class entry point to have
+ * blocked this code path already
+ */
+ g_assert_not_reached();
#endif
+ }
retry:
ret = sendmsg(sioc->fd, &msg, sflags);
@@ -592,21 +598,24 @@ static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
return QIO_CHANNEL_ERR_BLOCK;
case EINTR:
goto retry;
-#ifdef QEMU_MSG_ZEROCOPY
case ENOBUFS:
- if (sflags & MSG_ZEROCOPY) {
+ if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
error_setg_errno(errp, errno,
"Process can't lock enough memory for using MSG_ZEROCOPY");
return -1;
}
break;
-#endif
}
error_setg_errno(errp, errno,
"Unable to write to socket");
return -1;
}
+
+ if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
+ sioc->zero_copy_queued++;
+ }
+
return ret;
}
#else /* WIN32 */
diff --git a/io/meson.build b/io/meson.build
index bbcd3c53a4..283b9b2bdb 100644
--- a/io/meson.build
+++ b/io/meson.build
@@ -3,6 +3,7 @@ io_ss.add(files(
'channel-buffer.c',
'channel-command.c',
'channel-file.c',
+ 'channel-null.c',
'channel-socket.c',
'channel-tls.c',
'channel-util.c',
diff --git a/io/trace-events b/io/trace-events
index c5e814eb44..3cc5cf1efd 100644
--- a/io/trace-events
+++ b/io/trace-events
@@ -10,6 +10,9 @@ qio_task_thread_result(void *task) "Task thread result task=%p"
qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p"
qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p"
+# channel-null.c
+qio_channel_null_new(void *ioc) "Null new ioc=%p"
+
# channel-socket.c
qio_channel_socket_new(void *ioc) "Socket new ioc=%p"
qio_channel_socket_new_fd(void *ioc, int fd) "Socket new ioc=%p fd=%d"
diff --git a/migration/block.c b/migration/block.c
index 077a413325..823453c977 100644
--- a/migration/block.c
+++ b/migration/block.c
@@ -756,8 +756,8 @@ static int block_save_setup(QEMUFile *f, void *opaque)
static int block_save_iterate(QEMUFile *f, void *opaque)
{
int ret;
- int64_t last_ftell = qemu_ftell(f);
- int64_t delta_ftell;
+ int64_t last_bytes = qemu_file_total_transferred(f);
+ int64_t delta_bytes;
trace_migration_block_save("iterate", block_mig_state.submitted,
block_mig_state.transferred);
@@ -809,10 +809,10 @@ static int block_save_iterate(QEMUFile *f, void *opaque)
}
qemu_put_be64(f, BLK_MIG_FLAG_EOS);
- delta_ftell = qemu_ftell(f) - last_ftell;
- if (delta_ftell > 0) {
+ delta_bytes = qemu_file_total_transferred(f) - last_bytes;
+ if (delta_bytes > 0) {
return 1;
- } else if (delta_ftell < 0) {
+ } else if (delta_bytes < 0) {
return -1;
} else {
return 0;
diff --git a/migration/channel-block.c b/migration/channel-block.c
new file mode 100644
index 0000000000..c55c8c93ce
--- /dev/null
+++ b/migration/channel-block.c
@@ -0,0 +1,195 @@
+/*
+ * QEMU I/O channels block driver
+ *
+ * Copyright (c) 2022 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "migration/channel-block.h"
+#include "qapi/error.h"
+#include "block/block.h"
+#include "trace.h"
+
+QIOChannelBlock *
+qio_channel_block_new(BlockDriverState *bs)
+{
+ QIOChannelBlock *ioc;
+
+ ioc = QIO_CHANNEL_BLOCK(object_new(TYPE_QIO_CHANNEL_BLOCK));
+
+ bdrv_ref(bs);
+ ioc->bs = bs;
+
+ return ioc;
+}
+
+
+static void
+qio_channel_block_finalize(Object *obj)
+{
+ QIOChannelBlock *ioc = QIO_CHANNEL_BLOCK(obj);
+
+ g_clear_pointer(&ioc->bs, bdrv_unref);
+}
+
+
+static ssize_t
+qio_channel_block_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelBlock *bioc = QIO_CHANNEL_BLOCK(ioc);
+ QEMUIOVector qiov;
+ int ret;
+
+ qemu_iovec_init_external(&qiov, (struct iovec *)iov, niov);
+ ret = bdrv_readv_vmstate(bioc->bs, &qiov, bioc->offset);
+ if (ret < 0) {
+ return ret;
+ }
+
+ bioc->offset += qiov.size;
+ return qiov.size;
+}
+
+
+static ssize_t
+qio_channel_block_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ int flags,
+ Error **errp)
+{
+ QIOChannelBlock *bioc = QIO_CHANNEL_BLOCK(ioc);
+ QEMUIOVector qiov;
+ int ret;
+
+ qemu_iovec_init_external(&qiov, (struct iovec *)iov, niov);
+ ret = bdrv_writev_vmstate(bioc->bs, &qiov, bioc->offset);
+ if (ret < 0) {
+ return ret;
+ }
+
+ bioc->offset += qiov.size;
+ return qiov.size;
+}
+
+
+static int
+qio_channel_block_set_blocking(QIOChannel *ioc,
+ bool enabled,
+ Error **errp)
+{
+ if (!enabled) {
+ error_setg(errp, "Non-blocking mode not supported for block devices");
+ return -1;
+ }
+ return 0;
+}
+
+
+static off_t
+qio_channel_block_seek(QIOChannel *ioc,
+ off_t offset,
+ int whence,
+ Error **errp)
+{
+ QIOChannelBlock *bioc = QIO_CHANNEL_BLOCK(ioc);
+
+ switch (whence) {
+ case SEEK_SET:
+ bioc->offset = offset;
+ break;
+ case SEEK_CUR:
+ bioc->offset += whence;
+ break;
+ case SEEK_END:
+ error_setg(errp, "Size of VMstate region is unknown");
+ return (off_t)-1;
+ default:
+ g_assert_not_reached();
+ }
+
+ return bioc->offset;
+}
+
+
+static int
+qio_channel_block_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelBlock *bioc = QIO_CHANNEL_BLOCK(ioc);
+ int rv = bdrv_flush(bioc->bs);
+
+ if (rv < 0) {
+ error_setg_errno(errp, -rv,
+ "Unable to flush VMState");
+ return -1;
+ }
+
+ g_clear_pointer(&bioc->bs, bdrv_unref);
+ bioc->offset = 0;
+
+ return 0;
+}
+
+
+static void
+qio_channel_block_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ /* XXX anything we can do here ? */
+}
+
+
+static void
+qio_channel_block_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_block_writev;
+ ioc_klass->io_readv = qio_channel_block_readv;
+ ioc_klass->io_set_blocking = qio_channel_block_set_blocking;
+ ioc_klass->io_seek = qio_channel_block_seek;
+ ioc_klass->io_close = qio_channel_block_close;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_block_set_aio_fd_handler;
+}
+
+static const TypeInfo qio_channel_block_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_BLOCK,
+ .instance_size = sizeof(QIOChannelBlock),
+ .instance_finalize = qio_channel_block_finalize,
+ .class_init = qio_channel_block_class_init,
+};
+
+static void
+qio_channel_block_register_types(void)
+{
+ type_register_static(&qio_channel_block_info);
+}
+
+type_init(qio_channel_block_register_types);
diff --git a/migration/channel-block.h b/migration/channel-block.h
new file mode 100644
index 0000000000..31673824e6
--- /dev/null
+++ b/migration/channel-block.h
@@ -0,0 +1,59 @@
+/*
+ * QEMU I/O channels block driver
+ *
+ * Copyright (c) 2022 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#ifndef QIO_CHANNEL_BLOCK_H
+#define QIO_CHANNEL_BLOCK_H
+
+#include "io/channel.h"
+#include "qom/object.h"
+
+#define TYPE_QIO_CHANNEL_BLOCK "qio-channel-block"
+OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelBlock, QIO_CHANNEL_BLOCK)
+
+
+/**
+ * QIOChannelBlock:
+ *
+ * The QIOChannelBlock object provides a channel implementation
+ * that is able to perform I/O on the BlockDriverState objects
+ * to the VMState region.
+ */
+
+struct QIOChannelBlock {
+ QIOChannel parent;
+ BlockDriverState *bs;
+ off_t offset;
+};
+
+
+/**
+ * qio_channel_block_new:
+ * @bs: the block driver state
+ *
+ * Create a new IO channel object that can perform
+ * I/O on a BlockDriverState object to the VMState
+ * region
+ *
+ * Returns: the new channel object
+ */
+QIOChannelBlock *
+qio_channel_block_new(BlockDriverState *bs);
+
+#endif /* QIO_CHANNEL_BLOCK_H */
diff --git a/migration/channel.c b/migration/channel.c
index a162d00fea..90087d8986 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -14,7 +14,7 @@
#include "channel.h"
#include "tls.h"
#include "migration.h"
-#include "qemu-file-channel.h"
+#include "qemu-file.h"
#include "trace.h"
#include "qapi/error.h"
#include "io/channel-tls.h"
@@ -85,7 +85,7 @@ void migration_channel_connect(MigrationState *s,
return;
}
} else {
- QEMUFile *f = qemu_fopen_channel_output(ioc);
+ QEMUFile *f = qemu_file_new_output(ioc);
migration_ioc_register_yank(ioc);
diff --git a/migration/colo.c b/migration/colo.c
index 5f7071b3cd..2b71722fd6 100644
--- a/migration/colo.c
+++ b/migration/colo.c
@@ -14,7 +14,6 @@
#include "sysemu/sysemu.h"
#include "qapi/error.h"
#include "qapi/qapi-commands-migration.h"
-#include "qemu-file-channel.h"
#include "migration.h"
#include "qemu-file.h"
#include "savevm.h"
@@ -559,7 +558,7 @@ static void colo_process_checkpoint(MigrationState *s)
goto out;
}
bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
- fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+ fb = qemu_file_new_output(QIO_CHANNEL(bioc));
object_unref(OBJECT(bioc));
qemu_mutex_lock_iothread();
@@ -873,7 +872,7 @@ void *colo_process_incoming_thread(void *opaque)
colo_incoming_start_dirty_log();
bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
- fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
+ fb = qemu_file_new_input(QIO_CHANNEL(bioc));
object_unref(OBJECT(bioc));
qemu_mutex_lock_iothread();
diff --git a/migration/meson.build b/migration/meson.build
index 6880b61b10..690487cf1a 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -4,7 +4,6 @@ migration_files = files(
'xbzrle.c',
'vmstate-types.c',
'vmstate.c',
- 'qemu-file-channel.c',
'qemu-file.c',
'yank_functions.c',
)
@@ -13,6 +12,7 @@ softmmu_ss.add(migration_files)
softmmu_ss.add(files(
'block-dirty-bitmap.c',
'channel.c',
+ 'channel-block.c',
'colo-failover.c',
'colo.c',
'exec.c',
diff --git a/migration/migration.c b/migration/migration.c
index 31739b2af9..78f5057373 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -30,7 +30,6 @@
#include "migration/misc.h"
#include "migration.h"
#include "savevm.h"
-#include "qemu-file-channel.h"
#include "qemu-file.h"
#include "migration/vmstate.h"
#include "block/block.h"
@@ -163,7 +162,8 @@ INITIALIZE_MIGRATE_CAPS_SET(check_caps_background_snapshot,
MIGRATION_CAPABILITY_COMPRESS,
MIGRATION_CAPABILITY_XBZRLE,
MIGRATION_CAPABILITY_X_COLO,
- MIGRATION_CAPABILITY_VALIDATE_UUID);
+ MIGRATION_CAPABILITY_VALIDATE_UUID,
+ MIGRATION_CAPABILITY_ZERO_COPY_SEND);
/* When we add fault tolerance, we could have several
migrations at once. For now we don't need to add
@@ -722,7 +722,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
if (!mis->from_src_file) {
/* The first connection (multifd may have multiple) */
- QEMUFile *f = qemu_fopen_channel_input(ioc);
+ QEMUFile *f = qemu_file_new_input(ioc);
if (!migration_incoming_setup(f, errp)) {
return;
@@ -910,10 +910,6 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp)
params->multifd_zlib_level = s->parameters.multifd_zlib_level;
params->has_multifd_zstd_level = true;
params->multifd_zstd_level = s->parameters.multifd_zstd_level;
-#ifdef CONFIG_LINUX
- params->has_zero_copy_send = true;
- params->zero_copy_send = s->parameters.zero_copy_send;
-#endif
params->has_xbzrle_cache_size = true;
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
@@ -1275,6 +1271,24 @@ static bool migrate_caps_check(bool *cap_list,
}
}
+#ifdef CONFIG_LINUX
+ if (cap_list[MIGRATION_CAPABILITY_ZERO_COPY_SEND] &&
+ (!cap_list[MIGRATION_CAPABILITY_MULTIFD] ||
+ migrate_use_compression() ||
+ migrate_use_tls())) {
+ error_setg(errp,
+ "Zero copy only available for non-compressed non-TLS multifd migration");
+ return false;
+ }
+#else
+ if (cap_list[MIGRATION_CAPABILITY_ZERO_COPY_SEND]) {
+ error_setg(errp,
+ "Zero copy currently only available on Linux");
+ return false;
+ }
+#endif
+
+
/* incoming side only */
if (runstate_check(RUN_STATE_INMIGRATE) &&
!migrate_multi_channels_is_allowed() &&
@@ -1497,16 +1511,6 @@ static bool migrate_params_check(MigrationParameters *params, Error **errp)
error_prepend(errp, "Invalid mapping given for block-bitmap-mapping: ");
return false;
}
-#ifdef CONFIG_LINUX
- if (params->zero_copy_send &&
- (!migrate_use_multifd() ||
- params->multifd_compression != MULTIFD_COMPRESSION_NONE ||
- (params->tls_creds && *params->tls_creds))) {
- error_setg(errp,
- "Zero copy only available for non-compressed non-TLS multifd migration");
- return false;
- }
-#endif
return true;
}
@@ -1580,11 +1584,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
if (params->has_multifd_compression) {
dest->multifd_compression = params->multifd_compression;
}
-#ifdef CONFIG_LINUX
- if (params->has_zero_copy_send) {
- dest->zero_copy_send = params->zero_copy_send;
- }
-#endif
if (params->has_xbzrle_cache_size) {
dest->xbzrle_cache_size = params->xbzrle_cache_size;
}
@@ -1697,11 +1696,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
if (params->has_multifd_compression) {
s->parameters.multifd_compression = params->multifd_compression;
}
-#ifdef CONFIG_LINUX
- if (params->has_zero_copy_send) {
- s->parameters.zero_copy_send = params->zero_copy_send;
- }
-#endif
if (params->has_xbzrle_cache_size) {
s->parameters.xbzrle_cache_size = params->xbzrle_cache_size;
xbzrle_cache_resize(params->xbzrle_cache_size, errp);
@@ -2593,7 +2587,7 @@ bool migrate_use_zero_copy_send(void)
s = migrate_get_current();
- return s->parameters.zero_copy_send;
+ return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_COPY_SEND];
}
#endif
@@ -3081,7 +3075,7 @@ static int postcopy_start(MigrationState *ms)
*/
bioc = qio_channel_buffer_new(4096);
qio_channel_set_name(QIO_CHANNEL(bioc), "migration-postcopy-buffer");
- fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+ fb = qemu_file_new_output(QIO_CHANNEL(bioc));
object_unref(OBJECT(bioc));
/*
@@ -3544,7 +3538,8 @@ static MigThrError migration_detect_error(MigrationState *s)
/* How many bytes have we transferred since the beginning of the migration */
static uint64_t migration_total_bytes(MigrationState *s)
{
- return qemu_ftell(s->to_dst_file) + ram_counters.multifd_bytes;
+ return qemu_file_total_transferred(s->to_dst_file) +
+ ram_counters.multifd_bytes;
}
static void migration_calculate_complete(MigrationState *s)
@@ -3970,7 +3965,7 @@ static void *bg_migration_thread(void *opaque)
*/
s->bioc = qio_channel_buffer_new(512 * 1024);
qio_channel_set_name(QIO_CHANNEL(s->bioc), "vmstate-buffer");
- fb = qemu_fopen_channel_output(QIO_CHANNEL(s->bioc));
+ fb = qemu_file_new_output(QIO_CHANNEL(s->bioc));
object_unref(OBJECT(s->bioc));
update_iteration_initial_status(s);
@@ -4249,10 +4244,6 @@ static Property migration_properties[] = {
DEFINE_PROP_UINT8("multifd-zstd-level", MigrationState,
parameters.multifd_zstd_level,
DEFAULT_MIGRATE_MULTIFD_ZSTD_LEVEL),
-#ifdef CONFIG_LINUX
- DEFINE_PROP_BOOL("zero_copy_send", MigrationState,
- parameters.zero_copy_send, false),
-#endif
DEFINE_PROP_SIZE("xbzrle-cache-size", MigrationState,
parameters.xbzrle_cache_size,
DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE),
@@ -4290,6 +4281,10 @@ static Property migration_properties[] = {
DEFINE_PROP_MIG_CAP("x-multifd", MIGRATION_CAPABILITY_MULTIFD),
DEFINE_PROP_MIG_CAP("x-background-snapshot",
MIGRATION_CAPABILITY_BACKGROUND_SNAPSHOT),
+#ifdef CONFIG_LINUX
+ DEFINE_PROP_MIG_CAP("x-zero-copy-send",
+ MIGRATION_CAPABILITY_ZERO_COPY_SEND),
+#endif
DEFINE_PROP_END_OF_LIST(),
};
@@ -4350,9 +4345,6 @@ static void migration_instance_init(Object *obj)
params->has_multifd_compression = true;
params->has_multifd_zlib_level = true;
params->has_multifd_zstd_level = true;
-#ifdef CONFIG_LINUX
- params->has_zero_copy_send = true;
-#endif
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
params->has_max_cpu_throttle = true;
diff --git a/migration/multifd.c b/migration/multifd.c
index 9282ab6aa4..684c014c86 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -435,7 +435,7 @@ static int multifd_send_pages(QEMUFile *f)
p->pages = pages;
transferred = ((uint64_t) pages->num) * qemu_target_page_size()
+ p->packet_len;
- qemu_file_update_transfer(f, transferred);
+ qemu_file_acct_rate_limit(f, transferred);
ram_counters.multifd_bytes += transferred;
ram_counters.transferred += transferred;
qemu_mutex_unlock(&p->mutex);
@@ -610,7 +610,7 @@ int multifd_send_sync_main(QEMUFile *f)
p->packet_num = multifd_send_state->packet_num++;
p->flags |= MULTIFD_FLAG_SYNC;
p->pending_job++;
- qemu_file_update_transfer(f, p->packet_len);
+ qemu_file_acct_rate_limit(f, p->packet_len);
ram_counters.multifd_bytes += p->packet_len;
ram_counters.transferred += p->packet_len;
qemu_mutex_unlock(&p->mutex);
diff --git a/migration/qemu-file-channel.c b/migration/qemu-file-channel.c
deleted file mode 100644
index bb5a5752df..0000000000
--- a/migration/qemu-file-channel.c
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * QEMUFile backend for QIOChannel objects
- *
- * Copyright (c) 2015-2016 Red Hat, Inc
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-
-#include "qemu/osdep.h"
-#include "qemu-file-channel.h"
-#include "qemu-file.h"
-#include "io/channel-socket.h"
-#include "io/channel-tls.h"
-#include "qemu/iov.h"
-#include "qemu/yank.h"
-#include "yank_functions.h"
-
-
-static ssize_t channel_writev_buffer(void *opaque,
- struct iovec *iov,
- int iovcnt,
- int64_t pos,
- Error **errp)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
- ssize_t done = 0;
- struct iovec *local_iov = g_new(struct iovec, iovcnt);
- struct iovec *local_iov_head = local_iov;
- unsigned int nlocal_iov = iovcnt;
-
- nlocal_iov = iov_copy(local_iov, nlocal_iov,
- iov, iovcnt,
- 0, iov_size(iov, iovcnt));
-
- while (nlocal_iov > 0) {
- ssize_t len;
- len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
- if (len == QIO_CHANNEL_ERR_BLOCK) {
- if (qemu_in_coroutine()) {
- qio_channel_yield(ioc, G_IO_OUT);
- } else {
- qio_channel_wait(ioc, G_IO_OUT);
- }
- continue;
- }
- if (len < 0) {
- done = -EIO;
- goto cleanup;
- }
-
- iov_discard_front(&local_iov, &nlocal_iov, len);
- done += len;
- }
-
- cleanup:
- g_free(local_iov_head);
- return done;
-}
-
-
-static ssize_t channel_get_buffer(void *opaque,
- uint8_t *buf,
- int64_t pos,
- size_t size,
- Error **errp)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
- ssize_t ret;
-
- do {
- ret = qio_channel_read(ioc, (char *)buf, size, errp);
- if (ret < 0) {
- if (ret == QIO_CHANNEL_ERR_BLOCK) {
- if (qemu_in_coroutine()) {
- qio_channel_yield(ioc, G_IO_IN);
- } else {
- qio_channel_wait(ioc, G_IO_IN);
- }
- } else {
- return -EIO;
- }
- }
- } while (ret == QIO_CHANNEL_ERR_BLOCK);
-
- return ret;
-}
-
-
-static int channel_close(void *opaque, Error **errp)
-{
- int ret;
- QIOChannel *ioc = QIO_CHANNEL(opaque);
- ret = qio_channel_close(ioc, errp);
- object_unref(OBJECT(ioc));
- return ret;
-}
-
-
-static int channel_shutdown(void *opaque,
- bool rd,
- bool wr,
- Error **errp)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
-
- if (qio_channel_has_feature(ioc,
- QIO_CHANNEL_FEATURE_SHUTDOWN)) {
- QIOChannelShutdown mode;
- if (rd && wr) {
- mode = QIO_CHANNEL_SHUTDOWN_BOTH;
- } else if (rd) {
- mode = QIO_CHANNEL_SHUTDOWN_READ;
- } else {
- mode = QIO_CHANNEL_SHUTDOWN_WRITE;
- }
- if (qio_channel_shutdown(ioc, mode, errp) < 0) {
- return -EIO;
- }
- }
- return 0;
-}
-
-
-static int channel_set_blocking(void *opaque,
- bool enabled,
- Error **errp)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
-
- if (qio_channel_set_blocking(ioc, enabled, errp) < 0) {
- return -1;
- }
- return 0;
-}
-
-static QEMUFile *channel_get_input_return_path(void *opaque)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
-
- return qemu_fopen_channel_output(ioc);
-}
-
-static QEMUFile *channel_get_output_return_path(void *opaque)
-{
- QIOChannel *ioc = QIO_CHANNEL(opaque);
-
- return qemu_fopen_channel_input(ioc);
-}
-
-static const QEMUFileOps channel_input_ops = {
- .get_buffer = channel_get_buffer,
- .close = channel_close,
- .shut_down = channel_shutdown,
- .set_blocking = channel_set_blocking,
- .get_return_path = channel_get_input_return_path,
-};
-
-
-static const QEMUFileOps channel_output_ops = {
- .writev_buffer = channel_writev_buffer,
- .close = channel_close,
- .shut_down = channel_shutdown,
- .set_blocking = channel_set_blocking,
- .get_return_path = channel_get_output_return_path,
-};
-
-
-QEMUFile *qemu_fopen_channel_input(QIOChannel *ioc)
-{
- object_ref(OBJECT(ioc));
- return qemu_fopen_ops(ioc, &channel_input_ops, true);
-}
-
-QEMUFile *qemu_fopen_channel_output(QIOChannel *ioc)
-{
- object_ref(OBJECT(ioc));
- return qemu_fopen_ops(ioc, &channel_output_ops, true);
-}
diff --git a/migration/qemu-file-channel.h b/migration/qemu-file-channel.h
deleted file mode 100644
index 0028a09eb6..0000000000
--- a/migration/qemu-file-channel.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * QEMUFile backend for QIOChannel objects
- *
- * Copyright (c) 2015-2016 Red Hat, Inc
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-
-#ifndef QEMU_FILE_CHANNEL_H
-#define QEMU_FILE_CHANNEL_H
-
-#include "io/channel.h"
-
-QEMUFile *qemu_fopen_channel_input(QIOChannel *ioc);
-QEMUFile *qemu_fopen_channel_output(QIOChannel *ioc);
-#endif
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1479cddad9..1e80d496b7 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -35,15 +35,24 @@
#define MAX_IOV_SIZE MIN_CONST(IOV_MAX, 64)
struct QEMUFile {
- const QEMUFileOps *ops;
const QEMUFileHooks *hooks;
- void *opaque;
+ QIOChannel *ioc;
+ bool is_writable;
- int64_t bytes_xfer;
- int64_t xfer_limit;
+ /*
+ * Maximum amount of data in bytes to transfer during one
+ * rate limiting time window
+ */
+ int64_t rate_limit_max;
+ /*
+ * Total amount of data in bytes queued for transfer
+ * during this rate limiting time window
+ */
+ int64_t rate_limit_used;
+
+ /* The sum of bytes transferred on the wire */
+ int64_t total_transferred;
- int64_t pos; /* start of buffer when writing, end of buffer
- when reading */
int buf_index;
int buf_size; /* 0 when writing */
uint8_t buf[IO_BUF_SIZE];
@@ -56,23 +65,28 @@ struct QEMUFile {
Error *last_error_obj;
/* has the file has been shutdown */
bool shutdown;
- /* Whether opaque points to a QIOChannel */
- bool has_ioc;
};
/*
* Stop a file from being read/written - not all backing files can do this
* typically only sockets can.
+ *
+ * TODO: convert to propagate Error objects instead of squashing
+ * to a fixed errno value
*/
int qemu_file_shutdown(QEMUFile *f)
{
- int ret;
+ int ret = 0;
f->shutdown = true;
- if (!f->ops->shut_down) {
+ if (!qio_channel_has_feature(f->ioc,
+ QIO_CHANNEL_FEATURE_SHUTDOWN)) {
return -ENOSYS;
}
- ret = f->ops->shut_down(f->opaque, true, true, NULL);
+
+ if (qio_channel_shutdown(f->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL) < 0) {
+ ret = -EIO;
+ }
if (!f->last_error) {
qemu_file_set_error(f, -EIO);
@@ -80,18 +94,6 @@ int qemu_file_shutdown(QEMUFile *f)
return ret;
}
-/*
- * Result: QEMUFile* for a 'return path' for comms in the opposite direction
- * NULL if not available
- */
-QEMUFile *qemu_file_get_return_path(QEMUFile *f)
-{
- if (!f->ops->get_return_path) {
- return NULL;
- }
- return f->ops->get_return_path(f->opaque);
-}
-
bool qemu_file_mode_is_not_valid(const char *mode)
{
if (mode == NULL ||
@@ -104,18 +106,37 @@ bool qemu_file_mode_is_not_valid(const char *mode)
return false;
}
-QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops, bool has_ioc)
+static QEMUFile *qemu_file_new_impl(QIOChannel *ioc, bool is_writable)
{
QEMUFile *f;
f = g_new0(QEMUFile, 1);
- f->opaque = opaque;
- f->ops = ops;
- f->has_ioc = has_ioc;
+ object_ref(ioc);
+ f->ioc = ioc;
+ f->is_writable = is_writable;
+
return f;
}
+/*
+ * Result: QEMUFile* for a 'return path' for comms in the opposite direction
+ * NULL if not available
+ */
+QEMUFile *qemu_file_get_return_path(QEMUFile *f)
+{
+ return qemu_file_new_impl(f->ioc, !f->is_writable);
+}
+
+QEMUFile *qemu_file_new_output(QIOChannel *ioc)
+{
+ return qemu_file_new_impl(ioc, true);
+}
+
+QEMUFile *qemu_file_new_input(QIOChannel *ioc)
+{
+ return qemu_file_new_impl(ioc, false);
+}
void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks)
{
@@ -174,7 +195,7 @@ void qemu_file_set_error(QEMUFile *f, int ret)
bool qemu_file_is_writable(QEMUFile *f)
{
- return f->ops->writev_buffer;
+ return f->is_writable;
}
static void qemu_iovec_release_ram(QEMUFile *f)
@@ -212,6 +233,7 @@ static void qemu_iovec_release_ram(QEMUFile *f)
memset(f->may_free, 0, sizeof(f->may_free));
}
+
/**
* Flushes QEMUFile buffer
*
@@ -220,10 +242,6 @@ static void qemu_iovec_release_ram(QEMUFile *f)
*/
void qemu_fflush(QEMUFile *f)
{
- ssize_t ret = 0;
- ssize_t expect = 0;
- Error *local_error = NULL;
-
if (!qemu_file_is_writable(f)) {
return;
}
@@ -232,22 +250,18 @@ void qemu_fflush(QEMUFile *f)
return;
}
if (f->iovcnt > 0) {
- expect = iov_size(f->iov, f->iovcnt);
- ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos,
- &local_error);
+ Error *local_error = NULL;
+ if (qio_channel_writev_all(f->ioc,
+ f->iov, f->iovcnt,
+ &local_error) < 0) {
+ qemu_file_set_error_obj(f, -EIO, local_error);
+ } else {
+ f->total_transferred += iov_size(f->iov, f->iovcnt);
+ }
qemu_iovec_release_ram(f);
}
- if (ret >= 0) {
- f->pos += ret;
- }
- /* We expect the QEMUFile write impl to send the full
- * data set we requested, so sanity check that.
- */
- if (ret != expect) {
- qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
- }
f->buf_index = 0;
f->iovcnt = 0;
}
@@ -257,7 +271,7 @@ void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
int ret = 0;
if (f->hooks && f->hooks->before_ram_iterate) {
- ret = f->hooks->before_ram_iterate(f, f->opaque, flags, NULL);
+ ret = f->hooks->before_ram_iterate(f, flags, NULL);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -269,7 +283,7 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
int ret = 0;
if (f->hooks && f->hooks->after_ram_iterate) {
- ret = f->hooks->after_ram_iterate(f, f->opaque, flags, NULL);
+ ret = f->hooks->after_ram_iterate(f, flags, NULL);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -281,7 +295,7 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data)
int ret = -EINVAL;
if (f->hooks && f->hooks->hook_ram_load) {
- ret = f->hooks->hook_ram_load(f, f->opaque, flags, data);
+ ret = f->hooks->hook_ram_load(f, flags, data);
if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -301,16 +315,16 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
uint64_t *bytes_sent)
{
if (f->hooks && f->hooks->save_page) {
- int ret = f->hooks->save_page(f, f->opaque, block_offset,
+ int ret = f->hooks->save_page(f, block_offset,
offset, size, bytes_sent);
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
- f->bytes_xfer += size;
+ f->rate_limit_used += size;
}
if (ret != RAM_SAVE_CONTROL_DELAYED &&
ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (bytes_sent && *bytes_sent > 0) {
- qemu_update_position(f, *bytes_sent);
+ qemu_file_credit_transfer(f, *bytes_sent);
} else if (ret < 0) {
qemu_file_set_error(f, ret);
}
@@ -349,11 +363,25 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
return 0;
}
- len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos,
- IO_BUF_SIZE - pending, &local_error);
+ do {
+ len = qio_channel_read(f->ioc,
+ (char *)f->buf + pending,
+ IO_BUF_SIZE - pending,
+ &local_error);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ if (qemu_in_coroutine()) {
+ qio_channel_yield(f->ioc, G_IO_IN);
+ } else {
+ qio_channel_wait(f->ioc, G_IO_IN);
+ }
+ } else if (len < 0) {
+ len = -EIO;
+ }
+ } while (len == QIO_CHANNEL_ERR_BLOCK);
+
if (len > 0) {
f->buf_size += len;
- f->pos += len;
+ f->total_transferred += len;
} else if (len == 0) {
qemu_file_set_error_obj(f, -EIO, local_error);
} else if (len != -EAGAIN) {
@@ -365,9 +393,9 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
return len;
}
-void qemu_update_position(QEMUFile *f, size_t size)
+void qemu_file_credit_transfer(QEMUFile *f, size_t size)
{
- f->pos += size;
+ f->total_transferred += size;
}
/** Closes the file
@@ -380,16 +408,16 @@ void qemu_update_position(QEMUFile *f, size_t size)
*/
int qemu_fclose(QEMUFile *f)
{
- int ret;
+ int ret, ret2;
qemu_fflush(f);
ret = qemu_file_get_error(f);
- if (f->ops->close) {
- int ret2 = f->ops->close(f->opaque, NULL);
- if (ret >= 0) {
- ret = ret2;
- }
+ ret2 = qio_channel_close(f->ioc, NULL);
+ if (ret >= 0) {
+ ret = ret2;
}
+ g_clear_pointer(&f->ioc, object_unref);
+
/* If any error was spotted before closing, we should report it
* instead of the close() return value.
*/
@@ -457,7 +485,7 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
return;
}
- f->bytes_xfer += size;
+ f->rate_limit_used += size;
add_to_iovec(f, buf, size, may_free);
}
@@ -475,7 +503,7 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
l = size;
}
memcpy(f->buf + f->buf_index, buf, l);
- f->bytes_xfer += l;
+ f->rate_limit_used += l;
add_buf_to_iovec(f, l);
if (qemu_file_get_error(f)) {
break;
@@ -492,7 +520,7 @@ void qemu_put_byte(QEMUFile *f, int v)
}
f->buf[f->buf_index] = v;
- f->bytes_xfer++;
+ f->rate_limit_used++;
add_buf_to_iovec(f, 1);
}
@@ -648,9 +676,9 @@ int qemu_get_byte(QEMUFile *f)
return result;
}
-int64_t qemu_ftell_fast(QEMUFile *f)
+int64_t qemu_file_total_transferred_fast(QEMUFile *f)
{
- int64_t ret = f->pos;
+ int64_t ret = f->total_transferred;
int i;
for (i = 0; i < f->iovcnt; i++) {
@@ -660,10 +688,10 @@ int64_t qemu_ftell_fast(QEMUFile *f)
return ret;
}
-int64_t qemu_ftell(QEMUFile *f)
+int64_t qemu_file_total_transferred(QEMUFile *f)
{
qemu_fflush(f);
- return f->pos;
+ return f->total_transferred;
}
int qemu_file_rate_limit(QEMUFile *f)
@@ -674,7 +702,7 @@ int qemu_file_rate_limit(QEMUFile *f)
if (qemu_file_get_error(f)) {
return 1;
}
- if (f->xfer_limit > 0 && f->bytes_xfer > f->xfer_limit) {
+ if (f->rate_limit_max > 0 && f->rate_limit_used > f->rate_limit_max) {
return 1;
}
return 0;
@@ -682,22 +710,22 @@ int qemu_file_rate_limit(QEMUFile *f)
int64_t qemu_file_get_rate_limit(QEMUFile *f)
{
- return f->xfer_limit;
+ return f->rate_limit_max;
}
void qemu_file_set_rate_limit(QEMUFile *f, int64_t limit)
{
- f->xfer_limit = limit;
+ f->rate_limit_max = limit;
}
void qemu_file_reset_rate_limit(QEMUFile *f)
{
- f->bytes_xfer = 0;
+ f->rate_limit_used = 0;
}
-void qemu_file_update_transfer(QEMUFile *f, int64_t len)
+void qemu_file_acct_rate_limit(QEMUFile *f, int64_t len)
{
- f->bytes_xfer += len;
+ f->rate_limit_used += len;
}
void qemu_put_be16(QEMUFile *f, unsigned int v)
@@ -851,19 +879,18 @@ void qemu_put_counted_string(QEMUFile *f, const char *str)
*/
void qemu_file_set_blocking(QEMUFile *f, bool block)
{
- if (f->ops->set_blocking) {
- f->ops->set_blocking(f->opaque, block, NULL);
- }
+ qio_channel_set_blocking(f->ioc, block, NULL);
}
/*
- * Return the ioc object if it's a migration channel. Note: it can return NULL
- * for callers passing in a non-migration qemufile. E.g. see qemu_fopen_bdrv()
- * and its usage in e.g. load_snapshot(). So we need to check against NULL
- * before using it. If without the check, migration_incoming_state_destroy()
- * could fail for load_snapshot().
+ * qemu_file_get_ioc:
+ *
+ * Get the ioc object for the file, without incrementing
+ * the reference count.
+ *
+ * Returns: the ioc object
*/
QIOChannel *qemu_file_get_ioc(QEMUFile *file)
{
- return file->has_ioc ? QIO_CHANNEL(file->opaque) : NULL;
+ return file->ioc;
}
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index 3f36d4dc8c..96e72d8bd8 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -29,47 +29,12 @@
#include "exec/cpu-common.h"
#include "io/channel.h"
-/* Read a chunk of data from a file at the given position. The pos argument
- * can be ignored if the file is only be used for streaming. The number of
- * bytes actually read should be returned.
- */
-typedef ssize_t (QEMUFileGetBufferFunc)(void *opaque, uint8_t *buf,
- int64_t pos, size_t size,
- Error **errp);
-
-/* Close a file
- *
- * Return negative error number on error, 0 or positive value on success.
- *
- * The meaning of return value on success depends on the specific back-end being
- * used.
- */
-typedef int (QEMUFileCloseFunc)(void *opaque, Error **errp);
-
-/* Called to return the OS file descriptor associated to the QEMUFile.
- */
-typedef int (QEMUFileGetFD)(void *opaque);
-
-/* Called to change the blocking mode of the file
- */
-typedef int (QEMUFileSetBlocking)(void *opaque, bool enabled, Error **errp);
-
-/*
- * This function writes an iovec to file. The handler must write all
- * of the data or return a negative errno value.
- */
-typedef ssize_t (QEMUFileWritevBufferFunc)(void *opaque, struct iovec *iov,
- int iovcnt, int64_t pos,
- Error **errp);
-
/*
* This function provides hooks around different
* stages of RAM migration.
- * 'opaque' is the backend specific data in QEMUFile
* 'data' is call specific data associated with the 'flags' value
*/
-typedef int (QEMURamHookFunc)(QEMUFile *f, void *opaque, uint64_t flags,
- void *data);
+typedef int (QEMURamHookFunc)(QEMUFile *f, uint64_t flags, void *data);
/*
* Constants used by ram_control_* hooks
@@ -84,34 +49,11 @@ typedef int (QEMURamHookFunc)(QEMUFile *f, void *opaque, uint64_t flags,
* This function allows override of where the RAM page
* is saved (such as RDMA, for example.)
*/
-typedef size_t (QEMURamSaveFunc)(QEMUFile *f, void *opaque,
- ram_addr_t block_offset,
- ram_addr_t offset,
- size_t size,
- uint64_t *bytes_sent);
-
-/*
- * Return a QEMUFile for comms in the opposite direction
- */
-typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
-
-/*
- * Stop any read or write (depending on flags) on the underlying
- * transport on the QEMUFile.
- * Existing blocking reads/writes must be woken
- * Returns 0 on success, -err on error
- */
-typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
- Error **errp);
-
-typedef struct QEMUFileOps {
- QEMUFileGetBufferFunc *get_buffer;
- QEMUFileCloseFunc *close;
- QEMUFileSetBlocking *set_blocking;
- QEMUFileWritevBufferFunc *writev_buffer;
- QEMURetPathFunc *get_return_path;
- QEMUFileShutdownFunc *shut_down;
-} QEMUFileOps;
+typedef size_t (QEMURamSaveFunc)(QEMUFile *f,
+ ram_addr_t block_offset,
+ ram_addr_t offset,
+ size_t size,
+ uint64_t *bytes_sent);
typedef struct QEMUFileHooks {
QEMURamHookFunc *before_ram_iterate;
@@ -120,12 +62,41 @@ typedef struct QEMUFileHooks {
QEMURamSaveFunc *save_page;
} QEMUFileHooks;
-QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops, bool has_ioc);
+QEMUFile *qemu_file_new_input(QIOChannel *ioc);
+QEMUFile *qemu_file_new_output(QIOChannel *ioc);
void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks);
-int qemu_get_fd(QEMUFile *f);
int qemu_fclose(QEMUFile *f);
-int64_t qemu_ftell(QEMUFile *f);
-int64_t qemu_ftell_fast(QEMUFile *f);
+
+/*
+ * qemu_file_total_transferred:
+ *
+ * Report the total number of bytes transferred with
+ * this file.
+ *
+ * For writable files, any pending buffers will be
+ * flushed, so the reported value will be equal to
+ * the number of bytes transferred on the wire.
+ *
+ * For readable files, the reported value will be
+ * equal to the number of bytes transferred on the
+ * wire.
+ *
+ * Returns: the total bytes transferred
+ */
+int64_t qemu_file_total_transferred(QEMUFile *f);
+
+/*
+ * qemu_file_total_transferred_fast:
+ *
+ * As qemu_file_total_transferred except for writable
+ * files, where no flush is performed and the reported
+ * amount will include the size of any queued buffers,
+ * on top of the amount actually transferred.
+ *
+ * Returns: the total bytes transferred and queued
+ */
+int64_t qemu_file_total_transferred_fast(QEMUFile *f);
+
/*
* put_buffer without copying the buffer.
* The buffer should be available till it is sent asynchronously.
@@ -150,9 +121,23 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
*/
int qemu_peek_byte(QEMUFile *f, int offset);
void qemu_file_skip(QEMUFile *f, int size);
-void qemu_update_position(QEMUFile *f, size_t size);
+/*
+ * qemu_file_credit_transfer:
+ *
+ * Report on a number of bytes that have been transferred
+ * out of band from the main file object I/O methods. This
+ * accounting information tracks the total migration traffic.
+ */
+void qemu_file_credit_transfer(QEMUFile *f, size_t size);
void qemu_file_reset_rate_limit(QEMUFile *f);
-void qemu_file_update_transfer(QEMUFile *f, int64_t len);
+/*
+ * qemu_file_acct_rate_limit:
+ *
+ * Report on a number of bytes the have been transferred
+ * out of band from the main file object I/O methods, and
+ * need to be applied to the rate limiting calcuations
+ */
+void qemu_file_acct_rate_limit(QEMUFile *f, int64_t len);
void qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate);
int64_t qemu_file_get_rate_limit(QEMUFile *f);
int qemu_file_get_error_obj(QEMUFile *f, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 5f5e37f64d..01f9cc1d72 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -32,6 +32,7 @@
#include "qemu/bitmap.h"
#include "qemu/madvise.h"
#include "qemu/main-loop.h"
+#include "io/channel-null.h"
#include "xbzrle.h"
#include "ram.h"
#include "migration.h"
@@ -457,8 +458,6 @@ static QemuThread *compress_threads;
*/
static QemuMutex comp_done_lock;
static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
-static const QEMUFileOps empty_ops = { };
static QEMUFile *decomp_file;
static DecompressParam *decomp_param;
@@ -569,7 +568,8 @@ static int compress_threads_save_setup(void)
/* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty.
*/
- comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops, false);
+ comp_param[i].file = qemu_file_new_output(
+ QIO_CHANNEL(qio_channel_null_new()));
comp_param[i].done = true;
comp_param[i].quit = false;
qemu_mutex_init(&comp_param[i].mutex);
@@ -2300,7 +2300,7 @@ void acct_update_position(QEMUFile *f, size_t size, bool zero)
} else {
ram_counters.normal += pages;
ram_transferred_add(size);
- qemu_update_position(f, size);
+ qemu_file_credit_transfer(f, size);
}
}
diff --git a/migration/rdma.c b/migration/rdma.c
index 672d1958a9..94a55dd95b 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -21,7 +21,6 @@
#include "migration.h"
#include "qemu-file.h"
#include "ram.h"
-#include "qemu-file-channel.h"
#include "qemu/error-report.h"
#include "qemu/main-loop.h"
#include "qemu/module.h"
@@ -1371,30 +1370,6 @@ const char *print_wrid(int wrid)
}
/*
- * RDMA requires memory registration (mlock/pinning), but this is not good for
- * overcommitment.
- *
- * In preparation for the future where LRU information or workload-specific
- * writable writable working set memory access behavior is available to QEMU
- * it would be nice to have in place the ability to UN-register/UN-pin
- * particular memory regions from the RDMA hardware when it is determine that
- * those regions of memory will likely not be accessed again in the near future.
- *
- * While we do not yet have such information right now, the following
- * compile-time option allows us to perform a non-optimized version of this
- * behavior.
- *
- * By uncommenting this option, you will cause *all* RDMA transfers to be
- * unregistered immediately after the transfer completes on both sides of the
- * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
- *
- * This will have a terrible impact on migration performance, so until future
- * workload information or LRU information is available, do not attempt to use
- * this feature except for basic testing.
- */
-/* #define RDMA_UNREGISTRATION_EXAMPLE */
-
-/*
* Perform a non-optimized memory unregistration after every transfer
* for demonstration purposes, only if pin-all is not requested.
*
@@ -1487,34 +1462,6 @@ static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
}
/*
- * Set bit for unregistration in the next iteration.
- * We cannot transmit right here, but will unpin later.
- */
-static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
- uint64_t chunk, uint64_t wr_id)
-{
- if (rdma->unregistrations[rdma->unregister_next] != 0) {
- error_report("rdma migration: queue is full");
- } else {
- RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
-
- if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
- trace_qemu_rdma_signal_unregister_append(chunk,
- rdma->unregister_next);
-
- rdma->unregistrations[rdma->unregister_next++] =
- qemu_rdma_make_wrid(wr_id, index, chunk);
-
- if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
- rdma->unregister_next = 0;
- }
- } else {
- trace_qemu_rdma_signal_unregister_already(chunk);
- }
- }
-}
-
-/*
* Consult the connection manager to see a work request
* (of any kind) has completed.
* Return the work request ID that completed.
@@ -1571,18 +1518,6 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
if (rdma->nb_sent > 0) {
rdma->nb_sent--;
}
-
- if (!rdma->pin_all) {
- /*
- * FYI: If one wanted to signal a specific chunk to be unregistered
- * using LRU or workload-specific information, this is the function
- * you would call to do so. That chunk would then get asynchronously
- * unregistered later.
- */
-#ifdef RDMA_UNREGISTRATION_EXAMPLE
- qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
-#endif
- }
} else {
trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
}
@@ -2137,11 +2072,6 @@ retry:
chunk_end = ram_chunk_end(block, chunk + chunks);
- if (!rdma->pin_all) {
-#ifdef RDMA_UNREGISTRATION_EXAMPLE
- qemu_rdma_unregister_waiting(rdma);
-#endif
- }
while (test_bit(chunk, block->transit_bitmap)) {
(void)count;
@@ -3278,33 +3208,17 @@ qio_channel_rdma_shutdown(QIOChannel *ioc,
* Offset is an offset to be added to block_offset and used
* to also lookup the corresponding RAMBlock.
*
- * @size > 0 :
- * Initiate an transfer this size.
- *
- * @size == 0 :
- * A 'hint' or 'advice' that means that we wish to speculatively
- * and asynchronously unregister this memory. In this case, there is no
- * guarantee that the unregister will actually happen, for example,
- * if the memory is being actively transmitted. Additionally, the memory
- * may be re-registered at any future time if a write within the same
- * chunk was requested again, even if you attempted to unregister it
- * here.
- *
- * @size < 0 : TODO, not yet supported
- * Unregister the memory NOW. This means that the caller does not
- * expect there to be any future RDMA transfers and we just want to clean
- * things up. This is used in case the upper layer owns the memory and
- * cannot wait for qemu_fclose() to occur.
+ * @size : Number of bytes to transfer
*
* @bytes_sent : User-specificed pointer to indicate how many bytes were
* sent. Usually, this will not be more than a few bytes of
* the protocol because most transfers are sent asynchronously.
*/
-static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
+static size_t qemu_rdma_save_page(QEMUFile *f,
ram_addr_t block_offset, ram_addr_t offset,
size_t size, uint64_t *bytes_sent)
{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
RDMAContext *rdma;
int ret;
@@ -3323,61 +3237,27 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
qemu_fflush(f);
- if (size > 0) {
- /*
- * Add this page to the current 'chunk'. If the chunk
- * is full, or the page doesn't belong to the current chunk,
- * an actual RDMA write will occur and a new chunk will be formed.
- */
- ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
- if (ret < 0) {
- error_report("rdma migration: write error! %d", ret);
- goto err;
- }
-
- /*
- * We always return 1 bytes because the RDMA
- * protocol is completely asynchronous. We do not yet know
- * whether an identified chunk is zero or not because we're
- * waiting for other pages to potentially be merged with
- * the current chunk. So, we have to call qemu_update_position()
- * later on when the actual write occurs.
- */
- if (bytes_sent) {
- *bytes_sent = 1;
- }
- } else {
- uint64_t index, chunk;
-
- /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
- if (size < 0) {
- ret = qemu_rdma_drain_cq(f, rdma);
- if (ret < 0) {
- fprintf(stderr, "rdma: failed to synchronously drain"
- " completion queue before unregistration.\n");
- goto err;
- }
- }
- */
-
- ret = qemu_rdma_search_ram_block(rdma, block_offset,
- offset, size, &index, &chunk);
-
- if (ret) {
- error_report("ram block search failed");
- goto err;
- }
-
- qemu_rdma_signal_unregister(rdma, index, chunk, 0);
+ /*
+ * Add this page to the current 'chunk'. If the chunk
+ * is full, or the page doesn't belong to the current chunk,
+ * an actual RDMA write will occur and a new chunk will be formed.
+ */
+ ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
+ if (ret < 0) {
+ error_report("rdma migration: write error! %d", ret);
+ goto err;
+ }
- /*
- * TODO: Synchronous, guaranteed unregistration (should not occur during
- * fast-path). Otherwise, unregisters will process on the next call to
- * qemu_rdma_drain_cq()
- if (size < 0) {
- qemu_rdma_unregister_waiting(rdma);
- }
- */
+ /*
+ * We always return 1 bytes because the RDMA
+ * protocol is completely asynchronous. We do not yet know
+ * whether an identified chunk is zero or not because we're
+ * waiting for other pages to potentially be merged with
+ * the current chunk. So, we have to call qemu_update_position()
+ * later on when the actual write occurs.
+ */
+ if (bytes_sent) {
+ *bytes_sent = 1;
}
/*
@@ -3950,14 +3830,15 @@ rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
return 0;
}
-static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
+static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
switch (flags) {
case RAM_CONTROL_BLOCK_REG:
- return rdma_block_notification_handle(opaque, data);
+ return rdma_block_notification_handle(rioc, data);
case RAM_CONTROL_HOOK:
- return qemu_rdma_registration_handle(f, opaque);
+ return qemu_rdma_registration_handle(f, rioc);
default:
/* Shouldn't be called with any other values */
@@ -3965,10 +3846,10 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
}
}
-static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
+static int qemu_rdma_registration_start(QEMUFile *f,
uint64_t flags, void *data)
{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
RDMAContext *rdma;
RCU_READ_LOCK_GUARD();
@@ -3994,10 +3875,10 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
* Inform dest that dynamic registrations are done for now.
* First, flush writes, if any.
*/
-static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
+static int qemu_rdma_registration_stop(QEMUFile *f,
uint64_t flags, void *data)
{
- QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
RDMAContext *rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
@@ -4170,12 +4051,12 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
if (mode[0] == 'w') {
- rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
rioc->rdmaout = rdma;
rioc->rdmain = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
- rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
rioc->rdmain = rdma;
rioc->rdmaout = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
diff --git a/migration/savevm.c b/migration/savevm.c
index d9076897b8..e8a1b96fcd 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -35,8 +35,8 @@
#include "migration/misc.h"
#include "migration/register.h"
#include "migration/global_state.h"
+#include "migration/channel-block.h"
#include "ram.h"
-#include "qemu-file-channel.h"
#include "qemu-file.h"
#include "savevm.h"
#include "postcopy-ram.h"
@@ -130,48 +130,13 @@ static struct mig_cmd_args {
/***********************************************************/
/* savevm/loadvm support */
-static ssize_t block_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
- int64_t pos, Error **errp)
-{
- int ret;
- QEMUIOVector qiov;
-
- qemu_iovec_init_external(&qiov, iov, iovcnt);
- ret = bdrv_writev_vmstate(opaque, &qiov, pos);
- if (ret < 0) {
- return ret;
- }
-
- return qiov.size;
-}
-
-static ssize_t block_get_buffer(void *opaque, uint8_t *buf, int64_t pos,
- size_t size, Error **errp)
-{
- return bdrv_load_vmstate(opaque, buf, pos, size);
-}
-
-static int bdrv_fclose(void *opaque, Error **errp)
-{
- return bdrv_flush(opaque);
-}
-
-static const QEMUFileOps bdrv_read_ops = {
- .get_buffer = block_get_buffer,
- .close = bdrv_fclose
-};
-
-static const QEMUFileOps bdrv_write_ops = {
- .writev_buffer = block_writev_buffer,
- .close = bdrv_fclose
-};
-
static QEMUFile *qemu_fopen_bdrv(BlockDriverState *bs, int is_writable)
{
if (is_writable) {
- return qemu_fopen_ops(bs, &bdrv_write_ops, false);
+ return qemu_file_new_output(QIO_CHANNEL(qio_channel_block_new(bs)));
+ } else {
+ return qemu_file_new_input(QIO_CHANNEL(qio_channel_block_new(bs)));
}
- return qemu_fopen_ops(bs, &bdrv_read_ops, false);
}
@@ -916,9 +881,9 @@ static void vmstate_save_old_style(QEMUFile *f, SaveStateEntry *se,
{
int64_t old_offset, size;
- old_offset = qemu_ftell_fast(f);
+ old_offset = qemu_file_total_transferred_fast(f);
se->ops->save_state(f, se->opaque);
- size = qemu_ftell_fast(f) - old_offset;
+ size = qemu_file_total_transferred_fast(f) - old_offset;
if (vmdesc) {
json_writer_int64(vmdesc, "size", size);
@@ -2193,7 +2158,7 @@ static int loadvm_handle_cmd_packaged(MigrationIncomingState *mis)
bioc->usage += length;
trace_loadvm_handle_cmd_packaged_received(ret);
- QEMUFile *packf = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
+ QEMUFile *packf = qemu_file_new_input(QIO_CHANNEL(bioc));
ret = qemu_loadvm_state_main(packf, mis);
trace_loadvm_handle_cmd_packaged_main(ret);
@@ -2887,7 +2852,7 @@ bool save_snapshot(const char *name, bool overwrite, const char *vmstate,
goto the_end;
}
ret = qemu_savevm_state(f, errp);
- vm_state_size = qemu_ftell(f);
+ vm_state_size = qemu_file_total_transferred(f);
ret2 = qemu_fclose(f);
if (ret < 0) {
goto the_end;
@@ -2951,7 +2916,7 @@ void qmp_xen_save_devices_state(const char *filename, bool has_live, bool live,
goto the_end;
}
qio_channel_set_name(QIO_CHANNEL(ioc), "migration-xen-save-state");
- f = qemu_fopen_channel_output(QIO_CHANNEL(ioc));
+ f = qemu_file_new_output(QIO_CHANNEL(ioc));
object_unref(OBJECT(ioc));
ret = qemu_save_device_state(f);
if (ret < 0 || qemu_fclose(f) < 0) {
@@ -2998,7 +2963,7 @@ void qmp_xen_load_devices_state(const char *filename, Error **errp)
return;
}
qio_channel_set_name(QIO_CHANNEL(ioc), "migration-xen-load-state");
- f = qemu_fopen_channel_input(QIO_CHANNEL(ioc));
+ f = qemu_file_new_input(QIO_CHANNEL(ioc));
object_unref(OBJECT(ioc));
ret = qemu_loadvm_state(f);
diff --git a/migration/vmstate.c b/migration/vmstate.c
index 36ae8b9e19..924494bda3 100644
--- a/migration/vmstate.c
+++ b/migration/vmstate.c
@@ -360,7 +360,7 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
void *curr_elem = first_elem + size * i;
vmsd_desc_field_start(vmsd, vmdesc_loop, field, i, n_elems);
- old_offset = qemu_ftell_fast(f);
+ old_offset = qemu_file_total_transferred_fast(f);
if (field->flags & VMS_ARRAY_OF_POINTER) {
assert(curr_elem);
curr_elem = *(void **)curr_elem;
@@ -390,7 +390,8 @@ int vmstate_save_state_v(QEMUFile *f, const VMStateDescription *vmsd,
return ret;
}
- written_bytes = qemu_ftell_fast(f) - old_offset;
+ written_bytes = qemu_file_total_transferred_fast(f) -
+ old_offset;
vmsd_desc_field_end(vmsd, vmdesc_loop, field, written_bytes, i);
/* Compressed arrays only care about the first element */
diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c
index 47a27326ee..ca98df0495 100644
--- a/monitor/hmp-cmds.c
+++ b/monitor/hmp-cmds.c
@@ -1311,12 +1311,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
p->has_multifd_zstd_level = true;
visit_type_uint8(v, param, &p->multifd_zstd_level, &err);
break;
-#ifdef CONFIG_LINUX
- case MIGRATION_PARAMETER_ZERO_COPY_SEND:
- p->has_zero_copy_send = true;
- visit_type_bool(v, param, &p->zero_copy_send, &err);
- break;
-#endif
case MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE:
p->has_xbzrle_cache_size = true;
if (!visit_type_size(v, param, &cache_size, &err)) {
diff --git a/qapi/migration.json b/qapi/migration.json
index 6130cd9fae..7102e474a6 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -461,6 +461,13 @@
# procedure starts. The VM RAM is saved with running VM.
# (since 6.0)
#
+# @zero-copy-send: Controls behavior on sending memory pages on migration.
+# When true, enables a zero-copy mechanism for sending
+# memory pages, if host supports it.
+# Requires that QEMU be permitted to use locked memory
+# for guest RAM pages.
+# (since 7.1)
+#
# Features:
# @unstable: Members @x-colo and @x-ignore-shared are experimental.
#
@@ -474,7 +481,8 @@
'block', 'return-path', 'pause-before-switchover', 'multifd',
'dirty-bitmaps', 'postcopy-blocktime', 'late-block-activate',
{ 'name': 'x-ignore-shared', 'features': [ 'unstable' ] },
- 'validate-uuid', 'background-snapshot'] }
+ 'validate-uuid', 'background-snapshot',
+ 'zero-copy-send'] }
##
# @MigrationCapabilityStatus:
@@ -738,12 +746,6 @@
# will consume more CPU.
# Defaults to 1. (Since 5.0)
#
-# @zero-copy-send: Controls behavior on sending memory pages on migration.
-# When true, enables a zero-copy mechanism for sending
-# memory pages, if host supports it.
-# Requires that QEMU be permitted to use locked memory
-# for guest RAM pages.
-# Defaults to false. (Since 7.1)
#
# @block-bitmap-mapping: Maps block nodes and bitmaps on them to
# aliases for the purpose of dirty bitmap migration. Such
@@ -784,7 +786,6 @@
'xbzrle-cache-size', 'max-postcopy-bandwidth',
'max-cpu-throttle', 'multifd-compression',
'multifd-zlib-level' ,'multifd-zstd-level',
- { 'name': 'zero-copy-send', 'if' : 'CONFIG_LINUX'},
'block-bitmap-mapping' ] }
##
@@ -911,13 +912,6 @@
# will consume more CPU.
# Defaults to 1. (Since 5.0)
#
-# @zero-copy-send: Controls behavior on sending memory pages on migration.
-# When true, enables a zero-copy mechanism for sending
-# memory pages, if host supports it.
-# Requires that QEMU be permitted to use locked memory
-# for guest RAM pages.
-# Defaults to false. (Since 7.1)
-#
# @block-bitmap-mapping: Maps block nodes and bitmaps on them to
# aliases for the purpose of dirty bitmap migration. Such
# aliases may for example be the corresponding names on the
@@ -972,7 +966,6 @@
'*multifd-compression': 'MultiFDCompression',
'*multifd-zlib-level': 'uint8',
'*multifd-zstd-level': 'uint8',
- '*zero-copy-send': { 'type': 'bool', 'if': 'CONFIG_LINUX' },
'*block-bitmap-mapping': [ 'BitmapMigrationNodeAlias' ] } }
##
@@ -1119,13 +1112,6 @@
# will consume more CPU.
# Defaults to 1. (Since 5.0)
#
-# @zero-copy-send: Controls behavior on sending memory pages on migration.
-# When true, enables a zero-copy mechanism for sending
-# memory pages, if host supports it.
-# Requires that QEMU be permitted to use locked memory
-# for guest RAM pages.
-# Defaults to false. (Since 7.1)
-#
# @block-bitmap-mapping: Maps block nodes and bitmaps on them to
# aliases for the purpose of dirty bitmap migration. Such
# aliases may for example be the corresponding names on the
@@ -1178,7 +1164,6 @@
'*multifd-compression': 'MultiFDCompression',
'*multifd-zlib-level': 'uint8',
'*multifd-zstd-level': 'uint8',
- '*zero-copy-send': { 'type': 'bool', 'if': 'CONFIG_LINUX' },
'*block-bitmap-mapping': [ 'BitmapMigrationNodeAlias' ] } }
##
diff --git a/tests/unit/meson.build b/tests/unit/meson.build
index 287b367ec3..b497a41378 100644
--- a/tests/unit/meson.build
+++ b/tests/unit/meson.build
@@ -88,6 +88,7 @@ if have_block
'test-io-channel-file': ['io-channel-helpers.c', io],
'test-io-channel-command': ['io-channel-helpers.c', io],
'test-io-channel-buffer': ['io-channel-helpers.c', io],
+ 'test-io-channel-null': [io],
'test-crypto-ivgen': [io],
'test-crypto-afsplit': [io],
'test-crypto-block': [io],
diff --git a/tests/unit/test-io-channel-null.c b/tests/unit/test-io-channel-null.c
new file mode 100644
index 0000000000..b3aab17ccc
--- /dev/null
+++ b/tests/unit/test-io-channel-null.c
@@ -0,0 +1,95 @@
+/*
+ * QEMU I/O channel null test
+ *
+ * Copyright (c) 2022 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "io/channel-null.h"
+#include "qapi/error.h"
+
+static gboolean test_io_channel_watch(QIOChannel *ioc,
+ GIOCondition condition,
+ gpointer opaque)
+{
+ GIOCondition *gotcond = opaque;
+ *gotcond = condition;
+ return G_SOURCE_REMOVE;
+}
+
+static void test_io_channel_null_io(void)
+{
+ g_autoptr(QIOChannelNull) null = qio_channel_null_new();
+ char buf[1024];
+ GIOCondition gotcond = 0;
+ Error *local_err = NULL;
+
+ g_assert(qio_channel_write(QIO_CHANNEL(null),
+ "Hello World", 11,
+ &error_abort) == 11);
+
+ g_assert(qio_channel_read(QIO_CHANNEL(null),
+ buf, sizeof(buf),
+ &error_abort) == 0);
+
+ qio_channel_add_watch(QIO_CHANNEL(null),
+ G_IO_IN,
+ test_io_channel_watch,
+ &gotcond,
+ NULL);
+
+ g_main_context_iteration(NULL, false);
+
+ g_assert(gotcond == G_IO_IN);
+
+ qio_channel_add_watch(QIO_CHANNEL(null),
+ G_IO_IN | G_IO_OUT,
+ test_io_channel_watch,
+ &gotcond,
+ NULL);
+
+ g_main_context_iteration(NULL, false);
+
+ g_assert(gotcond == (G_IO_IN | G_IO_OUT));
+
+ qio_channel_close(QIO_CHANNEL(null), &error_abort);
+
+ g_assert(qio_channel_write(QIO_CHANNEL(null),
+ "Hello World", 11,
+ &local_err) == -1);
+ g_assert_nonnull(local_err);
+
+ g_clear_pointer(&local_err, error_free);
+
+ g_assert(qio_channel_read(QIO_CHANNEL(null),
+ buf, sizeof(buf),
+ &local_err) == -1);
+ g_assert_nonnull(local_err);
+
+ g_clear_pointer(&local_err, error_free);
+}
+
+int main(int argc, char **argv)
+{
+ module_call_init(MODULE_INIT_QOM);
+
+ g_test_init(&argc, &argv, NULL);
+
+ g_test_add_func("/io/channel/null/io", test_io_channel_null_io);
+
+ return g_test_run();
+}
diff --git a/tests/unit/test-vmstate.c b/tests/unit/test-vmstate.c
index 6a417bb102..72077b5780 100644
--- a/tests/unit/test-vmstate.c
+++ b/tests/unit/test-vmstate.c
@@ -28,7 +28,6 @@
#include "migration/vmstate.h"
#include "migration/qemu-file-types.h"
#include "../migration/qemu-file.h"
-#include "../migration/qemu-file-channel.h"
#include "../migration/savevm.h"
#include "qemu/coroutine.h"
#include "qemu/module.h"
@@ -52,9 +51,9 @@ static QEMUFile *open_test_file(bool write)
}
ioc = QIO_CHANNEL(qio_channel_file_new_fd(fd));
if (write) {
- f = qemu_fopen_channel_output(ioc);
+ f = qemu_file_new_output(ioc);
} else {
- f = qemu_fopen_channel_input(ioc);
+ f = qemu_file_new_input(ioc);
}
object_unref(OBJECT(ioc));
return f;