aboutsummaryrefslogtreecommitdiff
path: root/youtube_dl/jsinterp.py
diff options
context:
space:
mode:
authordf <fieldhouse@gmx.net>2021-11-02 11:18:39 +0000
committerdirkf <fieldhouse@gmx.net>2022-01-30 00:05:54 +0000
commit96f87aaa3b34d80bc72097a7475d8093849091fc (patch)
treed15de84dff0d793a460b68bdcdbdf9ad539f3f41 /youtube_dl/jsinterp.py
parent5f5de51a499f732a6e687f32037e130cbdc50c8f (diff)
downloadyoutube-dl-96f87aaa3b34d80bc72097a7475d8093849091fc.tar.xz
Back-port JS interpreter upgrade from yt-dlp PR #1437
Diffstat (limited to 'youtube_dl/jsinterp.py')
-rw-r--r--youtube_dl/jsinterp.py504
1 files changed, 397 insertions, 107 deletions
diff --git a/youtube_dl/jsinterp.py b/youtube_dl/jsinterp.py
index 7bda59610..061e92c2a 100644
--- a/youtube_dl/jsinterp.py
+++ b/youtube_dl/jsinterp.py
@@ -8,6 +8,15 @@ from .utils import (
ExtractorError,
remove_quotes,
)
+from .compat import (
+ compat_collections_abc
+)
+MutableMapping = compat_collections_abc.MutableMapping
+
+
+class Nonlocal:
+ pass
+
_OPERATORS = [
('|', operator.or_),
@@ -22,11 +31,55 @@ _OPERATORS = [
('*', operator.mul),
]
_ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS]
-_ASSIGN_OPERATORS.append(('=', lambda cur, right: right))
+_ASSIGN_OPERATORS.append(('=', (lambda cur, right: right)))
_NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
+class JS_Break(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid break')
+
+
+class JS_Continue(ExtractorError):
+ def __init__(self):
+ ExtractorError.__init__(self, 'Invalid continue')
+
+
+class LocalNameSpace(MutableMapping):
+ def __init__(self, *stack):
+ self.stack = tuple(stack)
+
+ def __getitem__(self, key):
+ for scope in self.stack:
+ if key in scope:
+ return scope[key]
+ raise KeyError(key)
+
+ def __setitem__(self, key, value):
+ for scope in self.stack:
+ if key in scope:
+ scope[key] = value
+ break
+ else:
+ self.stack[0][key] = value
+ return value
+
+ def __delitem__(self, key):
+ raise NotImplementedError('Deleting is not supported')
+
+ def __iter__(self):
+ for scope in self.stack:
+ for scope_item in iter(scope):
+ yield scope_item
+
+ def __len__(self, key):
+ return len(iter(self))
+
+ def __repr__(self):
+ return 'LocalNameSpace%s' % (self.stack, )
+
+
class JSInterpreter(object):
def __init__(self, code, objects=None):
if objects is None:
@@ -34,11 +87,58 @@ class JSInterpreter(object):
self.code = code
self._functions = {}
self._objects = objects
+ self.__named_object_counter = 0
+
+ def _named_object(self, namespace, obj):
+ self.__named_object_counter += 1
+ name = '__youtube_dl_jsinterp_obj%s' % (self.__named_object_counter, )
+ namespace[name] = obj
+ return name
+
+ @staticmethod
+ def _separate(expr, delim=',', max_split=None):
+ if not expr:
+ return
+ parens = {'(': 0, '{': 0, '[': 0, ']': 0, '}': 0, ')': 0}
+ start, splits, pos, max_pos = 0, 0, 0, len(delim) - 1
+ for idx, char in enumerate(expr):
+ if char in parens:
+ parens[char] += 1
+ is_in_parens = (parens['['] - parens[']']
+ or parens['('] - parens[')']
+ or parens['{'] - parens['}'])
+ if char == delim[pos] and not is_in_parens:
+ if pos == max_pos:
+ pos = 0
+ yield expr[start: idx - max_pos]
+ start = idx + 1
+ splits += 1
+ if max_split and splits >= max_split:
+ break
+ else:
+ pos += 1
+ else:
+ pos = 0
+ yield expr[start:]
+
+ @staticmethod
+ def _separate_at_paren(expr, delim):
+ separated = list(JSInterpreter._separate(expr, delim, 1))
+ if len(separated) < 2:
+ raise ExtractorError('No terminating paren {0} in {1}'.format(delim, expr))
+ return separated[0][1:].strip(), separated[1].strip()
def interpret_statement(self, stmt, local_vars, allow_recursion=100):
if allow_recursion < 0:
raise ExtractorError('Recursion limit reached')
+ sub_statements = list(self._separate(stmt, ';'))
+ stmt = (sub_statements or ['']).pop()
+ for sub_stmt in sub_statements:
+ ret, should_abort = self.interpret_statement(sub_stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+
should_abort = False
stmt = stmt.lstrip()
stmt_m = re.match(r'var\s', stmt)
@@ -61,25 +161,119 @@ class JSInterpreter(object):
if expr == '': # Empty expression
return None
+ if expr.startswith('{'):
+ inner, outer = self._separate_at_paren(expr, '}')
+ inner, should_abort = self.interpret_statement(inner, local_vars, allow_recursion - 1)
+ if not outer or should_abort:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
if expr.startswith('('):
- parens_count = 0
- for m in re.finditer(r'[()]', expr):
- if m.group(0) == '(':
- parens_count += 1
+ inner, outer = self._separate_at_paren(expr, ')')
+ inner = self.interpret_expression(inner, local_vars, allow_recursion)
+ if not outer:
+ return inner
+ else:
+ expr = json.dumps(inner) + outer
+
+ if expr.startswith('['):
+ inner, outer = self._separate_at_paren(expr, ']')
+ name = self._named_object(local_vars, [
+ self.interpret_expression(item, local_vars, allow_recursion)
+ for item in self._separate(inner)])
+ expr = name + outer
+
+ m = re.match(r'try\s*', expr)
+ if m:
+ if expr[m.end()] == '{':
+ try_expr, expr = self._separate_at_paren(expr[m.end():], '}')
+ else:
+ try_expr, expr = expr[m.end() - 1:], ''
+ ret, should_abort = self.interpret_statement(try_expr, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ m = re.match(r'(?:(?P<catch>catch)|(?P<for>for)|(?P<switch>switch))\s*\(', expr)
+ md = m.groupdict() if m else {}
+ if md.get('catch'):
+ # We ignore the catch block
+ _, expr = self._separate_at_paren(expr, '}')
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ elif md.get('for'):
+ def raise_constructor_error(c):
+ raise ExtractorError(
+ 'Premature return in the initialization of a for loop in {0!r}'.format(c))
+
+ constructor, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
+ if remaining.startswith('{'):
+ body, expr = self._separate_at_paren(remaining, '}')
+ else:
+ m = re.match(r'switch\s*\(', remaining) # FIXME
+ if m:
+ switch_val, remaining = self._separate_at_paren(remaining[m.end() - 1:], ')')
+ body, expr = self._separate_at_paren(remaining, '}')
+ body = 'switch(%s){%s}' % (switch_val, body)
else:
- parens_count -= 1
- if parens_count == 0:
- sub_expr = expr[1:m.start()]
- sub_result = self.interpret_expression(
- sub_expr, local_vars, allow_recursion)
- remaining_expr = expr[m.end():].strip()
- if not remaining_expr:
- return sub_result
- else:
- expr = json.dumps(sub_result) + remaining_expr
+ body, expr = remaining, ''
+ start, cndn, increment = self._separate(constructor, ';')
+ if self.interpret_statement(start, local_vars, allow_recursion - 1)[1]:
+ raise_constructor_error(constructor)
+ while True:
+ if not self.interpret_expression(cndn, local_vars, allow_recursion):
+ break
+ try:
+ ret, should_abort = self.interpret_statement(body, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
+ break
+ except JS_Continue:
+ pass
+ if self.interpret_statement(increment, local_vars, allow_recursion - 1)[1]:
+ raise_constructor_error(constructor)
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ elif md.get('switch'):
+ switch_val, remaining = self._separate_at_paren(expr[m.end() - 1:], ')')
+ switch_val = self.interpret_expression(switch_val, local_vars, allow_recursion)
+ body, expr = self._separate_at_paren(remaining, '}')
+ body, default = body.split('default:') if 'default:' in body else (body, None)
+ items = body.split('case ')[1:]
+ if default:
+ items.append('default:%s' % (default, ))
+ matched = False
+ for item in items:
+ case, stmt = [i.strip() for i in self._separate(item, ':', 1)]
+ matched = matched or case == 'default' or switch_val == self.interpret_expression(case, local_vars, allow_recursion)
+ if matched:
+ try:
+ ret, should_abort = self.interpret_statement(stmt, local_vars, allow_recursion - 1)
+ if should_abort:
+ return ret
+ except JS_Break:
break
- else:
- raise ExtractorError('Premature end of parens in %r' % expr)
+ return self.interpret_statement(expr, local_vars, allow_recursion - 1)[0]
+
+ # Comma separated statements
+ sub_expressions = list(self._separate(expr))
+ expr = sub_expressions.pop().strip() if sub_expressions else ''
+ for sub_expr in sub_expressions:
+ self.interpret_expression(sub_expr, local_vars, allow_recursion)
+
+ for m in re.finditer(r'''(?x)
+ (?P<pre_sign>\+\+|--)(?P<var1>%(_NAME_RE)s)|
+ (?P<var2>%(_NAME_RE)s)(?P<post_sign>\+\+|--)''' % globals(), expr):
+ var = m.group('var1') or m.group('var2')
+ start, end = m.span()
+ sign = m.group('pre_sign') or m.group('post_sign')
+ ret = local_vars[var]
+ local_vars[var] += 1 if sign[0] == '+' else -1
+ if m.group('pre_sign'):
+ ret = local_vars[var]
+ expr = expr[:start] + json.dumps(ret) + expr[end:]
for op, opfunc in _ASSIGN_OPERATORS:
m = re.match(r'''(?x)
@@ -88,14 +282,13 @@ class JSInterpreter(object):
(?P<expr>.*)$''' % (_NAME_RE, re.escape(op)), expr)
if not m:
continue
- right_val = self.interpret_expression(
- m.group('expr'), local_vars, allow_recursion - 1)
+ right_val = self.interpret_expression(m.group('expr'), local_vars, allow_recursion)
if m.groupdict().get('index'):
lvar = local_vars[m.group('out')]
- idx = self.interpret_expression(
- m.group('index'), local_vars, allow_recursion)
- assert isinstance(idx, int)
+ idx = self.interpret_expression(m.group('index'), local_vars, allow_recursion)
+ if not isinstance(idx, int):
+ raise ExtractorError('List indices must be integers: %s' % (idx, ))
cur = lvar[idx]
val = opfunc(cur, right_val)
lvar[idx] = val
@@ -109,8 +302,13 @@ class JSInterpreter(object):
if expr.isdigit():
return int(expr)
+ if expr == 'break':
+ raise JS_Break()
+ elif expr == 'continue':
+ raise JS_Continue()
+
var_m = re.match(
- r'(?!if|return|true|false)(?P<name>%s)$' % _NAME_RE,
+ r'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE,
expr)
if var_m:
return local_vars[var_m.group('name')]
@@ -124,91 +322,161 @@ class JSInterpreter(object):
r'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE, expr)
if m:
val = local_vars[m.group('in')]
- idx = self.interpret_expression(
- m.group('idx'), local_vars, allow_recursion - 1)
+ idx = self.interpret_expression(m.group('idx'), local_vars, allow_recursion)
return val[idx]
+ def raise_expr_error(where, op, exp):
+ raise ExtractorError('Premature {0} return of {1} in {2!r}'.format(where, op, exp))
+
+ for op, opfunc in _OPERATORS:
+ separated = list(self._separate(expr, op))
+ if len(separated) < 2:
+ continue
+ right_val = separated.pop()
+ left_val = op.join(separated)
+ left_val, should_abort = self.interpret_statement(
+ left_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise_expr_error('left-side', op, expr)
+ right_val, should_abort = self.interpret_statement(
+ right_val, local_vars, allow_recursion - 1)
+ if should_abort:
+ raise_expr_error('right-side', op, expr)
+ return opfunc(left_val or 0, right_val)
+
m = re.match(
- r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*(?:\(+(?P<args>[^()]*)\))?$' % _NAME_RE,
+ r'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE,
expr)
if m:
variable = m.group('var')
- member = remove_quotes(m.group('member') or m.group('member2'))
- arg_str = m.group('args')
+ nl = Nonlocal()
- if variable in local_vars:
- obj = local_vars[variable]
- else:
- if variable not in self._objects:
- self._objects[variable] = self.extract_object(variable)
- obj = self._objects[variable]
-
- if arg_str is None:
- # Member access
- if member == 'length':
- return len(obj)
- return obj[member]
-
- assert expr.endswith(')')
- # Function call
- if arg_str == '':
- argvals = tuple()
+ nl.member = remove_quotes(m.group('member') or m.group('member2'))
+ arg_str = expr[m.end():]
+ if arg_str.startswith('('):
+ arg_str, remaining = self._separate_at_paren(arg_str, ')')
else:
- argvals = tuple([
+ arg_str, remaining = None, arg_str
+
+ def assertion(cndn, msg):
+ """ assert, but without risk of getting optimized out """
+ if not cndn:
+ raise ExtractorError('{0} {1}: {2}'.format(nl.member, msg, expr))
+
+ def eval_method():
+ # nonlocal member
+ member = nl.member
+ if variable == 'String':
+ obj = str
+ elif variable in local_vars:
+ obj = local_vars[variable]
+ else:
+ if variable not in self._objects:
+ self._objects[variable] = self.extract_object(variable)
+ obj = self._objects[variable]
+
+ if arg_str is None:
+ # Member access
+ if member == 'length':
+ return len(obj)
+ return obj[member]
+
+ # Function call
+ argvals = [
self.interpret_expression(v, local_vars, allow_recursion)
- for v in arg_str.split(',')])
-
- if member == 'split':
- assert argvals == ('',)
- return list(obj)
- if member == 'join':
- assert len(argvals) == 1
- return argvals[0].join(obj)
- if member == 'reverse':
- assert len(argvals) == 0
- obj.reverse()
- return obj
- if member == 'slice':
- assert len(argvals) == 1
- return obj[argvals[0]:]
- if member == 'splice':
- assert isinstance(obj, list)
- index, howMany = argvals
- res = []
- for i in range(index, min(index + howMany, len(obj))):
- res.append(obj.pop(index))
- return res
-
- return obj[member](argvals)
-
- for op, opfunc in _OPERATORS:
- m = re.match(r'(?P<x>.+?)%s(?P<y>.+)' % re.escape(op), expr)
- if not m:
- continue
- x, abort = self.interpret_statement(
- m.group('x'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature left-side return of %s in %r' % (op, expr))
- y, abort = self.interpret_statement(
- m.group('y'), local_vars, allow_recursion - 1)
- if abort:
- raise ExtractorError(
- 'Premature right-side return of %s in %r' % (op, expr))
- return opfunc(x, y)
+ for v in self._separate(arg_str)]
+
+ if obj == str:
+ if member == 'fromCharCode':
+ assertion(argvals, 'takes one or more arguments')
+ return ''.join(map(chr, argvals))
+ raise ExtractorError('Unsupported string method %s' % (member, ))
+
+ if member == 'split':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(argvals == [''], 'with arguments is not implemented')
+ return list(obj)
+ elif member == 'join':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return argvals[0].join(obj)
+ elif member == 'reverse':
+ assertion(not argvals, 'does not take any arguments')
+ obj.reverse()
+ return obj
+ elif member == 'slice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(len(argvals) == 1, 'takes exactly one argument')
+ return obj[argvals[0]:]
+ elif member == 'splice':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ index, howMany = (argvals + [len(obj)])[:2]
+ if index < 0:
+ index += len(obj)
+ add_items = argvals[2:]
+ res = []
+ for i in range(index, min(index + howMany, len(obj))):
+ res.append(obj.pop(index))
+ for i, item in enumerate(add_items):
+ obj.insert(index + i, item)
+ return res
+ elif member == 'unshift':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(argvals, 'takes one or more arguments')
+ for item in reversed(argvals):
+ obj.insert(0, item)
+ return obj
+ elif member == 'pop':
+ assertion(isinstance(obj, list), 'must be applied on a list')
+ assertion(not argvals, 'does not take any arguments')
+ if not obj:
+ return
+ return obj.pop()
+ elif member == 'push':
+ assertion(argvals, 'takes one or more arguments')
+ obj.extend(argvals)
+ return obj
+ elif member == 'forEach':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ f, this = (argvals + [''])[:2]
+ return [f((item, idx, obj), this=this) for idx, item in enumerate(obj)]
+ elif member == 'indexOf':
+ assertion(argvals, 'takes one or more arguments')
+ assertion(len(argvals) <= 2, 'takes at-most 2 arguments')
+ idx, start = (argvals + [0])[:2]
+ try:
+ return obj.index(idx, start)
+ except ValueError:
+ return -1
+
+ if isinstance(obj, list):
+ member = int(member)
+ nl.member = member
+ return obj[member](argvals)
+
+ if remaining:
+ return self.interpret_expression(
+ self._named_object(local_vars, eval_method()) + remaining,
+ local_vars, allow_recursion)
+ else:
+ return eval_method()
- m = re.match(
- r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
+ m = re.match(r'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE, expr)
if m:
fname = m.group('func')
argvals = tuple([
int(v) if v.isdigit() else local_vars[v]
- for v in m.group('args').split(',')]) if len(m.group('args')) > 0 else tuple()
- if fname not in self._functions:
+ for v in self._separate(m.group('args'))])
+ if fname in local_vars:
+ return local_vars[fname](argvals)
+ elif fname not in self._functions:
self._functions[fname] = self.extract_function(fname)
return self._functions[fname](argvals)
- raise ExtractorError('Unsupported JS expression %r' % expr)
+ if expr:
+ raise ExtractorError('Unsupported JS expression %r' % expr)
def extract_object(self, objname):
_FUNC_NAME_RE = r'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
@@ -233,30 +501,52 @@ class JSInterpreter(object):
return obj
- def extract_function(self, funcname):
+ def extract_function_code(self, funcname):
+ """ @returns argnames, code """
func_m = re.search(
r'''(?x)
- (?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
+ (?:function\s+%(f_n)s|[{;,]\s*%(f_n)s\s*=\s*function|var\s+%(f_n)s\s*=\s*function)\s*
\((?P<args>[^)]*)\)\s*
- \{(?P<code>[^}]+)\}''' % (
- re.escape(funcname), re.escape(funcname), re.escape(funcname)),
+ (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % {'f_n': re.escape(funcname), },
self.code)
+ code, _ = self._separate_at_paren(func_m.group('code'), '}') # refine the match
if func_m is None:
raise ExtractorError('Could not find JS function %r' % funcname)
- argnames = func_m.group('args').split(',')
+ return func_m.group('args').split(','), code
- return self.build_function(argnames, func_m.group('code'))
+ def extract_function(self, funcname):
+ return self.extract_function_from_code(*self.extract_function_code(funcname))
+
+ def extract_function_from_code(self, argnames, code, *global_stack):
+ local_vars = {}
+ while True:
+ mobj = re.search(r'function\((?P<args>[^)]*)\)\s*{', code)
+ if mobj is None:
+ break
+ start, body_start = mobj.span()
+ body, remaining = self._separate_at_paren(code[body_start - 1:], '}')
+ name = self._named_object(
+ local_vars,
+ self.extract_function_from_code(
+ [str.strip(x) for x in mobj.group('args').split(',')],
+ body, local_vars, *global_stack))
+ code = code[:start] + name + remaining
+ return self.build_function(argnames, code, local_vars, *global_stack)
def call_function(self, funcname, *args):
- f = self.extract_function(funcname)
- return f(args)
-
- def build_function(self, argnames, code):
- def resf(args):
- local_vars = dict(zip(argnames, args))
- for stmt in code.split(';'):
- res, abort = self.interpret_statement(stmt, local_vars)
- if abort:
+ return self.extract_function(funcname)(args)
+
+ def build_function(self, argnames, code, *global_stack):
+ global_stack = list(global_stack) or [{}]
+ local_vars = global_stack.pop(0)
+
+ def resf(args, **kwargs):
+ local_vars.update(dict(zip(argnames, args)))
+ local_vars.update(kwargs)
+ var_stack = LocalNameSpace(local_vars, *global_stack)
+ for stmt in self._separate(code.replace('\n', ''), ';'):
+ ret, should_abort = self.interpret_statement(stmt, var_stack)
+ if should_abort:
break
- return res
+ return ret
return resf