#!/usr/bin/env python3 # # Copyright (c) 2017 Red Hat Inc # # Author: # Eduardo Habkost <ehabkost@redhat.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Run QEMU with all combinations of -machine and -device types, check for crashes and unexpected errors. """ import os import sys import glob import logging import traceback import re import random import argparse from itertools import chain sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'python')) from qemu.machine import QEMUMachine logger = logging.getLogger('device-crash-test') dbg = logger.debug # Purposes of the following rule list: # * Avoiding verbose log messages when we find known non-fatal # (exitcode=1) errors # * Avoiding fatal errors when we find known crashes # * Skipping machines/devices that are known not to work out of # the box, when running in --quick mode # # Keeping the rule list updated is desirable, but not required, # because unexpected cases where QEMU exits with exitcode=1 will # just trigger a INFO message. # Valid error rule keys: # * accel: regexp, full match only # * machine: regexp, full match only # * device: regexp, full match only # * log: regexp, partial match allowed # * exitcode: if not present, defaults to 1. If None, matches any exitcode # * warn: if True, matching failures will be logged as warnings # * expected: if True, QEMU is expected to always fail every time # when testing the corresponding test case # * loglevel: log level of log output when there's a match. ERROR_RULE_LIST = [ # Machines that won't work out of the box: # MACHINE | ERROR MESSAGE {'machine':'niagara', 'expected':True}, # Unable to load a firmware for -M niagara {'machine':'boston', 'expected':True}, # Please provide either a -kernel or -bios argument {'machine':'leon3_generic', 'expected':True}, # Can't read bios image (null) # devices that don't work out of the box because they require extra options to "-device DEV": # DEVICE | ERROR MESSAGE {'device':'.*-(i386|x86_64)-cpu', 'expected':True}, # CPU socket-id is not set {'device':'icp', 'expected':True}, # icp_realize: required link 'xics' not found: Property '.xics' not found {'device':'ics', 'expected':True}, # ics_base_realize: required link 'xics' not found: Property '.xics' not found # "-device ide-cd" does work on more recent QEMU versions, so it doesn't have expected=True {'device':'ide-cd'}, # No drive specified {'device':'ide-hd', 'expected':True}, # No drive specified {'device':'ipmi-bmc-extern', 'expected':True}, # IPMI external bmc requires chardev attribute {'device':'isa-debugcon', 'expected':True}, # Can't create serial device, empty char device {'device':'isa-ipmi-bt', 'expected':True}, # IPMI device requires a bmc attribute to be set {'device':'isa-ipmi-kcs', 'expected':True}, # IPMI device requires a bmc attribute to be set {'device':'isa-parallel', 'expected':True}, # Can't create serial device, empty char device {'device':'ivshmem-doorbell', 'expected':True}, # You must specify a 'chardev' {'device':'ivshmem-plain', 'expected':True}, # You must specify a 'memdev' {'device':'loader', 'expected':True}, # please include valid arguments {'device':'nand', 'expected':True}, # Unsupported NAND block size 0x1 {'device':'nvdimm', 'expected':True}, # 'memdev' property is not set {'device':'nvme', 'expected':True}, # Device initialization failed {'device':'pc-dimm', 'expected':True}, # 'memdev' property is not set {'device':'pci-bridge', 'expected':True}, # Bridge chassis not specified. Each bridge is required to be assigned a unique chassis id > 0. {'device':'pci-bridge-seat', 'expected':True}, # Bridge chassis not specified. Each bridge is required to be assigned a unique chassis id > 0. {'device':'pxb', 'expected':True}, # Bridge chassis not specified. Each bridge is required to be assigned a unique chassis id > 0. {'device':'scsi-block', 'expected':True}, # drive property not set {'device':'scsi-generic', 'expected':True}, # drive property not set {'device':'scsi-hd', 'expected':True}, # drive property not set {'device':'spapr-pci-host-bridge', 'expected':True}, # BUID not specified for PHB {'device':'spapr-rng', 'expected':True}, # spapr-rng needs an RNG backend! {'device':'spapr-vty', 'expected':True}, # chardev property not set {'device':'tpm-tis', 'expected':True}, # tpm_tis: backend driver with id (null) could not be found {'device':'unimplemented-device', 'expected':True}, # property 'size' not specified or zero {'device':'usb-braille', 'expected':True}, # Property chardev is required {'device':'usb-mtp', 'expected':True}, # rootdir property must be configured {'device':'usb-redir', 'expected':True}, # Parameter 'chardev' is missing {'device':'usb-serial', 'expected':True}, # Property chardev is required {'device':'usb-storage', 'expected':True}, # drive property not set {'device':'vfio-amd-xgbe', 'expected':True}, # -device vfio-amd-xgbe: vfio error: wrong host device name {'device':'vfio-calxeda-xgmac', 'expected':True}, # -device vfio-calxeda-xgmac: vfio error: wrong host device name {'device':'vfio-pci', 'expected':True}, # No provided host device {'device':'vfio-pci-igd-lpc-bridge', 'expected':True}, # VFIO dummy ISA/LPC bridge must have address 1f.0 {'device':'vhost-scsi.*', 'expected':True}, # vhost-scsi: missing wwpn {'device':'vhost-vsock-device', 'expected':True}, # guest-cid property must be greater than 2 {'device':'vhost-vsock-pci', 'expected':True}, # guest-cid property must be greater than 2 {'device':'virtio-9p-ccw', 'expected':True}, # 9pfs device couldn't find fsdev with the id = NULL {'device':'virtio-9p-device', 'expected':True}, # 9pfs device couldn't find fsdev with the id = NULL {'device':'virtio-9p-pci', 'expected':True}, # 9pfs device couldn't find fsdev with the id = NULL {'device':'virtio-blk-ccw', 'expected':True}, # drive property not set {'device':'virtio-blk-device', 'expected':True}, # drive property not set {'device':'virtio-blk-device', 'expected':True}, # drive property not set {'device':'virtio-blk-pci', 'expected':True}, # drive property not set {'device':'virtio-crypto-ccw', 'expected':True}, # 'cryptodev' parameter expects a valid object {'device':'virtio-crypto-device', 'expected':True}, # 'cryptodev' parameter expects a valid object {'device':'virtio-crypto-pci', 'expected':True}, # 'cryptodev' parameter expects a valid object {'device':'virtio-input-host-device', 'expected':True}, # evdev property is required {'device':'virtio-input-host-pci', 'expected':True}, # evdev property is required {'device':'xen-pvdevice', 'expected':True}, # Device ID invalid, it must always be supplied {'device':'vhost-vsock-ccw', 'expected':True}, # guest-cid property must be greater than 2 {'device':'zpci', 'expected':True}, # target must be defined {'device':'pnv-(occ|icp|lpc)', 'expected':True}, # required link 'xics' not found: Property '.xics' not found {'device':'powernv-cpu-.*', 'expected':True}, # pnv_core_realize: required link 'xics' not found: Property '.xics' not found # ioapic devices are already created by pc and will fail: {'machine':'q35|pc.*', 'device':'kvm-ioapic', 'expected':True}, # Only 1 ioapics allowed {'machine':'q35|pc.*', 'device':'ioapic', 'expected':True}, # Only 1 ioapics allowed # "spapr-cpu-core needs a pseries machine" {'machine':'(?!pseries).*', 'device':'.*-spapr-cpu-core', 'expected':True}, # KVM-specific devices shouldn't be tried without accel=kvm: {'accel':'(?!kvm).*', 'device':'kvmclock', 'expected':True}, # xen-specific machines and devices: {'accel':'(?!xen).*', 'machine':'xen.*', 'expected':True}, {'accel':'(?!xen).*', 'device':'xen-.*', 'expected':True}, # this fails on some machine-types, but not all, so they don't have expected=True: {'device':'vmgenid'}, # vmgenid requires DMA write support in fw_cfg, which this machine type does not provide # Silence INFO messages for errors that are common on multiple # devices/machines: {'log':r"No '[\w-]+' bus found for device '[\w-]+'"}, {'log':r"images* must be given with the 'pflash' parameter"}, {'log':r"(Guest|ROM|Flash|Kernel) image must be specified"}, {'log':r"[cC]ould not load [\w ]+ (BIOS|bios) '[\w-]+\.bin'"}, {'log':r"Couldn't find rom image '[\w-]+\.bin'"}, {'log':r"speed mismatch trying to attach usb device"}, {'log':r"Can't create a second ISA bus"}, {'log':r"duplicate fw_cfg file name"}, # sysbus-related error messages: most machines reject most dynamic sysbus devices: {'log':r"Option '-device [\w.,-]+' cannot be handled by this machine"}, {'log':r"Device [\w.,-]+ is not supported by this machine yet"}, {'log':r"Device [\w.,-]+ can not be dynamically instantiated"}, {'log':r"Platform Bus: Can not fit MMIO region of size "}, # other more specific errors we will ignore: {'device':'.*-spapr-cpu-core', 'log':r"CPU core type should be"}, {'log':r"MSI(-X)? is not supported by interrupt controller"}, {'log':r"pxb-pcie? devices cannot reside on a PCIe? bus"}, {'log':r"Ignoring smp_cpus value"}, {'log':r"sd_init failed: Drive 'sd0' is already in use because it has been automatically connected to another device"}, {'log':r"This CPU requires a smaller page size than the system is using"}, {'log':r"MSI-X support is mandatory in the S390 architecture"}, {'log':r"rom check and register reset failed"}, {'log':r"Unable to initialize GIC, CPUState for CPU#0 not valid"}, {'log':r"Multiple VT220 operator consoles are not supported"}, {'log':r"core 0 already populated"}, {'log':r"could not find stage1 bootloader"}, {'log':r"No '.*' bus found for device"}, # other exitcode=1 failures not listed above will just generate INFO messages: {'exitcode':1, 'loglevel':logging.INFO}, # everything else (including SIGABRT and SIGSEGV) will be a fatal error: {'exitcode':None, 'fatal':True, 'loglevel':logging.FATAL}, ] def errorRuleTestCaseMatch(rule, t): """Check if a test case specification can match a error rule This only checks if a error rule is a candidate match for a given test case, it won't check if the test case results/output match the rule. See ruleListResultMatch(). """ return (('machine' not in rule or 'machine' not in t or re.match(rule['machine'] + '$', t['machine'])) and ('accel' not in rule or 'accel' not in t or re.match(rule['accel'] + '$', t['accel'])) and ('device' not in rule or 'device' not in t or re.match(rule['device'] + '$', t['device']))) def ruleListCandidates(t): """Generate the list of candidates that can match a test case""" for i, rule in enumerate(ERROR_RULE_LIST): if errorRuleTestCaseMatch(rule, t): yield (i, rule) def findExpectedResult(t): """Check if there's an expected=True error rule for a test case Returns (i, rule) tuple, where i is the index in ERROR_RULE_LIST and rule is the error rule itself. """ for i, rule in ruleListCandidates(t): if rule.get('expected'): return (i, rule) def ruleListResultMatch(rule, r): """Check if test case results/output match a error rule It is valid to call this function only if errorRuleTestCaseMatch() is True for the rule (e.g. on rules returned by ruleListCandidates()) """ assert errorRuleTestCaseMatch(rule, r['testcase']) return ((rule.get('exitcode', 1) is None or r['exitcode'] == rule.get('exitcode', 1)) and ('log' not in rule or re.search(rule['log'], r['log'], re.MULTILINE))) def checkResultRuleList(r): """Look up error rule for a given test case result Returns (i, rule) tuple, where i is the index in ERROR_RULE_LIST and rule is the error rule itself. """ for i, rule in ruleListCandidates(r['testcase']): if ruleListResultMatch(rule, r): return i, rule raise Exception("this should never happen") def qemuOptsEscape(s): """Escape option value QemuOpts""" return s.replace(",", ",,") def formatTestCase(t): """Format test case info as "key=value key=value" for prettier logging output""" return ' '.join('%s=%s' % (k, v) for k, v in t.items()) def qomListTypeNames(vm, **kwargs): """Run qom-list-types QMP command, return type names""" types = vm.command('qom-list-types', **kwargs) return [t['name'] for t in types] def infoQDM(vm): """Parse 'info qdm' output""" args = {'command-line': 'info qdm'} devhelp = vm.command('human-monitor-command', **args) for l in devhelp.split('\n'): l = l.strip() if l == '' or l.endswith(':'): continue d = {'name': re.search(r'name "([^"]+)"', l).group(1), 'no-user': (re.search(', no-user', l) is not None)} yield d class QemuBinaryInfo(object): def __init__(self, binary, devtype): if devtype is None: devtype = 'device' self.binary = binary self._machine_info = {} dbg("devtype: %r", devtype) args = ['-S', '-machine', 'none,accel=kvm:tcg'] dbg("querying info for QEMU binary: %s", binary) vm = QEMUMachine(binary=binary, args=args) vm.launch() try: self.alldevs = set(qomListTypeNames(vm, implements=devtype, abstract=False)) # there's no way to query DeviceClass::user_creatable using QMP, # so use 'info qdm': self.no_user_devs = set([d['name'] for d in infoQDM(vm, ) if d['no-user']]) self.machines = list(m['name'] for m in vm.command('query-machines')) self.user_devs = self.alldevs.difference(self.no_user_devs) self.kvm_available = vm.command('query-kvm')['enabled'] finally: vm.shutdown() def machineInfo(self, machine): """Query for information on a specific machine-type Results are cached internally, in case the same machine- type is queried multiple times. """ if machine in self._machine_info: return self._machine_info[machine] mi = {} args = ['-S', '-machine', '%s' % (machine)] dbg("querying machine info for binary=%s machine=%s", self.binary, machine) vm = QEMUMachine(binary=self.binary, args=args) try: vm.launch() mi['runnable'] = True except KeyboardInterrupt: raise except: dbg("exception trying to run binary=%s machine=%s", self.binary, machine, exc_info=sys.exc_info()) dbg("log: %r", vm.get_log()) mi['runnable'] = False vm.shutdown() self._machine_info[machine] = mi return mi BINARY_INFO = {} def getBinaryInfo(args, binary): if binary not in BINARY_INFO: BINARY_INFO[binary] = QemuBinaryInfo(binary, args.devtype) return BINARY_INFO[binary] def checkOneCase(args, testcase): """Check one specific case Returns a dictionary containing failure information on error, or None on success """ binary = testcase['binary'] accel = testcase['accel'] machine = testcase['machine'] device = testcase['device'] dbg("will test: %r", testcase) args = ['-S', '-machine', '%s,accel=%s' % (machine, accel), '-device', qemuOptsEscape(device)] cmdline = ' '.join([binary] + args) dbg("will launch QEMU: %s", cmdline) vm = QEMUMachine(binary=binary, args=args) exc_traceback = None try: vm.launch() except KeyboardInterrupt: raise except: exc_traceback = traceback.format_exc() dbg("Exception while running test case") finally: vm.shutdown() ec = vm.exitcode() log = vm.get_log() if exc_traceback is not None or ec != 0: return {'exc_traceback':exc_traceback, 'exitcode':ec, 'log':log, 'testcase':testcase, 'cmdline':cmdline} def binariesToTest(args, testcase): if args.qemu: r = args.qemu else: r = [f.path for f in os.scandir('.') if f.name.startswith('qemu-system-') and f.is_file() and os.access(f, os.X_OK)] return r def accelsToTest(args, testcase): if getBinaryInfo(args, testcase['binary']).kvm_available: yield 'kvm' yield 'tcg' def machinesToTest(args, testcase): return getBinaryInfo(args, testcase['binary']).machines def devicesToTest(args, testcase): return getBinaryInfo(args, testcase['binary']).user_devs TESTCASE_VARIABLES = [ ('binary', binariesToTest), ('accel', accelsToTest), ('machine', machinesToTest), ('device', devicesToTest), ] def genCases1(args, testcases, var, fn): """Generate new testcases for one variable If an existing item already has a variable set, don't generate new items and just return it directly. This allows the "-t" command-line option to be used to choose a specific test case. """ for testcase in testcases: if var in testcase: yield testcase.copy() else: for i in fn(args, testcase): t = testcase.copy() t[var] = i yield t def genCases(args, testcase): """Generate test cases for all variables """ cases = [testcase.copy()] for var, fn in TESTCASE_VARIABLES: dbg("var: %r, fn: %r", var, fn) cases = genCases1(args, cases, var, fn) return cases def casesToTest(args, testcase): cases = genCases(args, testcase) if args.random: cases = list(cases) cases = random.sample(cases, min(args.random, len(cases))) if args.debug: cases = list(cases) dbg("%d test cases to test", len(cases)) if args.shuffle: cases = list(cases) random.shuffle(cases) return cases def logFailure(f, level): t = f['testcase'] logger.log(level, "failed: %s", formatTestCase(t)) logger.log(level, "cmdline: %s", f['cmdline']) for l in f['log'].strip().split('\n'): logger.log(level, "log: %s", l) logger.log(level, "exit code: %r", f['exitcode']) if f['exc_traceback']: logger.log(level, "exception:") for l in f['exc_traceback'].split('\n'): logger.log(level, " %s", l.rstrip('\n')) def main(): parser = argparse.ArgumentParser(description="QEMU -device crash test") parser.add_argument('-t', metavar='KEY=VALUE', nargs='*', help="Limit test cases to KEY=VALUE", action='append', dest='testcases', default=[]) parser.add_argument('-d', '--debug', action='store_true', help='debug output') parser.add_argument('-v', '--verbose', action='store_true', default=True, help='verbose output') parser.add_argument('-q', '--quiet', dest='verbose', action='store_false', help='non-verbose output') parser.add_argument('-r', '--random', type=int, metavar='COUNT', help='run a random sample of COUNT test cases', default=0) parser.add_argument('--shuffle', action='store_true', help='Run test cases in random order') parser.add_argument('--dry-run', action='store_true', help="Don't run any tests, just generate list") parser.add_argument('-D', '--devtype', metavar='TYPE', help="Test only device types that implement TYPE") parser.add_argument('-Q', '--quick', action='store_true', default=True, help="Quick mode: skip test cases that are expected to fail") parser.add_argument('-F', '--full', action='store_false', dest='quick', help="Full mode: test cases that are expected to fail") parser.add_argument('--strict', action='store_true', dest='strict', help="Treat all warnings as fatal") parser.add_argument('qemu', nargs='*', metavar='QEMU', help='QEMU binary to run') args = parser.parse_args() if args.debug: lvl = logging.DEBUG elif args.verbose: lvl = logging.INFO else: lvl = logging.WARN logging.basicConfig(stream=sys.stdout, level=lvl, format='%(levelname)s: %(message)s') fatal_failures = [] wl_stats = {} skipped = 0 total = 0 tc = {} dbg("testcases: %r", args.testcases) if args.testcases: for t in chain(*args.testcases): for kv in t.split(): k, v = kv.split('=', 1) tc[k] = v if len(binariesToTest(args, tc)) == 0: print("No QEMU binary found", file=sys.stderr) parser.print_usage(sys.stderr) return 1 for t in casesToTest(args, tc): logger.info("running test case: %s", formatTestCase(t)) total += 1 expected_match = findExpectedResult(t) if (args.quick and (expected_match or not getBinaryInfo(args, t['binary']).machineInfo(t['machine'])['runnable'])): dbg("skipped: %s", formatTestCase(t)) skipped += 1 continue if args.dry_run: continue try: f = checkOneCase(args, t) except KeyboardInterrupt: break if f: i, rule = checkResultRuleList(f) dbg("testcase: %r, rule list match: %r", t, rule) wl_stats.setdefault(i, []).append(f) level = rule.get('loglevel', logging.DEBUG) logFailure(f, level) if rule.get('fatal') or (args.strict and level >= logging.WARN): fatal_failures.append(f) else: dbg("success: %s", formatTestCase(t)) if expected_match: logger.warn("Didn't fail as expected: %s", formatTestCase(t)) logger.info("Total: %d test cases", total) if skipped: logger.info("Skipped %d test cases", skipped) if args.debug: stats = sorted([(len(wl_stats.get(i, [])), rule) for i, rule in enumerate(ERROR_RULE_LIST)], key=lambda x: x[0]) for count, rule in stats: dbg("error rule stats: %d: %r", count, rule) if fatal_failures: for f in fatal_failures: t = f['testcase'] logger.error("Fatal failure: %s", formatTestCase(t)) logger.error("Fatal failures on some machine/device combinations") return 1 if __name__ == '__main__': sys.exit(main())