diff options
| -rw-r--r-- | test/test_YoutubeDL.py | 185 | ||||
| -rw-r--r-- | test/test_YoutubeDLCookieJar.py | 14 | ||||
| -rwxr-xr-x | youtube_dl/YoutubeDL.py | 182 | ||||
| -rw-r--r-- | youtube_dl/downloader/common.py | 9 | 
4 files changed, 357 insertions, 33 deletions
| diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py index 60780b8a7..6cf555827 100644 --- a/test/test_YoutubeDL.py +++ b/test/test_YoutubeDL.py @@ -10,14 +10,30 @@ import unittest  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))  import copy +import json -from test.helper import FakeYDL, assertRegexpMatches +from test.helper import ( +    FakeYDL, +    assertRegexpMatches, +    try_rm, +)  from youtube_dl import YoutubeDL -from youtube_dl.compat import compat_str, compat_urllib_error +from youtube_dl.compat import ( +    compat_http_cookiejar_Cookie, +    compat_http_cookies_SimpleCookie, +    compat_kwargs, +    compat_str, +    compat_urllib_error, +) +  from youtube_dl.extractor import YoutubeIE  from youtube_dl.extractor.common import InfoExtractor  from youtube_dl.postprocessor.common import PostProcessor -from youtube_dl.utils import ExtractorError, match_filter_func +from youtube_dl.utils import ( +    ExtractorError, +    match_filter_func, +    traverse_obj, +)  TEST_URL = 'http://localhost/sample.mp4' @@ -29,11 +45,14 @@ class YDL(FakeYDL):          self.msgs = []      def process_info(self, info_dict): -        self.downloaded_info_dicts.append(info_dict) +        self.downloaded_info_dicts.append(info_dict.copy())      def to_screen(self, msg):          self.msgs.append(msg) +    def dl(self, *args, **kwargs): +        assert False, 'Downloader must not be invoked for test_YoutubeDL' +  def _make_result(formats, **kwargs):      res = { @@ -42,8 +61,9 @@ def _make_result(formats, **kwargs):          'title': 'testttitle',          'extractor': 'testex',          'extractor_key': 'TestEx', +        'webpage_url': 'http://example.com/watch?v=shenanigans',      } -    res.update(**kwargs) +    res.update(**compat_kwargs(kwargs))      return res @@ -1011,5 +1031,160 @@ class TestYoutubeDL(unittest.TestCase):          self.assertEqual(out_info['release_date'], '20210930') +class TestYoutubeDLCookies(unittest.TestCase): + +    @staticmethod +    def encode_cookie(cookie): +        if not isinstance(cookie, dict): +            cookie = vars(cookie) +        for name, value in cookie.items(): +            yield name, compat_str(value) + +    @classmethod +    def comparable_cookies(cls, cookies): +        # Work around cookiejar cookies not being unicode strings +        return sorted(map(tuple, map(sorted, map(cls.encode_cookie, cookies)))) + +    def assertSameCookies(self, c1, c2, msg=None): +        return self.assertEqual( +            *map(self.comparable_cookies, (c1, c2)), +            msg=msg) + +    def assertSameCookieStrings(self, c1, c2, msg=None): +        return self.assertSameCookies( +            *map(lambda c: compat_http_cookies_SimpleCookie(c).values(), (c1, c2)), +            msg=msg) + +    def test_header_cookies(self): + +        ydl = FakeYDL() +        ydl.report_warning = lambda *_, **__: None + +        def cookie(name, value, version=None, domain='', path='', secure=False, expires=None): +            return compat_http_cookiejar_Cookie( +                version or 0, name, value, None, False, +                domain, bool(domain), bool(domain), path, bool(path), +                secure, expires, False, None, None, rest={}) + +        test_url, test_domain = (t % ('yt.dl',) for t in ('https://%s/test', '.%s')) + +        def test(encoded_cookies, cookies, headers=False, round_trip=None, error_re=None): +            def _test(): +                ydl.cookiejar.clear() +                ydl._load_cookies(encoded_cookies, autoscope=headers) +                if headers: +                    ydl._apply_header_cookies(test_url) +                data = {'url': test_url} +                ydl._calc_headers(data) +                self.assertSameCookies( +                    cookies, ydl.cookiejar, +                    'Extracted cookiejar.Cookie is not the same') +                if not headers: +                    self.assertSameCookieStrings( +                        data.get('cookies'), round_trip or encoded_cookies, +                        msg='Cookie is not the same as round trip') +                ydl.__dict__['_YoutubeDL__header_cookies'] = [] + +            try: +                _test() +            except AssertionError: +                raise +            except Exception as e: +                if not error_re: +                    raise +                assertRegexpMatches(self, e.args[0], error_re.join(('.*',) * 2)) + +        test('test=value; Domain=' + test_domain, [cookie('test', 'value', domain=test_domain)]) +        test('test=value', [cookie('test', 'value')], error_re='Unscoped cookies are not allowed') +        test('cookie1=value1; Domain={0}; Path=/test; cookie2=value2; Domain={0}; Path=/'.format(test_domain), [ +            cookie('cookie1', 'value1', domain=test_domain, path='/test'), +            cookie('cookie2', 'value2', domain=test_domain, path='/')]) +        cookie_kw = compat_kwargs( +            {'domain': test_domain, 'path': '/test', 'secure': True, 'expires': '9999999999', }) +        test('test=value; Domain={domain}; Path={path}; Secure; Expires={expires}'.format(**cookie_kw), [ +            cookie('test', 'value', **cookie_kw)]) +        test('test="value; "; path=/test; domain=' + test_domain, [ +            cookie('test', 'value; ', domain=test_domain, path='/test')], +            round_trip='test="value\\073 "; Domain={0}; Path=/test'.format(test_domain)) +        test('name=; Domain=' + test_domain, [cookie('name', '', domain=test_domain)], +             round_trip='name=""; Domain=' + test_domain) +        test('test=value', [cookie('test', 'value', domain=test_domain)], headers=True) +        test('cookie1=value; Domain={0}; cookie2=value'.format(test_domain), [], +             headers=True, error_re='Invalid syntax') +        ydl.report_warning = ydl.report_error +        test('test=value', [], headers=True, error_re='Passing cookies as a header is a potential security risk') + +    def test_infojson_cookies(self): +        TEST_FILE = 'test_infojson_cookies.info.json' +        TEST_URL = 'https://example.com/example.mp4' +        COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com' +        COOKIE_HEADER = {'Cookie': 'a=b; c=d'} + +        ydl = FakeYDL() +        ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE) + +        def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False): +            fmt = {'url': TEST_URL} +            if fmts_header_cookies: +                fmt['http_headers'] = COOKIE_HEADER +            if cookies_field: +                fmt['cookies'] = COOKIES +            return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None) + +        def test(initial_info, note): + +            def failure_msg(why): +                return ' when '.join((why, note)) + +            result = {} +            result['processed'] = ydl.process_ie_result(initial_info) +            self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), +                            msg=failure_msg('No cookies set in cookiejar after initial process')) +            ydl.cookiejar.clear() +            with open(TEST_FILE) as infojson: +                result['loaded'] = ydl.sanitize_info(json.load(infojson), True) +            result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False) +            self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), +                            msg=failure_msg('No cookies set in cookiejar after final process')) +            ydl.cookiejar.clear() +            for key in ('processed', 'loaded', 'final'): +                info = result[key] +                self.assertIsNone( +                    traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False), +                    msg=failure_msg('Cookie header not removed in {0} result'.format(key))) +                self.assertSameCookieStrings( +                    traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES, +                    msg=failure_msg('No cookies field found in {0} result'.format(key))) + +        test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field') +        test(make_info(info_header_cookies=True), 'info_dict header cokies') +        test(make_info(fmts_header_cookies=True), 'format header cookies') +        test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies') +        test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields') +        test(make_info(cookies_field=True), 'cookies format field') +        test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only') + +        try_rm(TEST_FILE) + +    def test_add_headers_cookie(self): +        def check_for_cookie_header(result): +            return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False) + +        ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}}) +        ydl._apply_header_cookies(_make_result([])['webpage_url'])  # Scope to input webpage URL: .example.com + +        fmt = {'url': 'https://example.com/video.mp4'} +        result = ydl.process_ie_result(_make_result([fmt]), download=False) +        self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict') +        self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field') +        self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar') + +        fmt = {'url': 'https://wrong.com/video.mp4'} +        result = ydl.process_ie_result(_make_result([fmt]), download=False) +        self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain') +        self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain') +        self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain') + +  if __name__ == '__main__':      unittest.main() diff --git a/test/test_YoutubeDLCookieJar.py b/test/test_YoutubeDLCookieJar.py index 05f48bd74..4f9dd71ae 100644 --- a/test/test_YoutubeDLCookieJar.py +++ b/test/test_YoutubeDLCookieJar.py @@ -46,6 +46,20 @@ class TestYoutubeDLCookieJar(unittest.TestCase):          # will be ignored          self.assertFalse(cookiejar._cookies) +    def test_get_cookie_header(self): +        cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt') +        cookiejar.load(ignore_discard=True, ignore_expires=True) +        header = cookiejar.get_cookie_header('https://www.foobar.foobar') +        self.assertIn('HTTPONLY_COOKIE', header) + +    def test_get_cookies_for_url(self): +        cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt') +        cookiejar.load(ignore_discard=True, ignore_expires=True) +        cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/') +        self.assertEqual(len(cookies), 2) +        cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/') +        self.assertFalse(cookies) +  if __name__ == '__main__':      unittest.main() diff --git a/youtube_dl/YoutubeDL.py b/youtube_dl/YoutubeDL.py index 1435754c2..98d080f43 100755 --- a/youtube_dl/YoutubeDL.py +++ b/youtube_dl/YoutubeDL.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals  import collections  import contextlib +import copy  import datetime  import errno  import fileinput @@ -34,10 +35,12 @@ from string import ascii_letters  from .compat import (      compat_basestring, -    compat_cookiejar, +    compat_collections_chain_map as ChainMap,      compat_filter as filter,      compat_get_terminal_size,      compat_http_client, +    compat_http_cookiejar_Cookie, +    compat_http_cookies_SimpleCookie,      compat_integer_types,      compat_kwargs,      compat_map as map, @@ -53,6 +56,7 @@ from .compat import (  from .utils import (      age_restricted,      args_to_str, +    bug_reports_message,      ContentTooShortError,      date_from_str,      DateRange, @@ -97,6 +101,7 @@ from .utils import (      std_headers,      str_or_none,      subtitles_filename, +    traverse_obj,      UnavailableVideoError,      url_basename,      version_tuple, @@ -376,6 +381,9 @@ class YoutubeDL(object):          self.params.update(params)          self.cache = Cache(self) +        self._header_cookies = [] +        self._load_cookies_from_headers(self.params.get('http_headers')) +          def check_deprecated(param, option, suggestion):              if self.params.get(param) is not None:                  self.report_warning( @@ -870,8 +878,83 @@ class YoutubeDL(object):                      raise          return wrapper +    def _remove_cookie_header(self, http_headers): +        """Filters out `Cookie` header from an `http_headers` dict +        The `Cookie` header is removed to prevent leaks as a result of unscoped cookies. +        See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj + +        @param http_headers     An `http_headers` dict from which any `Cookie` header +                                should be removed, or None +        """ +        return dict(filter(lambda pair: pair[0].lower() != 'cookie', (http_headers or {}).items())) + +    def _load_cookies(self, data, **kwargs): +        """Loads cookies from a `Cookie` header + +        This tries to work around the security vulnerability of passing cookies to every domain. + +        @param data         The Cookie header as a string to load the cookies from +        @param autoscope    If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains +                            If `True`, save cookies for later to be stored in the jar with a limited scope +                            If a URL, save cookies in the jar with the domain of the URL +        """ +        # autoscope=True (kw-only) +        autoscope = kwargs.get('autoscope', True) + +        for cookie in compat_http_cookies_SimpleCookie(data).values() if data else []: +            if autoscope and any(cookie.values()): +                raise ValueError('Invalid syntax in Cookie Header') + +            domain = cookie.get('domain') or '' +            expiry = cookie.get('expires') +            if expiry == '':  # 0 is valid so we check for `''` explicitly +                expiry = None +            prepared_cookie = compat_http_cookiejar_Cookie( +                cookie.get('version') or 0, cookie.key, cookie.value, None, False, +                domain, True, True, cookie.get('path') or '', bool(cookie.get('path')), +                bool(cookie.get('secure')), expiry, False, None, None, {}) + +            if domain: +                self.cookiejar.set_cookie(prepared_cookie) +            elif autoscope is True: +                self.report_warning( +                    'Passing cookies as a header is a potential security risk; ' +                    'they will be scoped to the domain of the downloaded urls. ' +                    'Please consider loading cookies from a file or browser instead.', +                    only_once=True) +                self._header_cookies.append(prepared_cookie) +            elif autoscope: +                self.report_warning( +                    'The extractor result contains an unscoped cookie as an HTTP header. ' +                    'If you are specifying an input URL, ' + bug_reports_message(), +                    only_once=True) +                self._apply_header_cookies(autoscope, [prepared_cookie]) +            else: +                self.report_unscoped_cookies() + +    def _load_cookies_from_headers(self, headers): +        self._load_cookies(traverse_obj(headers, 'cookie', casesense=False)) + +    def _apply_header_cookies(self, url, cookies=None): +        """This method applies stray header cookies to the provided url + +        This loads header cookies and scopes them to the domain provided in `url`. +        While this is not ideal, it helps reduce the risk of them being sent to +        an unintended destination. +        """ +        parsed = compat_urllib_parse.urlparse(url) +        if not parsed.hostname: +            return + +        for cookie in map(copy.copy, cookies or self._header_cookies): +            cookie.domain = '.' + parsed.hostname +            self.cookiejar.set_cookie(cookie) +      @__handle_extraction_exceptions      def __extract_info(self, url, ie, download, extra_info, process): +        # Compat with passing cookies in http headers +        self._apply_header_cookies(url) +          ie_result = ie.extract(url)          if ie_result is None:  # Finished already (backwards compatibility; listformats and friends should be moved here)              return @@ -897,7 +980,7 @@ class YoutubeDL(object):      def process_ie_result(self, ie_result, download=True, extra_info={}):          """ -        Take the result of the ie(may be modified) and resolve all unresolved +        Take the result of the ie (may be modified) and resolve all unresolved          references (URLs, playlist items).          It will also download the videos if 'download'. @@ -1468,23 +1551,45 @@ class YoutubeDL(object):          parsed_selector = _parse_format_selection(iter(TokenIterator(tokens)))          return _build_selector_function(parsed_selector) -    def _calc_headers(self, info_dict): -        res = std_headers.copy() - -        add_headers = info_dict.get('http_headers') -        if add_headers: -            res.update(add_headers) +    def _calc_headers(self, info_dict, load_cookies=False): +        if load_cookies:  # For --load-info-json +            # load cookies from http_headers in legacy info.json +            self._load_cookies(traverse_obj(info_dict, ('http_headers', 'Cookie'), casesense=False), +                               autoscope=info_dict['url']) +            # load scoped cookies from info.json +            self._load_cookies(info_dict.get('cookies'), autoscope=False) -        cookies = self._calc_cookies(info_dict) +        cookies = self.cookiejar.get_cookies_for_url(info_dict['url'])          if cookies: -            res['Cookie'] = cookies +            # Make a string like name1=val1; attr1=a_val1; ...name2=val2; ... +            # By convention a cookie name can't be a well-known attribute name +            # so this syntax is unambiguous and can be parsed by (eg) SimpleCookie +            encoder = compat_http_cookies_SimpleCookie() +            values = [] +            attributes = (('Domain', '='), ('Path', '='), ('Secure',), ('Expires', '='), ('Version', '=')) +            attributes = tuple([x[0].lower()] + list(x) for x in attributes) +            for cookie in cookies: +                _, value = encoder.value_encode(cookie.value) +                # Py 2 '' --> '', Py 3 '' --> '""' +                if value == '': +                    value = '""' +                values.append('='.join((cookie.name, value))) +                for attr in attributes: +                    value = getattr(cookie, attr[0], None) +                    if value: +                        values.append('%s%s' % (''.join(attr[1:]), value if len(attr) == 3 else '')) +            info_dict['cookies'] = '; '.join(values) + +        res = std_headers.copy() +        res.update(info_dict.get('http_headers') or {}) +        res = self._remove_cookie_header(res)          if 'X-Forwarded-For' not in res:              x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip')              if x_forwarded_for_ip:                  res['X-Forwarded-For'] = x_forwarded_for_ip -        return res +        return res or None      def _calc_cookies(self, info_dict):          pr = sanitized_Request(info_dict['url']) @@ -1663,10 +1768,13 @@ class YoutubeDL(object):                  format['protocol'] = determine_protocol(format)              # Add HTTP headers, so that external programs can use them from the              # json output -            full_format_info = info_dict.copy() -            full_format_info.update(format) -            format['http_headers'] = self._calc_headers(full_format_info) -        # Remove private housekeeping stuff +            format['http_headers'] = self._calc_headers(ChainMap(format, info_dict), load_cookies=True) + +        # Safeguard against old/insecure infojson when using --load-info-json +        info_dict['http_headers'] = self._remove_cookie_header( +            info_dict.get('http_headers') or {}) or None + +        # Remove private housekeeping stuff (copied to http_headers in _calc_headers())          if '__x_forwarded_for_ip' in info_dict:              del info_dict['__x_forwarded_for_ip'] @@ -1927,17 +2035,9 @@ class YoutubeDL(object):                                                  (sub_lang, error_to_compat_str(err)))                              continue -        if self.params.get('writeinfojson', False): -            infofn = replace_extension(filename, 'info.json', info_dict.get('ext')) -            if self.params.get('nooverwrites', False) and os.path.exists(encodeFilename(infofn)): -                self.to_screen('[info] Video description metadata is already present') -            else: -                self.to_screen('[info] Writing video description metadata as JSON to: ' + infofn) -                try: -                    write_json_file(self.filter_requested_info(info_dict), infofn) -                except (OSError, IOError): -                    self.report_error('Cannot write metadata to JSON file ' + infofn) -                    return +        self._write_info_json( +            'video description', info_dict, +            replace_extension(filename, 'info.json', info_dict.get('ext')))          self._write_thumbnails(info_dict, filename) @@ -1958,7 +2058,11 @@ class YoutubeDL(object):                          fd.add_progress_hook(ph)                      if self.params.get('verbose'):                          self.to_screen('[debug] Invoking downloader on %r' % info.get('url')) -                    return fd.download(name, info) + +                    new_info = dict((k, v) for k, v in info.items() if not k.startswith('__p')) +                    new_info['http_headers'] = self._calc_headers(new_info) + +                    return fd.download(name, new_info)                  if info_dict.get('requested_formats') is not None:                      downloaded = [] @@ -2484,7 +2588,7 @@ class YoutubeDL(object):          opts_proxy = self.params.get('proxy')          if opts_cookiefile is None: -            self.cookiejar = compat_cookiejar.CookieJar() +            self.cookiejar = YoutubeDLCookieJar()          else:              opts_cookiefile = expand_path(opts_cookiefile)              self.cookiejar = YoutubeDLCookieJar(opts_cookiefile) @@ -2545,6 +2649,28 @@ class YoutubeDL(object):              encoding = preferredencoding()          return encoding +    def _write_info_json(self, label, info_dict, infofn, overwrite=None): +        if not self.params.get('writeinfojson', False): +            return False + +        def msg(fmt, lbl): +            return fmt % (lbl + ' metadata',) + +        if overwrite is None: +            overwrite = not self.params.get('nooverwrites', False) + +        if not overwrite and os.path.exists(encodeFilename(infofn)): +            self.to_screen(msg('[info] %s is already present', label.title())) +            return 'exists' +        else: +            self.to_screen(msg('[info] Writing %s as JSON to: ' + infofn, label)) +            try: +                write_json_file(self.filter_requested_info(info_dict), infofn) +                return True +            except (OSError, IOError): +                self.report_error(msg('Cannot write %s to JSON file ' + infofn, label)) +                return +      def _write_thumbnails(self, info_dict, filename):          if self.params.get('writethumbnail', False):              thumbnails = info_dict.get('thumbnails') diff --git a/youtube_dl/downloader/common.py b/youtube_dl/downloader/common.py index c86ce2aa5..08c98b336 100644 --- a/youtube_dl/downloader/common.py +++ b/youtube_dl/downloader/common.py @@ -13,7 +13,9 @@ from ..utils import (      error_to_compat_str,      format_bytes,      shell_quote, +    T,      timeconvert, +    traverse_obj,  ) @@ -339,6 +341,10 @@ class FileDownloader(object):      def download(self, filename, info_dict):          """Download to a filename using the info from info_dict          Return True on success and False otherwise + +        This method filters the `Cookie` header from the info_dict to prevent leaks. +        Downloaders have their own way of handling cookies. +        See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj          """          nooverwrites_and_exists = ( @@ -373,6 +379,9 @@ class FileDownloader(object):                      else '%.2f' % sleep_interval))              time.sleep(sleep_interval) +        info_dict['http_headers'] = dict(traverse_obj(info_dict, ( +            'http_headers', T(dict.items), lambda _, pair: pair[0].lower() != 'cookie'))) or None +          return self.real_download(filename, info_dict)      def real_download(self, filename, info_dict): | 
