aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/qemu/aqmp/__init__.py2
-rw-r--r--python/qemu/aqmp/qmp_client.py264
2 files changed, 266 insertions, 0 deletions
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 829166a2e2..d975c752ea 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -25,11 +25,13 @@ from .error import AQMPError
from .events import EventListener
from .message import Message
from .protocol import ConnectError, Runstate, StateError
+from .qmp_client import QMPClient
# The order of these fields impact the Sphinx documentation order.
__all__ = (
# Classes, most to least important
+ 'QMPClient',
'Message',
'EventListener',
'Runstate',
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
new file mode 100644
index 0000000000..000ff59c7a
--- /dev/null
+++ b/python/qemu/aqmp/qmp_client.py
@@ -0,0 +1,264 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import logging
+from typing import (
+ Dict,
+ List,
+ Mapping,
+ Optional,
+)
+
+from .error import ProtocolError
+from .events import Events
+from .message import Message
+from .models import Greeting
+from .protocol import AsyncProtocol
+from .util import (
+ bottom_half,
+ exception_summary,
+ pretty_traceback,
+ upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+ """
+ Abstract exception class for Protocol errors that wrap an Exception.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ self.exc = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+ """
+ An exception occurred during the Greeting phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class NegotiationError(_WrappedProtocolError):
+ """
+ An exception occurred during the Negotiation phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+ """
+ Implements a QMP client connection.
+
+ QMP can be used to establish a connection as either the transport
+ client or server, though this class always acts as the QMP client.
+
+ :param name: Optional nickname for the connection, used for logging.
+
+ Basic script-style usage looks like this::
+
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ Basic async client-style usage looks like this::
+
+ class Client:
+ def __init__(self, name: str):
+ self.qmp = QMPClient(name)
+
+ async def watch_events(self):
+ try:
+ async for event in self.qmp.events:
+ print(f"Event: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self, address='/tmp/qemu.socket'):
+ await self.qmp.connect(address)
+ asyncio.create_task(self.watch_events())
+ await self.qmp.runstate_changed.wait()
+ await self.disconnect()
+
+ See `aqmp.events` for more detail on event handling patterns.
+ """
+ #: Logger object used for debugging messages.
+ logger = logging.getLogger(__name__)
+
+ # Read buffer limit; large enough to accept query-qmp-schema
+ _limit = (256 * 1024)
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+ Events.__init__(self)
+
+ #: Whether or not to await a greeting after establishing a connection.
+ self.await_greeting: bool = True
+
+ #: Whether or not to perform capabilities negotiation upon connection.
+ #: Implies `await_greeting`.
+ self.negotiate: bool = True
+
+ # Cached Greeting, if one was awaited.
+ self._greeting: Optional[Greeting] = None
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Initiate the QMP session.
+
+ Wait for the QMP greeting and perform capabilities negotiation.
+
+ :raise GreetingError: When the greeting is not understood.
+ :raise NegotiationError: If the negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ if self.await_greeting or self.negotiate:
+ self._greeting = await self._get_greeting()
+
+ if self.negotiate:
+ await self._negotiate()
+
+ # This will start the reader/writers:
+ await super()._establish_session()
+
+ @upper_half
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise GreetingError: When the greeting is not understood.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+
+ :return: the Greeting object given by the server.
+ """
+ self.logger.debug("Awaiting greeting ...")
+
+ try:
+ msg = await self._recv()
+ return Greeting(msg)
+ except (ProtocolError, KeyError, TypeError) as err:
+ emsg = "Did not understand Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Failed to receive Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @upper_half
+ async def _negotiate(self) -> None:
+ """
+ Perform QMP capabilities negotiation.
+
+ :raise NegotiationError: When negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self.logger.debug("Negotiating capabilities ...")
+
+ arguments: Dict[str, List[str]] = {'enable': []}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments['enable'].append('oob')
+ msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+ # It's not safe to use execute() here, because the reader/writers
+ # aren't running. AsyncProtocol *requires* that a new session
+ # does not fail after the reader/writers are running!
+ try:
+ await self._send(msg)
+ reply = await self._recv()
+ assert 'return' in reply
+ assert 'error' not in reply
+ except (ProtocolError, AssertionError) as err:
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @bottom_half
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_dispatch(msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+ # ... Which we'll implement in the next commit!
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> Message:
+ """
+ :raise OSError: When a stream error is encountered.
+ :raise EOFError: When the stream is at EOF.
+ :raise ProtocolError:
+ When the Message is not understood.
+ See also `Message._deserialize`.
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise ValueError: JSON serialization failure
+ :raise TypeError: JSON serialization failure
+ :raise OSError: When a stream error is encountered.
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return: An executable QMP `Message`.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg