diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/helper.py | 5 | ||||
| -rw-r--r-- | test/test_compat.py | 30 | ||||
| -rw-r--r-- | test/test_http.py | 55 | ||||
| -rw-r--r-- | test/test_socks.py | 118 | ||||
| -rw-r--r-- | test/test_utils.py | 54 | 
5 files changed, 240 insertions, 22 deletions
| diff --git a/test/helper.py b/test/helper.py index b8e22c5cb..dfee217a9 100644 --- a/test/helper.py +++ b/test/helper.py @@ -24,8 +24,13 @@ from youtube_dl.utils import (  def get_params(override=None):      PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)),                                     "parameters.json") +    LOCAL_PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), +                                         "local_parameters.json")      with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:          parameters = json.load(pf) +    if os.path.exists(LOCAL_PARAMETERS_FILE): +        with io.open(LOCAL_PARAMETERS_FILE, encoding='utf-8') as pf: +            parameters.update(json.load(pf))      if override:          parameters.update(override)      return parameters diff --git a/test/test_compat.py b/test/test_compat.py index 618668210..f5317ac3e 100644 --- a/test/test_compat.py +++ b/test/test_compat.py @@ -10,13 +10,14 @@ import unittest  sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from youtube_dl.utils import get_filesystem_encoding  from youtube_dl.compat import (      compat_getenv, +    compat_setenv,      compat_etree_fromstring,      compat_expanduser,      compat_shlex_split,      compat_str, +    compat_struct_unpack,      compat_urllib_parse_unquote,      compat_urllib_parse_unquote_plus,      compat_urllib_parse_urlencode, @@ -26,19 +27,22 @@ from youtube_dl.compat import (  class TestCompat(unittest.TestCase):      def test_compat_getenv(self):          test_str = 'тест' -        os.environ['YOUTUBE-DL-TEST'] = ( -            test_str if sys.version_info >= (3, 0) -            else test_str.encode(get_filesystem_encoding())) +        compat_setenv('YOUTUBE-DL-TEST', test_str)          self.assertEqual(compat_getenv('YOUTUBE-DL-TEST'), test_str) +    def test_compat_setenv(self): +        test_var = 'YOUTUBE-DL-TEST' +        test_str = 'тест' +        compat_setenv(test_var, test_str) +        compat_getenv(test_var) +        self.assertEqual(compat_getenv(test_var), test_str) +      def test_compat_expanduser(self):          old_home = os.environ.get('HOME')          test_str = 'C:\Documents and Settings\тест\Application Data' -        os.environ['HOME'] = ( -            test_str if sys.version_info >= (3, 0) -            else test_str.encode(get_filesystem_encoding())) +        compat_setenv('HOME', test_str)          self.assertEqual(compat_expanduser('~'), test_str) -        os.environ['HOME'] = old_home +        compat_setenv('HOME', old_home or '')      def test_all_present(self):          import youtube_dl.compat @@ -99,5 +103,15 @@ class TestCompat(unittest.TestCase):          self.assertTrue(isinstance(doc.find('chinese').text, compat_str))          self.assertTrue(isinstance(doc.find('foo/bar').text, compat_str)) +    def test_compat_etree_fromstring_doctype(self): +        xml = '''<?xml version="1.0"?> +<!DOCTYPE smil PUBLIC "-//W3C//DTD SMIL 2.0//EN" "http://www.w3.org/2001/SMIL20/SMIL20.dtd"> +<smil xmlns="http://www.w3.org/2001/SMIL20/Language"></smil>''' +        compat_etree_fromstring(xml) + +    def test_struct_unpack(self): +        self.assertEqual(compat_struct_unpack('!B', b'\x00'), (0,)) + +  if __name__ == '__main__':      unittest.main() diff --git a/test/test_http.py b/test/test_http.py index 15e0ad369..5076ced51 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -16,6 +16,15 @@ import threading  TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +def http_server_port(httpd): +    if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket): +        # In Jython SSLSocket is not a subclass of socket.socket +        sock = httpd.socket.sock +    else: +        sock = httpd.socket +    return sock.getsockname()[1] + +  class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):      def log_message(self, format, *args):          pass @@ -31,6 +40,22 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):              self.send_header('Content-Type', 'video/mp4')              self.end_headers()              self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') +        elif self.path == '/302': +            if sys.version_info[0] == 3: +                # XXX: Python 3 http server does not allow non-ASCII header values +                self.send_response(404) +                self.end_headers() +                return + +            new_url = 'http://localhost:%d/中文.html' % http_server_port(self.server) +            self.send_response(302) +            self.send_header(b'Location', new_url.encode('utf-8')) +            self.end_headers() +        elif self.path == '/%E4%B8%AD%E6%96%87.html': +            self.send_response(200) +            self.send_header('Content-Type', 'text/html; charset=utf-8') +            self.end_headers() +            self.wfile.write(b'<html><video src="/vid.mp4" /></html>')          else:              assert False @@ -48,17 +73,31 @@ class FakeLogger(object):  class TestHTTP(unittest.TestCase):      def setUp(self): +        self.httpd = compat_http_server.HTTPServer( +            ('localhost', 0), HTTPTestRequestHandler) +        self.port = http_server_port(self.httpd) +        self.server_thread = threading.Thread(target=self.httpd.serve_forever) +        self.server_thread.daemon = True +        self.server_thread.start() + +    def test_unicode_path_redirection(self): +        # XXX: Python 3 http server does not allow non-ASCII header values +        if sys.version_info[0] == 3: +            return + +        ydl = YoutubeDL({'logger': FakeLogger()}) +        r = ydl.extract_info('http://localhost:%d/302' % self.port) +        self.assertEqual(r['url'], 'http://localhost:%d/vid.mp4' % self.port) + + +class TestHTTPS(unittest.TestCase): +    def setUp(self):          certfn = os.path.join(TEST_DIR, 'testcert.pem')          self.httpd = compat_http_server.HTTPServer(              ('localhost', 0), HTTPTestRequestHandler)          self.httpd.socket = ssl.wrap_socket(              self.httpd.socket, certfile=certfn, server_side=True) -        if os.name == 'java': -            # In Jython SSLSocket is not a subclass of socket.socket -            sock = self.httpd.socket.sock -        else: -            sock = self.httpd.socket -        self.port = sock.getsockname()[1] +        self.port = http_server_port(self.httpd)          self.server_thread = threading.Thread(target=self.httpd.serve_forever)          self.server_thread.daemon = True          self.server_thread.start() @@ -94,14 +133,14 @@ class TestProxy(unittest.TestCase):      def setUp(self):          self.proxy = compat_http_server.HTTPServer(              ('localhost', 0), _build_proxy_handler('normal')) -        self.port = self.proxy.socket.getsockname()[1] +        self.port = http_server_port(self.proxy)          self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)          self.proxy_thread.daemon = True          self.proxy_thread.start()          self.cn_proxy = compat_http_server.HTTPServer(              ('localhost', 0), _build_proxy_handler('cn')) -        self.cn_port = self.cn_proxy.socket.getsockname()[1] +        self.cn_port = http_server_port(self.cn_proxy)          self.cn_proxy_thread = threading.Thread(target=self.cn_proxy.serve_forever)          self.cn_proxy_thread.daemon = True          self.cn_proxy_thread.start() diff --git a/test/test_socks.py b/test/test_socks.py new file mode 100644 index 000000000..1e68eb0da --- /dev/null +++ b/test/test_socks.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# coding: utf-8 +from __future__ import unicode_literals + +# Allow direct execution +import os +import sys +import unittest +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import random +import subprocess + +from test.helper import ( +    FakeYDL, +    get_params, +) +from youtube_dl.compat import ( +    compat_str, +    compat_urllib_request, +) + + +class TestMultipleSocks(unittest.TestCase): +    @staticmethod +    def _check_params(attrs): +        params = get_params() +        for attr in attrs: +            if attr not in params: +                print('Missing %s. Skipping.' % attr) +                return +        return params + +    def test_proxy_http(self): +        params = self._check_params(['primary_proxy', 'primary_server_ip']) +        if params is None: +            return +        ydl = FakeYDL({ +            'proxy': params['primary_proxy'] +        }) +        self.assertEqual( +            ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8'), +            params['primary_server_ip']) + +    def test_proxy_https(self): +        params = self._check_params(['primary_proxy', 'primary_server_ip']) +        if params is None: +            return +        ydl = FakeYDL({ +            'proxy': params['primary_proxy'] +        }) +        self.assertEqual( +            ydl.urlopen('https://yt-dl.org/ip').read().decode('utf-8'), +            params['primary_server_ip']) + +    def test_secondary_proxy_http(self): +        params = self._check_params(['secondary_proxy', 'secondary_server_ip']) +        if params is None: +            return +        ydl = FakeYDL() +        req = compat_urllib_request.Request('http://yt-dl.org/ip') +        req.add_header('Ytdl-request-proxy', params['secondary_proxy']) +        self.assertEqual( +            ydl.urlopen(req).read().decode('utf-8'), +            params['secondary_server_ip']) + +    def test_secondary_proxy_https(self): +        params = self._check_params(['secondary_proxy', 'secondary_server_ip']) +        if params is None: +            return +        ydl = FakeYDL() +        req = compat_urllib_request.Request('https://yt-dl.org/ip') +        req.add_header('Ytdl-request-proxy', params['secondary_proxy']) +        self.assertEqual( +            ydl.urlopen(req).read().decode('utf-8'), +            params['secondary_server_ip']) + + +class TestSocks(unittest.TestCase): +    _SKIP_SOCKS_TEST = True + +    def setUp(self): +        if self._SKIP_SOCKS_TEST: +            return + +        self.port = random.randint(20000, 30000) +        self.server_process = subprocess.Popen([ +            'srelay', '-f', '-i', '127.0.0.1:%d' % self.port], +            stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + +    def tearDown(self): +        if self._SKIP_SOCKS_TEST: +            return + +        self.server_process.terminate() +        self.server_process.communicate() + +    def _get_ip(self, protocol): +        if self._SKIP_SOCKS_TEST: +            return '127.0.0.1' + +        ydl = FakeYDL({ +            'proxy': '%s://127.0.0.1:%d' % (protocol, self.port), +        }) +        return ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8') + +    def test_socks4(self): +        self.assertTrue(isinstance(self._get_ip('socks4'), compat_str)) + +    def test_socks4a(self): +        self.assertTrue(isinstance(self._get_ip('socks4a'), compat_str)) + +    def test_socks5(self): +        self.assertTrue(isinstance(self._get_ip('socks5'), compat_str)) + + +if __name__ == '__main__': +    unittest.main() diff --git a/test/test_utils.py b/test/test_utils.py index e16a6761b..feef80465 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -50,12 +50,13 @@ from youtube_dl.utils import (      sanitize_path,      prepend_extension,      replace_extension, +    remove_start, +    remove_end,      remove_quotes,      shell_quote,      smuggle_url,      str_to_int,      strip_jsonp, -    struct_unpack,      timeconvert,      unescapeHTML,      unified_strdate, @@ -139,8 +140,8 @@ class TestUtil(unittest.TestCase):          self.assertEqual('yes_no', sanitize_filename('yes? no', restricted=True))          self.assertEqual('this_-_that', sanitize_filename('this: that', restricted=True)) -        tests = 'a\xe4b\u4e2d\u56fd\u7684c' -        self.assertEqual(sanitize_filename(tests, restricted=True), 'a_b_c') +        tests = 'aäb\u4e2d\u56fd\u7684c' +        self.assertEqual(sanitize_filename(tests, restricted=True), 'aab_c')          self.assertTrue(sanitize_filename('\xf6', restricted=True) != '')  # No empty filename          forbidden = '"\0\\/&!: \'\t\n()[]{}$;`^,#' @@ -155,6 +156,10 @@ class TestUtil(unittest.TestCase):          self.assertTrue(sanitize_filename('-', restricted=True) != '')          self.assertTrue(sanitize_filename(':', restricted=True) != '') +        self.assertEqual(sanitize_filename( +            'ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ', restricted=True), +            'AAAAAAAECEEEEIIIIDNOOOOOOOOEUUUUUYPssaaaaaaaeceeeeiiiionooooooooeuuuuuypy') +      def test_sanitize_ids(self):          self.assertEqual(sanitize_filename('_n_cd26wFpw', is_id=True), '_n_cd26wFpw')          self.assertEqual(sanitize_filename('_BD_eEpuzXw', is_id=True), '_BD_eEpuzXw') @@ -212,6 +217,16 @@ class TestUtil(unittest.TestCase):          self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp')          self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp') +    def test_remove_start(self): +        self.assertEqual(remove_start(None, 'A - '), None) +        self.assertEqual(remove_start('A - B', 'A - '), 'B') +        self.assertEqual(remove_start('B - A', 'A - '), 'B - A') + +    def test_remove_end(self): +        self.assertEqual(remove_end(None, ' - B'), None) +        self.assertEqual(remove_end('A - B', ' - B'), 'A') +        self.assertEqual(remove_end('B - A', ' - B'), 'B - A') +      def test_remove_quotes(self):          self.assertEqual(remove_quotes(None), None)          self.assertEqual(remove_quotes('"'), '"') @@ -453,9 +468,6 @@ class TestUtil(unittest.TestCase):          testPL(5, 2, (2, 99), [2, 3, 4])          testPL(5, 2, (20, 99), []) -    def test_struct_unpack(self): -        self.assertEqual(struct_unpack('!B', b'\x00'), (0,)) -      def test_read_batch_urls(self):          f = io.StringIO('''\xef\xbb\xbf foo              bar\r @@ -617,6 +629,15 @@ class TestUtil(unittest.TestCase):          json_code = js_to_json(inp)          self.assertEqual(json.loads(json_code), json.loads(inp)) +        inp = '''{ +            0:{src:'skipped', type: 'application/dash+xml'}, +            1:{src:'skipped', type: 'application/vnd.apple.mpegURL'}, +        }''' +        self.assertEqual(js_to_json(inp), '''{ +            "0":{"src":"skipped", "type": "application/dash+xml"}, +            "1":{"src":"skipped", "type": "application/vnd.apple.mpegURL"} +        }''') +      def test_js_to_json_edgecases(self):          on = js_to_json("{abc_def:'1\\'\\\\2\\\\\\'3\"4'}")          self.assertEqual(json.loads(on), {"abc_def": "1'\\2\\'3\"4"}) @@ -640,6 +661,27 @@ class TestUtil(unittest.TestCase):          on = js_to_json('{"abc": "def",}')          self.assertEqual(json.loads(on), {'abc': 'def'}) +        on = js_to_json('{ 0: /* " \n */ ",]" , }') +        self.assertEqual(json.loads(on), {'0': ',]'}) + +        on = js_to_json(r'["<p>x<\/p>"]') +        self.assertEqual(json.loads(on), ['<p>x</p>']) + +        on = js_to_json(r'["\xaa"]') +        self.assertEqual(json.loads(on), ['\u00aa']) + +        on = js_to_json("['a\\\nb']") +        self.assertEqual(json.loads(on), ['ab']) + +        on = js_to_json('{0xff:0xff}') +        self.assertEqual(json.loads(on), {'255': 255}) + +        on = js_to_json('{077:077}') +        self.assertEqual(json.loads(on), {'63': 63}) + +        on = js_to_json('{42:42}') +        self.assertEqual(json.loads(on), {'42': 42}) +      def test_extract_attributes(self):          self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'})          self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'}) | 
