aboutsummaryrefslogtreecommitdiff
path: root/test/test_downloader_http.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_downloader_http.py')
-rw-r--r--test/test_downloader_http.py123
1 files changed, 123 insertions, 0 deletions
diff --git a/test/test_downloader_http.py b/test/test_downloader_http.py
new file mode 100644
index 000000000..95d152072
--- /dev/null
+++ b/test/test_downloader_http.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+# coding: utf-8
+from __future__ import unicode_literals
+
+# Allow direct execution
+import os
+import re
+import sys
+import unittest
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from youtube_dl import YoutubeDL
+from youtube_dl.compat import compat_http_server
+from youtube_dl.downloader.http import HttpFD
+from youtube_dl.utils import encodeFilename
+import ssl
+import threading
+
+TEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def http_server_port(httpd):
+ if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket):
+ # In Jython SSLSocket is not a subclass of socket.socket
+ sock = httpd.socket.sock
+ else:
+ sock = httpd.socket
+ return sock.getsockname()[1]
+
+
+TEST_SIZE = 10 * 1024
+
+
+class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
+ def log_message(self, format, *args):
+ pass
+
+ def send_content_range(self, total=None):
+ range_header = self.headers.get('Range')
+ start = end = None
+ if range_header:
+ mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)
+ if mobj:
+ start = int(mobj.group(1))
+ end = int(mobj.group(2))
+ valid_range = start is not None and end is not None
+ if valid_range:
+ content_range = 'bytes %d-%d' % (start, end)
+ if total:
+ content_range += '/%d' % total
+ self.send_header('Content-Range', content_range)
+ return (end - start + 1) if valid_range else total
+
+ def serve(self, range=True, content_length=True):
+ self.send_response(200)
+ self.send_header('Content-Type', 'video/mp4')
+ size = TEST_SIZE
+ if range:
+ size = self.send_content_range(TEST_SIZE)
+ if content_length:
+ self.send_header('Content-Length', size)
+ self.end_headers()
+ self.wfile.write(b'#' * size)
+
+ def do_GET(self):
+ if self.path == '/regular':
+ self.serve()
+ elif self.path == '/no-content-length':
+ self.serve(content_length=False)
+ elif self.path == '/no-range':
+ self.serve(range=False)
+ elif self.path == '/no-range-no-content-length':
+ self.serve(range=False, content_length=False)
+ else:
+ assert False
+
+
+class FakeLogger(object):
+ def debug(self, msg):
+ pass
+
+ def warning(self, msg):
+ pass
+
+ def error(self, msg):
+ pass
+
+
+class TestHttpFD(unittest.TestCase):
+ def setUp(self):
+ self.httpd = compat_http_server.HTTPServer(
+ ('127.0.0.1', 0), HTTPTestRequestHandler)
+ self.port = http_server_port(self.httpd)
+ self.server_thread = threading.Thread(target=self.httpd.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+
+ def download(self, params, ep):
+ params['logger'] = FakeLogger()
+ ydl = YoutubeDL(params)
+ downloader = HttpFD(ydl, params)
+ filename = 'testfile.mp4'
+ self.assertTrue(downloader.real_download(filename, {
+ 'url': 'http://127.0.0.1:%d/%s' % (self.port, ep),
+ }))
+ self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
+ os.remove(encodeFilename(filename))
+
+ def download_all(self, params):
+ for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
+ self.download(params, ep)
+
+ def test_regular(self):
+ self.download_all({})
+
+ def test_chunked(self):
+ self.download_all({
+ 'http_chunk_size': 1000,
+ })
+
+
+if __name__ == '__main__':
+ unittest.main()