#!/usr/bin/env python3 # Copyright (c) 2015-2020 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Functionality to build scripts, as well as signature hash functions. This file is modified from python-bitcoinlib. """ from collections import namedtuple import hashlib import struct import unittest from typing import List, Dict from .key import TaggedHash, tweak_add_pubkey from .messages import ( CTransaction, CTxOut, hash256, ser_string, ser_uint256, sha256, uint256_from_str, ) MAX_SCRIPT_ELEMENT_SIZE = 520 LOCKTIME_THRESHOLD = 500000000 ANNEX_TAG = 0x50 OPCODE_NAMES = {} # type: Dict[CScriptOp, str] LEAF_VERSION_TAPSCRIPT = 0xc0 def hash160(s): return hashlib.new('ripemd160', sha256(s)).digest() def bn2vch(v): """Convert number to bitcoin-specific little endian format.""" # We need v.bit_length() bits, plus a sign bit for every nonzero number. n_bits = v.bit_length() + (v != 0) # The number of bytes for that is: n_bytes = (n_bits + 7) // 8 # Convert number to absolute value + sign in top bit. encoded_v = 0 if v == 0 else abs(v) | ((v < 0) << (n_bytes * 8 - 1)) # Serialize to bytes return encoded_v.to_bytes(n_bytes, 'little') _opcode_instances = [] # type: List[CScriptOp] class CScriptOp(int): """A single script opcode""" __slots__ = () @staticmethod def encode_op_pushdata(d): """Encode a PUSHDATA op, returning bytes""" if len(d) < 0x4c: return b'' + bytes([len(d)]) + d # OP_PUSHDATA elif len(d) <= 0xff: return b'\x4c' + bytes([len(d)]) + d # OP_PUSHDATA1 elif len(d) <= 0xffff: return b'\x4d' + struct.pack(b'>= 8 if r[-1] & 0x80: r.append(0x80 if neg else 0) elif neg: r[-1] |= 0x80 return bytes([len(r)]) + r @staticmethod def decode(vch): result = 0 # We assume valid push_size and minimal encoding value = vch[1:] if len(value) == 0: return result for i, byte in enumerate(value): result |= int(byte) << 8 * i if value[-1] >= 0x80: # Mask for all but the highest result bit num_mask = (2**(len(value) * 8) - 1) >> 1 result &= num_mask result *= -1 return result class CScript(bytes): """Serialized script A bytes subclass, so you can use this directly whenever bytes are accepted. Note that this means that indexing does *not* work - you'll get an index by byte rather than opcode. This format was chosen for efficiency so that the general case would not require creating a lot of little CScriptOP objects. iter(script) however does iterate by opcode. """ __slots__ = () @classmethod def __coerce_instance(cls, other): # Coerce other into bytes if isinstance(other, CScriptOp): other = bytes([other]) elif isinstance(other, CScriptNum): if (other.value == 0): other = bytes([CScriptOp(OP_0)]) else: other = CScriptNum.encode(other) elif isinstance(other, int): if 0 <= other <= 16: other = bytes([CScriptOp.encode_op_n(other)]) elif other == -1: other = bytes([OP_1NEGATE]) else: other = CScriptOp.encode_op_pushdata(bn2vch(other)) elif isinstance(other, (bytes, bytearray)): other = CScriptOp.encode_op_pushdata(other) return other def __add__(self, other): # add makes no sense for a CScript() raise NotImplementedError def join(self, iterable): # join makes no sense for a CScript() raise NotImplementedError def __new__(cls, value=b''): if isinstance(value, bytes) or isinstance(value, bytearray): return super().__new__(cls, value) else: def coerce_iterable(iterable): for instance in iterable: yield cls.__coerce_instance(instance) # Annoyingly on both python2 and python3 bytes.join() always # returns a bytes instance even when subclassed. return super().__new__(cls, b''.join(coerce_iterable(value))) def raw_iter(self): """Raw iteration Yields tuples of (opcode, data, sop_idx) so that the different possible PUSHDATA encodings can be accurately distinguished, as well as determining the exact opcode byte indexes. (sop_idx) """ i = 0 while i < len(self): sop_idx = i opcode = self[i] i += 1 if opcode > OP_PUSHDATA4: yield (opcode, None, sop_idx) else: datasize = None pushdata_type = None if opcode < OP_PUSHDATA1: pushdata_type = 'PUSHDATA(%d)' % opcode datasize = opcode elif opcode == OP_PUSHDATA1: pushdata_type = 'PUSHDATA1' if i >= len(self): raise CScriptInvalidError('PUSHDATA1: missing data length') datasize = self[i] i += 1 elif opcode == OP_PUSHDATA2: pushdata_type = 'PUSHDATA2' if i + 1 >= len(self): raise CScriptInvalidError('PUSHDATA2: missing data length') datasize = self[i] + (self[i + 1] << 8) i += 2 elif opcode == OP_PUSHDATA4: pushdata_type = 'PUSHDATA4' if i + 3 >= len(self): raise CScriptInvalidError('PUSHDATA4: missing data length') datasize = self[i] + (self[i + 1] << 8) + (self[i + 2] << 16) + (self[i + 3] << 24) i += 4 else: assert False # shouldn't happen data = bytes(self[i:i + datasize]) # Check for truncation if len(data) < datasize: raise CScriptTruncatedPushDataError('%s: truncated data' % pushdata_type, data) i += datasize yield (opcode, data, sop_idx) def __iter__(self): """'Cooked' iteration Returns either a CScriptOP instance, an integer, or bytes, as appropriate. See raw_iter() if you need to distinguish the different possible PUSHDATA encodings. """ for (opcode, data, sop_idx) in self.raw_iter(): if data is not None: yield data else: opcode = CScriptOp(opcode) if opcode.is_small_int(): yield opcode.decode_op_n() else: yield CScriptOp(opcode) def __repr__(self): def _repr(o): if isinstance(o, bytes): return "x('%s')" % o.hex() else: return repr(o) ops = [] i = iter(self) while True: op = None try: op = _repr(next(i)) except CScriptTruncatedPushDataError as err: op = '%s...' % (_repr(err.data), err) break except CScriptInvalidError as err: op = '' % err break except StopIteration: break finally: if op is not None: ops.append(op) return "CScript([%s])" % ', '.join(ops) def GetSigOpCount(self, fAccurate): """Get the SigOp count. fAccurate - Accurately count CHECKMULTISIG, see BIP16 for details. Note that this is consensus-critical. """ n = 0 lastOpcode = OP_INVALIDOPCODE for (opcode, data, sop_idx) in self.raw_iter(): if opcode in (OP_CHECKSIG, OP_CHECKSIGVERIFY): n += 1 elif opcode in (OP_CHECKMULTISIG, OP_CHECKMULTISIGVERIFY): if fAccurate and (OP_1 <= lastOpcode <= OP_16): n += opcode.decode_op_n() else: n += 20 lastOpcode = opcode return n SIGHASH_DEFAULT = 0 # Taproot-only default, semantics same as SIGHASH_ALL SIGHASH_ALL = 1 SIGHASH_NONE = 2 SIGHASH_SINGLE = 3 SIGHASH_ANYONECANPAY = 0x80 def FindAndDelete(script, sig): """Consensus critical, see FindAndDelete() in Satoshi codebase""" r = b'' last_sop_idx = sop_idx = 0 skip = True for (opcode, data, sop_idx) in script.raw_iter(): if not skip: r += script[last_sop_idx:sop_idx] last_sop_idx = sop_idx if script[sop_idx:sop_idx + len(sig)] == sig: skip = True else: skip = False if not skip: r += script[last_sop_idx:] return CScript(r) def LegacySignatureHash(script, txTo, inIdx, hashtype): """Consensus-correct SignatureHash Returns (hash, err) to precisely match the consensus-critical behavior of the SIGHASH_SINGLE bug. (inIdx is *not* checked for validity) """ HASH_ONE = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' if inIdx >= len(txTo.vin): return (HASH_ONE, "inIdx %d out of range (%d)" % (inIdx, len(txTo.vin))) txtmp = CTransaction(txTo) for txin in txtmp.vin: txin.scriptSig = b'' txtmp.vin[inIdx].scriptSig = FindAndDelete(script, CScript([OP_CODESEPARATOR])) if (hashtype & 0x1f) == SIGHASH_NONE: txtmp.vout = [] for i in range(len(txtmp.vin)): if i != inIdx: txtmp.vin[i].nSequence = 0 elif (hashtype & 0x1f) == SIGHASH_SINGLE: outIdx = inIdx if outIdx >= len(txtmp.vout): return (HASH_ONE, "outIdx %d out of range (%d)" % (outIdx, len(txtmp.vout))) tmp = txtmp.vout[outIdx] txtmp.vout = [] for _ in range(outIdx): txtmp.vout.append(CTxOut(-1)) txtmp.vout.append(tmp) for i in range(len(txtmp.vin)): if i != inIdx: txtmp.vin[i].nSequence = 0 if hashtype & SIGHASH_ANYONECANPAY: tmp = txtmp.vin[inIdx] txtmp.vin = [] txtmp.vin.append(tmp) s = txtmp.serialize_without_witness() s += struct.pack(b" TaprootLeafInfo objects for all known leaves TaprootInfo = namedtuple("TaprootInfo", "scriptPubKey,inner_pubkey,negflag,tweak,leaves") # A TaprootLeafInfo object has the following fields: # - script: the leaf script (CScript or bytes) # - version: the leaf version (0xc0 for BIP342 tapscript) # - merklebranch: the merkle branch to use for this leaf (32*N bytes) TaprootLeafInfo = namedtuple("TaprootLeafInfo", "script,version,merklebranch") def taproot_construct(pubkey, scripts=None): """Construct a tree of Taproot spending conditions pubkey: a 32-byte xonly pubkey for the internal pubkey (bytes) scripts: a list of items; each item is either: - a (name, CScript or bytes, leaf version) tuple - a (name, CScript or bytes) tuple (defaulting to leaf version 0xc0) - another list of items (with the same structure) - a list of two items; the first of which is an item itself, and the second is a function. The function takes as input the Merkle root of the first item, and produces a (fictitious) partner to hash with. Returns: a TaprootInfo object """ if scripts is None: scripts = [] ret, h = taproot_tree_helper(scripts) tweak = TaggedHash("TapTweak", pubkey + h) tweaked, negated = tweak_add_pubkey(pubkey, tweak) leaves = dict((name, TaprootLeafInfo(script, version, merklebranch)) for name, version, script, merklebranch in ret) return TaprootInfo(CScript([OP_1, tweaked]), pubkey, negated + 0, tweak, leaves) def is_op_success(o): return o == 0x50 or o == 0x62 or o == 0x89 or o == 0x8a or o == 0x8d or o == 0x8e or (o >= 0x7e and o <= 0x81) or (o >= 0x83 and o <= 0x86) or (o >= 0x95 and o <= 0x99) or (o >= 0xbb and o <= 0xfe)