diff options
| author | Sergey M․ <dstftw@gmail.com> | 2018-01-28 13:21:45 +0700 | 
|---|---|---|
| committer | Sergey M․ <dstftw@gmail.com> | 2018-02-03 23:08:58 +0700 | 
| commit | ba515388b8dc608a07b4390b601e227c1fce6fa9 (patch) | |
| tree | 86675008d6df5287205d732b22c559582b9a44c5 /test | |
| parent | e2e18694dbcd5963f2044837af0e566667d14347 (diff) | |
Introduce --http-chunk-size
Diffstat (limited to 'test')
| -rw-r--r-- | test/test_downloader_http.py | 123 | 
1 files changed, 123 insertions, 0 deletions
diff --git a/test/test_downloader_http.py b/test/test_downloader_http.py new file mode 100644 index 000000000..95d152072 --- /dev/null +++ b/test/test_downloader_http.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# coding: utf-8 +from __future__ import unicode_literals + +# Allow direct execution +import os +import re +import sys +import unittest +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from youtube_dl import YoutubeDL +from youtube_dl.compat import compat_http_server +from youtube_dl.downloader.http import HttpFD +from youtube_dl.utils import encodeFilename +import ssl +import threading + +TEST_DIR = os.path.dirname(os.path.abspath(__file__)) + + +def http_server_port(httpd): +    if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket): +        # In Jython SSLSocket is not a subclass of socket.socket +        sock = httpd.socket.sock +    else: +        sock = httpd.socket +    return sock.getsockname()[1] + + +TEST_SIZE = 10 * 1024 + + +class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): +    def log_message(self, format, *args): +        pass + +    def send_content_range(self, total=None): +        range_header = self.headers.get('Range') +        start = end = None +        if range_header: +            mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header) +            if mobj: +                start = int(mobj.group(1)) +                end = int(mobj.group(2)) +        valid_range = start is not None and end is not None +        if valid_range: +            content_range = 'bytes %d-%d' % (start, end) +            if total: +                content_range += '/%d' % total +            self.send_header('Content-Range', content_range) +        return (end - start + 1) if valid_range else total + +    def serve(self, range=True, content_length=True): +        self.send_response(200) +        self.send_header('Content-Type', 'video/mp4') +        size = TEST_SIZE +        if range: +            size = self.send_content_range(TEST_SIZE) +        if content_length: +            self.send_header('Content-Length', size) +        self.end_headers() +        self.wfile.write(b'#' * size) + +    def do_GET(self): +        if self.path == '/regular': +            self.serve() +        elif self.path == '/no-content-length': +            self.serve(content_length=False) +        elif self.path == '/no-range': +            self.serve(range=False) +        elif self.path == '/no-range-no-content-length': +            self.serve(range=False, content_length=False) +        else: +            assert False + + +class FakeLogger(object): +    def debug(self, msg): +        pass + +    def warning(self, msg): +        pass + +    def error(self, msg): +        pass + + +class TestHttpFD(unittest.TestCase): +    def setUp(self): +        self.httpd = compat_http_server.HTTPServer( +            ('127.0.0.1', 0), HTTPTestRequestHandler) +        self.port = http_server_port(self.httpd) +        self.server_thread = threading.Thread(target=self.httpd.serve_forever) +        self.server_thread.daemon = True +        self.server_thread.start() + +    def download(self, params, ep): +        params['logger'] = FakeLogger() +        ydl = YoutubeDL(params) +        downloader = HttpFD(ydl, params) +        filename = 'testfile.mp4' +        self.assertTrue(downloader.real_download(filename, { +            'url': 'http://127.0.0.1:%d/%s' % (self.port, ep), +        })) +        self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE) +        os.remove(encodeFilename(filename)) + +    def download_all(self, params): +        for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'): +            self.download(params, ep) + +    def test_regular(self): +        self.download_all({}) + +    def test_chunked(self): +        self.download_all({ +            'http_chunk_size': 1000, +        }) + + +if __name__ == '__main__': +    unittest.main()  | 
