aboutsummaryrefslogtreecommitdiff
path: root/tests/migration/guestperf/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/migration/guestperf/engine.py')
-rw-r--r--tests/migration/guestperf/engine.py439
1 files changed, 439 insertions, 0 deletions
diff --git a/tests/migration/guestperf/engine.py b/tests/migration/guestperf/engine.py
new file mode 100644
index 0000000000..0a13050bc6
--- /dev/null
+++ b/tests/migration/guestperf/engine.py
@@ -0,0 +1,439 @@
+#
+# Migration test main engine
+#
+# Copyright (c) 2016 Red Hat, Inc.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see <http://www.gnu.org/licenses/>.
+#
+
+
+import os
+import re
+import sys
+import time
+
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'scripts'))
+import qemu
+import qmp.qmp
+from guestperf.progress import Progress, ProgressStats
+from guestperf.report import Report
+from guestperf.timings import TimingRecord, Timings
+
+
+class Engine(object):
+
+ def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
+ sleep=15, verbose=False, debug=False):
+
+ self._binary = binary # Path to QEMU binary
+ self._dst_host = dst_host # Hostname of target host
+ self._kernel = kernel # Path to kernel image
+ self._initrd = initrd # Path to stress initrd
+ self._transport = transport # 'unix' or 'tcp' or 'rdma'
+ self._sleep = sleep
+ self._verbose = verbose
+ self._debug = debug
+
+ if debug:
+ self._verbose = debug
+
+ def _vcpu_timing(self, pid, tid_list):
+ records = []
+ now = time.time()
+
+ jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
+ for tid in tid_list:
+ statfile = "/proc/%d/task/%d/stat" % (pid, tid)
+ with open(statfile, "r") as fh:
+ stat = fh.readline()
+ fields = stat.split(" ")
+ stime = int(fields[13])
+ utime = int(fields[14])
+ records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
+ return records
+
+ def _cpu_timing(self, pid):
+ records = []
+ now = time.time()
+
+ jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
+ statfile = "/proc/%d/stat" % pid
+ with open(statfile, "r") as fh:
+ stat = fh.readline()
+ fields = stat.split(" ")
+ stime = int(fields[13])
+ utime = int(fields[14])
+ return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
+
+ def _migrate_progress(self, vm):
+ info = vm.command("query-migrate")
+
+ if "ram" not in info:
+ info["ram"] = {}
+
+ return Progress(
+ info.get("status", "active"),
+ ProgressStats(
+ info["ram"].get("transferred", 0),
+ info["ram"].get("remaining", 0),
+ info["ram"].get("total", 0),
+ info["ram"].get("duplicate", 0),
+ info["ram"].get("skipped", 0),
+ info["ram"].get("normal", 0),
+ info["ram"].get("normal-bytes", 0),
+ info["ram"].get("dirty-pages-rate", 0),
+ info["ram"].get("mbps", 0),
+ info["ram"].get("dirty-sync-count", 0)
+ ),
+ time.time(),
+ info.get("total-time", 0),
+ info.get("downtime", 0),
+ info.get("expected-downtime", 0),
+ info.get("setup-time", 0),
+ info.get("x-cpu-throttle-percentage", 0),
+ )
+
+ def _migrate(self, hardware, scenario, src, dst, connect_uri):
+ src_qemu_time = []
+ src_vcpu_time = []
+ src_pid = src.get_pid()
+
+ vcpus = src.command("query-cpus")
+ src_threads = []
+ for vcpu in vcpus:
+ src_threads.append(vcpu["thread_id"])
+
+ # XXX how to get dst timings on remote host ?
+
+ if self._verbose:
+ print "Sleeping %d seconds for initial guest workload run" % self._sleep
+ sleep_secs = self._sleep
+ while sleep_secs > 1:
+ src_qemu_time.append(self._cpu_timing(src_pid))
+ src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
+ time.sleep(1)
+ sleep_secs -= 1
+
+ if self._verbose:
+ print "Starting migration"
+ if scenario._auto_converge:
+ resp = src.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "auto-converge",
+ "state": True }
+ ])
+ resp = src.command("migrate-set-parameters",
+ x_cpu_throttle_increment=scenario._auto_converge_step)
+
+ if scenario._post_copy:
+ resp = src.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "postcopy-ram",
+ "state": True }
+ ])
+ resp = dst.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "postcopy-ram",
+ "state": True }
+ ])
+
+ resp = src.command("migrate_set_speed",
+ value=scenario._bandwidth * 1024 * 1024)
+
+ resp = src.command("migrate_set_downtime",
+ value=scenario._downtime / 1024.0)
+
+ if scenario._compression_mt:
+ resp = src.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "compress",
+ "state": True }
+ ])
+ resp = src.command("migrate-set-parameters",
+ compress_threads=scenario._compression_mt_threads)
+ resp = dst.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "compress",
+ "state": True }
+ ])
+ resp = dst.command("migrate-set-parameters",
+ decompress_threads=scenario._compression_mt_threads)
+
+ if scenario._compression_xbzrle:
+ resp = src.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "xbzrle",
+ "state": True }
+ ])
+ resp = dst.command("migrate-set-capabilities",
+ capabilities = [
+ { "capability": "xbzrle",
+ "state": True }
+ ])
+ resp = src.command("migrate-set-cache-size",
+ value=(hardware._mem * 1024 * 1024 * 1024 / 100 *
+ scenario._compression_xbzrle_cache))
+
+ resp = src.command("migrate", uri=connect_uri)
+
+ post_copy = False
+ paused = False
+
+ progress_history = []
+
+ start = time.time()
+ loop = 0
+ while True:
+ loop = loop + 1
+ time.sleep(0.05)
+
+ progress = self._migrate_progress(src)
+ if (loop % 20) == 0:
+ src_qemu_time.append(self._cpu_timing(src_pid))
+ src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
+
+ if (len(progress_history) == 0 or
+ (progress_history[-1]._ram._iterations <
+ progress._ram._iterations)):
+ progress_history.append(progress)
+
+ if progress._status in ("completed", "failed", "cancelled"):
+ if progress._status == "completed" and paused:
+ dst.command("cont")
+ if progress_history[-1] != progress:
+ progress_history.append(progress)
+
+ if progress._status == "completed":
+ if self._verbose:
+ print "Sleeping %d seconds for final guest workload run" % self._sleep
+ sleep_secs = self._sleep
+ while sleep_secs > 1:
+ time.sleep(1)
+ src_qemu_time.append(self._cpu_timing(src_pid))
+ src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
+ sleep_secs -= 1
+
+ return [progress_history, src_qemu_time, src_vcpu_time]
+
+ if self._verbose and (loop % 20) == 0:
+ print "Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
+ progress._ram._iterations,
+ progress._ram._remaining_bytes / (1024 * 1024),
+ progress._ram._total_bytes / (1024 * 1024),
+ progress._ram._transferred_bytes / (1024 * 1024),
+ progress._ram._transfer_rate_mbs,
+ )
+
+ if progress._ram._iterations > scenario._max_iters:
+ if self._verbose:
+ print "No completion after %d iterations over RAM" % scenario._max_iters
+ src.command("migrate_cancel")
+ continue
+
+ if time.time() > (start + scenario._max_time):
+ if self._verbose:
+ print "No completion after %d seconds" % scenario._max_time
+ src.command("migrate_cancel")
+ continue
+
+ if (scenario._post_copy and
+ progress._ram._iterations >= scenario._post_copy_iters and
+ not post_copy):
+ if self._verbose:
+ print "Switching to post-copy after %d iterations" % scenario._post_copy_iters
+ resp = src.command("migrate-start-postcopy")
+ post_copy = True
+
+ if (scenario._pause and
+ progress._ram._iterations >= scenario._pause_iters and
+ not paused):
+ if self._verbose:
+ print "Pausing VM after %d iterations" % scenario._pause_iters
+ resp = src.command("stop")
+ paused = True
+
+ def _get_common_args(self, hardware, tunnelled=False):
+ args = [
+ "noapic",
+ "edd=off",
+ "printk.time=1",
+ "noreplace-smp",
+ "cgroup_disable=memory",
+ "pci=noearly",
+ "console=ttyS0",
+ ]
+ if self._debug:
+ args.append("debug")
+ else:
+ args.append("quiet")
+
+ args.append("ramsize=%s" % hardware._mem)
+
+ cmdline = " ".join(args)
+ if tunnelled:
+ cmdline = "'" + cmdline + "'"
+
+ argv = [
+ "-machine", "accel=kvm",
+ "-cpu", "host",
+ "-kernel", self._kernel,
+ "-initrd", self._initrd,
+ "-append", cmdline,
+ "-chardev", "stdio,id=cdev0",
+ "-device", "isa-serial,chardev=cdev0",
+ "-m", str((hardware._mem * 1024) + 512),
+ "-smp", str(hardware._cpus),
+ ]
+
+ if self._debug:
+ argv.extend(["-device", "sga"])
+
+ if hardware._prealloc_pages:
+ argv_source += ["-mem-path", "/dev/shm",
+ "-mem-prealloc"]
+ if hardware._locked_pages:
+ argv_source += ["-realtime", "mlock=on"]
+ if hardware._huge_pages:
+ pass
+
+ return argv
+
+ def _get_src_args(self, hardware):
+ return self._get_common_args(hardware)
+
+ def _get_dst_args(self, hardware, uri):
+ tunnelled = False
+ if self._dst_host != "localhost":
+ tunnelled = True
+ argv = self._get_common_args(hardware, tunnelled)
+ return argv + ["-incoming", uri]
+
+ @staticmethod
+ def _get_common_wrapper(cpu_bind, mem_bind):
+ wrapper = []
+ if len(cpu_bind) > 0 or len(mem_bind) > 0:
+ wrapper.append("numactl")
+ if cpu_bind:
+ wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
+ if mem_bind:
+ wrapper.append("--membind=%s" % ",".join(mem_bind))
+
+ return wrapper
+
+ def _get_src_wrapper(self, hardware):
+ return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
+
+ def _get_dst_wrapper(self, hardware):
+ wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
+ if self._dst_host != "localhost":
+ return ["ssh",
+ "-R", "9001:localhost:9001",
+ self._dst_host] + wrapper
+ else:
+ return wrapper
+
+ def _get_timings(self, vm):
+ log = vm.get_log()
+ if not log:
+ return []
+ if self._debug:
+ print log
+
+ regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
+ matcher = re.compile(regex)
+ records = []
+ for line in log.split("\n"):
+ match = matcher.match(line)
+ if match:
+ records.append(TimingRecord(int(match.group(1)),
+ int(match.group(2)) / 1000.0,
+ int(match.group(3))))
+ return records
+
+ def run(self, hardware, scenario, result_dir=os.getcwd()):
+ abs_result_dir = os.path.join(result_dir, scenario._name)
+
+ if self._transport == "tcp":
+ uri = "tcp:%s:9000" % self._dst_host
+ elif self._transport == "rdma":
+ uri = "rdma:%s:9000" % self._dst_host
+ elif self._transport == "unix":
+ if self._dst_host != "localhost":
+ raise Exception("Running use unix migration transport for non-local host")
+ uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
+ try:
+ os.remove(uri[5:])
+ os.remove(monaddr)
+ except:
+ pass
+
+ if self._dst_host != "localhost":
+ dstmonaddr = ("localhost", 9001)
+ else:
+ dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
+ srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
+
+ src = qemu.QEMUMachine(self._binary,
+ args=self._get_src_args(hardware),
+ wrapper=self._get_src_wrapper(hardware),
+ name="qemu-src-%d" % os.getpid(),
+ monitor_address=srcmonaddr,
+ debug=self._debug)
+
+ dst = qemu.QEMUMachine(self._binary,
+ args=self._get_dst_args(hardware, uri),
+ wrapper=self._get_dst_wrapper(hardware),
+ name="qemu-dst-%d" % os.getpid(),
+ monitor_address=dstmonaddr,
+ debug=self._debug)
+
+ try:
+ src.launch()
+ dst.launch()
+
+ ret = self._migrate(hardware, scenario, src, dst, uri)
+ progress_history = ret[0]
+ qemu_timings = ret[1]
+ vcpu_timings = ret[2]
+ if uri[0:5] == "unix:":
+ os.remove(uri[5:])
+ if self._verbose:
+ print "Finished migration"
+
+ src.shutdown()
+ dst.shutdown()
+
+ return Report(hardware, scenario, progress_history,
+ Timings(self._get_timings(src) + self._get_timings(dst)),
+ Timings(qemu_timings),
+ Timings(vcpu_timings),
+ self._binary, self._dst_host, self._kernel,
+ self._initrd, self._transport, self._sleep)
+ except Exception as e:
+ if self._debug:
+ print "Failed: %s" % str(e)
+ try:
+ src.shutdown()
+ except:
+ pass
+ try:
+ dst.shutdown()
+ except:
+ pass
+
+ if self._debug:
+ print src.get_log()
+ print dst.get_log()
+ raise
+