#!/usr/bin/env python3 # # Low-level QEMU shell on top of QMP. # # Copyright (C) 2009, 2010 Red Hat Inc. # # Authors: # Luiz Capitulino # # This work is licensed under the terms of the GNU GPL, version 2. See # the COPYING file in the top-level directory. # # Usage: # # Start QEMU with: # # # qemu [...] -qmp unix:./qmp-sock,server # # Run the shell: # # $ qmp-shell ./qmp-sock # # Commands have the following format: # # < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] # # For example: # # (QEMU) device_add driver=e1000 id=net1 # {u'return': {}} # (QEMU) # # key=value pairs also support Python or JSON object literal subset notations, # without spaces. Dictionaries/objects {} are supported as are arrays []. # # example-command arg-name1={'key':'value','obj'={'prop':"value"}} # # Both JSON and Python formatting should work, including both styles of # string literal quotes. Both paradigms of literal values should work, # including null/true/false for JSON and None/True/False for Python. # # # Transactions have the following multi-line format: # # transaction( # action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] # ... # action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] # ) # # One line transactions are also supported: # # transaction( action-name1 ... ) # # For example: # # (QEMU) transaction( # TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 # TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 # TRANS> ) # {"return": {}} # (QEMU) # # Use the -v and -p options to activate the verbose and pretty-print options, # which will echo back the properly formatted JSON-compliant QMP that is being # sent to QEMU, which is useful for debugging and documentation generation. import argparse import ast import atexit import json import os import re import readline import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) from qemu import qmp class QMPCompleter: # NB: Python 3.9+ will probably allow us to subclass list[str] directly, # but pylint as of today does not know that List[str] is simply 'list'. def __init__(self) -> None: self._matches: List[str] = [] def append(self, value: str) -> None: return self._matches.append(value) def complete(self, text: str, state: int) -> Optional[str]: for cmd in self._matches: if cmd.startswith(text): if state == 0: return cmd state -= 1 return None class QMPShellError(Exception): pass class FuzzyJSON(ast.NodeTransformer): """ This extension of ast.NodeTransformer filters literal "true/false/null" values in a Python AST and replaces them by proper "True/False/None" values that Python can properly evaluate. """ @classmethod def visit_Name(cls, # pylint: disable=invalid-name node: ast.Name) -> ast.AST: if node.id == 'true': return ast.Constant(value=True) if node.id == 'false': return ast.Constant(value=False) if node.id == 'null': return ast.Constant(value=None) return node # TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and # _execute_cmd()). Let's design a better one. class QMPShell(qmp.QEMUMonitorProtocol): def __init__(self, address, pretty=False, verbose=False): super().__init__(self.parse_address(address)) self._greeting = None self._completer = QMPCompleter() self._pretty = pretty self._transmode = False self._actions = list() self._histfile = os.path.join(os.path.expanduser('~'), '.qmp-shell_history') self.verbose = verbose def _fill_completion(self): cmds = self.cmd('query-commands') if 'error' in cmds: return for cmd in cmds['return']: self._completer.append(cmd['name']) def __completer_setup(self): self._completer = QMPCompleter() self._fill_completion() readline.set_history_length(1024) readline.set_completer(self._completer.complete) readline.parse_and_bind("tab: complete") # NB: default delimiters conflict with some command names # (eg. query-), clearing everything as it doesn't seem to matter readline.set_completer_delims('') try: readline.read_history_file(self._histfile) except FileNotFoundError: pass except IOError as err: print(f"Failed to read history '{self._histfile}': {err!s}") atexit.register(self.__save_history) def __save_history(self): try: readline.write_history_file(self._histfile) except IOError as err: print(f"Failed to save history file '{self._histfile}': {err!s}") @classmethod def __parse_value(cls, val): try: return int(val) except ValueError: pass if val.lower() == 'true': return True if val.lower() == 'false': return False if val.startswith(('{', '[')): # Try first as pure JSON: try: return json.loads(val) except ValueError: pass # Try once again as FuzzyJSON: try: tree = ast.parse(val, mode='eval') transformed = FuzzyJSON().visit(tree) return ast.literal_eval(transformed) except (SyntaxError, ValueError): pass return val def __cli_expr(self, tokens, parent): for arg in tokens: (key, sep, val) = arg.partition('=') if sep != '=': raise QMPShellError( f"Expected a key=value pair, got '{arg!s}'" ) value = self.__parse_value(val) optpath = key.split('.') curpath = [] for path in optpath[:-1]: curpath.append(path) obj = parent.get(path, {}) if not isinstance(obj, dict): msg = 'Cannot use "{:s}" as both leaf and non-leaf key' raise QMPShellError(msg.format('.'.join(curpath))) parent[path] = obj parent = obj if optpath[-1] in parent: if isinstance(parent[optpath[-1]], dict): msg = 'Cannot use "{:s}" as both leaf and non-leaf key' raise QMPShellError(msg.format('.'.join(curpath))) raise QMPShellError(f'Cannot set "{key}" multiple times') parent[optpath[-1]] = value def __build_cmd(self, cmdline): """ Build a QMP input object from a user provided command-line in the following format: < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] """ argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' cmdargs = re.findall(argument_regex, cmdline) # Transactional CLI entry/exit: if cmdargs[0] == 'transaction(': self._transmode = True cmdargs.pop(0) elif cmdargs[0] == ')' and self._transmode: self._transmode = False if len(cmdargs) > 1: msg = 'Unexpected input after close of Transaction sub-shell' raise QMPShellError(msg) qmpcmd = { 'execute': 'transaction', 'arguments': {'actions': self._actions} } self._actions = list() return qmpcmd # Nothing to process? if not cmdargs: return None # Parse and then cache this Transactional Action if self._transmode: finalize = False action = {'type': cmdargs[0], 'data': {}} if cmdargs[-1] == ')': cmdargs.pop(-1) finalize = True self.__cli_expr(cmdargs[1:], action['data']) self._actions.append(action) return self.__build_cmd(')') if finalize else None # Standard command: parse and return it to be executed. qmpcmd = {'execute': cmdargs[0], 'arguments': {}} self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) return qmpcmd def _print(self, qmp_message): indent = None if self._pretty: indent = 4 jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty) print(str(jsobj)) def _execute_cmd(self, cmdline): try: qmpcmd = self.__build_cmd(cmdline) except Exception as err: print('Error while parsing command line: %s' % err) print('command format: ', end=' ') print('[arg-name1=arg1] ... [arg-nameN=argN]') return True # For transaction mode, we may have just cached the action: if qmpcmd is None: return True if self.verbose: self._print(qmpcmd) resp = self.cmd_obj(qmpcmd) if resp is None: print('Disconnected') return False self._print(resp) return True def connect(self, negotiate: bool = True): self._greeting = super().connect(negotiate) self.__completer_setup() def show_banner(self, msg='Welcome to the QMP low-level shell!'): print(msg) if not self._greeting: print('Connected') return version = self._greeting['QMP']['version']['qemu'] print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) @property def prompt(self): if self._transmode: return 'TRANS> ' return '(QEMU) ' def read_exec_command(self): """ Read and execute a command. @return True if execution was ok, return False if disconnected. """ try: cmdline = input(self.prompt) except EOFError: print() return False if cmdline == '': for event in self.get_events(): print(event) self.clear_events() return True return self._execute_cmd(cmdline) def repl(self): self.show_banner() while self.read_exec_command(): yield self.close() class HMPShell(QMPShell): def __init__(self, address, pretty=False, verbose=False): super().__init__(address, pretty, verbose) self.__cpu_index = 0 def __cmd_completion(self): for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): if cmd and cmd[0] != '[' and cmd[0] != '\t': name = cmd.split()[0] # drop help text if name == 'info': continue if name.find('|') != -1: # Command in the form 'foobar|f' or 'f|foobar', take the # full name opt = name.split('|') if len(opt[0]) == 1: name = opt[1] else: name = opt[0] self._completer.append(name) self._completer.append('help ' + name) # help completion def __info_completion(self): for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): if cmd: self._completer.append('info ' + cmd.split()[1]) def __other_completion(self): # special cases self._completer.append('help info') def _fill_completion(self): self.__cmd_completion() self.__info_completion() self.__other_completion() def __cmd_passthrough(self, cmdline, cpu_index=0): return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments': { 'command-line': cmdline, 'cpu-index': cpu_index } }) def _execute_cmd(self, cmdline): if cmdline.split()[0] == "cpu": # trap the cpu command, it requires special setting try: idx = int(cmdline.split()[1]) if 'return' not in self.__cmd_passthrough('info version', idx): print('bad CPU index') return True self.__cpu_index = idx except ValueError: print('cpu command takes an integer argument') return True resp = self.__cmd_passthrough(cmdline, self.__cpu_index) if resp is None: print('Disconnected') return False assert 'return' in resp or 'error' in resp if 'return' in resp: # Success if len(resp['return']) > 0: print(resp['return'], end=' ') else: # Error print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) return True def show_banner(self, msg='Welcome to the HMP shell!'): QMPShell.show_banner(self, msg) def die(msg): sys.stderr.write('ERROR: %s\n' % msg) sys.exit(1) def main(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--hmp', action='store_true', help='Use HMP interface') parser.add_argument('-N', '--skip-negotiation', action='store_true', help='Skip negotiate (for qemu-ga)') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose (echo commands sent and received)') parser.add_argument('-p', '--pretty', action='store_true', help='Pretty-print JSON') default_server = os.environ.get('QMP_SOCKET') parser.add_argument('qmp_server', action='store', default=default_server, help='< UNIX socket path | TCP address:port >') args = parser.parse_args() if args.qmp_server is None: parser.error("QMP socket or TCP address must be specified") shell_class = HMPShell if args.hmp else QMPShell try: qemu = shell_class(args.qmp_server, args.pretty, args.verbose) except qmp.QMPBadPortError: parser.error(f"Bad port number: {args.qmp_server}") return # pycharm doesn't know error() is noreturn try: qemu.connect(negotiate=not args.skip_negotiation) except qmp.QMPConnectError: die("Didn't get QMP greeting message") except qmp.QMPCapabilitiesError: die("Couldn't negotiate capabilities") except OSError as err: die(f"Couldn't connect to {args.qmp_server}: {err!s}") for _ in qemu.repl(): pass if __name__ == '__main__': main()