aboutsummaryrefslogtreecommitdiff
path: root/test/test_download.py
blob: 0428a79fe7450e0fdbd97cc698c243fa76d9ea6e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python

import errno
import hashlib
import io
import os
import json
import unittest
import sys
import socket
import binascii

# Allow direct execution
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import youtube_dl.YoutubeDL
import youtube_dl.extractor
from youtube_dl.utils import *

DEF_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tests.json')
PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json")

RETRIES = 3

# General configuration (from __init__, not very elegant...)
jar = compat_cookiejar.CookieJar()
cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar)
proxy_handler = compat_urllib_request.ProxyHandler()
opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
compat_urllib_request.install_opener(opener)
socket.setdefaulttimeout(10)

def _try_rm(filename):
    """ Remove a file if it exists """
    try:
        os.remove(filename)
    except OSError as ose:
        if ose.errno != errno.ENOENT:
            raise

md5 = lambda s: hashlib.md5(s.encode('utf-8')).hexdigest()

class YoutubeDL(youtube_dl.YoutubeDL):
    def __init__(self, *args, **kwargs):
        self.to_stderr = self.to_screen
        self.processed_info_dicts = []
        super(YoutubeDL, self).__init__(*args, **kwargs)
    def report_warning(self, message):
        # Don't accept warnings during tests
        raise ExtractorError(message)
    def process_info(self, info_dict):
        self.processed_info_dicts.append(info_dict)
        return super(YoutubeDL, self).process_info(info_dict)

def _file_md5(fn):
    with open(fn, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()

with io.open(DEF_FILE, encoding='utf-8') as deff:
    defs = json.load(deff)
for ie in youtube_dl.extractor.gen_extractors():
    t = getattr(ie, '_TEST', None)
    if t:
        t['name'] = type(ie).__name__[:-len('IE')]
        defs.append(t)


with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
    parameters = json.load(pf)


class TestDownload(unittest.TestCase):
    maxDiff = None
    def setUp(self):
        self.parameters = parameters
        self.defs = defs

### Dynamically generate tests
def generator(test_case):

    def test_template(self):
        ie = youtube_dl.extractor.get_info_extractor(test_case['name'])
        if not ie._WORKING:
            print('Skipping: IE marked as not _WORKING')
            return
        if 'playlist' not in test_case and not test_case['file']:
            print('Skipping: No output file specified')
            return
        if 'skip' in test_case:
            print('Skipping: {0}'.format(test_case['skip']))
            return

        params = self.parameters.copy()
        params.update(test_case.get('params', {}))

        ydl = YoutubeDL(params)
        for ie in youtube_dl.extractor.gen_extractors():
            ydl.add_info_extractor(ie)
        finished_hook_called = set()
        def _hook(status):
            if status['status'] == 'finished':
                finished_hook_called.add(status['filename'])
        ydl.fd.add_progress_hook(_hook)

        test_cases = test_case.get('playlist', [test_case])
        for tc in test_cases:
            _try_rm(tc['file'])
            _try_rm(tc['file'] + '.part')
            _try_rm(tc['file'] + '.info.json')
        try:
            for retry in range(1, RETRIES + 1):
                try:
                    ydl.download([test_case['url']])
                except (DownloadError, ExtractorError) as err:
                    if retry == RETRIES: raise

                    # Check if the exception is not a network related one
                    if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError):
                        raise

                    print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry))
                else:
                    break

            for tc in test_cases:
                if not test_case.get('params', {}).get('skip_download', False):
                    self.assertTrue(os.path.exists(tc['file']), msg='Missing file ' + tc['file'])
                    self.assertTrue(tc['file'] in finished_hook_called)
                self.assertTrue(os.path.exists(tc['file'] + '.info.json'))
                if 'md5' in tc:
                    md5_for_file = _file_md5(tc['file'])
                    self.assertEqual(md5_for_file, tc['md5'])
                with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof:
                    info_dict = json.load(infof)
                for (info_field, expected) in tc.get('info_dict', {}).items():
                    if isinstance(expected, compat_str) and expected.startswith('md5:'):
                        self.assertEqual(expected, 'md5:' + md5(info_dict.get(info_field)))
                    else:
                        got = info_dict.get(info_field)
                        self.assertEqual(
                            expected, got,
                            u'invalid value for field %s, expected %r, got %r' % (info_field, expected, got))

                # If checkable fields are missing from the test case, print the info_dict
                test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value))
                    for key, value in info_dict.items()
                    if value and key in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location'))
                if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()):
                    sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=2) + u'\n')

                # Check for the presence of mandatory fields
                for key in ('id', 'url', 'title', 'ext'):
                    self.assertTrue(key in info_dict.keys() and info_dict[key])
        finally:
            for tc in test_cases:
                _try_rm(tc['file'])
                _try_rm(tc['file'] + '.part')
                _try_rm(tc['file'] + '.info.json')

    return test_template

### And add them to TestDownload
for n, test_case in enumerate(defs):
    test_method = generator(test_case)
    test_method.__name__ = "test_{0}".format(test_case["name"])
    if getattr(TestDownload, test_method.__name__, False):
        test_method.__name__ = "test_{0}_{1}".format(test_case["name"], n)
    setattr(TestDownload, test_method.__name__, test_method)
    del test_method


if __name__ == '__main__':
    unittest.main()