aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_YoutubeDL.py70
-rw-r--r--test/test_aes.py47
-rw-r--r--test/test_http.py49
-rw-r--r--test/test_utils.py34
4 files changed, 199 insertions, 1 deletions
diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py
index db8a47d2d..652519831 100644
--- a/test/test_YoutubeDL.py
+++ b/test/test_YoutubeDL.py
@@ -14,6 +14,7 @@ from test.helper import FakeYDL, assertRegexpMatches
from youtube_dl import YoutubeDL
from youtube_dl.extractor import YoutubeIE
from youtube_dl.postprocessor.common import PostProcessor
+from youtube_dl.utils import match_filter_func
TEST_URL = 'http://localhost/sample.mp4'
@@ -339,6 +340,8 @@ class TestFormatSelection(unittest.TestCase):
downloaded = ydl.downloaded_info_dicts[0]
self.assertEqual(downloaded['format_id'], 'G')
+
+class TestYoutubeDL(unittest.TestCase):
def test_subtitles(self):
def s_formats(lang, autocaption=False):
return [{
@@ -461,6 +464,73 @@ class TestFormatSelection(unittest.TestCase):
self.assertTrue(os.path.exists(audiofile), '%s doesn\'t exist' % audiofile)
os.unlink(audiofile)
+ def test_match_filter(self):
+ class FilterYDL(YDL):
+ def __init__(self, *args, **kwargs):
+ super(FilterYDL, self).__init__(*args, **kwargs)
+ self.params['simulate'] = True
+
+ def process_info(self, info_dict):
+ super(YDL, self).process_info(info_dict)
+
+ def _match_entry(self, info_dict, incomplete):
+ res = super(FilterYDL, self)._match_entry(info_dict, incomplete)
+ if res is None:
+ self.downloaded_info_dicts.append(info_dict)
+ return res
+
+ first = {
+ 'id': '1',
+ 'url': TEST_URL,
+ 'title': 'one',
+ 'extractor': 'TEST',
+ 'duration': 30,
+ 'filesize': 10 * 1024,
+ }
+ second = {
+ 'id': '2',
+ 'url': TEST_URL,
+ 'title': 'two',
+ 'extractor': 'TEST',
+ 'duration': 10,
+ 'description': 'foo',
+ 'filesize': 5 * 1024,
+ }
+ videos = [first, second]
+
+ def get_videos(filter_=None):
+ ydl = FilterYDL({'match_filter': filter_})
+ for v in videos:
+ ydl.process_ie_result(v, download=True)
+ return [v['id'] for v in ydl.downloaded_info_dicts]
+
+ res = get_videos()
+ self.assertEqual(res, ['1', '2'])
+
+ def f(v):
+ if v['id'] == '1':
+ return None
+ else:
+ return 'Video id is not 1'
+ res = get_videos(f)
+ self.assertEqual(res, ['1'])
+
+ f = match_filter_func('duration < 30')
+ res = get_videos(f)
+ self.assertEqual(res, ['2'])
+
+ f = match_filter_func('description = foo')
+ res = get_videos(f)
+ self.assertEqual(res, ['2'])
+
+ f = match_filter_func('description =? foo')
+ res = get_videos(f)
+ self.assertEqual(res, ['1', '2'])
+
+ f = match_filter_func('filesize > 5KiB')
+ res = get_videos(f)
+ self.assertEqual(res, ['1'])
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_aes.py b/test/test_aes.py
new file mode 100644
index 000000000..111b902e1
--- /dev/null
+++ b/test/test_aes.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+
+from __future__ import unicode_literals
+
+# Allow direct execution
+import os
+import sys
+import unittest
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from youtube_dl.aes import aes_decrypt, aes_encrypt, aes_cbc_decrypt, aes_decrypt_text
+from youtube_dl.utils import bytes_to_intlist, intlist_to_bytes
+import base64
+
+# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
+
+
+class TestAES(unittest.TestCase):
+ def setUp(self):
+ self.key = self.iv = [0x20, 0x15] + 14 * [0]
+ self.secret_msg = b'Secret message goes here'
+
+ def test_encrypt(self):
+ msg = b'message'
+ key = list(range(16))
+ encrypted = aes_encrypt(bytes_to_intlist(msg), key)
+ decrypted = intlist_to_bytes(aes_decrypt(encrypted, key))
+ self.assertEqual(decrypted, msg)
+
+ def test_cbc_decrypt(self):
+ data = bytes_to_intlist(
+ b"\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd"
+ )
+ decrypted = intlist_to_bytes(aes_cbc_decrypt(data, self.key, self.iv))
+ self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
+
+ def test_decrypt_text(self):
+ password = intlist_to_bytes(self.key).decode('utf-8')
+ encrypted = base64.b64encode(
+ intlist_to_bytes(self.iv[:8]) +
+ b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae'
+ )
+ decrypted = (aes_decrypt_text(encrypted, password, 16))
+ self.assertEqual(decrypted, self.secret_msg)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_http.py b/test/test_http.py
index bd4d46fef..f2e305b6f 100644
--- a/test/test_http.py
+++ b/test/test_http.py
@@ -8,7 +8,7 @@ import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from youtube_dl import YoutubeDL
-from youtube_dl.compat import compat_http_server
+from youtube_dl.compat import compat_http_server, compat_urllib_request
import ssl
import threading
@@ -68,5 +68,52 @@ class TestHTTP(unittest.TestCase):
r = ydl.extract_info('https://localhost:%d/video.html' % self.port)
self.assertEqual(r['url'], 'https://localhost:%d/vid.mp4' % self.port)
+
+def _build_proxy_handler(name):
+ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
+ proxy_name = name
+
+ def log_message(self, format, *args):
+ pass
+
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain; charset=utf-8')
+ self.end_headers()
+ self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8'))
+ return HTTPTestRequestHandler
+
+
+class TestProxy(unittest.TestCase):
+ def setUp(self):
+ self.proxy = compat_http_server.HTTPServer(
+ ('localhost', 0), _build_proxy_handler('normal'))
+ self.port = self.proxy.socket.getsockname()[1]
+ self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
+ self.proxy_thread.daemon = True
+ self.proxy_thread.start()
+
+ self.cn_proxy = compat_http_server.HTTPServer(
+ ('localhost', 0), _build_proxy_handler('cn'))
+ self.cn_port = self.cn_proxy.socket.getsockname()[1]
+ self.cn_proxy_thread = threading.Thread(target=self.cn_proxy.serve_forever)
+ self.cn_proxy_thread.daemon = True
+ self.cn_proxy_thread.start()
+
+ def test_proxy(self):
+ cn_proxy = 'localhost:{0}'.format(self.cn_port)
+ ydl = YoutubeDL({
+ 'proxy': 'localhost:{0}'.format(self.port),
+ 'cn_verification_proxy': cn_proxy,
+ })
+ url = 'http://foo.com/bar'
+ response = ydl.urlopen(url).read().decode('utf-8')
+ self.assertEqual(response, 'normal: {0}'.format(url))
+
+ req = compat_urllib_request.Request(url)
+ req.add_header('Ytdl-request-proxy', cn_proxy)
+ response = ydl.urlopen(req).read().decode('utf-8')
+ self.assertEqual(response, 'cn: {0}'.format(url))
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/test_utils.py b/test/test_utils.py
index 8f790bf0a..a8ab87685 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -24,6 +24,7 @@ from youtube_dl.utils import (
encodeFilename,
escape_rfc3986,
escape_url,
+ ExtractorError,
find_xpath_attr,
fix_xml_ampersands,
InAdvancePagedList,
@@ -39,6 +40,7 @@ from youtube_dl.utils import (
read_batch_urls,
sanitize_filename,
sanitize_path,
+ sanitize_url_path_consecutive_slashes,
shell_quote,
smuggle_url,
str_to_int,
@@ -53,6 +55,7 @@ from youtube_dl.utils import (
urlencode_postdata,
version_tuple,
xpath_with_ns,
+ xpath_text,
render_table,
match_str,
)
@@ -168,6 +171,26 @@ class TestUtil(unittest.TestCase):
self.assertEqual(sanitize_path('./abc'), 'abc')
self.assertEqual(sanitize_path('./../abc'), '..\\abc')
+ def test_sanitize_url_path_consecutive_slashes(self):
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname/foo//bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname//foo/bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname//'),
+ 'http://hostname/')
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname/foo/bar/filename.html'),
+ 'http://hostname/foo/bar/filename.html')
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname/'),
+ 'http://hostname/')
+ self.assertEqual(
+ sanitize_url_path_consecutive_slashes('http://hostname/abc//'),
+ 'http://hostname/abc/')
+
def test_ordered_set(self):
self.assertEqual(orderedSet([1, 1, 2, 3, 4, 4, 5, 6, 7, 3, 5]), [1, 2, 3, 4, 5, 6, 7])
self.assertEqual(orderedSet([]), [])
@@ -229,6 +252,17 @@ class TestUtil(unittest.TestCase):
self.assertEqual(find('media:song/media:author').text, 'The Author')
self.assertEqual(find('media:song/url').text, 'http://server.com/download.mp3')
+ def test_xpath_text(self):
+ testxml = '''<root>
+ <div>
+ <p>Foo</p>
+ </div>
+ </root>'''
+ doc = xml.etree.ElementTree.fromstring(testxml)
+ self.assertEqual(xpath_text(doc, 'div/p'), 'Foo')
+ self.assertTrue(xpath_text(doc, 'div/bar') is None)
+ self.assertRaises(ExtractorError, xpath_text, doc, 'div/bar', fatal=True)
+
def test_smuggle_url(self):
data = {"ö": "ö", "abc": [3]}
url = 'https://foo.bar/baz?x=y#a'