aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/qemu/aqmp/protocol.py18
1 files changed, 13 insertions, 5 deletions
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index ae1df24026..860b79512d 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -623,13 +623,21 @@ class AsyncProtocol(Generic[T]):
def _done(task: Optional['asyncio.Future[Any]']) -> bool:
return task is not None and task.done()
- # NB: We can't rely on _bh_tasks being done() here, it may not
- # yet have had a chance to run and gather itself.
+ # Are we already in an error pathway? If either of the tasks are
+ # already done, or if we have no tasks but a reader/writer; we
+ # must be.
+ #
+ # NB: We can't use _bh_tasks to check for premature task
+ # completion, because it may not yet have had a chance to run
+ # and gather itself.
tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
error_pathway = _done(self._reader_task) or _done(self._writer_task)
+ if not tasks:
+ error_pathway |= bool(self._reader) or bool(self._writer)
try:
- # Try to flush the writer, if possible:
+ # Try to flush the writer, if possible.
+ # This *may* cause an error and force us over into the error path.
if not error_pathway:
await self._bh_flush_writer()
except BaseException as err:
@@ -639,7 +647,7 @@ class AsyncProtocol(Generic[T]):
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise
finally:
- # Cancel any still-running tasks:
+ # Cancel any still-running tasks (Won't raise):
if self._writer_task is not None and not self._writer_task.done():
self.logger.debug("Cancelling writer task.")
self._writer_task.cancel()
@@ -652,7 +660,7 @@ class AsyncProtocol(Generic[T]):
self.logger.debug("Waiting for tasks to complete ...")
await asyncio.wait(tasks)
- # Lastly, close the stream itself. (May raise):
+ # Lastly, close the stream itself. (*May raise*!):
await self._bh_close_stream(error_pathway)
self.logger.debug("Disconnected.")