diff options
Diffstat (limited to 'test/test_http.py')
| -rw-r--r-- | test/test_http.py | 489 | 
1 files changed, 440 insertions, 49 deletions
| diff --git a/test/test_http.py b/test/test_http.py index 487a9bc77..1a65df9e0 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -8,33 +8,160 @@ import sys  import unittest  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import gzip +import io +import ssl +import tempfile +import threading +import zlib + +# avoid deprecated alias assertRaisesRegexp +if hasattr(unittest.TestCase, 'assertRaisesRegex'): +    unittest.TestCase.assertRaisesRegexp = unittest.TestCase.assertRaisesRegex + +try: +    import brotli +except ImportError: +    brotli = None +try: +    from urllib.request import pathname2url +except ImportError: +    from urllib import pathname2url + +from youtube_dl.compat import ( +    compat_http_cookiejar_Cookie, +    compat_http_server, +    compat_str as str, +    compat_urllib_error, +    compat_urllib_HTTPError, +    compat_urllib_parse, +    compat_urllib_request, +) + +from youtube_dl.utils import ( +    sanitized_Request, +    urlencode_postdata, +) +  from test.helper import ( +    FakeYDL,      FakeLogger,      http_server_port,  )  from youtube_dl import YoutubeDL -from youtube_dl.compat import compat_http_server, compat_urllib_request -import ssl -import threading  TEST_DIR = os.path.dirname(os.path.abspath(__file__))  class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): +    protocol_version = 'HTTP/1.1' + +    # work-around old/new -style class inheritance +    def super(self, meth_name, *args, **kwargs): +        from types import MethodType +        try: +            super() +            fn = lambda s, m, *a, **k: getattr(super(), m)(*a, **k) +        except TypeError: +            fn = lambda s, m, *a, **k: getattr(compat_http_server.BaseHTTPRequestHandler, m)(s, *a, **k) +        self.super = MethodType(fn, self) +        return self.super(meth_name, *args, **kwargs) +      def log_message(self, format, *args):          pass +    def _headers(self): +        payload = str(self.headers).encode('utf-8') +        self.send_response(200) +        self.send_header('Content-Type', 'application/json') +        self.send_header('Content-Length', str(len(payload))) +        self.end_headers() +        self.wfile.write(payload) + +    def _redirect(self): +        self.send_response(int(self.path[len('/redirect_'):])) +        self.send_header('Location', '/method') +        self.send_header('Content-Length', '0') +        self.end_headers() + +    def _method(self, method, payload=None): +        self.send_response(200) +        self.send_header('Content-Length', str(len(payload or ''))) +        self.send_header('Method', method) +        self.end_headers() +        if payload: +            self.wfile.write(payload) + +    def _status(self, status): +        payload = '<html>{0} NOT FOUND</html>'.format(status).encode('utf-8') +        self.send_response(int(status)) +        self.send_header('Content-Type', 'text/html; charset=utf-8') +        self.send_header('Content-Length', str(len(payload))) +        self.end_headers() +        self.wfile.write(payload) + +    def _read_data(self): +        if 'Content-Length' in self.headers: +            return self.rfile.read(int(self.headers['Content-Length'])) + +    def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): +        return '{0}://{1}:{2}/{3}'.format( +            scheme, host, +            port if port is not None +            else http_server_port(self.server), path) + +    def do_POST(self): +        data = self._read_data() +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('POST', data) +        elif self.path.startswith('/headers'): +            self._headers() +        else: +            self._status(404) + +    def do_HEAD(self): +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('HEAD') +        else: +            self._status(404) + +    def do_PUT(self): +        data = self._read_data() +        if self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('PUT', data) +        else: +            self._status(404) +      def do_GET(self): -        if self.path == '/video.html': -            self.send_response(200) -            self.send_header('Content-Type', 'text/html; charset=utf-8') + +        def respond(payload=b'<html><video src="/vid.mp4" /></html>', +                    payload_type='text/html; charset=utf-8', +                    payload_encoding=None, +                    resp_code=200): +            self.send_response(resp_code) +            self.send_header('Content-Type', payload_type) +            if payload_encoding: +                self.send_header('Content-Encoding', payload_encoding) +            self.send_header('Content-Length', str(len(payload)))  # required for persistent connections              self.end_headers() -            self.wfile.write(b'<html><video src="/vid.mp4" /></html>') +            self.wfile.write(payload) + +        def gzip_compress(p): +            buf = io.BytesIO() +            with gzip.GzipFile(fileobj=buf, mode='wb') as f: +                f.write(p) +            return buf.getvalue() + +        if self.path == '/video.html': +            respond()          elif self.path == '/vid.mp4': -            self.send_response(200) -            self.send_header('Content-Type', 'video/mp4') -            self.end_headers() -            self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') +            respond(b'\x00\x00\x00\x00\x20\x66\x74[video]', 'video/mp4')          elif self.path == '/302':              if sys.version_info[0] == 3:                  # XXX: Python 3 http server does not allow non-ASCII header values @@ -42,60 +169,284 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):                  self.end_headers()                  return -            new_url = 'http://127.0.0.1:%d/中文.html' % http_server_port(self.server) +            new_url = self._test_url('中文.html')              self.send_response(302)              self.send_header(b'Location', new_url.encode('utf-8'))              self.end_headers()          elif self.path == '/%E4%B8%AD%E6%96%87.html': -            self.send_response(200) -            self.send_header('Content-Type', 'text/html; charset=utf-8') +            respond() +        elif self.path == '/%c7%9f': +            respond() +        elif self.path.startswith('/redirect_'): +            self._redirect() +        elif self.path.startswith('/method'): +            self._method('GET') +        elif self.path.startswith('/headers'): +            self._headers() +        elif self.path == '/trailing_garbage': +            payload = b'<html><video src="/vid.mp4" /></html>' +            compressed = gzip_compress(payload) + b'trailing garbage' +            respond(compressed, payload_encoding='gzip') +        elif self.path == '/302-non-ascii-redirect': +            new_url = self._test_url('中文.html') +            # actually respond with permanent redirect +            self.send_response(301) +            self.send_header('Location', new_url) +            self.send_header('Content-Length', '0')              self.end_headers() -            self.wfile.write(b'<html><video src="/vid.mp4" /></html>') +        elif self.path == '/content-encoding': +            encodings = self.headers.get('ytdl-encoding', '') +            payload = b'<html><video src="/vid.mp4" /></html>' +            for encoding in filter(None, (e.strip() for e in encodings.split(','))): +                if encoding == 'br' and brotli: +                    payload = brotli.compress(payload) +                elif encoding == 'gzip': +                    payload = gzip_compress(payload) +                elif encoding == 'deflate': +                    payload = zlib.compress(payload) +                elif encoding == 'unsupported': +                    payload = b'raw' +                    break +                else: +                    self._status(415) +                    return +            respond(payload, payload_encoding=encodings) +          else: -            assert False +            self._status(404) + +    def send_header(self, keyword, value): +        """ +        Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. +        This is against what is defined in RFC 3986: but we need to test that we support this +        since some sites incorrectly do this. +        """ +        if keyword.lower() == 'connection': +            return self.super('send_header', keyword, value) + +        if not hasattr(self, '_headers_buffer'): +            self._headers_buffer = [] + +        self._headers_buffer.append('{0}: {1}\r\n'.format(keyword, value).encode('utf-8')) + +    def end_headers(self): +        if hasattr(self, '_headers_buffer'): +            self.wfile.write(b''.join(self._headers_buffer)) +            self._headers_buffer = [] +        self.super('end_headers')  class TestHTTP(unittest.TestCase):      def setUp(self): -        self.httpd = compat_http_server.HTTPServer( +        # HTTP server +        self.http_httpd = compat_http_server.HTTPServer(              ('127.0.0.1', 0), HTTPTestRequestHandler) -        self.port = http_server_port(self.httpd) -        self.server_thread = threading.Thread(target=self.httpd.serve_forever) -        self.server_thread.daemon = True -        self.server_thread.start() +        self.http_port = http_server_port(self.http_httpd) -    def test_unicode_path_redirection(self): -        # XXX: Python 3 http server does not allow non-ASCII header values -        if sys.version_info[0] == 3: -            return +        self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) +        self.http_server_thread.daemon = True +        self.http_server_thread.start() -        ydl = YoutubeDL({'logger': FakeLogger()}) -        r = ydl.extract_info('http://127.0.0.1:%d/302' % self.port) -        self.assertEqual(r['entries'][0]['url'], 'http://127.0.0.1:%d/vid.mp4' % self.port) +        try: +            from http.server import ThreadingHTTPServer +        except ImportError: +            try: +                from socketserver import ThreadingMixIn +            except ImportError: +                from SocketServer import ThreadingMixIn +            class ThreadingHTTPServer(ThreadingMixIn, compat_http_server.HTTPServer): +                pass -class TestHTTPS(unittest.TestCase): -    def setUp(self): +        # HTTPS server          certfn = os.path.join(TEST_DIR, 'testcert.pem') -        self.httpd = compat_http_server.HTTPServer( +        self.https_httpd = ThreadingHTTPServer(              ('127.0.0.1', 0), HTTPTestRequestHandler) -        self.httpd.socket = ssl.wrap_socket( -            self.httpd.socket, certfile=certfn, server_side=True) -        self.port = http_server_port(self.httpd) -        self.server_thread = threading.Thread(target=self.httpd.serve_forever) -        self.server_thread.daemon = True -        self.server_thread.start() +        try: +            sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) +            sslctx.verify_mode = ssl.CERT_NONE +            sslctx.check_hostname = False +            sslctx.load_cert_chain(certfn, None) +            self.https_httpd.socket = sslctx.wrap_socket( +                self.https_httpd.socket, server_side=True) +        except AttributeError: +            self.https_httpd.socket = ssl.wrap_socket( +                self.https_httpd.socket, certfile=certfn, server_side=True) + +        self.https_port = http_server_port(self.https_httpd) +        self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) +        self.https_server_thread.daemon = True +        self.https_server_thread.start() + +    def tearDown(self): + +        def closer(svr): +            def _closer(): +                svr.shutdown() +                svr.server_close() +            return _closer + +        shutdown_thread = threading.Thread(target=closer(self.http_httpd)) +        shutdown_thread.start() +        self.http_server_thread.join(2.0) + +        shutdown_thread = threading.Thread(target=closer(self.https_httpd)) +        shutdown_thread.start() +        self.https_server_thread.join(2.0) + +    def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): +        return '{0}://{1}:{2}/{3}'.format( +            scheme, host, +            port if port is not None +            else self.https_port if scheme == 'https' +            else self.http_port, path)      def test_nocheckcertificate(self): -        if sys.version_info >= (2, 7, 9):  # No certificate checking anyways -            ydl = YoutubeDL({'logger': FakeLogger()}) -            self.assertRaises( -                Exception, -                ydl.extract_info, 'https://127.0.0.1:%d/video.html' % self.port) +        with FakeYDL({'logger': FakeLogger()}) as ydl: +            with self.assertRaises(compat_urllib_error.URLError): +                ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) + +        with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: +            r = ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) +            self.assertEqual(r.getcode(), 200) +            r.close() + +    def test_percent_encode(self): +        with FakeYDL() as ydl: +            # Unicode characters should be encoded with uppercase percent-encoding +            res = ydl.urlopen(sanitized_Request(self._test_url('中文.html'))) +            self.assertEqual(res.getcode(), 200) +            res.close() +            # don't normalize existing percent encodings +            res = ydl.urlopen(sanitized_Request(self._test_url('%c7%9f'))) +            self.assertEqual(res.getcode(), 200) +            res.close() + +    def test_unicode_path_redirection(self): +        with FakeYDL() as ydl: +            r = ydl.urlopen(sanitized_Request(self._test_url('302-non-ascii-redirect'))) +            self.assertEqual(r.url, self._test_url('%E4%B8%AD%E6%96%87.html')) +            r.close() + +    def test_redirect(self): +        with FakeYDL() as ydl: +            def do_req(redirect_status, method, check_no_content=False): +                data = b'testdata' if method in ('POST', 'PUT') else None +                res = ydl.urlopen(sanitized_Request( +                    self._test_url('redirect_{0}'.format(redirect_status)), +                    method=method, data=data)) +                if check_no_content: +                    self.assertNotIn('Content-Type', res.headers) +                return res.read().decode('utf-8'), res.headers.get('method', '') +            # A 303 must either use GET or HEAD for subsequent request +            self.assertEqual(do_req(303, 'POST'), ('', 'GET')) +            self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) + +            self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) + +            # 301 and 302 turn POST only into a GET, with no Content-Type +            self.assertEqual(do_req(301, 'POST', True), ('', 'GET')) +            self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) +            self.assertEqual(do_req(302, 'POST', True), ('', 'GET')) +            self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) + +            self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) +            self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) + +            # 307 and 308 should not change method +            for m in ('POST', 'PUT'): +                self.assertEqual(do_req(307, m), ('testdata', m)) +                self.assertEqual(do_req(308, m), ('testdata', m)) + +            self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) +            self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) -        ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True}) -        r = ydl.extract_info('https://127.0.0.1:%d/video.html' % self.port) -        self.assertEqual(r['entries'][0]['url'], 'https://127.0.0.1:%d/vid.mp4' % self.port) +            # These should not redirect and instead raise an HTTPError +            for code in (300, 304, 305, 306): +                with self.assertRaises(compat_urllib_HTTPError): +                    do_req(code, 'GET') + +    def test_content_type(self): +        # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 +        with FakeYDL({'nocheckcertificate': True}) as ydl: +            # method should be auto-detected as POST +            r = sanitized_Request(self._test_url('headers', scheme='https'), data=urlencode_postdata({'test': 'test'})) + +            headers = ydl.urlopen(r).read().decode('utf-8') +            self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) + +            # test http +            r = sanitized_Request(self._test_url('headers'), data=urlencode_postdata({'test': 'test'})) +            headers = ydl.urlopen(r).read().decode('utf-8') +            self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) + +    def test_cookiejar(self): +        with FakeYDL() as ydl: +            ydl.cookiejar.set_cookie(compat_http_cookiejar_Cookie( +                0, 'test', 'ytdl', None, False, '127.0.0.1', True, +                False, '/headers', True, False, None, False, None, None, {})) +            data = ydl.urlopen(sanitized_Request(self._test_url('headers'))).read() +            self.assertIn(b'Cookie: test=ytdl', data) + +    def test_no_compression_compat_header(self): +        with FakeYDL() as ydl: +            data = ydl.urlopen( +                sanitized_Request( +                    self._test_url('headers'), +                    headers={'Youtubedl-no-compression': True})).read() +            self.assertIn(b'Accept-Encoding: identity', data) +            self.assertNotIn(b'youtubedl-no-compression', data.lower()) + +    def test_gzip_trailing_garbage(self): +        # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 +        # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f +        with FakeYDL() as ydl: +            data = ydl.urlopen(sanitized_Request(self._test_url('trailing_garbage'))).read().decode('utf-8') +            self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') + +    def __test_compression(self, encoding): +        with FakeYDL() as ydl: +            res = ydl.urlopen( +                sanitized_Request( +                    self._test_url('content-encoding'), +                    headers={'ytdl-encoding': encoding})) +            self.assertEqual(res.headers.get('Content-Encoding'), encoding) +            self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') + +    @unittest.skipUnless(brotli, 'brotli support is not installed') +    @unittest.expectedFailure +    def test_brotli(self): +        self.__test_compression('br') + +    @unittest.expectedFailure +    def test_deflate(self): +        self.__test_compression('deflate') + +    @unittest.expectedFailure +    def test_gzip(self): +        self.__test_compression('gzip') + +    @unittest.expectedFailure  # not yet implemented +    def test_multiple_encodings(self): +        # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4 +        with FakeYDL() as ydl: +            for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'): +                res = ydl.urlopen( +                    sanitized_Request( +                        self._test_url('content-encoding'), +                        headers={'ytdl-encoding': pair})) +                self.assertEqual(res.headers.get('Content-Encoding'), pair) +                self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') + +    def test_unsupported_encoding(self): +        # it should return the raw content +        with FakeYDL() as ydl: +            res = ydl.urlopen( +                sanitized_Request( +                    self._test_url('content-encoding'), +                    headers={'ytdl-encoding': 'unsupported'})) +            self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported') +            self.assertEqual(res.read(), b'raw')  def _build_proxy_handler(name): @@ -109,7 +460,7 @@ def _build_proxy_handler(name):              self.send_response(200)              self.send_header('Content-Type', 'text/plain; charset=utf-8')              self.end_headers() -            self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8')) +            self.wfile.write('{0}: {1}'.format(self.proxy_name, self.path).encode('utf-8'))      return HTTPTestRequestHandler @@ -129,10 +480,30 @@ class TestProxy(unittest.TestCase):          self.geo_proxy_thread.daemon = True          self.geo_proxy_thread.start() +    def tearDown(self): + +        def closer(svr): +            def _closer(): +                svr.shutdown() +                svr.server_close() +            return _closer + +        shutdown_thread = threading.Thread(target=closer(self.proxy)) +        shutdown_thread.start() +        self.proxy_thread.join(2.0) + +        shutdown_thread = threading.Thread(target=closer(self.geo_proxy)) +        shutdown_thread.start() +        self.geo_proxy_thread.join(2.0) + +    def _test_proxy(self, host='127.0.0.1', port=None): +        return '{0}:{1}'.format( +            host, port if port is not None else self.port) +      def test_proxy(self): -        geo_proxy = '127.0.0.1:{0}'.format(self.geo_port) +        geo_proxy = self._test_proxy(port=self.geo_port)          ydl = YoutubeDL({ -            'proxy': '127.0.0.1:{0}'.format(self.port), +            'proxy': self._test_proxy(),              'geo_verification_proxy': geo_proxy,          })          url = 'http://foo.com/bar' @@ -146,7 +517,7 @@ class TestProxy(unittest.TestCase):      def test_proxy_with_idn(self):          ydl = YoutubeDL({ -            'proxy': '127.0.0.1:{0}'.format(self.port), +            'proxy': self._test_proxy(),          })          url = 'http://中文.tw/'          response = ydl.urlopen(url).read().decode('utf-8') @@ -154,5 +525,25 @@ class TestProxy(unittest.TestCase):          self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') +class TestFileURL(unittest.TestCase): +    # See https://github.com/ytdl-org/youtube-dl/issues/8227 +    def test_file_urls(self): +        tf = tempfile.NamedTemporaryFile(delete=False) +        tf.write(b'foobar') +        tf.close() +        url = compat_urllib_parse.urljoin('file://', pathname2url(tf.name)) +        with FakeYDL() as ydl: +            self.assertRaisesRegexp( +                compat_urllib_error.URLError, 'file:// scheme is explicitly disabled in youtube-dl for security reasons', ydl.urlopen, url) +        # not yet implemented +        """ +        with FakeYDL({'enable_file_urls': True}) as ydl: +            res = ydl.urlopen(url) +            self.assertEqual(res.read(), b'foobar') +            res.close() +        """ +        os.unlink(tf.name) + +  if __name__ == '__main__':      unittest.main() | 
