From c365dba8430ee33abda85d31f95128605bf240eb Mon Sep 17 00:00:00 2001 From: pukkandan Date: Sat, 15 Jul 2023 14:30:08 +0530 Subject: [networking] Add module (#2861) No actual changes - code is only moved around --- test/test_http.py | 531 ------------------------------------------------ test/test_networking.py | 531 ++++++++++++++++++++++++++++++++++++++++++++++++ test/test_utils.py | 18 +- 3 files changed, 540 insertions(+), 540 deletions(-) delete mode 100644 test/test_http.py create mode 100644 test/test_networking.py (limited to 'test') diff --git a/test/test_http.py b/test/test_http.py deleted file mode 100644 index e4e66dce1..000000000 --- a/test/test_http.py +++ /dev/null @@ -1,531 +0,0 @@ -#!/usr/bin/env python3 - -# Allow direct execution -import os -import sys -import unittest - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -import gzip -import http.cookiejar -import http.server -import io -import pathlib -import ssl -import tempfile -import threading -import urllib.error -import urllib.request -import zlib - -from test.helper import http_server_port -from yt_dlp import YoutubeDL -from yt_dlp.dependencies import brotli -from yt_dlp.utils import sanitized_Request, urlencode_postdata - -from .helper import FakeYDL - -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) - - -class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler): - protocol_version = 'HTTP/1.1' - - def log_message(self, format, *args): - pass - - def _headers(self): - payload = str(self.headers).encode('utf-8') - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(payload))) - self.end_headers() - self.wfile.write(payload) - - def _redirect(self): - self.send_response(int(self.path[len('/redirect_'):])) - self.send_header('Location', '/method') - self.send_header('Content-Length', '0') - self.end_headers() - - def _method(self, method, payload=None): - self.send_response(200) - self.send_header('Content-Length', str(len(payload or ''))) - self.send_header('Method', method) - self.end_headers() - if payload: - self.wfile.write(payload) - - def _status(self, status): - payload = f'{status} NOT FOUND'.encode() - self.send_response(int(status)) - self.send_header('Content-Type', 'text/html; charset=utf-8') - self.send_header('Content-Length', str(len(payload))) - self.end_headers() - self.wfile.write(payload) - - def _read_data(self): - if 'Content-Length' in self.headers: - return self.rfile.read(int(self.headers['Content-Length'])) - - def do_POST(self): - data = self._read_data() - if self.path.startswith('/redirect_'): - self._redirect() - elif self.path.startswith('/method'): - self._method('POST', data) - elif self.path.startswith('/headers'): - self._headers() - else: - self._status(404) - - def do_HEAD(self): - if self.path.startswith('/redirect_'): - self._redirect() - elif self.path.startswith('/method'): - self._method('HEAD') - else: - self._status(404) - - def do_PUT(self): - data = self._read_data() - if self.path.startswith('/redirect_'): - self._redirect() - elif self.path.startswith('/method'): - self._method('PUT', data) - else: - self._status(404) - - def do_GET(self): - if self.path == '/video.html': - payload = b'