diff options
| -rw-r--r-- | test/test_http.py | 489 | ||||
| -rw-r--r-- | youtube_dl/utils.py | 74 | 
2 files changed, 484 insertions, 79 deletions
| diff --git a/test/test_http.py b/test/test_http.py index 487a9bc77..1a65df9e0 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -8,33 +8,160 @@ import sys  import unittest  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import gzip +import io +import ssl +import tempfile +import threading +import zlib + +# avoid deprecated alias assertRaisesRegexp +if hasattr(unittest.TestCase, 'assertRaisesRegex'): +    unittest.TestCase.assertRaisesRegexp = unittest.TestCase.assertRaisesRegex + +try: +    import brotli +except ImportError: +    brotli = None +try: +    from urllib.request import pathname2url +except ImportError: +    from urllib import pathname2url + +from youtube_dl.compat import ( +    compat_http_cookiejar_Cookie, +    compat_http_server, +    compat_str as str, +    compat_urllib_error, +    compat_urllib_HTTPError, +    compat_urllib_parse, +    compat_urllib_request, +) + +from youtube_dl.utils import ( +    sanitized_Request, +    urlencode_postdata, +) +  from test.helper import ( +    FakeYDL,      FakeLogger,      http_server_port,  )  from youtube_dl import YoutubeDL -from youtube_dl.compat import compat_http_server, compat_urllib_request -import ssl -import threading  TEST_DIR = os.path.dirname(os.path.abspath(__file__))  class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): +    protocol_version = 'HTTP/1.1' + +    # work-around old/new -style class inheritance +    def super(self, meth_name, *args, **kwargs): +        from types import MethodType +        try: +            super() +            fn = lambda s, m, *a, **k: getattr(super(), m)(*a, **k) +        except TypeError: +            fn = lambda s, m, *a, **k: getattr(compat_http_server.BaseHTTPRequestHandler, m)(s, *a, **k) +        self.super = MethodType(fn, self) +        return self.super(meth_name, *args, **kwargs) +      def log_message(self, format, *args):          pass +    def _headers(self): +        payload = str(self.headers).encode('utf-8') +        self.send_response(200) +        self.send_header('Content-Type', 'application/json') +        self.send_header('Content-Length', str(len(payload))) +        self.end_headers() +        self.wfile.write(payload) + +    def _redirect(self): +        self.send_response(int(self.path[len('/redirect_'):])) +        self.send_header('Location', '/method') +        self.send_header('Content-Length', '0') +        self.end_headers() + +    def _method(self, method, payload=None): +        self.send_response(200) +        self.send_header('Content-Length', str(len(payload or ''))) +        self.send_header('Method', method) +        self.end_headers() +        if payload: +            self.wfile.write(payload) + +    def _status(self, status): +        payload = '<html>{0} NOT FOUND</html>'.format(status).encode('utf-8') +        self.send_response(int(status)) +        self.send_header('Content-Type', 'text/html; charset=utf-8') +        self.send_header('Content-Length', str(len(payload))) +        self.end_headers() +        self.wfile.write(payload) + +    def _read_data(self): +        if 'Content-Length' in self.headers: +            return self.rfile.read(int(self.headers['Content-Length'])) + +    def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): +        return '{0}://{1}:{2}/{3}'.format( +            scheme, host, +            port if port is not None +            else http_server_port(self.server), path) + +    def do_POST(self): +        data = self._read_data() +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('POST', data) +        elif self.path.startswith('/headers'): +            self._headers() +        else: +            self._status(404) + +    def do_HEAD(self): +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('HEAD') +        else: +            self._status(404) + +    def do_PUT(self): +        data = self._read_data() +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('PUT', data) +        else: +            self._status(404) +      def do_GET(self): -        if self.path == '/video.html': -            self.send_response(200) -            self.send_header('Content-Type', 'text/html; charset=utf-8') + +        def respond(payload=b'<html><video src="/vid.mp4" /></html>', +                    payload_type='text/html; charset=utf-8', +                    payload_encoding=None, +                    resp_code=200): +            self.send_response(resp_code) +            self.send_header('Content-Type', payload_type) +            if payload_encoding: +                self.send_header('Content-Encoding', payload_encoding) +            self.send_header('Content-Length', str(len(payload)))  # required for persistent connections              self.end_headers() -            self.wfile.write(b'<html><video src="/vid.mp4" /></html>') +            self.wfile.write(payload) + +        def gzip_compress(p): +            buf = io.BytesIO() +            with gzip.GzipFile(fileobj=buf, mode='wb') as f: +                f.write(p) +            return buf.getvalue() + +        if self.path == '/video.html': +            respond()          elif self.path == '/vid.mp4': -            self.send_response(200) -            self.send_header('Content-Type', 'video/mp4') -            self.end_headers() -            self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') +            respond(b'\x00\x00\x00\x00\x20\x66\x74[video]', 'video/mp4')          elif self.path == '/302':              if sys.version_info[0] == 3:                  # XXX: Python 3 http server does not allow non-ASCII header values @@ -42,60 +169,284 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):                  self.end_headers()                  return -            new_url = 'http://127.0.0.1:%d/中文.html' % http_server_port(self.server) +            new_url = self._test_url('中文.html')              self.send_response(302)              self.send_header(b'Location', new_url.encode('utf-8'))              self.end_headers()          elif self.path == '/%E4%B8%AD%E6%96%87.html': -            self.send_response(200) -            self.send_header('Content-Type', 'text/html; charset=utf-8') +            respond() +        elif self.path == '/%c7%9f': +            respond() +        elif self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('GET') +        elif self.path.startswith('/headers'): +            self._headers() +        elif self.path == '/trailing_garbage': +            payload = b'<html><video src="/vid.mp4" /></html>' +            compressed = gzip_compress(payload) + b'trailing garbage' +            respond(compressed, payload_encoding='gzip') +        elif self.path == '/302-non-ascii-redirect': +            new_url = self._test_url('中文.html') +            # actually respond with permanent redirect +            self.send_response(301) +            self.send_header('Location', new_url) +            self.send_header('Content-Length', '0')              self.end_headers() -            self.wfile.write(b'<html><video src="/vid.mp4" /></html>') +        elif self.path == '/content-encoding': +            encodings = self.headers.get('ytdl-encoding', '') +            payload = b'<html><video src="/vid.mp4" /></html>' +            for encoding in filter(None, (e.strip() for e in encodings.split(','))): +                if encoding == 'br' and brotli: +                    payload = brotli.compress(payload) +                elif encoding == 'gzip': +                    payload = gzip_compress(payload) +                elif encoding == 'deflate': +                    payload = zlib.compress(payload) +                elif encoding == 'unsupported': +                    payload = b'raw' +                    break +                else: +                    self._status(415) +                    return +            respond(payload, payload_encoding=encodings) +          else: -            assert False +            self._status(404) + +    def send_header(self, keyword, value): +        """ +        Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. +        This is against what is defined in RFC 3986: but we need to test that we support this +        since some sites incorrectly do this. +        """ +        if keyword.lower() == 'connection': +            return self.super('send_header', keyword, value) + +        if not hasattr(self, '_headers_buffer'): +            self._headers_buffer = [] + +        self._headers_buffer.append('{0}: {1}\r\n'.format(keyword, value).encode('utf-8')) + +    def end_headers(self): +        if hasattr(self, '_headers_buffer'): +            self.wfile.write(b''.join(self._headers_buffer)) +            self._headers_buffer = [] +        self.super('end_headers')  class TestHTTP(unittest.TestCase):      def setUp(self): -        self.httpd = compat_http_server.HTTPServer( +        # HTTP server +        self.http_httpd = compat_http_server.HTTPServer(              ('127.0.0.1', 0), HTTPTestRequestHandler) -        self.port = http_server_port(self.httpd) -        self.server_thread = threading.Thread(target=self.httpd.serve_forever) -        self.server_thread.daemon = True -        self.server_thread.start() +        self.http_port = http_server_port(self.http_httpd) -    def test_unicode_path_redirection(self): -        # XXX: Python 3 http server does not allow non-ASCII header values -        if sys.version_info[0] == 3: -            return +        self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) +        self.http_server_thread.daemon = True +        self.http_server_thread.start() -        ydl = YoutubeDL({'logger': FakeLogger()}) -        r = ydl.extract_info('http://127.0.0.1:%d/302' % self.port) -        self.assertEqual(r['entries'][0]['url'], 'http://127.0.0.1:%d/vid.mp4' % self.port) +        try: +            from http.server import ThreadingHTTPServer +        except ImportError: +            try: +                from socketserver import ThreadingMixIn +            except ImportError: +                from SocketServer import ThreadingMixIn +            class ThreadingHTTPServer(ThreadingMixIn, compat_http_server.HTTPServer): +                pass -class TestHTTPS(unittest.TestCase): -    def setUp(self): +        # HTTPS server          certfn = os.path.join(TEST_DIR, 'testcert.pem') -        self.httpd = compat_http_server.HTTPServer( +        self.https_httpd = ThreadingHTTPServer(              ('127.0.0.1', 0), HTTPTestRequestHandler) -        self.httpd.socket = ssl.wrap_socket( -            self.httpd.socket, certfile=certfn, server_side=True) -        self.port = http_server_port(self.httpd) -        self.server_thread = threading.Thread(target=self.httpd.serve_forever) -        self.server_thread.daemon = True -        self.server_thread.start() +        try: +            sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) +            sslctx.verify_mode = ssl.CERT_NONE +            sslctx.check_hostname = False +            sslctx.load_cert_chain(certfn, None) +            self.https_httpd.socket = sslctx.wrap_socket( +                self.https_httpd.socket, server_side=True) +        except AttributeError: +            self.https_httpd.socket = ssl.wrap_socket( +                self.https_httpd.socket, certfile=certfn, server_side=True) + +        self.https_port = http_server_port(self.https_httpd) +        self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) +        self.https_server_thread.daemon = True +        self.https_server_thread.start() + +    def tearDown(self): + +        def closer(svr): +            def _closer(): +                svr.shutdown() +                svr.server_close() +            return _closer + +        shutdown_thread = threading.Thread(target=closer(self.http_httpd)) +        shutdown_thread.start() +        self.http_server_thread.join(2.0) + +        shutdown_thread = threading.Thread(target=closer(self.https_httpd)) +        shutdown_thread.start() +        self.https_server_thread.join(2.0) + +    def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): +        return '{0}://{1}:{2}/{3}'.format( +            scheme, host, +            port if port is not None +            else self.https_port if scheme == 'https' +            else self.http_port, path)      def test_nocheckcertificate(self): -        if sys.version_info >= (2, 7, 9):  # No certificate checking anyways -            ydl = YoutubeDL({'logger': FakeLogger()}) -            self.assertRaises( -                Exception, -                ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) +        with FakeYDL({'logger': FakeLogger()}) as ydl: +            with self.assertRaises(compat_urllib_error.URLError): +                ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) + +        with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: +            r = ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) +            self.assertEqual(r.getcode(), 200) +            r.close() + +    def test_percent_encode(self): +        with FakeYDL() as ydl: +            # Unicode characters should be encoded with uppercase percent-encoding +            res = ydl.urlopen(sanitized_Request(self._test_url('中文.html'))) +            self.assertEqual(res.getcode(), 200) +            res.close() +            # don't normalize existing percent encodings +            res = ydl.urlopen(sanitized_Request(self._test_url('%c7%9f'))) +            self.assertEqual(res.getcode(), 200) +            res.close() + +    def test_unicode_path_redirection(self): +        with FakeYDL() as ydl: +            r = ydl.urlopen(sanitized_Request(self._test_url('302-non-ascii-redirect'))) +            self.assertEqual(r.url, self._test_url('%E4%B8%AD%E6%96%87.html')) +            r.close() + +    def test_redirect(self): +        with FakeYDL() as ydl: +            def do_req(redirect_status, method, check_no_content=False): +                data = b'testdata' if method in ('POST', 'PUT') else None +                res = ydl.urlopen(sanitized_Request( +                    self._test_url('redirect_{0}'.format(redirect_status)), +                    method=method, data=data)) +                if check_no_content: +                    self.assertNotIn('Content-Type', res.headers) +                return res.read().decode('utf-8'), res.headers.get('method', '') +            # A 303 must either use GET or HEAD for subsequent request +            self.assertEqual(do_req(303, 'POST'), ('', 'GET')) +            self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) + +            self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) + +            # 301 and 302 turn POST only into a GET, with no Content-Type +            self.assertEqual(do_req(301, 'POST', True), ('', 'GET')) +            self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) +            self.assertEqual(do_req(302, 'POST', True), ('', 'GET')) +            self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) + +            self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) +            self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) + +            # 307 and 308 should not change method +            for m in ('POST', 'PUT'): +                self.assertEqual(do_req(307, m), ('testdata', m)) +                self.assertEqual(do_req(308, m), ('testdata', m)) + +            self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) +            self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) -        ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) -        r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) -        self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) +            # These should not redirect and instead raise an HTTPError +            for code in (300, 304, 305, 306): +                with self.assertRaises(compat_urllib_HTTPError): +                    do_req(code, 'GET') + +    def test_content_type(self): +        # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 +        with FakeYDL({'nocheckcertificate': True}) as ydl: +            # method should be auto-detected as POST +            r = sanitized_Request(self._test_url('headers', scheme='https'), data=urlencode_postdata({'test': 'test'})) + +            headers = ydl.urlopen(r).read().decode('utf-8') +            self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) + +            # test http +            r = sanitized_Request(self._test_url('headers'), data=urlencode_postdata({'test': 'test'})) +            headers = ydl.urlopen(r).read().decode('utf-8') +            self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) + +    def test_cookiejar(self): +        with FakeYDL() as ydl: +            ydl.cookiejar.set_cookie(compat_http_cookiejar_Cookie( +                0, 'test', 'ytdl', None, False, '127.0.0.1', True, +                False, '/headers', True, False, None, False, None, None, {})) +            data = ydl.urlopen(sanitized_Request(self._test_url('headers'))).read() +            self.assertIn(b'Cookie: test=ytdl', data) + +    def test_no_compression_compat_header(self): +        with FakeYDL() as ydl: +            data = ydl.urlopen( +                sanitized_Request( +                    self._test_url('headers'), +                    headers={'Youtubedl-no-compression': True})).read() +            self.assertIn(b'Accept-Encoding: identity', data) +            self.assertNotIn(b'youtubedl-no-compression', data.lower()) + +    def test_gzip_trailing_garbage(self): +        # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 +        # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f +        with FakeYDL() as ydl: +            data = ydl.urlopen(sanitized_Request(self._test_url('trailing_garbage'))).read().decode('utf-8') +            self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') + +    def __test_compression(self, encoding): +        with FakeYDL() as ydl: +            res = ydl.urlopen( +                sanitized_Request( +                    self._test_url('content-encoding'), +                    headers={'ytdl-encoding': encoding})) +            self.assertEqual(res.headers.get('Content-Encoding'), encoding) +            self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') + +    @unittest.skipUnless(brotli, 'brotli support is not installed') +    @unittest.expectedFailure +    def test_brotli(self): +        self.__test_compression('br') + +    @unittest.expectedFailure +    def test_deflate(self): +        self.__test_compression('deflate') + +    @unittest.expectedFailure +    def test_gzip(self): +        self.__test_compression('gzip') + +    @unittest.expectedFailure  # not yet implemented +    def test_multiple_encodings(self): +        # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4 +        with FakeYDL() as ydl: +            for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'): +                res = ydl.urlopen( +                    sanitized_Request( +                        self._test_url('content-encoding'), +                        headers={'ytdl-encoding': pair})) +                self.assertEqual(res.headers.get('Content-Encoding'), pair) +                self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') + +    def test_unsupported_encoding(self): +        # it should return the raw content +        with FakeYDL() as ydl: +            res = ydl.urlopen( +                sanitized_Request( +                    self._test_url('content-encoding'), +                    headers={'ytdl-encoding': 'unsupported'})) +            self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported') +            self.assertEqual(res.read(), b'raw')  def _build_proxy_handler(name): @@ -109,7 +460,7 @@ def _build_proxy_handler(name):              self.send_response(200)              self.send_header('Content-Type', 'text/plain; charset=utf-8')              self.end_headers() -            self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) +            self.wfile.write('{0}: {1}'.format(self.proxy_name, self.path).encode('utf-8'))      return HTTPTestRequestHandler @@ -129,10 +480,30 @@ class TestProxy(unittest.TestCase):          self.geo_proxy_thread.daemon = True          self.geo_proxy_thread.start() +    def tearDown(self): + +        def closer(svr): +            def _closer(): +                svr.shutdown() +                svr.server_close() +            return _closer + +        shutdown_thread = threading.Thread(target=closer(self.proxy)) +        shutdown_thread.start() +        self.proxy_thread.join(2.0) + +        shutdown_thread = threading.Thread(target=closer(self.geo_proxy)) +        shutdown_thread.start() +        self.geo_proxy_thread.join(2.0) + +    def _test_proxy(self, host='127.0.0.1', port=None): +        return '{0}:{1}'.format( +            host, port if port is not None else self.port) +      def test_proxy(self): -        geo_proxy = '127.0.0.1:{0}'.format(self.geo_port) +        geo_proxy = self._test_proxy(port=self.geo_port)          ydl = YoutubeDL({ -            'proxy': '127.0.0.1:{0}'.format(self.port), +            'proxy': self._test_proxy(),              'geo_verification_proxy': geo_proxy,          })          url = 'http://foo.com/bar' @@ -146,7 +517,7 @@ class TestProxy(unittest.TestCase):      def test_proxy_with_idn(self):          ydl = YoutubeDL({ -            'proxy': '127.0.0.1:{0}'.format(self.port), +            'proxy': self._test_proxy(),          })          url = 'http://中文.tw/'          response = ydl.urlopen(url).read().decode('utf-8') @@ -154,5 +525,25 @@ class TestProxy(unittest.TestCase):          self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') +class TestFileURL(unittest.TestCase): +    # See https://github.com/ytdl-org/youtube-dl/issues/8227 +    def test_file_urls(self): +        tf = tempfile.NamedTemporaryFile(delete=False) +        tf.write(b'foobar') +        tf.close() +        url = compat_urllib_parse.urljoin('file://', pathname2url(tf.name)) +        with FakeYDL() as ydl: +            self.assertRaisesRegexp( +                compat_urllib_error.URLError, 'file:// scheme is explicitly disabled in youtube-dl for security reasons', ydl.urlopen, url) +        # not yet implemented +        """ +        with FakeYDL({'enable_file_urls': True}) as ydl: +            res = ydl.urlopen(url) +            self.assertEqual(res.read(), b'foobar') +            res.close() +        """ +        os.unlink(tf.name) + +  if __name__ == '__main__':      unittest.main() diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py index dbdbe5f59..58c710b08 100644 --- a/youtube_dl/utils.py +++ b/youtube_dl/utils.py @@ -41,7 +41,6 @@ import zlib  from .compat import (      compat_HTMLParseError,      compat_HTMLParser, -    compat_HTTPError,      compat_basestring,      compat_casefold,      compat_chr, @@ -64,6 +63,7 @@ from .compat import (      compat_struct_pack,      compat_struct_unpack,      compat_urllib_error, +    compat_urllib_HTTPError,      compat_urllib_parse,      compat_urllib_parse_parse_qs as compat_parse_qs,      compat_urllib_parse_urlencode, @@ -2614,7 +2614,8 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):      Part of this code was copied from: -    http://techknack.net/python-urllib2-handlers/ +    http://techknack.net/python-urllib2-handlers/, archived at +    https://web.archive.org/web/20130527205558/http://techknack.net/python-urllib2-handlers/      Andrew Rowls, the author of that code, agreed to release it to the      public domain. @@ -2672,7 +2673,9 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):              req._Request__original = req._Request__original.partition('#')[0]              req._Request__r_type = req._Request__r_type.partition('#')[0] -        return req +        # Use the totally undocumented AbstractHTTPHandler per +        # https://github.com/yt-dlp/yt-dlp/pull/4158 +        return compat_urllib_request.AbstractHTTPHandler.do_request_(self, req)      def http_response(self, req, resp):          old_resp = resp @@ -2683,7 +2686,7 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):              try:                  uncompressed = io.BytesIO(gz.read())              except IOError as original_ioerror: -                # There may be junk add the end of the file +                # There may be junk at the end of the file                  # See http://stackoverflow.com/q/4928560/35070 for details                  for i in range(1, 1024):                      try: @@ -2710,9 +2713,8 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler):              if location:                  # As of RFC 2616 default charset is iso-8859-1 that is respected by python 3                  if sys.version_info >= (3, 0): -                    location = location.encode('iso-8859-1').decode('utf-8') -                else: -                    location = location.decode('utf-8') +                    location = location.encode('iso-8859-1') +                location = location.decode('utf-8')                  location_escaped = escape_url(location)                  if location != location_escaped:                      del resp.headers['Location'] @@ -2940,17 +2942,16 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):      The code is based on HTTPRedirectHandler implementation from CPython [1]. -    This redirect handler solves two issues: -     - ensures redirect URL is always unicode under python 2 -     - introduces support for experimental HTTP response status code -       308 Permanent Redirect [2] used by some sites [3] +    This redirect handler fixes and improves the logic to better align with RFC7261 +    and what browsers tend to do [2][3]      1. https://github.com/python/cpython/blob/master/Lib/urllib/request.py -    2. https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/308 -    3. https://github.com/ytdl-org/youtube-dl/issues/28768 +    2. https://datatracker.ietf.org/doc/html/rfc7231 +    3. https://github.com/python/cpython/issues/91306      """ -    http_error_301 = http_error_303 = http_error_307 = http_error_308 = compat_urllib_request.HTTPRedirectHandler.http_error_302 +    # Supply possibly missing alias +    http_error_308 = compat_urllib_request.HTTPRedirectHandler.http_error_302      def redirect_request(self, req, fp, code, msg, headers, newurl):          """Return a Request or None in response to a redirect. @@ -2962,19 +2963,16 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):          else should try to handle this url.  Return None if you can't          but another Handler might.          """ -        m = req.get_method() -        if (not (code in (301, 302, 303, 307, 308) and m in ("GET", "HEAD") -                 or code in (301, 302, 303) and m == "POST")): -            raise compat_HTTPError(req.full_url, code, msg, headers, fp) -        # Strictly (according to RFC 2616), 301 or 302 in response to -        # a POST MUST NOT cause a redirection without confirmation -        # from the user (of urllib.request, in this case).  In practice, -        # essentially all clients do redirect in this case, so we do -        # the same. +        if code not in (301, 302, 303, 307, 308): +            raise compat_urllib_HTTPError(req.full_url, code, msg, headers, fp) + +        new_method = req.get_method() +        new_data = req.data +        remove_headers = []          # On python 2 urlh.geturl() may sometimes return redirect URL -        # as byte string instead of unicode. This workaround allows -        # to force it always return unicode. +        # as a byte string instead of unicode. This workaround forces +        # it to return unicode.          if sys.version_info[0] < 3:              newurl = compat_str(newurl) @@ -2983,13 +2981,29 @@ class YoutubeDLRedirectHandler(compat_urllib_request.HTTPRedirectHandler):          # but it is kept for compatibility with other callers.          newurl = newurl.replace(' ', '%20') -        CONTENT_HEADERS = ("content-length", "content-type") +        # A 303 must either use GET or HEAD for subsequent request +        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4 +        if code == 303 and req.get_method() != 'HEAD': +            new_method = 'GET' +        # 301 and 302 redirects are commonly turned into a GET from a POST +        # for subsequent requests by browsers, so we'll do the same. +        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2 +        # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3 +        elif code in (301, 302) and req.get_method() == 'POST': +            new_method = 'GET' + +        # only remove payload if method changed (e.g. POST to GET) +        if new_method != req.get_method(): +            new_data = None +            remove_headers.extend(['Content-Length', 'Content-Type']) +          # NB: don't use dict comprehension for python 2.6 compatibility -        newheaders = dict((k, v) for k, v in req.headers.items() -                          if k.lower() not in CONTENT_HEADERS) +        new_headers = dict((k, v) for k, v in req.header_items() +                           if k.lower() not in remove_headers) +          return compat_urllib_request.Request( -            newurl, headers=newheaders, origin_req_host=req.origin_req_host, -            unverifiable=True) +            newurl, headers=new_headers, origin_req_host=req.origin_req_host, +            unverifiable=True, method=new_method, data=new_data)  def extract_timezone(date_str): | 
