diff options
-rw-r--r-- | python/qemu/aqmp/legacy.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py new file mode 100644 index 0000000000..9e7b9fb80b --- /dev/null +++ b/python/qemu/aqmp/legacy.py @@ -0,0 +1,138 @@ +""" +Sync QMP Wrapper + +This class pretends to be qemu.qmp.QEMUMonitorProtocol. +""" + +import asyncio +from typing import ( + Awaitable, + List, + Optional, + TypeVar, + Union, +) + +import qemu.qmp +from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT + +from .qmp_client import QMPClient + + +# pylint: disable=missing-docstring + + +class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): + def __init__(self, address: SocketAddrT, + server: bool = False, + nickname: Optional[str] = None): + + # pylint: disable=super-init-not-called + self._aqmp = QMPClient(nickname) + self._aloop = asyncio.get_event_loop() + self._address = address + self._timeout: Optional[float] = None + + _T = TypeVar('_T') + + def _sync( + self, future: Awaitable[_T], timeout: Optional[float] = None + ) -> _T: + return self._aloop.run_until_complete( + asyncio.wait_for(future, timeout=timeout) + ) + + def _get_greeting(self) -> Optional[QMPMessage]: + if self._aqmp.greeting is not None: + # pylint: disable=protected-access + return self._aqmp.greeting._asdict() + return None + + # __enter__ and __exit__ need no changes + # parse_address needs no changes + + def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: + self._aqmp.await_greeting = negotiate + self._aqmp.negotiate = negotiate + + self._sync( + self._aqmp.connect(self._address) + ) + return self._get_greeting() + + def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: + self._aqmp.await_greeting = True + self._aqmp.negotiate = True + + self._sync( + self._aqmp.accept(self._address), + timeout + ) + + ret = self._get_greeting() + assert ret is not None + return ret + + def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: + return dict( + self._sync( + # pylint: disable=protected-access + + # _raw() isn't a public API, because turning off + # automatic ID assignment is discouraged. For + # compatibility with iotests *only*, do it anyway. + self._aqmp._raw(qmp_cmd, assign_id=False), + self._timeout + ) + ) + + # Default impl of cmd() delegates to cmd_obj + + def command(self, cmd: str, **kwds: object) -> QMPReturnValue: + return self._sync( + self._aqmp.execute(cmd, kwds), + self._timeout + ) + + def pull_event(self, + wait: Union[bool, float] = False) -> Optional[QMPMessage]: + if not wait: + # wait is False/0: "do not wait, do not except." + if self._aqmp.events.empty(): + return None + + # If wait is 'True', wait forever. If wait is False/0, the events + # queue must not be empty; but it still needs some real amount + # of time to complete. + timeout = None + if wait and isinstance(wait, float): + timeout = wait + + return dict( + self._sync( + self._aqmp.events.get(), + timeout + ) + ) + + def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: + events = [dict(x) for x in self._aqmp.events.clear()] + if events: + return events + + event = self.pull_event(wait) + return [event] if event is not None else [] + + def clear_events(self) -> None: + self._aqmp.events.clear() + + def close(self) -> None: + self._sync( + self._aqmp.disconnect() + ) + + def settimeout(self, timeout: Optional[float]) -> None: + self._timeout = timeout + + def send_fd_scm(self, fd: int) -> None: + self._aqmp.send_fd_scm(fd) |