aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/qemu/aqmp/__init__.py2
-rw-r--r--python/qemu/aqmp/events.py706
2 files changed, 708 insertions, 0 deletions
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
index 96fff1e5f3..829166a2e2 100644
--- a/python/qemu/aqmp/__init__.py
+++ b/python/qemu/aqmp/__init__.py
@@ -22,6 +22,7 @@ managing QMP events.
# the COPYING file in the top-level directory.
from .error import AQMPError
+from .events import EventListener
from .message import Message
from .protocol import ConnectError, Runstate, StateError
@@ -30,6 +31,7 @@ from .protocol import ConnectError, Runstate, StateError
__all__ = (
# Classes, most to least important
'Message',
+ 'EventListener',
'Runstate',
# Exceptions, most generic to most explicit
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
new file mode 100644
index 0000000000..fb81d21610
--- /dev/null
+++ b/python/qemu/aqmp/events.py
@@ -0,0 +1,706 @@
+"""
+AQMP Events and EventListeners
+
+Asynchronous QMP uses `EventListener` objects to listen for events. An
+`EventListener` is a FIFO event queue that can be pre-filtered to listen
+for only specific events. Each `EventListener` instance receives its own
+copy of events that it hears, so events may be consumed without fear or
+worry for depriving other listeners of events they need to hear.
+
+
+EventListener Tutorial
+----------------------
+
+In all of the following examples, we assume that we have a `QMPClient`
+instantiated named ``qmp`` that is already connected.
+
+
+`listener()` context blocks with one name
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The most basic usage is by using the `listener()` context manager to
+construct them:
+
+.. code:: python
+
+ with qmp.listener('STOP') as listener:
+ await qmp.execute('stop')
+ await listener.get()
+
+The listener is active only for the duration of the ‘with’ block. This
+instance listens only for ‘STOP’ events.
+
+
+`listener()` context blocks with two or more names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Multiple events can be selected for by providing any ``Iterable[str]``:
+
+.. code:: python
+
+ with qmp.listener(('STOP', 'RESUME')) as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+ await qmp.execute('cont')
+ event = await listener.get()
+ assert event['event'] == 'RESUME'
+
+
+`listener()` context blocks with no names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+By omitting names entirely, you can listen to ALL events.
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+This isn’t a very good use case for this feature: In a non-trivial
+running system, we may not know what event will arrive next. Grabbing
+the top of a FIFO queue returning multiple kinds of events may be prone
+to error.
+
+
+Using async iterators to retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you’d like to simply watch what events happen to arrive, you can use
+the listener as an async iterator:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+
+This is analogous to the following code:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ while True:
+ event = listener.get()
+ print(f"Event arrived: {event['event']}")
+
+This event stream will never end, so these blocks will never terminate.
+
+
+Using asyncio.Task to concurrently retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Since a listener’s event stream will never terminate, it is not likely
+useful to use that form in a script. For longer-running clients, we can
+create event handlers by using `asyncio.Task` to create concurrent
+coroutines:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ with qmp.listener() as listener:
+ task = asyncio.Task(print_events(listener))
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ task.cancel()
+ await task
+
+However, there is no guarantee that these events will be received by the
+time we leave this context block. Once the context block is exited, the
+listener will cease to hear any new events, and becomes inert.
+
+Be mindful of the timing: the above example will *probably*– but does
+not *guarantee*– that both STOP/RESUMED events will be printed. The
+example below outlines how to use listeners outside of a context block.
+
+
+Using `register_listener()` and `remove_listener()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To create a listener with a longer lifetime, beyond the scope of a
+single block, create a listener and then call `register_listener()`:
+
+.. code:: python
+
+ class MyClient:
+ def __init__(self, qmp):
+ self.qmp = qmp
+ self.listener = EventListener()
+
+ async def print_events(self):
+ try:
+ async for event in self.listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self):
+ self.task = asyncio.Task(self.print_events)
+ self.qmp.register_listener(self.listener)
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ async def stop(self):
+ self.task.cancel()
+ await self.task
+ self.qmp.remove_listener(self.listener)
+
+The listener can be deactivated by using `remove_listener()`. When it is
+removed, any possible pending events are cleared and it can be
+re-registered at a later time.
+
+
+Using the built-in all events listener
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The `QMPClient` object creates its own default listener named
+:py:obj:`~Events.events` that can be used for the same purpose without
+having to create your own:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ task = asyncio.Task(print_events(qmp.events))
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ task.cancel()
+ await task
+
+
+Using both .get() and async iterators
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The async iterator and `get()` methods pull events from the same FIFO
+queue. If you mix the usage of both, be aware: Events are emitted
+precisely once per listener.
+
+If multiple contexts try to pull events from the same listener instance,
+events are still emitted only precisely once.
+
+This restriction can be lifted by creating additional listeners.
+
+
+Creating multiple listeners
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional `EventListener` objects can be created at-will. Each one
+receives its own copy of events, with separate FIFO event queues.
+
+.. code:: python
+
+ my_listener = EventListener()
+ qmp.register_listener(my_listener)
+
+ await qmp.execute('stop')
+ copy1 = await my_listener.get()
+ copy2 = await qmp.events.get()
+
+ assert copy1 == copy2
+
+In this example, we await an event from both a user-created
+`EventListener` and the built-in events listener. Both receive the same
+event.
+
+
+Clearing listeners
+~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be cleared, clearing all events seen thus far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ qmp.events.clear()
+ await qmp.execute('cont')
+ event = await qmp.events.get()
+ assert event['event'] == 'RESUME'
+
+`EventListener` objects are FIFO queues. If events are not consumed,
+they will remain in the queue until they are witnessed or discarded via
+`clear()`. FIFO queues will be drained automatically upon leaving a
+context block, or when calling `remove_listener()`.
+
+
+Accessing listener history
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects record their history. Even after being cleared,
+you can obtain a record of all events seen so far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ qmp.events.clear()
+
+ assert len(qmp.events.history) == 2
+ assert qmp.events.history[0]['event'] == 'STOP'
+ assert qmp.events.history[1]['event'] == 'RESUME'
+
+The history is updated immediately and does not require the event to be
+witnessed first.
+
+
+Using event filters
+~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be given complex filtering criteria if names
+are not sufficient:
+
+.. code:: python
+
+ def job1_filter(event) -> bool:
+ event_data = event.get('data', {})
+ event_job_id = event_data.get('id')
+ return event_job_id == "job1"
+
+ with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+These filters might be most useful when parameterized. `EventListener`
+objects expect a function that takes only a single argument (the raw
+event, as a `Message`) and returns a bool; True if the event should be
+accepted into the stream. You can create a function that adapts this
+signature to accept configuration parameters:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Activating an existing listener with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Listeners with complex, long configurations can also be created manually
+and activated temporarily by using `listen()` instead of `listener()`:
+
+.. code:: python
+
+ listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+
+ with qmp.listen(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+Any events that are not witnessed by the time the block is left will be
+cleared from the queue; entering the block is an implicit
+`register_listener()` and leaving the block is an implicit
+`remove_listener()`.
+
+
+Activating multiple existing listeners with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+While `listener()` is only capable of creating a single listener,
+`listen()` is capable of activating multiple listeners simultaneously:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
+ jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
+
+ with qmp.listen(jobA, jobB):
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
+
+ async for event in jobA.get():
+ if event['data']['status'] == 'concluded':
+ break
+ async for event in jobB.get():
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Extending the `EventListener` class
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In the case that a more specialized `EventListener` is desired to
+provide either more functionality or more compact syntax for specialized
+cases, it can be extended.
+
+One of the key methods to extend or override is
+:py:meth:`~EventListener.accept()`. The default implementation checks an
+incoming message for:
+
+1. A qualifying name, if any :py:obj:`~EventListener.names` were
+ specified at initialization time
+2. That :py:obj:`~EventListener.event_filter()` returns True.
+
+This can be modified however you see fit to change the criteria for
+inclusion in the stream.
+
+For convenience, a ``JobListener`` class could be created that simply
+bakes in configuration so it does not need to be repeated:
+
+.. code:: python
+
+ class JobListener(EventListener):
+ def __init__(self, job_id: str):
+ super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+ self.job_id = job_id
+
+ def accept(self, event) -> bool:
+ if not super().accept(event):
+ return False
+ if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
+ return event['data']['id'] == job_id
+ return event['data']['device'] == job_id
+
+From here on out, you can conjure up a custom-purpose listener that
+listens only for job-related events for a specific job-id easily:
+
+.. code:: python
+
+ listener = JobListener('job4')
+ with qmp.listener(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+
+Experimental Interfaces & Design Issues
+---------------------------------------
+
+These interfaces are not ones I am sure I will keep or otherwise modify
+heavily.
+
+qmp.listener()’s type signature
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`listener()` does not return anything, because it was assumed the caller
+already had a handle to the listener. However, for
+``qmp.listener(EventListener())`` forms, the caller will not have saved
+a handle to the listener.
+
+Because this function can accept *many* listeners, I found it hard to
+accurately type in a way where it could be used in both “one” or “many”
+forms conveniently and in a statically type-safe manner.
+
+Ultimately, I removed the return altogether, but perhaps with more time
+I can work out a way to re-add it.
+
+
+API Reference
+-------------
+
+"""
+
+import asyncio
+from contextlib import contextmanager
+import logging
+from typing import (
+ AsyncIterator,
+ Callable,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+
+from .error import AQMPError
+from .message import Message
+
+
+EventNames = Union[str, Iterable[str], None]
+EventFilter = Callable[[Message], bool]
+
+
+class ListenerError(AQMPError):
+ """
+ Generic error class for `EventListener`-related problems.
+ """
+
+
+class EventListener:
+ """
+ Selectively listens for events with runtime configurable filtering.
+
+ This class is designed to be directly usable for the most common cases,
+ but it can be extended to provide more rigorous control.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ When ``names`` and ``event_filter`` are both provided, the names
+ will be filtered first, and then the filter function will be called
+ second. The event filter function can assume that the format of the
+ event is a known format.
+ """
+ def __init__(
+ self,
+ names: EventNames = None,
+ event_filter: Optional[EventFilter] = None,
+ ):
+ # Queue of 'heard' events yet to be witnessed by a caller.
+ self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
+
+ # Intended as a historical record, NOT a processing queue or backlog.
+ self._history: List[Message] = []
+
+ #: Primary event filter, based on one or more event names.
+ self.names: Set[str] = set()
+ if isinstance(names, str):
+ self.names.add(names)
+ elif names is not None:
+ self.names.update(names)
+
+ #: Optional, secondary event filter.
+ self.event_filter: Optional[EventFilter] = event_filter
+
+ @property
+ def history(self) -> Tuple[Message, ...]:
+ """
+ A read-only history of all events seen so far.
+
+ This represents *every* event, including those not yet witnessed
+ via `get()` or ``async for``. It persists between `clear()`
+ calls and is immutable.
+ """
+ return tuple(self._history)
+
+ def accept(self, event: Message) -> bool:
+ """
+ Determine if this listener accepts this event.
+
+ This method determines which events will appear in the stream.
+ The default implementation simply checks the event against the
+ list of names and the event_filter to decide if this
+ `EventListener` accepts a given event. It can be
+ overridden/extended to provide custom listener behavior.
+
+ User code is not expected to need to invoke this method.
+
+ :param event: The event under consideration.
+ :return: `True`, if this listener accepts this event.
+ """
+ name_ok = (not self.names) or (event['event'] in self.names)
+ return name_ok and (
+ (not self.event_filter) or self.event_filter(event)
+ )
+
+ async def put(self, event: Message) -> None:
+ """
+ Conditionally put a new event into the FIFO queue.
+
+ This method is not designed to be invoked from user code, and it
+ should not need to be overridden. It is a public interface so
+ that `QMPClient` has an interface by which it can inform
+ registered listeners of new events.
+
+ The event will be put into the queue if
+ :py:meth:`~EventListener.accept()` returns `True`.
+
+ :param event: The new event to put into the FIFO queue.
+ """
+ if not self.accept(event):
+ return
+
+ self._history.append(event)
+ await self._queue.put(event)
+
+ async def get(self) -> Message:
+ """
+ Wait for the very next event in this stream.
+
+ If one is already available, return that one.
+ """
+ return await self._queue.get()
+
+ def clear(self) -> None:
+ """
+ Clear this listener of all pending events.
+
+ Called when an `EventListener` is being unregistered, this clears the
+ pending FIFO queue synchronously. It can be also be used to
+ manually clear any pending events, if desired.
+
+ .. warning::
+ Take care when discarding events. Cleared events will be
+ silently tossed on the floor. All events that were ever
+ accepted by this listener are visible in `history()`.
+ """
+ while True:
+ try:
+ self._queue.get_nowait()
+ except asyncio.QueueEmpty:
+ break
+
+ def __aiter__(self) -> AsyncIterator[Message]:
+ return self
+
+ async def __anext__(self) -> Message:
+ """
+ Enables the `EventListener` to function as an async iterator.
+
+ It may be used like this:
+
+ .. code:: python
+
+ async for event in listener:
+ print(event)
+
+ These iterators will never terminate of their own accord; you
+ must provide break conditions or otherwise prepare to run them
+ in an `asyncio.Task` that can be cancelled.
+ """
+ return await self.get()
+
+
+class Events:
+ """
+ Events is a mix-in class that adds event functionality to the QMP class.
+
+ It's designed specifically as a mix-in for `QMPClient`, and it
+ relies upon the class it is being mixed into having a 'logger'
+ property.
+ """
+ def __init__(self) -> None:
+ self._listeners: List[EventListener] = []
+
+ #: Default, all-events `EventListener`.
+ self.events: EventListener = EventListener()
+ self.register_listener(self.events)
+
+ # Parent class needs to have a logger
+ self.logger: logging.Logger
+
+ async def _event_dispatch(self, msg: Message) -> None:
+ """
+ Given a new event, propagate it to all of the active listeners.
+
+ :param msg: The event to propagate.
+ """
+ for listener in self._listeners:
+ await listener.put(msg)
+
+ def register_listener(self, listener: EventListener) -> None:
+ """
+ Register and activate an `EventListener`.
+
+ :param listener: The listener to activate.
+ :raise ListenerError: If the given listener is already registered.
+ """
+ if listener in self._listeners:
+ raise ListenerError("Attempted to re-register existing listener")
+ self.logger.debug("Registering %s.", str(listener))
+ self._listeners.append(listener)
+
+ def remove_listener(self, listener: EventListener) -> None:
+ """
+ Unregister and deactivate an `EventListener`.
+
+ The removed listener will have its pending events cleared via
+ `clear()`. The listener can be re-registered later when
+ desired.
+
+ :param listener: The listener to deactivate.
+ :raise ListenerError: If the given listener is not registered.
+ """
+ if listener == self.events:
+ raise ListenerError("Cannot remove the default listener.")
+ self.logger.debug("Removing %s.", str(listener))
+ listener.clear()
+ self._listeners.remove(listener)
+
+ @contextmanager
+ def listen(self, *listeners: EventListener) -> Iterator[None]:
+ r"""
+ Context manager: Temporarily listen with an `EventListener`.
+
+ Accepts one or more `EventListener` objects and registers them,
+ activating them for the duration of the context block.
+
+ `EventListener` objects will have any pending events in their
+ FIFO queue cleared upon exiting the context block, when they are
+ deactivated.
+
+ :param \*listeners: One or more EventListeners to activate.
+ :raise ListenerError: If the given listener(s) are already active.
+ """
+ _added = []
+
+ try:
+ for listener in listeners:
+ self.register_listener(listener)
+ _added.append(listener)
+
+ yield
+
+ finally:
+ for listener in _added:
+ self.remove_listener(listener)
+
+ @contextmanager
+ def listener(
+ self,
+ names: EventNames = (),
+ event_filter: Optional[EventFilter] = None
+ ) -> Iterator[EventListener]:
+ """
+ Context manager: Temporarily listen with a new `EventListener`.
+
+ Creates an `EventListener` object and registers it, activating
+ it for the duration of the context block.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ :return: The newly created and active `EventListener`.
+ """
+ listener = EventListener(names, event_filter)
+ with self.listen(listener):
+ yield listener