aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_framework')
-rw-r--r--test/functional/test_framework/address.py28
-rw-r--r--test/functional/test_framework/blocktools.py6
-rwxr-xr-xtest/functional/test_framework/script_util.py13
-rwxr-xr-xtest/functional/test_framework/test_framework.py21
-rw-r--r--test/functional/test_framework/util.py24
-rw-r--r--test/functional/test_framework/wallet.py28
-rwxr-xr-xtest/functional/test_framework/wallet_util.py9
7 files changed, 91 insertions, 38 deletions
diff --git a/test/functional/test_framework/address.py b/test/functional/test_framework/address.py
index fe733e9368..89839c9bab 100644
--- a/test/functional/test_framework/address.py
+++ b/test/functional/test_framework/address.py
@@ -5,12 +5,21 @@
"""Encode and decode Bitcoin addresses.
- base58 P2PKH and P2SH addresses.
-- bech32 segwit v0 P2WPKH and P2WSH addresses."""
+- bech32 segwit v0 P2WPKH and P2WSH addresses.
+- bech32m segwit v1 P2TR addresses."""
import enum
import unittest
-from .script import hash256, hash160, sha256, CScript, OP_0
+from .script import (
+ CScript,
+ OP_0,
+ OP_TRUE,
+ hash160,
+ hash256,
+ sha256,
+ taproot_construct,
+)
from .segwit_addr import encode_segwit_address
from .util import assert_equal
@@ -29,6 +38,21 @@ class AddressType(enum.Enum):
chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
+def create_deterministic_address_bcrt1_p2tr_op_true():
+ """
+ Generates a deterministic bech32m address (segwit v1 output) that
+ can be spent with a witness stack of OP_TRUE and the control block
+ with internal public key (script-path spending).
+
+ Returns a tuple with the generated address and the internal key.
+ """
+ internal_key = (1).to_bytes(32, 'big')
+ scriptPubKey = taproot_construct(internal_key, [(None, CScript([OP_TRUE]))]).scriptPubKey
+ address = encode_segwit_address("bcrt", 1, scriptPubKey[2:])
+ assert_equal(address, 'bcrt1p9yfmy5h72durp7zrhlw9lf7jpwjgvwdg0jr0lqmmjtgg83266lqsekaqka')
+ return (address, internal_key)
+
+
def byte_to_base58(b, version):
result = ''
str = b.hex()
diff --git a/test/functional/test_framework/blocktools.py b/test/functional/test_framework/blocktools.py
index 85e3c2a383..5d0113465f 100644
--- a/test/functional/test_framework/blocktools.py
+++ b/test/functional/test_framework/blocktools.py
@@ -32,13 +32,13 @@ from .script import (
CScriptNum,
CScriptOp,
OP_1,
- OP_CHECKMULTISIG,
OP_RETURN,
OP_TRUE,
)
from .script_util import (
key_to_p2pk_script,
key_to_p2wpkh_script,
+ keys_to_multisig_script,
script_to_p2wsh_script,
)
from .util import assert_equal
@@ -209,7 +209,7 @@ def witness_script(use_p2wsh, pubkey):
pkscript = key_to_p2wpkh_script(pubkey)
else:
# 1-of-1 multisig
- witness_script = CScript([OP_1, bytes.fromhex(pubkey), OP_1, OP_CHECKMULTISIG])
+ witness_script = keys_to_multisig_script([pubkey])
pkscript = script_to_p2wsh_script(witness_script)
return pkscript.hex()
@@ -218,7 +218,7 @@ def create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount):
Optionally wrap the segwit output using P2SH."""
if use_p2wsh:
- program = CScript([OP_1, bytes.fromhex(pubkey), OP_1, OP_CHECKMULTISIG])
+ program = keys_to_multisig_script([pubkey])
addr = script_to_p2sh_p2wsh(program) if encode_p2sh else script_to_p2wsh(program)
else:
addr = key_to_p2sh_p2wpkh(pubkey) if encode_p2sh else key_to_p2wpkh(pubkey)
diff --git a/test/functional/test_framework/script_util.py b/test/functional/test_framework/script_util.py
index 82a9067dd2..cbc4a560db 100755
--- a/test/functional/test_framework/script_util.py
+++ b/test/functional/test_framework/script_util.py
@@ -5,7 +5,9 @@
"""Useful Script constants and utils."""
from test_framework.script import (
CScript,
+ CScriptOp,
OP_0,
+ OP_CHECKMULTISIG,
OP_CHECKSIG,
OP_DUP,
OP_EQUAL,
@@ -41,6 +43,17 @@ def key_to_p2pk_script(key):
return CScript([key, OP_CHECKSIG])
+def keys_to_multisig_script(keys, *, k=None):
+ n = len(keys)
+ if k is None: # n-of-n multisig by default
+ k = n
+ assert k <= n
+ op_k = CScriptOp.encode_op_n(k)
+ op_n = CScriptOp.encode_op_n(n)
+ checked_keys = [check_key(key) for key in keys]
+ return CScript([op_k] + checked_keys + [op_n, OP_CHECKMULTISIG])
+
+
def keyhash_to_p2pkh_script(hash):
assert len(hash) == 20
return CScript([OP_DUP, OP_HASH160, hash, OP_EQUALVERIFY, OP_CHECKSIG])
diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py
index ec3561b1f2..b18c050e0a 100755
--- a/test/functional/test_framework/test_framework.py
+++ b/test/functional/test_framework/test_framework.py
@@ -19,7 +19,7 @@ import tempfile
import time
from typing import List
-from .address import ADDRESS_BCRT1_P2WSH_OP_TRUE
+from .address import create_deterministic_address_bcrt1_p2tr_op_true
from .authproxy import JSONRPCException
from . import coverage
from .p2p import NetworkThread
@@ -413,7 +413,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
# To ensure that all nodes are out of IBD, the most recent block
# must have a timestamp not too old (see IsInitialBlockDownload()).
self.log.debug('Generate a block with current time')
- block_hash = self.generate(self.nodes[0], 1)[0]
+ block_hash = self.generate(self.nodes[0], 1, sync_fun=self.no_op)[0]
block = self.nodes[0].getblock(blockhash=block_hash, verbosity=0)
for n in self.nodes:
n.submitblock(block)
@@ -627,20 +627,27 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
self.connect_nodes(1, 2)
self.sync_all()
- def generate(self, generator, *args, **kwargs):
+ def no_op(self):
+ pass
+
+ def generate(self, generator, *args, sync_fun=None, **kwargs):
blocks = generator.generate(*args, invalid_call=False, **kwargs)
+ sync_fun() if sync_fun else self.sync_all()
return blocks
- def generateblock(self, generator, *args, **kwargs):
+ def generateblock(self, generator, *args, sync_fun=None, **kwargs):
blocks = generator.generateblock(*args, invalid_call=False, **kwargs)
+ sync_fun() if sync_fun else self.sync_all()
return blocks
- def generatetoaddress(self, generator, *args, **kwargs):
+ def generatetoaddress(self, generator, *args, sync_fun=None, **kwargs):
blocks = generator.generatetoaddress(*args, invalid_call=False, **kwargs)
+ sync_fun() if sync_fun else self.sync_all()
return blocks
- def generatetodescriptor(self, generator, *args, **kwargs):
+ def generatetodescriptor(self, generator, *args, sync_fun=None, **kwargs):
blocks = generator.generatetodescriptor(*args, invalid_call=False, **kwargs)
+ sync_fun() if sync_fun else self.sync_all()
return blocks
def sync_blocks(self, nodes=None, wait=1, timeout=60):
@@ -770,7 +777,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
# block in the cache does not age too much (have an old tip age).
# This is needed so that we are out of IBD when the test starts,
# see the tip age check in IsInitialBlockDownload().
- gen_addresses = [k.address for k in TestNode.PRIV_KEYS][:3] + [ADDRESS_BCRT1_P2WSH_OP_TRUE]
+ gen_addresses = [k.address for k in TestNode.PRIV_KEYS][:3] + [create_deterministic_address_bcrt1_p2tr_op_true()[0]]
assert_equal(len(gen_addresses), 4)
for i in range(8):
self.generatetoaddress(
diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py
index 9f5bca6884..ca1ffd48de 100644
--- a/test/functional/test_framework/util.py
+++ b/test/functional/test_framework/util.py
@@ -36,12 +36,12 @@ def assert_approx(v, vexp, vspan=0.00001):
def assert_fee_amount(fee, tx_size, feerate_BTC_kvB):
"""Assert the fee is in range."""
- feerate_BTC_vB = feerate_BTC_kvB / 1000
- target_fee = satoshi_round(tx_size * feerate_BTC_vB)
+ target_fee = get_fee(tx_size, feerate_BTC_kvB)
if fee < target_fee:
raise AssertionError("Fee of %s BTC too low! (Should be %s BTC)" % (str(fee), str(target_fee)))
# allow the wallet's estimation to be at most 2 bytes off
- if fee > (tx_size + 2) * feerate_BTC_vB:
+ high_fee = get_fee(tx_size + 2, feerate_BTC_kvB)
+ if fee > high_fee:
raise AssertionError("Fee of %s BTC too high! (Should be %s BTC)" % (str(fee), str(target_fee)))
@@ -218,6 +218,18 @@ def str_to_b64str(string):
return b64encode(string.encode('utf-8')).decode('ascii')
+def ceildiv(a, b):
+ """Divide 2 ints and round up to next int rather than round down"""
+ return -(-a // b)
+
+
+def get_fee(tx_size, feerate_btc_kvb):
+ """Calculate the fee in BTC given a feerate is BTC/kvB. Reflects CFeeRate::GetFee"""
+ feerate_sat_kvb = int(feerate_btc_kvb * Decimal(1e8)) # Fee in sat/kvb as an int to avoid float precision errors
+ target_fee_sat = ceildiv(feerate_sat_kvb * tx_size, 1000) # Round calculated fee up to nearest sat
+ return satoshi_round(target_fee_sat / Decimal(1e8)) # Truncate BTC result to nearest sat
+
+
def satoshi_round(amount):
return Decimal(amount).quantize(Decimal('0.00000001'), rounding=ROUND_DOWN)
@@ -451,10 +463,10 @@ def find_output(node, txid, amount, *, blockhash=None):
# Helper to create at least "count" utxos
# Pass in a fee that is sufficient for relay and mining new transactions.
-def create_confirmed_utxos(test_framework, fee, node, count):
+def create_confirmed_utxos(test_framework, fee, node, count, **kwargs):
to_generate = int(0.5 * count) + 101
while to_generate > 0:
- test_framework.generate(node, min(25, to_generate))
+ test_framework.generate(node, min(25, to_generate), **kwargs)
to_generate -= 25
utxos = node.listunspent()
iterations = count - len(utxos)
@@ -475,7 +487,7 @@ def create_confirmed_utxos(test_framework, fee, node, count):
node.sendrawtransaction(signed_tx)
while (node.getmempoolinfo()['size'] > 0):
- test_framework.generate(node, 1)
+ test_framework.generate(node, 1, **kwargs)
utxos = node.listunspent()
assert len(utxos) >= count
diff --git a/test/functional/test_framework/wallet.py b/test/functional/test_framework/wallet.py
index 81aad20079..7de6994735 100644
--- a/test/functional/test_framework/wallet.py
+++ b/test/functional/test_framework/wallet.py
@@ -9,7 +9,7 @@ from decimal import Decimal
from enum import Enum
from random import choice
from typing import Optional
-from test_framework.address import ADDRESS_BCRT1_P2WSH_OP_TRUE
+from test_framework.address import create_deterministic_address_bcrt1_p2tr_op_true
from test_framework.descriptors import descsum_create
from test_framework.key import ECKey
from test_framework.messages import (
@@ -24,8 +24,9 @@ from test_framework.messages import (
from test_framework.script import (
CScript,
LegacySignatureHash,
- OP_TRUE,
+ LEAF_VERSION_TAPSCRIPT,
OP_NOP,
+ OP_TRUE,
SIGHASH_ALL,
)
from test_framework.script_util import (
@@ -43,7 +44,7 @@ class MiniWalletMode(Enum):
"""Determines the transaction type the MiniWallet is creating and spending.
For most purposes, the default mode ADDRESS_OP_TRUE should be sufficient;
- it simply uses a fixed bech32 P2WSH address whose coins are spent with a
+ it simply uses a fixed bech32m P2TR address whose coins are spent with a
witness stack of OP_TRUE, i.e. following an anyone-can-spend policy.
However, if the transactions need to be modified by the user (e.g. prepending
scriptSig for testing opcodes that are activated by a soft-fork), or the txs
@@ -53,7 +54,7 @@ class MiniWalletMode(Enum):
| output | | tx is | can modify | needs
mode | description | address | standard | scriptSig | signing
----------------+-------------------+-----------+----------+------------+----------
- ADDRESS_OP_TRUE | anyone-can-spend | bech32 | yes | no | no
+ ADDRESS_OP_TRUE | anyone-can-spend | bech32m | yes | no | no
RAW_OP_TRUE | anyone-can-spend | - (raw) | no | yes | no
RAW_P2PK | pay-to-public-key | - (raw) | yes | yes | yes
"""
@@ -79,7 +80,7 @@ class MiniWallet:
pub_key = self._priv_key.get_pubkey()
self._scriptPubKey = key_to_p2pk_script(pub_key.get_bytes())
elif mode == MiniWalletMode.ADDRESS_OP_TRUE:
- self._address = ADDRESS_BCRT1_P2WSH_OP_TRUE
+ self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true()
self._scriptPubKey = bytes.fromhex(self._test_node.validateaddress(self._address)['scriptPubKey'])
def rescan_utxos(self):
@@ -88,13 +89,13 @@ class MiniWallet:
res = self._test_node.scantxoutset(action="start", scanobjects=[self.get_descriptor()])
assert_equal(True, res['success'])
for utxo in res['unspents']:
- self._utxos.append({'txid': utxo['txid'], 'vout': utxo['vout'], 'value': utxo['amount']})
+ self._utxos.append({'txid': utxo['txid'], 'vout': utxo['vout'], 'value': utxo['amount'], 'height': utxo['height']})
def scan_tx(self, tx):
"""Scan the tx for self._scriptPubKey outputs and add them to self._utxos"""
for out in tx['vout']:
if out['scriptPubKey']['hex'] == self._scriptPubKey.hex():
- self._utxos.append({'txid': tx['txid'], 'vout': out['n'], 'value': out['value']})
+ self._utxos.append({'txid': tx['txid'], 'vout': out['n'], 'value': out['value'], 'height': 0})
def sign_tx(self, tx, fixed_length=True):
"""Sign tx that has been created by MiniWallet in P2PK mode"""
@@ -115,8 +116,9 @@ class MiniWallet:
"""Generate blocks with coinbase outputs to the internal address, and append the outputs to the internal list"""
blocks = self._test_node.generatetodescriptor(num_blocks, self.get_descriptor(), **kwargs)
for b in blocks:
- cb_tx = self._test_node.getblock(blockhash=b, verbosity=2)['tx'][0]
- self._utxos.append({'txid': cb_tx['txid'], 'vout': 0, 'value': cb_tx['vout'][0]['value']})
+ block_info = self._test_node.getblock(blockhash=b, verbosity=2)
+ cb_tx = block_info['tx'][0]
+ self._utxos.append({'txid': cb_tx['txid'], 'vout': 0, 'value': cb_tx['vout'][0]['value'], 'height': block_info['height']})
return blocks
def get_descriptor(self):
@@ -170,10 +172,10 @@ class MiniWallet:
def create_self_transfer(self, *, fee_rate=Decimal("0.003"), from_node, utxo_to_spend=None, mempool_valid=True, locktime=0, sequence=0):
"""Create and return a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed."""
- self._utxos = sorted(self._utxos, key=lambda k: k['value'])
+ self._utxos = sorted(self._utxos, key=lambda k: (k['value'], -k['height']))
utxo_to_spend = utxo_to_spend or self._utxos.pop() # Pick the largest utxo (if none provided) and hope it covers the fee
if self._priv_key is None:
- vsize = Decimal(96) # anyone-can-spend
+ vsize = Decimal(104) # anyone-can-spend
else:
vsize = Decimal(168) # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other)
send_value = int(COIN * (utxo_to_spend['value'] - fee_rate * (vsize / 1000)))
@@ -190,10 +192,10 @@ class MiniWallet:
self.sign_tx(tx)
else:
# anyone-can-spend
- tx.vin[0].scriptSig = CScript([OP_NOP] * 35) # pad to identical size
+ tx.vin[0].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
else:
tx.wit.vtxinwit = [CTxInWitness()]
- tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE])]
+ tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
tx_hex = tx.serialize().hex()
tx_info = from_node.testmempoolaccept([tx_hex])[0]
diff --git a/test/functional/test_framework/wallet_util.py b/test/functional/test_framework/wallet_util.py
index 1ee55aa3b7..c307ded542 100755
--- a/test/functional/test_framework/wallet_util.py
+++ b/test/functional/test_framework/wallet_util.py
@@ -15,15 +15,10 @@ from test_framework.address import (
script_to_p2wsh,
)
from test_framework.key import ECKey
-from test_framework.script import (
- CScript,
- OP_2,
- OP_3,
- OP_CHECKMULTISIG,
-)
from test_framework.script_util import (
key_to_p2pkh_script,
key_to_p2wpkh_script,
+ keys_to_multisig_script,
script_to_p2sh_script,
script_to_p2wsh_script,
)
@@ -92,7 +87,7 @@ def get_multisig(node):
addr = node.getaddressinfo(node.getnewaddress())
addrs.append(addr['address'])
pubkeys.append(addr['pubkey'])
- script_code = CScript([OP_2] + [bytes.fromhex(pubkey) for pubkey in pubkeys] + [OP_3, OP_CHECKMULTISIG])
+ script_code = keys_to_multisig_script(pubkeys, k=2)
witness_script = script_to_p2wsh_script(script_code)
return Multisig(privkeys=[node.dumpprivkey(addr) for addr in addrs],
pubkeys=pubkeys,