diff options
Diffstat (limited to 'yt_dlp/utils/_legacy.py')
-rw-r--r-- | yt_dlp/utils/_legacy.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/yt_dlp/utils/_legacy.py b/yt_dlp/utils/_legacy.py index dde02092c..aa9f46d20 100644 --- a/yt_dlp/utils/_legacy.py +++ b/yt_dlp/utils/_legacy.py @@ -1,4 +1,6 @@ """No longer used and new code should not use. Exists only for API compat.""" +import asyncio +import atexit import platform import struct import sys @@ -32,6 +34,77 @@ has_certifi = bool(certifi) has_websockets = bool(websockets) +class WebSocketsWrapper: + """Wraps websockets module to use in non-async scopes""" + pool = None + + def __init__(self, url, headers=None, connect=True, **ws_kwargs): + self.loop = asyncio.new_event_loop() + # XXX: "loop" is deprecated + self.conn = websockets.connect( + url, extra_headers=headers, ping_interval=None, + close_timeout=float('inf'), loop=self.loop, ping_timeout=float('inf'), **ws_kwargs) + if connect: + self.__enter__() + atexit.register(self.__exit__, None, None, None) + + def __enter__(self): + if not self.pool: + self.pool = self.run_with_loop(self.conn.__aenter__(), self.loop) + return self + + def send(self, *args): + self.run_with_loop(self.pool.send(*args), self.loop) + + def recv(self, *args): + return self.run_with_loop(self.pool.recv(*args), self.loop) + + def __exit__(self, type, value, traceback): + try: + return self.run_with_loop(self.conn.__aexit__(type, value, traceback), self.loop) + finally: + self.loop.close() + self._cancel_all_tasks(self.loop) + + # taken from https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py with modifications + # for contributors: If there's any new library using asyncio needs to be run in non-async, move these function out of this class + @staticmethod + def run_with_loop(main, loop): + if not asyncio.iscoroutine(main): + raise ValueError(f'a coroutine was expected, got {main!r}') + + try: + return loop.run_until_complete(main) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + if hasattr(loop, 'shutdown_default_executor'): + loop.run_until_complete(loop.shutdown_default_executor()) + + @staticmethod + def _cancel_all_tasks(loop): + to_cancel = asyncio.all_tasks(loop) + + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + + # XXX: "loop" is removed in python 3.10+ + loop.run_until_complete( + asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)) + + for task in to_cancel: + if task.cancelled(): + continue + if task.exception() is not None: + loop.call_exception_handler({ + 'message': 'unhandled exception during asyncio.run() shutdown', + 'exception': task.exception(), + 'task': task, + }) + + def load_plugins(name, suffix, namespace): from ..plugins import load_plugins ret = load_plugins(name, suffix) |