diff options
Diffstat (limited to 'yt_dlp/utils/traversal.py')
-rw-r--r-- | yt_dlp/utils/traversal.py | 158 |
1 files changed, 155 insertions, 3 deletions
diff --git a/yt_dlp/utils/traversal.py b/yt_dlp/utils/traversal.py index 96eb2eddf..b918487f9 100644 --- a/yt_dlp/utils/traversal.py +++ b/yt_dlp/utils/traversal.py @@ -1,18 +1,35 @@ +from __future__ import annotations + +import collections import collections.abc import contextlib +import functools import http.cookies import inspect import itertools import re +import typing import xml.etree.ElementTree from ._utils import ( IDENTITY, NO_DEFAULT, + ExtractorError, LazyList, deprecation_warning, + get_elements_html_by_class, + get_elements_html_by_attribute, + get_elements_by_attribute, + get_element_html_by_attribute, + get_element_by_attribute, + get_element_html_by_id, + get_element_by_id, + get_element_html_by_class, + get_elements_by_class, + get_element_text_and_html_by_tag, is_iterable_like, try_call, + url_or_none, variadic, ) @@ -54,6 +71,7 @@ def traverse_obj( Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`. - `any`-builtin: Take the first matching object and return it, resetting branching. - `all`-builtin: Take all matching objects and return them as a list, resetting branching. + - `filter`-builtin: Return the value if it is truthy, `None` otherwise. `tuple`, `list`, and `dict` all support nested paths and branches. @@ -247,6 +265,10 @@ def traverse_obj( objs = (list(filtered_objs),) continue + if key is filter: + objs = filter(None, objs) + continue + if __debug__ and callable(key): # Verify function signature inspect.signature(key).bind(None, None) @@ -277,13 +299,143 @@ def traverse_obj( return results[0] if results else {} if allow_empty and is_dict else None for index, path in enumerate(paths, 1): - result = _traverse_obj(obj, path, index == len(paths), True) - if result is not None: - return result + is_last = index == len(paths) + try: + result = _traverse_obj(obj, path, is_last, True) + if result is not None: + return result + except _RequiredError as e: + if is_last: + # Reraise to get cleaner stack trace + raise ExtractorError(e.orig_msg, expected=e.expected) from None return None if default is NO_DEFAULT else default +def value(value, /): + return lambda _: value + + +def require(name, /, *, expected=False): + def func(value): + if value is None: + raise _RequiredError(f'Unable to extract {name}', expected=expected) + + return value + + return func + + +class _RequiredError(ExtractorError): + pass + + +@typing.overload +def subs_list_to_dict(*, ext: str | None = None) -> collections.abc.Callable[[list[dict]], dict[str, list[dict]]]: ... + + +@typing.overload +def subs_list_to_dict(subs: list[dict] | None, /, *, ext: str | None = None) -> dict[str, list[dict]]: ... + + +def subs_list_to_dict(subs: list[dict] | None = None, /, *, ext=None): + """ + Convert subtitles from a traversal into a subtitle dict. + The path should have an `all` immediately before this function. + + Arguments: + `ext` The default value for `ext` in the subtitle dict + + In the dict you can set the following additional items: + `id` The subtitle id to sort the dict into + `quality` The sort order for each subtitle + """ + if subs is None: + return functools.partial(subs_list_to_dict, ext=ext) + + result = collections.defaultdict(list) + + for sub in subs: + if not url_or_none(sub.get('url')) and not sub.get('data'): + continue + sub_id = sub.pop('id', None) + if sub_id is None: + continue + if ext is not None and not sub.get('ext'): + sub['ext'] = ext + result[sub_id].append(sub) + result = dict(result) + + for subs in result.values(): + subs.sort(key=lambda x: x.pop('quality', 0) or 0) + + return result + + +@typing.overload +def find_element(*, attr: str, value: str, tag: str | None = None, html=False): ... + + +@typing.overload +def find_element(*, cls: str, html=False): ... + + +@typing.overload +def find_element(*, id: str, tag: str | None = None, html=False): ... + + +@typing.overload +def find_element(*, tag: str, html=False): ... + + +def find_element(*, tag=None, id=None, cls=None, attr=None, value=None, html=False): + # deliberately using `id=` and `cls=` for ease of readability + assert tag or id or cls or (attr and value), 'One of tag, id, cls or (attr AND value) is required' + if not tag: + tag = r'[\w:.-]+' + + if attr and value: + assert not cls, 'Cannot match both attr and cls' + assert not id, 'Cannot match both attr and id' + func = get_element_html_by_attribute if html else get_element_by_attribute + return functools.partial(func, attr, value, tag=tag) + + elif cls: + assert not id, 'Cannot match both cls and id' + assert tag is None, 'Cannot match both cls and tag' + func = get_element_html_by_class if html else get_elements_by_class + return functools.partial(func, cls) + + elif id: + func = get_element_html_by_id if html else get_element_by_id + return functools.partial(func, id, tag=tag) + + index = int(bool(html)) + return lambda html: get_element_text_and_html_by_tag(tag, html)[index] + + +@typing.overload +def find_elements(*, cls: str, html=False): ... + + +@typing.overload +def find_elements(*, attr: str, value: str, tag: str | None = None, html=False): ... + + +def find_elements(*, tag=None, cls=None, attr=None, value=None, html=False): + # deliberately using `cls=` for ease of readability + assert cls or (attr and value), 'One of cls or (attr AND value) is required' + + if attr and value: + assert not cls, 'Cannot match both attr and cls' + func = get_elements_html_by_attribute if html else get_elements_by_attribute + return functools.partial(func, attr, value, tag=tag or r'[\w:.-]+') + + assert not tag, 'Cannot match both cls and tag' + func = get_elements_html_by_class if html else get_elements_by_class + return functools.partial(func, cls) + + def get_first(obj, *paths, **kwargs): return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False) |