| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
 | #!/usr/bin/env python
# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import get_params, get_testcases, global_setup, try_rm, md5
global_setup()
import hashlib
import io
import json
import socket
import youtube_dl.YoutubeDL
from youtube_dl.utils import (
    compat_str,
    compat_urllib_error,
    DownloadError,
    ExtractorError,
    UnavailableVideoError,
)
RETRIES = 3
class YoutubeDL(youtube_dl.YoutubeDL):
    def __init__(self, *args, **kwargs):
        self.to_stderr = self.to_screen
        self.processed_info_dicts = []
        super(YoutubeDL, self).__init__(*args, **kwargs)
    def report_warning(self, message):
        # Don't accept warnings during tests
        raise ExtractorError(message)
    def process_info(self, info_dict):
        self.processed_info_dicts.append(info_dict)
        return super(YoutubeDL, self).process_info(info_dict)
def _file_md5(fn):
    with open(fn, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()
defs = get_testcases()
class TestDownload(unittest.TestCase):
    maxDiff = None
    def setUp(self):
        self.defs = defs
### Dynamically generate tests
def generator(test_case):
    def test_template(self):
        ie = youtube_dl.extractor.get_info_extractor(test_case['name'])
        def print_skipping(reason):
            print('Skipping %s: %s' % (test_case['name'], reason))
        if not ie._WORKING:
            print_skipping('IE marked as not _WORKING')
            return
        if 'playlist' not in test_case and not test_case['file']:
            print_skipping('No output file specified')
            return
        if 'skip' in test_case:
            print_skipping(test_case['skip'])
            return
        params = get_params(test_case.get('params', {}))
        ydl = YoutubeDL(params)
        ydl.add_default_info_extractors()
        finished_hook_called = set()
        def _hook(status):
            if status['status'] == 'finished':
                finished_hook_called.add(status['filename'])
        ydl.fd.add_progress_hook(_hook)
        test_cases = test_case.get('playlist', [test_case])
        for tc in test_cases:
            try_rm(tc['file'])
            try_rm(tc['file'] + '.part')
            try_rm(tc['file'] + '.info.json')
        try:
            for retry in range(1, RETRIES + 1):
                try:
                    ydl.download([test_case['url']])
                except (DownloadError, ExtractorError) as err:
                    if retry == RETRIES: raise
                    # Check if the exception is not a network related one
                    if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError):
                        raise
                    print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry))
                else:
                    break
            for tc in test_cases:
                if not test_case.get('params', {}).get('skip_download', False):
                    self.assertTrue(os.path.exists(tc['file']), msg='Missing file ' + tc['file'])
                    self.assertTrue(tc['file'] in finished_hook_called)
                self.assertTrue(os.path.exists(tc['file'] + '.info.json'))
                if 'md5' in tc:
                    md5_for_file = _file_md5(tc['file'])
                    self.assertEqual(md5_for_file, tc['md5'])
                with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof:
                    info_dict = json.load(infof)
                for (info_field, expected) in tc.get('info_dict', {}).items():
                    if isinstance(expected, compat_str) and expected.startswith('md5:'):
                        got = 'md5:' + md5(info_dict.get(info_field))
                    else:
                        got = info_dict.get(info_field)
                    self.assertEqual(expected, got,
                        u'invalid value for field %s, expected %r, got %r' % (info_field, expected, got))
                # If checkable fields are missing from the test case, print the info_dict
                test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value))
                    for key, value in info_dict.items()
                    if value and key in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location'))
                if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()):
                    sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=2) + u'\n')
                # Check for the presence of mandatory fields
                for key in ('id', 'url', 'title', 'ext'):
                    self.assertTrue(key in info_dict.keys() and info_dict[key])
        finally:
            for tc in test_cases:
                try_rm(tc['file'])
                try_rm(tc['file'] + '.part')
                try_rm(tc['file'] + '.info.json')
    return test_template
### And add them to TestDownload
for n, test_case in enumerate(defs):
    test_method = generator(test_case)
    tname = 'test_' + str(test_case['name'])
    i = 1
    while hasattr(TestDownload, tname):
        tname = 'test_'  + str(test_case['name']) + '_' + str(i)
        i += 1
    test_method.__name__ = tname
    setattr(TestDownload, test_method.__name__, test_method)
    del test_method
if __name__ == '__main__':
    unittest.main()
 |