diff options
Diffstat (limited to 'test/functional')
-rwxr-xr-x | test/functional/feature_taproot.py | 301 | ||||
-rw-r--r-- | test/functional/test_framework/key.py | 20 | ||||
-rw-r--r-- | test/functional/test_framework/script.py | 84 |
3 files changed, 358 insertions, 47 deletions
diff --git a/test/functional/feature_taproot.py b/test/functional/feature_taproot.py index 5f90e49de1..c78e289f74 100755 --- a/test/functional/feature_taproot.py +++ b/test/functional/feature_taproot.py @@ -22,11 +22,17 @@ from test_framework.messages import ( ) from test_framework.script import ( ANNEX_TAG, + BIP341_sha_amounts, + BIP341_sha_outputs, + BIP341_sha_prevouts, + BIP341_sha_scriptpubkeys, + BIP341_sha_sequences, CScript, CScriptNum, CScriptOp, + hash256, LEAF_VERSION_TAPSCRIPT, - LegacySignatureHash, + LegacySignatureMsg, LOCKTIME_THRESHOLD, MAX_SCRIPT_ELEMENT_SIZE, OP_0, @@ -70,13 +76,15 @@ from test_framework.script import ( SIGHASH_NONE, SIGHASH_SINGLE, SIGHASH_ANYONECANPAY, - SegwitV0SignatureHash, - TaprootSignatureHash, + SegwitV0SignatureMsg, + TaggedHash, + TaprootSignatureMsg, is_op_success, taproot_construct, ) from test_framework.script_util import ( key_to_p2pk_script, + key_to_p2pkh_script, key_to_p2wpkh_script, keyhash_to_p2pkh_script, script_to_p2sh_script, @@ -87,6 +95,7 @@ from test_framework.util import assert_raises_rpc_error, assert_equal from test_framework.key import generate_privkey, compute_xonly_pubkey, sign_schnorr, tweak_add_privkey, ECKey from test_framework.address import ( hash160, + program_to_witness ) from collections import OrderedDict, namedtuple from io import BytesIO @@ -95,6 +104,9 @@ import hashlib import os import random +# Whether or not to output generated test vectors, in JSON format. +GEN_TEST_VECTORS = False + # === Framework for building spending transactions. === # # The computation is represented as a "context" dict, whose entries store potentially-unevaluated expressions that @@ -194,8 +206,8 @@ def default_controlblock(ctx): """Default expression for "controlblock": combine leafversion, negflag, pubkey_internal, merklebranch.""" return bytes([get(ctx, "leafversion") + get(ctx, "negflag")]) + get(ctx, "pubkey_internal") + get(ctx, "merklebranch") -def default_sighash(ctx): - """Default expression for "sighash": depending on mode, compute BIP341, BIP143, or legacy sighash.""" +def default_sigmsg(ctx): + """Default expression for "sigmsg": depending on mode, compute BIP341, BIP143, or legacy sigmsg.""" tx = get(ctx, "tx") idx = get(ctx, "idx") hashtype = get(ctx, "hashtype_actual") @@ -208,18 +220,30 @@ def default_sighash(ctx): codeseppos = get(ctx, "codeseppos") leaf_ver = get(ctx, "leafversion") script = get(ctx, "script_taproot") - return TaprootSignatureHash(tx, utxos, hashtype, idx, scriptpath=True, script=script, leaf_ver=leaf_ver, codeseparator_pos=codeseppos, annex=annex) + return TaprootSignatureMsg(tx, utxos, hashtype, idx, scriptpath=True, script=script, leaf_ver=leaf_ver, codeseparator_pos=codeseppos, annex=annex) else: - return TaprootSignatureHash(tx, utxos, hashtype, idx, scriptpath=False, annex=annex) + return TaprootSignatureMsg(tx, utxos, hashtype, idx, scriptpath=False, annex=annex) elif mode == "witv0": # BIP143 signature hash scriptcode = get(ctx, "scriptcode") utxos = get(ctx, "utxos") - return SegwitV0SignatureHash(scriptcode, tx, idx, hashtype, utxos[idx].nValue) + return SegwitV0SignatureMsg(scriptcode, tx, idx, hashtype, utxos[idx].nValue) else: # Pre-segwit signature hash scriptcode = get(ctx, "scriptcode") - return LegacySignatureHash(scriptcode, tx, idx, hashtype)[0] + return LegacySignatureMsg(scriptcode, tx, idx, hashtype)[0] + +def default_sighash(ctx): + """Default expression for "sighash": depending on mode, compute tagged hash or dsha256 of sigmsg.""" + msg = get(ctx, "sigmsg") + mode = get(ctx, "mode") + if mode == "taproot": + return TaggedHash("TapSighash", msg) + else: + if msg is None: + return (1).to_bytes(32, 'little') + else: + return hash256(msg) def default_tweak(ctx): """Default expression for "tweak": None if a leaf is specified, tap[0] otherwise.""" @@ -239,14 +263,18 @@ def default_key_tweaked(ctx): def default_signature(ctx): """Default expression for "signature": BIP340 signature or ECDSA signature depending on mode.""" sighash = get(ctx, "sighash") + deterministic = get(ctx, "deterministic") if get(ctx, "mode") == "taproot": key = get(ctx, "key_tweaked") flip_r = get(ctx, "flag_flip_r") flip_p = get(ctx, "flag_flip_p") - return sign_schnorr(key, sighash, flip_r=flip_r, flip_p=flip_p) + aux = bytes([0] * 32) + if not deterministic: + aux = random.getrandbits(256).to_bytes(32, 'big') + return sign_schnorr(key, sighash, flip_r=flip_r, flip_p=flip_p, aux=aux) else: key = get(ctx, "key") - return key.sign_ecdsa(sighash) + return key.sign_ecdsa(sighash, rfc6979=deterministic) def default_hashtype_actual(ctx): """Default expression for "hashtype_actual": hashtype, unless mismatching SIGHASH_SINGLE in taproot.""" @@ -340,6 +368,8 @@ DEFAULT_CONTEXT = { "key_tweaked": default_key_tweaked, # The tweak to use (None for script path spends, the actual tweak for key path spends). "tweak": default_tweak, + # The sigmsg value (preimage of sighash) + "sigmsg": default_sigmsg, # The sighash value (32 bytes) "sighash": default_sighash, # The information about the chosen script path spend (TaprootLeafInfo object). @@ -376,6 +406,8 @@ DEFAULT_CONTEXT = { "leaf": None, # The input arguments to provide to the executed script "inputs": [], + # Use deterministic signing nonces + "deterministic": False, # == Parameters to be set before evaluation: == # - mode: what spending style to use ("taproot", "witv0", or "legacy"). @@ -396,6 +428,7 @@ def flatten(lst): ret.append(elem) return ret + def spend(tx, idx, utxos, **kwargs): """Sign transaction input idx of tx, provided utxos is the list of outputs being spent. @@ -1253,6 +1286,14 @@ class TaprootTest(BitcoinTestFramework): else: assert node.getbestblockhash() == self.lastblockhash, "Failed to reject: " + msg + def init_blockinfo(self, node): + # Initialize variables used by block_submit(). + self.lastblockhash = node.getbestblockhash() + self.tip = int(self.lastblockhash, 16) + block = node.getblock(self.lastblockhash) + self.lastblockheight = block['height'] + self.lastblocktime = block['time'] + def test_spenders(self, node, spenders, input_counts): """Run randomized tests with a number of "spenders". @@ -1279,12 +1320,7 @@ class TaprootTest(BitcoinTestFramework): host_spks.append(spk) host_pubkeys.append(bytes.fromhex(info['pubkey'])) - # Initialize variables used by block_submit(). - self.lastblockhash = node.getbestblockhash() - self.tip = int(self.lastblockhash, 16) - block = node.getblock(self.lastblockhash) - self.lastblockheight = block['height'] - self.lastblocktime = block['time'] + self.init_blockinfo(node) # Create transactions spending up to 50 of the wallet's inputs, with one output for each spender, and # one change output at the end. The transaction is constructed on the Python side to enable @@ -1458,10 +1494,239 @@ class TaprootTest(BitcoinTestFramework): assert len(mismatching_utxos) == 0 self.log.info(" - Done") + def gen_test_vectors(self): + """Run a scenario that corresponds (and optionally produces) to BIP341 test vectors.""" + + self.log.info("Unit test scenario...") + + # Deterministically mine coins to OP_TRUE in block 1 + assert self.nodes[1].getblockcount() == 0 + coinbase = CTransaction() + coinbase.nVersion = 1 + coinbase.vin = [CTxIn(COutPoint(0, 0xffffffff), CScript([OP_1, OP_1]), 0xffffffff)] + coinbase.vout = [CTxOut(5000000000, CScript([OP_1]))] + coinbase.nLockTime = 0 + coinbase.rehash() + assert coinbase.hash == "f60c73405d499a956d3162e3483c395526ef78286458a4cb17b125aa92e49b20" + # Mine it + block = create_block(hashprev=int(self.nodes[1].getbestblockhash(), 16), coinbase=coinbase) + block.rehash() + block.solve() + self.nodes[1].submitblock(block.serialize().hex()) + assert self.nodes[1].getblockcount() == 1 + self.generate(self.nodes[1], COINBASE_MATURITY) + + SEED = 317 + VALID_LEAF_VERS = list(range(0xc0, 0x100, 2)) + [0x66, 0x7e, 0x80, 0x84, 0x96, 0x98, 0xba, 0xbc, 0xbe] + # Generate private keys + prvs = [hashlib.sha256(SEED.to_bytes(2, 'big') + bytes([i])).digest() for i in range(100)] + # Generate corresponding public x-only pubkeys + pubs = [compute_xonly_pubkey(prv)[0] for prv in prvs] + # Generate taproot objects + inner_keys = [pubs[i] for i in range(7)] + + script_lists = [ + None, + [("0", CScript([pubs[50], OP_CHECKSIG]), 0xc0)], + [("0", CScript([pubs[51], OP_CHECKSIG]), 0xc0)], + [("0", CScript([pubs[52], OP_CHECKSIG]), 0xc0), ("1", CScript([b"BIP341"]), VALID_LEAF_VERS[pubs[99][0] % 41])], + [("0", CScript([pubs[53], OP_CHECKSIG]), 0xc0), ("1", CScript([b"Taproot"]), VALID_LEAF_VERS[pubs[99][1] % 41])], + [("0", CScript([pubs[54], OP_CHECKSIG]), 0xc0), [("1", CScript([pubs[55], OP_CHECKSIG]), 0xc0), ("2", CScript([pubs[56], OP_CHECKSIG]), 0xc0)]], + [("0", CScript([pubs[57], OP_CHECKSIG]), 0xc0), [("1", CScript([pubs[58], OP_CHECKSIG]), 0xc0), ("2", CScript([pubs[59], OP_CHECKSIG]), 0xc0)]], + ] + taps = [taproot_construct(inner_keys[i], script_lists[i]) for i in range(len(inner_keys))] + + # Require negated taps[0] + assert taps[0].negflag + # Require one negated and one non-negated in taps 1 and 2. + assert taps[1].negflag != taps[2].negflag + # Require one negated and one non-negated in taps 3 and 4. + assert taps[3].negflag != taps[4].negflag + # Require one negated and one non-negated in taps 5 and 6. + assert taps[5].negflag != taps[6].negflag + + cblks = [{leaf: get({**DEFAULT_CONTEXT, 'tap': taps[i], 'leaf': leaf}, 'controlblock') for leaf in taps[i].leaves} for i in range(7)] + # Require one swapped and one unswapped in taps 3 and 4. + assert (cblks[3]['0'][33:65] < cblks[3]['1'][33:65]) != (cblks[4]['0'][33:65] < cblks[4]['1'][33:65]) + # Require one swapped and one unswapped in taps 5 and 6, both at the top and child level. + assert (cblks[5]['0'][33:65] < cblks[5]['1'][65:]) != (cblks[6]['0'][33:65] < cblks[6]['1'][65:]) + assert (cblks[5]['1'][33:65] < cblks[5]['2'][33:65]) != (cblks[6]['1'][33:65] < cblks[6]['2'][33:65]) + # Require within taps 5 (and thus also 6) that one level is swapped and the other is not. + assert (cblks[5]['0'][33:65] < cblks[5]['1'][65:]) != (cblks[5]['1'][33:65] < cblks[5]['2'][33:65]) + + # Compute a deterministic set of scriptPubKeys + tap_spks = [] + old_spks = [] + spend_info = {} + # First, taproot scriptPubKeys, for the tap objects constructed above + for i, tap in enumerate(taps): + tap_spks.append(tap.scriptPubKey) + d = {'key': prvs[i], 'tap': tap, 'mode': 'taproot'} + spend_info[tap.scriptPubKey] = d + # Then, a number of deterministically generated (keys 0x1,0x2,0x3) with 2x P2PKH, 1x P2WPKH spks. + for i in range(1, 4): + prv = ECKey() + prv.set(i.to_bytes(32, 'big'), True) + pub = prv.get_pubkey().get_bytes() + d = {"key": prv} + d["scriptcode"] = key_to_p2pkh_script(pub) + d["inputs"] = [getter("sign"), pub] + if i < 3: + # P2PKH + d['spk'] = key_to_p2pkh_script(pub) + d['mode'] = 'legacy' + else: + # P2WPKH + d['spk'] = key_to_p2wpkh_script(pub) + d['mode'] = 'witv0' + old_spks.append(d['spk']) + spend_info[d['spk']] = d + + # Construct a deterministic chain of transactions creating UTXOs to the test's spk's (so that they + # come from distinct txids). + txn = [] + lasttxid = coinbase.sha256 + amount = 5000000000 + for i, spk in enumerate(old_spks + tap_spks): + val = 42000000 * (i + 7) + tx = CTransaction() + tx.nVersion = 1 + tx.vin = [CTxIn(COutPoint(lasttxid, i & 1), CScript([]), 0xffffffff)] + tx.vout = [CTxOut(val, spk), CTxOut(amount - val, CScript([OP_1]))] + if i & 1: + tx.vout = list(reversed(tx.vout)) + tx.nLockTime = 0 + tx.rehash() + amount -= val + lasttxid = tx.sha256 + txn.append(tx) + spend_info[spk]['prevout'] = COutPoint(tx.sha256, i & 1) + spend_info[spk]['utxo'] = CTxOut(val, spk) + # Mine those transactions + self.init_blockinfo(self.nodes[1]) + self.block_submit(self.nodes[1], txn, "Crediting txn", None, sigops_weight=10, accept=True) + + # scriptPubKey computation + tests = {"version": 1} + spk_tests = tests.setdefault("scriptPubKey", []) + for i, tap in enumerate(taps): + test_case = {} + given = test_case.setdefault("given", {}) + given['internalPubkey'] = tap.internal_pubkey.hex() + + def pr(node): + if node is None: + return None + elif isinstance(node, tuple): + return {"id": int(node[0]), "script": node[1].hex(), "leafVersion": node[2]} + elif len(node) == 1: + return pr(node[0]) + elif len(node) == 2: + return [pr(node[0]), pr(node[1])] + else: + assert False + + given['scriptTree'] = pr(script_lists[i]) + intermediary = test_case.setdefault("intermediary", {}) + if len(tap.leaves): + leafhashes = intermediary.setdefault('leafHashes', [None] * len(tap.leaves)) + for leaf in tap.leaves: + leafhashes[int(leaf)] = tap.leaves[leaf].leaf_hash.hex() + intermediary['merkleRoot'] = tap.merkle_root.hex() if tap.merkle_root else None + intermediary['tweak'] = tap.tweak.hex() + intermediary['tweakedPubkey'] = tap.output_pubkey.hex() + expected = test_case.setdefault("expected", {}) + expected['scriptPubKey'] = tap.scriptPubKey.hex() + expected['bip350Address'] = program_to_witness(1, bytes(tap.output_pubkey), True) + if len(tap.leaves): + control_blocks = expected.setdefault("scriptPathControlBlocks", [None] * len(tap.leaves)) + for leaf in tap.leaves: + ctx = {**DEFAULT_CONTEXT, 'tap': tap, 'leaf': leaf} + control_blocks[int(leaf)] = get(ctx, "controlblock").hex() + spk_tests.append(test_case) + + # Construct a deterministic transaction spending all outputs created above. + tx = CTransaction() + tx.nVersion = 2 + tx.vin = [] + inputs = [] + input_spks = [tap_spks[0], tap_spks[1], old_spks[0], tap_spks[2], tap_spks[5], old_spks[2], tap_spks[6], tap_spks[3], tap_spks[4]] + sequences = [0, 0xffffffff, 0xffffffff, 0xfffffffe, 0xfffffffe, 0, 0, 0xffffffff, 0xffffffff] + hashtypes = [SIGHASH_SINGLE, SIGHASH_SINGLE|SIGHASH_ANYONECANPAY, SIGHASH_ALL, SIGHASH_ALL, SIGHASH_DEFAULT, SIGHASH_ALL, SIGHASH_NONE, SIGHASH_NONE|SIGHASH_ANYONECANPAY, SIGHASH_ALL|SIGHASH_ANYONECANPAY] + for i, spk in enumerate(input_spks): + tx.vin.append(CTxIn(spend_info[spk]['prevout'], CScript(), sequences[i])) + inputs.append(spend_info[spk]['utxo']) + tx.vout.append(CTxOut(1000000000, old_spks[1])) + tx.vout.append(CTxOut(3410000000, pubs[98])) + tx.nLockTime = 500000000 + precomputed = { + "hashAmounts": BIP341_sha_amounts(inputs), + "hashPrevouts": BIP341_sha_prevouts(tx), + "hashScriptPubkeys": BIP341_sha_scriptpubkeys(inputs), + "hashSequences": BIP341_sha_sequences(tx), + "hashOutputs": BIP341_sha_outputs(tx) + } + keypath_tests = tests.setdefault("keyPathSpending", []) + tx_test = {} + global_given = tx_test.setdefault("given", {}) + global_given['rawUnsignedTx'] = tx.serialize().hex() + utxos_spent = global_given.setdefault("utxosSpent", []) + for i in range(len(input_spks)): + utxos_spent.append({"scriptPubKey": inputs[i].scriptPubKey.hex(), "amountSats": inputs[i].nValue}) + global_intermediary = tx_test.setdefault("intermediary", {}) + for key in sorted(precomputed.keys()): + global_intermediary[key] = precomputed[key].hex() + test_list = tx_test.setdefault('inputSpending', []) + for i in range(len(input_spks)): + ctx = { + **DEFAULT_CONTEXT, + **spend_info[input_spks[i]], + 'tx': tx, + 'utxos': inputs, + 'idx': i, + 'hashtype': hashtypes[i], + 'deterministic': True + } + if ctx['mode'] == 'taproot': + test_case = {} + given = test_case.setdefault("given", {}) + given['txinIndex'] = i + given['internalPrivkey'] = get(ctx, 'key').hex() + if get(ctx, "tap").merkle_root != bytes(): + given['merkleRoot'] = get(ctx, "tap").merkle_root.hex() + else: + given['merkleRoot'] = None + given['hashType'] = get(ctx, "hashtype") + intermediary = test_case.setdefault("intermediary", {}) + intermediary['internalPubkey'] = get(ctx, "tap").internal_pubkey.hex() + intermediary['tweak'] = get(ctx, "tap").tweak.hex() + intermediary['tweakedPrivkey'] = get(ctx, "key_tweaked").hex() + sigmsg = get(ctx, "sigmsg") + intermediary['sigMsg'] = sigmsg.hex() + intermediary['precomputedUsed'] = [key for key in sorted(precomputed.keys()) if sigmsg.count(precomputed[key])] + intermediary['sigHash'] = get(ctx, "sighash").hex() + expected = test_case.setdefault("expected", {}) + expected['witness'] = [get(ctx, "sign").hex()] + test_list.append(test_case) + tx.wit.vtxinwit.append(CTxInWitness()) + tx.vin[i].scriptSig = CScript(flatten(get(ctx, "scriptsig"))) + tx.wit.vtxinwit[i].scriptWitness.stack = flatten(get(ctx, "witness")) + aux = tx_test.setdefault("auxiliary", {}) + aux['fullySignedTx'] = tx.serialize().hex() + keypath_tests.append(tx_test) + assert_equal(hashlib.sha256(tx.serialize()).hexdigest(), "24bab662cb55a7f3bae29b559f651674c62bcc1cd442d44715c0133939107b38") + # Mine the spending transaction + self.block_submit(self.nodes[1], [tx], "Spending txn", None, sigops_weight=10000, accept=True, witness=True) + + if GEN_TEST_VECTORS: + print(json.dumps(tests, indent=4, sort_keys=False)) + + def run_test(self): + self.gen_test_vectors() + # Post-taproot activation tests go first (pre-taproot tests' blocks are invalid post-taproot). self.log.info("Post-activation tests...") - self.generate(self.nodes[1], COINBASE_MATURITY + 1) self.test_spenders(self.nodes[1], spenders_taproot_active(), input_counts=[1, 2, 2, 2, 2, 3]) # Re-connect nodes in case they have been disconnected diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py index 26526e35fa..e5dea66963 100644 --- a/test/functional/test_framework/key.py +++ b/test/functional/test_framework/key.py @@ -8,6 +8,7 @@ keys, and is trivially vulnerable to side channel attacks. Do not use for anything but tests.""" import csv import hashlib +import hmac import os import random import unittest @@ -326,6 +327,16 @@ def generate_privkey(): """Generate a valid random 32-byte private key.""" return random.randrange(1, SECP256K1_ORDER).to_bytes(32, 'big') +def rfc6979_nonce(key): + """Compute signing nonce using RFC6979.""" + v = bytes([1] * 32) + k = bytes([0] * 32) + k = hmac.new(k, v + b"\x00" + key, 'sha256').digest() + v = hmac.new(k, v, 'sha256').digest() + k = hmac.new(k, v + b"\x01" + key, 'sha256').digest() + v = hmac.new(k, v, 'sha256').digest() + return hmac.new(k, v, 'sha256').digest() + class ECKey(): """A secp256k1 private key""" @@ -368,15 +379,18 @@ class ECKey(): ret.compressed = self.compressed return ret - def sign_ecdsa(self, msg, low_s=True): + def sign_ecdsa(self, msg, low_s=True, rfc6979=False): """Construct a DER-encoded ECDSA signature with this key. See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the ECDSA signer algorithm.""" assert(self.valid) z = int.from_bytes(msg, 'big') - # Note: no RFC6979, but a simple random nonce (some tests rely on distinct transactions for the same operation) - k = random.randrange(1, SECP256K1_ORDER) + # Note: no RFC6979 by default, but a simple random nonce (some tests rely on distinct transactions for the same operation) + if rfc6979: + k = int.from_bytes(rfc6979_nonce(self.secret.to_bytes(32, 'big') + msg), 'big') + else: + k = random.randrange(1, SECP256K1_ORDER) R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, k)])) r = R[0] % SECP256K1_ORDER s = (modinv(k, SECP256K1_ORDER) * (z + self.secret * r)) % SECP256K1_ORDER diff --git a/test/functional/test_framework/script.py b/test/functional/test_framework/script.py index 3c9b8a6e69..de71e19251 100644 --- a/test/functional/test_framework/script.py +++ b/test/functional/test_framework/script.py @@ -619,16 +619,15 @@ def FindAndDelete(script, sig): r += script[last_sop_idx:] return CScript(r) -def LegacySignatureHash(script, txTo, inIdx, hashtype): - """Consensus-correct SignatureHash +def LegacySignatureMsg(script, txTo, inIdx, hashtype): + """Preimage of the signature hash, if it exists. - Returns (hash, err) to precisely match the consensus-critical behavior of - the SIGHASH_SINGLE bug. (inIdx is *not* checked for validity) + Returns either (None, err) to indicate error (which translates to sighash 1), + or (msg, None). """ - HASH_ONE = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' if inIdx >= len(txTo.vin): - return (HASH_ONE, "inIdx %d out of range (%d)" % (inIdx, len(txTo.vin))) + return (None, "inIdx %d out of range (%d)" % (inIdx, len(txTo.vin))) txtmp = CTransaction(txTo) for txin in txtmp.vin: @@ -645,7 +644,7 @@ def LegacySignatureHash(script, txTo, inIdx, hashtype): elif (hashtype & 0x1f) == SIGHASH_SINGLE: outIdx = inIdx if outIdx >= len(txtmp.vout): - return (HASH_ONE, "outIdx %d out of range (%d)" % (outIdx, len(txtmp.vout))) + return (None, "outIdx %d out of range (%d)" % (outIdx, len(txtmp.vout))) tmp = txtmp.vout[outIdx] txtmp.vout = [] @@ -665,15 +664,27 @@ def LegacySignatureHash(script, txTo, inIdx, hashtype): s = txtmp.serialize_without_witness() s += struct.pack(b"<I", hashtype) - hash = hash256(s) + return (s, None) + +def LegacySignatureHash(*args, **kwargs): + """Consensus-correct SignatureHash + + Returns (hash, err) to precisely match the consensus-critical behavior of + the SIGHASH_SINGLE bug. (inIdx is *not* checked for validity) + """ - return (hash, None) + HASH_ONE = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg, err = LegacySignatureMsg(*args, **kwargs) + if msg is None: + return (HASH_ONE, err) + else: + return (hash256(msg), err) # TODO: Allow cached hashPrevouts/hashSequence/hashOutputs to be provided. # Performance optimization probably not necessary for python tests, however. # Note that this corresponds to sigversion == 1 in EvalScript, which is used # for version 0 witnesses. -def SegwitV0SignatureHash(script, txTo, inIdx, hashtype, amount): +def SegwitV0SignatureMsg(script, txTo, inIdx, hashtype, amount): hashPrevouts = 0 hashSequence = 0 @@ -711,8 +722,10 @@ def SegwitV0SignatureHash(script, txTo, inIdx, hashtype, amount): ss += ser_uint256(hashOutputs) ss += struct.pack("<i", txTo.nLockTime) ss += struct.pack("<I", hashtype) + return ss - return hash256(ss) +def SegwitV0SignatureHash(*args, **kwargs): + return hash256(SegwitV0SignatureMsg(*args, **kwargs)) class TestFrameworkScript(unittest.TestCase): def test_bn2vch(self): @@ -742,7 +755,22 @@ class TestFrameworkScript(unittest.TestCase): for value in values: self.assertEqual(CScriptNum.decode(CScriptNum.encode(CScriptNum(value))), value) -def TaprootSignatureHash(txTo, spent_utxos, hash_type, input_index = 0, scriptpath = False, script = CScript(), codeseparator_pos = -1, annex = None, leaf_ver = LEAF_VERSION_TAPSCRIPT): +def BIP341_sha_prevouts(txTo): + return sha256(b"".join(i.prevout.serialize() for i in txTo.vin)) + +def BIP341_sha_amounts(spent_utxos): + return sha256(b"".join(struct.pack("<q", u.nValue) for u in spent_utxos)) + +def BIP341_sha_scriptpubkeys(spent_utxos): + return sha256(b"".join(ser_string(u.scriptPubKey) for u in spent_utxos)) + +def BIP341_sha_sequences(txTo): + return sha256(b"".join(struct.pack("<I", i.nSequence) for i in txTo.vin)) + +def BIP341_sha_outputs(txTo): + return sha256(b"".join(o.serialize() for o in txTo.vout)) + +def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index = 0, scriptpath = False, script = CScript(), codeseparator_pos = -1, annex = None, leaf_ver = LEAF_VERSION_TAPSCRIPT): assert (len(txTo.vin) == len(spent_utxos)) assert (input_index < len(txTo.vin)) out_type = SIGHASH_ALL if hash_type == 0 else hash_type & 3 @@ -752,12 +780,12 @@ def TaprootSignatureHash(txTo, spent_utxos, hash_type, input_index = 0, scriptpa ss += struct.pack("<i", txTo.nVersion) ss += struct.pack("<I", txTo.nLockTime) if in_type != SIGHASH_ANYONECANPAY: - ss += sha256(b"".join(i.prevout.serialize() for i in txTo.vin)) - ss += sha256(b"".join(struct.pack("<q", u.nValue) for u in spent_utxos)) - ss += sha256(b"".join(ser_string(u.scriptPubKey) for u in spent_utxos)) - ss += sha256(b"".join(struct.pack("<I", i.nSequence) for i in txTo.vin)) + ss += BIP341_sha_prevouts(txTo) + ss += BIP341_sha_amounts(spent_utxos) + ss += BIP341_sha_scriptpubkeys(spent_utxos) + ss += BIP341_sha_sequences(txTo) if out_type == SIGHASH_ALL: - ss += sha256(b"".join(o.serialize() for o in txTo.vout)) + ss += BIP341_sha_outputs(txTo) spend_type = 0 if annex is not None: spend_type |= 1 @@ -783,7 +811,10 @@ def TaprootSignatureHash(txTo, spent_utxos, hash_type, input_index = 0, scriptpa ss += bytes([0]) ss += struct.pack("<i", codeseparator_pos) assert len(ss) == 175 - (in_type == SIGHASH_ANYONECANPAY) * 49 - (out_type != SIGHASH_ALL and out_type != SIGHASH_SINGLE) * 32 + (annex is not None) * 32 + scriptpath * 37 - return TaggedHash("TapSighash", ss) + return ss + +def TaprootSignatureHash(*args, **kwargs): + return TaggedHash("TapSighash", TaprootSignatureMsg(*args, **kwargs)) def taproot_tree_helper(scripts): if len(scripts) == 0: @@ -805,20 +836,20 @@ def taproot_tree_helper(scripts): h = TaggedHash("TapLeaf", bytes([version]) + ser_string(code)) if name is None: return ([], h) - return ([(name, version, code, bytes())], h) + return ([(name, version, code, bytes(), h)], h) elif len(scripts) == 2 and callable(scripts[1]): # Two entries, and the right one is a function left, left_h = taproot_tree_helper(scripts[0:1]) right_h = scripts[1](left_h) - left = [(name, version, script, control + right_h) for name, version, script, control in left] + left = [(name, version, script, control + right_h, leaf) for name, version, script, control, leaf in left] right = [] else: # Two or more entries: descend into each side split_pos = len(scripts) // 2 left, left_h = taproot_tree_helper(scripts[0:split_pos]) right, right_h = taproot_tree_helper(scripts[split_pos:]) - left = [(name, version, script, control + right_h) for name, version, script, control in left] - right = [(name, version, script, control + left_h) for name, version, script, control in right] + left = [(name, version, script, control + right_h, leaf) for name, version, script, control, leaf in left] + right = [(name, version, script, control + left_h, leaf) for name, version, script, control, leaf in right] if right_h < left_h: right_h, left_h = left_h, right_h h = TaggedHash("TapBranch", left_h + right_h) @@ -830,13 +861,14 @@ def taproot_tree_helper(scripts): # - negflag: whether the pubkey in the scriptPubKey was negated from internal_pubkey+tweak*G (bool). # - tweak: the tweak (32 bytes) # - leaves: a dict of name -> TaprootLeafInfo objects for all known leaves -TaprootInfo = namedtuple("TaprootInfo", "scriptPubKey,internal_pubkey,negflag,tweak,leaves") +# - merkle_root: the script tree's Merkle root, or bytes() if no leaves are present +TaprootInfo = namedtuple("TaprootInfo", "scriptPubKey,internal_pubkey,negflag,tweak,leaves,merkle_root,output_pubkey") # A TaprootLeafInfo object has the following fields: # - script: the leaf script (CScript or bytes) # - version: the leaf version (0xc0 for BIP342 tapscript) # - merklebranch: the merkle branch to use for this leaf (32*N bytes) -TaprootLeafInfo = namedtuple("TaprootLeafInfo", "script,version,merklebranch") +TaprootLeafInfo = namedtuple("TaprootLeafInfo", "script,version,merklebranch,leaf_hash") def taproot_construct(pubkey, scripts=None): """Construct a tree of Taproot spending conditions @@ -858,8 +890,8 @@ def taproot_construct(pubkey, scripts=None): ret, h = taproot_tree_helper(scripts) tweak = TaggedHash("TapTweak", pubkey + h) tweaked, negated = tweak_add_pubkey(pubkey, tweak) - leaves = dict((name, TaprootLeafInfo(script, version, merklebranch)) for name, version, script, merklebranch in ret) - return TaprootInfo(CScript([OP_1, tweaked]), pubkey, negated + 0, tweak, leaves) + leaves = dict((name, TaprootLeafInfo(script, version, merklebranch, leaf)) for name, version, script, merklebranch, leaf in ret) + return TaprootInfo(CScript([OP_1, tweaked]), pubkey, negated + 0, tweak, leaves, h, tweaked) def is_op_success(o): return o == 0x50 or o == 0x62 or o == 0x89 or o == 0x8a or o == 0x8d or o == 0x8e or (o >= 0x7e and o <= 0x81) or (o >= 0x83 and o <= 0x86) or (o >= 0x95 and o <= 0x99) or (o >= 0xbb and o <= 0xfe) |