diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/helper.py | 5 | ||||
-rw-r--r-- | test/test_compat.py | 30 | ||||
-rw-r--r-- | test/test_http.py | 55 | ||||
-rw-r--r-- | test/test_socks.py | 118 | ||||
-rw-r--r-- | test/test_utils.py | 54 |
5 files changed, 240 insertions, 22 deletions
diff --git a/test/helper.py b/test/helper.py index b8e22c5cb..dfee217a9 100644 --- a/test/helper.py +++ b/test/helper.py @@ -24,8 +24,13 @@ from youtube_dl.utils import ( def get_params(override=None): PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json") + LOCAL_PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), + "local_parameters.json") with io.open(PARAMETERS_FILE, encoding='utf-8') as pf: parameters = json.load(pf) + if os.path.exists(LOCAL_PARAMETERS_FILE): + with io.open(LOCAL_PARAMETERS_FILE, encoding='utf-8') as pf: + parameters.update(json.load(pf)) if override: parameters.update(override) return parameters diff --git a/test/test_compat.py b/test/test_compat.py index 618668210..f5317ac3e 100644 --- a/test/test_compat.py +++ b/test/test_compat.py @@ -10,13 +10,14 @@ import unittest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from youtube_dl.utils import get_filesystem_encoding from youtube_dl.compat import ( compat_getenv, + compat_setenv, compat_etree_fromstring, compat_expanduser, compat_shlex_split, compat_str, + compat_struct_unpack, compat_urllib_parse_unquote, compat_urllib_parse_unquote_plus, compat_urllib_parse_urlencode, @@ -26,19 +27,22 @@ from youtube_dl.compat import ( class TestCompat(unittest.TestCase): def test_compat_getenv(self): test_str = 'тест' - os.environ['YOUTUBE-DL-TEST'] = ( - test_str if sys.version_info >= (3, 0) - else test_str.encode(get_filesystem_encoding())) + compat_setenv('YOUTUBE-DL-TEST', test_str) self.assertEqual(compat_getenv('YOUTUBE-DL-TEST'), test_str) + def test_compat_setenv(self): + test_var = 'YOUTUBE-DL-TEST' + test_str = 'тест' + compat_setenv(test_var, test_str) + compat_getenv(test_var) + self.assertEqual(compat_getenv(test_var), test_str) + def test_compat_expanduser(self): old_home = os.environ.get('HOME') test_str = 'C:\Documents and Settings\тест\Application Data' - os.environ['HOME'] = ( - test_str if sys.version_info >= (3, 0) - else test_str.encode(get_filesystem_encoding())) + compat_setenv('HOME', test_str) self.assertEqual(compat_expanduser('~'), test_str) - os.environ['HOME'] = old_home + compat_setenv('HOME', old_home or '') def test_all_present(self): import youtube_dl.compat @@ -99,5 +103,15 @@ class TestCompat(unittest.TestCase): self.assertTrue(isinstance(doc.find('chinese').text, compat_str)) self.assertTrue(isinstance(doc.find('foo/bar').text, compat_str)) + def test_compat_etree_fromstring_doctype(self): + xml = '''<?xml version="1.0"?> +<!DOCTYPE smil PUBLIC "-//W3C//DTD SMIL 2.0//EN" "http://www.w3.org/2001/SMIL20/SMIL20.dtd"> +<smil xmlns="http://www.w3.org/2001/SMIL20/Language"></smil>''' + compat_etree_fromstring(xml) + + def test_struct_unpack(self): + self.assertEqual(compat_struct_unpack('!B', b'\x00'), (0,)) + + if __name__ == '__main__': unittest.main() diff --git a/test/test_http.py b/test/test_http.py index 15e0ad369..5076ced51 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -16,6 +16,15 @@ import threading TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +def http_server_port(httpd): + if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket): + # In Jython SSLSocket is not a subclass of socket.socket + sock = httpd.socket.sock + else: + sock = httpd.socket + return sock.getsockname()[1] + + class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): def log_message(self, format, *args): pass @@ -31,6 +40,22 @@ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): self.send_header('Content-Type', 'video/mp4') self.end_headers() self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]') + elif self.path == '/302': + if sys.version_info[0] == 3: + # XXX: Python 3 http server does not allow non-ASCII header values + self.send_response(404) + self.end_headers() + return + + new_url = 'http://localhost:%d/中文.html' % http_server_port(self.server) + self.send_response(302) + self.send_header(b'Location', new_url.encode('utf-8')) + self.end_headers() + elif self.path == '/%E4%B8%AD%E6%96%87.html': + self.send_response(200) + self.send_header('Content-Type', 'text/html; charset=utf-8') + self.end_headers() + self.wfile.write(b'<html><video src="/vid.mp4" /></html>') else: assert False @@ -48,17 +73,31 @@ class FakeLogger(object): class TestHTTP(unittest.TestCase): def setUp(self): + self.httpd = compat_http_server.HTTPServer( + ('localhost', 0), HTTPTestRequestHandler) + self.port = http_server_port(self.httpd) + self.server_thread = threading.Thread(target=self.httpd.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + + def test_unicode_path_redirection(self): + # XXX: Python 3 http server does not allow non-ASCII header values + if sys.version_info[0] == 3: + return + + ydl = YoutubeDL({'logger': FakeLogger()}) + r = ydl.extract_info('http://localhost:%d/302' % self.port) + self.assertEqual(r['url'], 'http://localhost:%d/vid.mp4' % self.port) + + +class TestHTTPS(unittest.TestCase): + def setUp(self): certfn = os.path.join(TEST_DIR, 'testcert.pem') self.httpd = compat_http_server.HTTPServer( ('localhost', 0), HTTPTestRequestHandler) self.httpd.socket = ssl.wrap_socket( self.httpd.socket, certfile=certfn, server_side=True) - if os.name == 'java': - # In Jython SSLSocket is not a subclass of socket.socket - sock = self.httpd.socket.sock - else: - sock = self.httpd.socket - self.port = sock.getsockname()[1] + self.port = http_server_port(self.httpd) self.server_thread = threading.Thread(target=self.httpd.serve_forever) self.server_thread.daemon = True self.server_thread.start() @@ -94,14 +133,14 @@ class TestProxy(unittest.TestCase): def setUp(self): self.proxy = compat_http_server.HTTPServer( ('localhost', 0), _build_proxy_handler('normal')) - self.port = self.proxy.socket.getsockname()[1] + self.port = http_server_port(self.proxy) self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) self.proxy_thread.daemon = True self.proxy_thread.start() self.cn_proxy = compat_http_server.HTTPServer( ('localhost', 0), _build_proxy_handler('cn')) - self.cn_port = self.cn_proxy.socket.getsockname()[1] + self.cn_port = http_server_port(self.cn_proxy) self.cn_proxy_thread = threading.Thread(target=self.cn_proxy.serve_forever) self.cn_proxy_thread.daemon = True self.cn_proxy_thread.start() diff --git a/test/test_socks.py b/test/test_socks.py new file mode 100644 index 000000000..1e68eb0da --- /dev/null +++ b/test/test_socks.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# coding: utf-8 +from __future__ import unicode_literals + +# Allow direct execution +import os +import sys +import unittest +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import random +import subprocess + +from test.helper import ( + FakeYDL, + get_params, +) +from youtube_dl.compat import ( + compat_str, + compat_urllib_request, +) + + +class TestMultipleSocks(unittest.TestCase): + @staticmethod + def _check_params(attrs): + params = get_params() + for attr in attrs: + if attr not in params: + print('Missing %s. Skipping.' % attr) + return + return params + + def test_proxy_http(self): + params = self._check_params(['primary_proxy', 'primary_server_ip']) + if params is None: + return + ydl = FakeYDL({ + 'proxy': params['primary_proxy'] + }) + self.assertEqual( + ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8'), + params['primary_server_ip']) + + def test_proxy_https(self): + params = self._check_params(['primary_proxy', 'primary_server_ip']) + if params is None: + return + ydl = FakeYDL({ + 'proxy': params['primary_proxy'] + }) + self.assertEqual( + ydl.urlopen('https://yt-dl.org/ip').read().decode('utf-8'), + params['primary_server_ip']) + + def test_secondary_proxy_http(self): + params = self._check_params(['secondary_proxy', 'secondary_server_ip']) + if params is None: + return + ydl = FakeYDL() + req = compat_urllib_request.Request('http://yt-dl.org/ip') + req.add_header('Ytdl-request-proxy', params['secondary_proxy']) + self.assertEqual( + ydl.urlopen(req).read().decode('utf-8'), + params['secondary_server_ip']) + + def test_secondary_proxy_https(self): + params = self._check_params(['secondary_proxy', 'secondary_server_ip']) + if params is None: + return + ydl = FakeYDL() + req = compat_urllib_request.Request('https://yt-dl.org/ip') + req.add_header('Ytdl-request-proxy', params['secondary_proxy']) + self.assertEqual( + ydl.urlopen(req).read().decode('utf-8'), + params['secondary_server_ip']) + + +class TestSocks(unittest.TestCase): + _SKIP_SOCKS_TEST = True + + def setUp(self): + if self._SKIP_SOCKS_TEST: + return + + self.port = random.randint(20000, 30000) + self.server_process = subprocess.Popen([ + 'srelay', '-f', '-i', '127.0.0.1:%d' % self.port], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + def tearDown(self): + if self._SKIP_SOCKS_TEST: + return + + self.server_process.terminate() + self.server_process.communicate() + + def _get_ip(self, protocol): + if self._SKIP_SOCKS_TEST: + return '127.0.0.1' + + ydl = FakeYDL({ + 'proxy': '%s://127.0.0.1:%d' % (protocol, self.port), + }) + return ydl.urlopen('http://yt-dl.org/ip').read().decode('utf-8') + + def test_socks4(self): + self.assertTrue(isinstance(self._get_ip('socks4'), compat_str)) + + def test_socks4a(self): + self.assertTrue(isinstance(self._get_ip('socks4a'), compat_str)) + + def test_socks5(self): + self.assertTrue(isinstance(self._get_ip('socks5'), compat_str)) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_utils.py b/test/test_utils.py index e16a6761b..feef80465 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -50,12 +50,13 @@ from youtube_dl.utils import ( sanitize_path, prepend_extension, replace_extension, + remove_start, + remove_end, remove_quotes, shell_quote, smuggle_url, str_to_int, strip_jsonp, - struct_unpack, timeconvert, unescapeHTML, unified_strdate, @@ -139,8 +140,8 @@ class TestUtil(unittest.TestCase): self.assertEqual('yes_no', sanitize_filename('yes? no', restricted=True)) self.assertEqual('this_-_that', sanitize_filename('this: that', restricted=True)) - tests = 'a\xe4b\u4e2d\u56fd\u7684c' - self.assertEqual(sanitize_filename(tests, restricted=True), 'a_b_c') + tests = 'aäb\u4e2d\u56fd\u7684c' + self.assertEqual(sanitize_filename(tests, restricted=True), 'aab_c') self.assertTrue(sanitize_filename('\xf6', restricted=True) != '') # No empty filename forbidden = '"\0\\/&!: \'\t\n()[]{}$;`^,#' @@ -155,6 +156,10 @@ class TestUtil(unittest.TestCase): self.assertTrue(sanitize_filename('-', restricted=True) != '') self.assertTrue(sanitize_filename(':', restricted=True) != '') + self.assertEqual(sanitize_filename( + 'ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ', restricted=True), + 'AAAAAAAECEEEEIIIIDNOOOOOOOOEUUUUUYPssaaaaaaaeceeeeiiiionooooooooeuuuuuypy') + def test_sanitize_ids(self): self.assertEqual(sanitize_filename('_n_cd26wFpw', is_id=True), '_n_cd26wFpw') self.assertEqual(sanitize_filename('_BD_eEpuzXw', is_id=True), '_BD_eEpuzXw') @@ -212,6 +217,16 @@ class TestUtil(unittest.TestCase): self.assertEqual(replace_extension('.abc', 'temp'), '.abc.temp') self.assertEqual(replace_extension('.abc.ext', 'temp'), '.abc.temp') + def test_remove_start(self): + self.assertEqual(remove_start(None, 'A - '), None) + self.assertEqual(remove_start('A - B', 'A - '), 'B') + self.assertEqual(remove_start('B - A', 'A - '), 'B - A') + + def test_remove_end(self): + self.assertEqual(remove_end(None, ' - B'), None) + self.assertEqual(remove_end('A - B', ' - B'), 'A') + self.assertEqual(remove_end('B - A', ' - B'), 'B - A') + def test_remove_quotes(self): self.assertEqual(remove_quotes(None), None) self.assertEqual(remove_quotes('"'), '"') @@ -453,9 +468,6 @@ class TestUtil(unittest.TestCase): testPL(5, 2, (2, 99), [2, 3, 4]) testPL(5, 2, (20, 99), []) - def test_struct_unpack(self): - self.assertEqual(struct_unpack('!B', b'\x00'), (0,)) - def test_read_batch_urls(self): f = io.StringIO('''\xef\xbb\xbf foo bar\r @@ -617,6 +629,15 @@ class TestUtil(unittest.TestCase): json_code = js_to_json(inp) self.assertEqual(json.loads(json_code), json.loads(inp)) + inp = '''{ + 0:{src:'skipped', type: 'application/dash+xml'}, + 1:{src:'skipped', type: 'application/vnd.apple.mpegURL'}, + }''' + self.assertEqual(js_to_json(inp), '''{ + "0":{"src":"skipped", "type": "application/dash+xml"}, + "1":{"src":"skipped", "type": "application/vnd.apple.mpegURL"} + }''') + def test_js_to_json_edgecases(self): on = js_to_json("{abc_def:'1\\'\\\\2\\\\\\'3\"4'}") self.assertEqual(json.loads(on), {"abc_def": "1'\\2\\'3\"4"}) @@ -640,6 +661,27 @@ class TestUtil(unittest.TestCase): on = js_to_json('{"abc": "def",}') self.assertEqual(json.loads(on), {'abc': 'def'}) + on = js_to_json('{ 0: /* " \n */ ",]" , }') + self.assertEqual(json.loads(on), {'0': ',]'}) + + on = js_to_json(r'["<p>x<\/p>"]') + self.assertEqual(json.loads(on), ['<p>x</p>']) + + on = js_to_json(r'["\xaa"]') + self.assertEqual(json.loads(on), ['\u00aa']) + + on = js_to_json("['a\\\nb']") + self.assertEqual(json.loads(on), ['ab']) + + on = js_to_json('{0xff:0xff}') + self.assertEqual(json.loads(on), {'255': 255}) + + on = js_to_json('{077:077}') + self.assertEqual(json.loads(on), {'63': 63}) + + on = js_to_json('{42:42}') + self.assertEqual(json.loads(on), {'42': 42}) + def test_extract_attributes(self): self.assertEqual(extract_attributes('<e x="y">'), {'x': 'y'}) self.assertEqual(extract_attributes("<e x='y'>"), {'x': 'y'}) |