diff options
author | John Snow <jsnow@redhat.com> | 2021-06-07 16:06:47 -0400 |
---|---|---|
committer | John Snow <jsnow@redhat.com> | 2021-06-18 16:10:07 -0400 |
commit | 6be7206efc394b0232912e7055c7298ec2b0352d (patch) | |
tree | dd6f2764765f3e0d6dff44162bb4bc3a21235457 /scripts | |
parent | e359c5a8b8e6184c15806d1408de085aab9c268b (diff) |
scripts/qmp-shell: move to python/qemu/qmp/qmp_shell.py
The script will be unavailable for a commit or two, which will help
preserve development history attached to the new file. A forwarder will
be added shortly afterwards.
With qmp_shell in the python qemu.qmp package, now it is fully type
checked, linted, etc. via the Python CI. It will be quite a bit harder
to accidentally break it again in the future.
Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210607200649.1840382-41-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/qmp/qmp-shell | 538 |
1 files changed, 0 insertions, 538 deletions
diff --git a/scripts/qmp/qmp-shell b/scripts/qmp/qmp-shell deleted file mode 100755 index 15aedb80c2..0000000000 --- a/scripts/qmp/qmp-shell +++ /dev/null @@ -1,538 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2009, 2010 Red Hat Inc. -# -# Authors: -# Luiz Capitulino <lcapitulino@redhat.com> -# -# This work is licensed under the terms of the GNU GPL, version 2. See -# the COPYING file in the top-level directory. -# - -""" -Low-level QEMU shell on top of QMP. - -usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server - -positional arguments: - qmp_server < UNIX socket path | TCP address:port > - -optional arguments: - -h, --help show this help message and exit - -H, --hmp Use HMP interface - -N, --skip-negotiation - Skip negotiate (for qemu-ga) - -v, --verbose Verbose (echo commands sent and received) - -p, --pretty Pretty-print JSON - - -Start QEMU with: - -# qemu [...] -qmp unix:./qmp-sock,server - -Run the shell: - -$ qmp-shell ./qmp-sock - -Commands have the following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - -For example: - -(QEMU) device_add driver=e1000 id=net1 -{'return': {}} -(QEMU) - -key=value pairs also support Python or JSON object literal subset notations, -without spaces. Dictionaries/objects {} are supported as are arrays []. - - example-command arg-name1={'key':'value','obj'={'prop':"value"}} - -Both JSON and Python formatting should work, including both styles of -string literal quotes. Both paradigms of literal values should work, -including null/true/false for JSON and None/True/False for Python. - - -Transactions have the following multi-line format: - - transaction( - action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ... - action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] - ) - -One line transactions are also supported: - - transaction( action-name1 ... ) - -For example: - - (QEMU) transaction( - TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 - TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 - TRANS> ) - {"return": {}} - (QEMU) - -Use the -v and -p options to activate the verbose and pretty-print options, -which will echo back the properly formatted JSON-compliant QMP that is being -sent to QEMU, which is useful for debugging and documentation generation. -""" - -import argparse -import ast -import json -import logging -import os -import re -import readline -import sys -from typing import ( - Iterator, - List, - NoReturn, - Optional, - Sequence, -) - - -sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) -from qemu import qmp -from qemu.qmp import QMPMessage - - -LOG = logging.getLogger(__name__) - - -class QMPCompleter: - """ - QMPCompleter provides a readline library tab-complete behavior. - """ - # NB: Python 3.9+ will probably allow us to subclass list[str] directly, - # but pylint as of today does not know that List[str] is simply 'list'. - def __init__(self) -> None: - self._matches: List[str] = [] - - def append(self, value: str) -> None: - """Append a new valid completion to the list of possibilities.""" - return self._matches.append(value) - - def complete(self, text: str, state: int) -> Optional[str]: - """readline.set_completer() callback implementation.""" - for cmd in self._matches: - if cmd.startswith(text): - if state == 0: - return cmd - state -= 1 - return None - - -class QMPShellError(qmp.QMPError): - """ - QMP Shell Base error class. - """ - - -class FuzzyJSON(ast.NodeTransformer): - """ - This extension of ast.NodeTransformer filters literal "true/false/null" - values in a Python AST and replaces them by proper "True/False/None" values - that Python can properly evaluate. - """ - - @classmethod - def visit_Name(cls, # pylint: disable=invalid-name - node: ast.Name) -> ast.AST: - """ - Transform Name nodes with certain values into Constant (keyword) nodes. - """ - if node.id == 'true': - return ast.Constant(value=True) - if node.id == 'false': - return ast.Constant(value=False) - if node.id == 'null': - return ast.Constant(value=None) - return node - - -class QMPShell(qmp.QEMUMonitorProtocol): - """ - QMPShell provides a basic readline-based QMP shell. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: qmp.SocketAddrT, - pretty: bool = False, verbose: bool = False): - super().__init__(address) - self._greeting: Optional[QMPMessage] = None - self._completer = QMPCompleter() - self._transmode = False - self._actions: List[QMPMessage] = [] - self._histfile = os.path.join(os.path.expanduser('~'), - '.qmp-shell_history') - self.pretty = pretty - self.verbose = verbose - - def close(self) -> None: - # Hook into context manager of parent to save shell history. - self._save_history() - super().close() - - def _fill_completion(self) -> None: - cmds = self.cmd('query-commands') - if 'error' in cmds: - return - for cmd in cmds['return']: - self._completer.append(cmd['name']) - - def _completer_setup(self) -> None: - self._completer = QMPCompleter() - self._fill_completion() - readline.set_history_length(1024) - readline.set_completer(self._completer.complete) - readline.parse_and_bind("tab: complete") - # NB: default delimiters conflict with some command names - # (eg. query-), clearing everything as it doesn't seem to matter - readline.set_completer_delims('') - try: - readline.read_history_file(self._histfile) - except FileNotFoundError: - pass - except IOError as err: - msg = f"Failed to read history '{self._histfile}': {err!s}" - LOG.warning(msg) - - def _save_history(self) -> None: - try: - readline.write_history_file(self._histfile) - except IOError as err: - msg = f"Failed to save history file '{self._histfile}': {err!s}" - LOG.warning(msg) - - @classmethod - def _parse_value(cls, val: str) -> object: - try: - return int(val) - except ValueError: - pass - - if val.lower() == 'true': - return True - if val.lower() == 'false': - return False - if val.startswith(('{', '[')): - # Try first as pure JSON: - try: - return json.loads(val) - except ValueError: - pass - # Try once again as FuzzyJSON: - try: - tree = ast.parse(val, mode='eval') - transformed = FuzzyJSON().visit(tree) - return ast.literal_eval(transformed) - except (SyntaxError, ValueError): - pass - return val - - def _cli_expr(self, - tokens: Sequence[str], - parent: qmp.QMPObject) -> None: - for arg in tokens: - (key, sep, val) = arg.partition('=') - if sep != '=': - raise QMPShellError( - f"Expected a key=value pair, got '{arg!s}'" - ) - - value = self._parse_value(val) - optpath = key.split('.') - curpath = [] - for path in optpath[:-1]: - curpath.append(path) - obj = parent.get(path, {}) - if not isinstance(obj, dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - parent[path] = obj - parent = obj - if optpath[-1] in parent: - if isinstance(parent[optpath[-1]], dict): - msg = 'Cannot use "{:s}" as both leaf and non-leaf key' - raise QMPShellError(msg.format('.'.join(curpath))) - raise QMPShellError(f'Cannot set "{key}" multiple times') - parent[optpath[-1]] = value - - def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: - """ - Build a QMP input object from a user provided command-line in the - following format: - - < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] - """ - argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' - cmdargs = re.findall(argument_regex, cmdline) - qmpcmd: QMPMessage - - # Transactional CLI entry: - if cmdargs and cmdargs[0] == 'transaction(': - self._transmode = True - self._actions = [] - cmdargs.pop(0) - - # Transactional CLI exit: - if cmdargs and cmdargs[0] == ')' and self._transmode: - self._transmode = False - if len(cmdargs) > 1: - msg = 'Unexpected input after close of Transaction sub-shell' - raise QMPShellError(msg) - qmpcmd = { - 'execute': 'transaction', - 'arguments': {'actions': self._actions} - } - return qmpcmd - - # No args, or no args remaining - if not cmdargs: - return None - - if self._transmode: - # Parse and cache this Transactional Action - finalize = False - action = {'type': cmdargs[0], 'data': {}} - if cmdargs[-1] == ')': - cmdargs.pop(-1) - finalize = True - self._cli_expr(cmdargs[1:], action['data']) - self._actions.append(action) - return self._build_cmd(')') if finalize else None - - # Standard command: parse and return it to be executed. - qmpcmd = {'execute': cmdargs[0], 'arguments': {}} - self._cli_expr(cmdargs[1:], qmpcmd['arguments']) - return qmpcmd - - def _print(self, qmp_message: object) -> None: - jsobj = json.dumps(qmp_message, - indent=4 if self.pretty else None, - sort_keys=self.pretty) - print(str(jsobj)) - - def _execute_cmd(self, cmdline: str) -> bool: - try: - qmpcmd = self._build_cmd(cmdline) - except QMPShellError as err: - print( - f"Error while parsing command line: {err!s}\n" - "command format: <command-name> " - "[arg-name1=arg1] ... [arg-nameN=argN", - file=sys.stderr - ) - return True - # For transaction mode, we may have just cached the action: - if qmpcmd is None: - return True - if self.verbose: - self._print(qmpcmd) - resp = self.cmd_obj(qmpcmd) - if resp is None: - print('Disconnected') - return False - self._print(resp) - return True - - def connect(self, negotiate: bool = True) -> None: - self._greeting = super().connect(negotiate) - self._completer_setup() - - def show_banner(self, - msg: str = 'Welcome to the QMP low-level shell!') -> None: - """ - Print to stdio a greeting, and the QEMU version if available. - """ - print(msg) - if not self._greeting: - print('Connected') - return - version = self._greeting['QMP']['version']['qemu'] - print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) - - @property - def prompt(self) -> str: - """ - Return the current shell prompt, including a trailing space. - """ - if self._transmode: - return 'TRANS> ' - return '(QEMU) ' - - def read_exec_command(self) -> bool: - """ - Read and execute a command. - - @return True if execution was ok, return False if disconnected. - """ - try: - cmdline = input(self.prompt) - except EOFError: - print() - return False - - if cmdline == '': - for event in self.get_events(): - print(event) - self.clear_events() - return True - - return self._execute_cmd(cmdline) - - def repl(self) -> Iterator[None]: - """ - Return an iterator that implements the REPL. - """ - self.show_banner() - while self.read_exec_command(): - yield - self.close() - - -class HMPShell(QMPShell): - """ - HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. - - :param address: Address of the QMP server. - :param pretty: Pretty-print QMP messages. - :param verbose: Echo outgoing QMP messages to console. - """ - def __init__(self, address: qmp.SocketAddrT, - pretty: bool = False, verbose: bool = False): - super().__init__(address, pretty, verbose) - self._cpu_index = 0 - - def _cmd_completion(self) -> None: - for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): - if cmd and cmd[0] != '[' and cmd[0] != '\t': - name = cmd.split()[0] # drop help text - if name == 'info': - continue - if name.find('|') != -1: - # Command in the form 'foobar|f' or 'f|foobar', take the - # full name - opt = name.split('|') - if len(opt[0]) == 1: - name = opt[1] - else: - name = opt[0] - self._completer.append(name) - self._completer.append('help ' + name) # help completion - - def _info_completion(self) -> None: - for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): - if cmd: - self._completer.append('info ' + cmd.split()[1]) - - def _other_completion(self) -> None: - # special cases - self._completer.append('help info') - - def _fill_completion(self) -> None: - self._cmd_completion() - self._info_completion() - self._other_completion() - - def _cmd_passthrough(self, cmdline: str, - cpu_index: int = 0) -> QMPMessage: - return self.cmd_obj({ - 'execute': 'human-monitor-command', - 'arguments': { - 'command-line': cmdline, - 'cpu-index': cpu_index - } - }) - - def _execute_cmd(self, cmdline: str) -> bool: - if cmdline.split()[0] == "cpu": - # trap the cpu command, it requires special setting - try: - idx = int(cmdline.split()[1]) - if 'return' not in self._cmd_passthrough('info version', idx): - print('bad CPU index') - return True - self._cpu_index = idx - except ValueError: - print('cpu command takes an integer argument') - return True - resp = self._cmd_passthrough(cmdline, self._cpu_index) - if resp is None: - print('Disconnected') - return False - assert 'return' in resp or 'error' in resp - if 'return' in resp: - # Success - if len(resp['return']) > 0: - print(resp['return'], end=' ') - else: - # Error - print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) - return True - - def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: - QMPShell.show_banner(self, msg) - - -def die(msg: str) -> NoReturn: - """Write an error to stderr, then exit with a return code of 1.""" - sys.stderr.write('ERROR: %s\n' % msg) - sys.exit(1) - - -def main() -> None: - """ - qmp-shell entry point: parse command line arguments and start the REPL. - """ - parser = argparse.ArgumentParser() - parser.add_argument('-H', '--hmp', action='store_true', - help='Use HMP interface') - parser.add_argument('-N', '--skip-negotiation', action='store_true', - help='Skip negotiate (for qemu-ga)') - parser.add_argument('-v', '--verbose', action='store_true', - help='Verbose (echo commands sent and received)') - parser.add_argument('-p', '--pretty', action='store_true', - help='Pretty-print JSON') - - default_server = os.environ.get('QMP_SOCKET') - parser.add_argument('qmp_server', action='store', - default=default_server, - help='< UNIX socket path | TCP address:port >') - - args = parser.parse_args() - if args.qmp_server is None: - parser.error("QMP socket or TCP address must be specified") - - shell_class = HMPShell if args.hmp else QMPShell - - try: - address = shell_class.parse_address(args.qmp_server) - except qmp.QMPBadPortError: - parser.error(f"Bad port number: {args.qmp_server}") - return # pycharm doesn't know error() is noreturn - - with shell_class(address, args.pretty, args.verbose) as qemu: - try: - qemu.connect(negotiate=not args.skip_negotiation) - except qmp.QMPConnectError: - die("Didn't get QMP greeting message") - except qmp.QMPCapabilitiesError: - die("Couldn't negotiate capabilities") - except OSError as err: - die(f"Couldn't connect to {args.qmp_server}: {err!s}") - - for _ in qemu.repl(): - pass - - -if __name__ == '__main__': - main() |