aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtest/functional/feature_bip68_sequence.py104
-rw-r--r--test/functional/test_framework/wallet.py81
2 files changed, 93 insertions, 92 deletions
diff --git a/test/functional/feature_bip68_sequence.py b/test/functional/feature_bip68_sequence.py
index d63821fbbc..43cde0a546 100755
--- a/test/functional/feature_bip68_sequence.py
+++ b/test/functional/feature_bip68_sequence.py
@@ -32,6 +32,7 @@ from test_framework.util import (
assert_raises_rpc_error,
softfork_active,
)
+from test_framework.wallet import MiniWallet
SCRIPT_W0_SH_OP_TRUE = script_to_p2wsh_script(CScript([OP_TRUE]))
@@ -58,14 +59,12 @@ class BIP68Test(BitcoinTestFramework):
],
]
- def skip_test_if_missing_module(self):
- self.skip_if_no_wallet()
-
def run_test(self):
self.relayfee = self.nodes[0].getnetworkinfo()["relayfee"]
+ self.wallet = MiniWallet(self.nodes[0])
# Generate some coins
- self.generate(self.nodes[0], 110)
+ self.generate(self.wallet, 110)
self.log.info("Running test disable flag")
self.test_disable_flag()
@@ -92,16 +91,10 @@ class BIP68Test(BitcoinTestFramework):
# the first sequence bit is set.
def test_disable_flag(self):
# Create some unconfirmed inputs
- new_addr = self.nodes[0].getnewaddress()
- self.nodes[0].sendtoaddress(new_addr, 2) # send 2 BTC
-
- utxos = self.nodes[0].listunspent(0, 0)
- assert len(utxos) > 0
-
- utxo = utxos[0]
+ utxo = self.wallet.send_self_transfer(from_node=self.nodes[0])["new_utxo"]
tx1 = CTransaction()
- value = int((utxo["amount"] - self.relayfee) * COIN)
+ value = int((utxo["value"] - self.relayfee) * COIN)
# Check that the disable flag disables relative locktime.
# If sequence locks were used, this would require 1 block for the
@@ -110,8 +103,8 @@ class BIP68Test(BitcoinTestFramework):
tx1.vin = [CTxIn(COutPoint(int(utxo["txid"], 16), utxo["vout"]), nSequence=sequence_value)]
tx1.vout = [CTxOut(value, SCRIPT_W0_SH_OP_TRUE)]
- tx1_signed = self.nodes[0].signrawtransactionwithwallet(tx1.serialize().hex())["hex"]
- tx1_id = self.nodes[0].sendrawtransaction(tx1_signed)
+ self.wallet.sign_tx(tx=tx1)
+ tx1_id = self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx1.serialize().hex())
tx1_id = int(tx1_id, 16)
# This transaction will enable sequence-locks, so this transaction should
@@ -125,13 +118,13 @@ class BIP68Test(BitcoinTestFramework):
tx2.vout = [CTxOut(int(value - self.relayfee * COIN), SCRIPT_W0_SH_OP_TRUE)]
tx2.rehash()
- assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.nodes[0].sendrawtransaction, tx2.serialize().hex())
+ assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.wallet.sendrawtransaction, from_node=self.nodes[0], tx_hex=tx2.serialize().hex())
# Setting the version back down to 1 should disable the sequence lock,
# so this should be accepted.
tx2.nVersion = 1
- self.nodes[0].sendrawtransaction(tx2.serialize().hex())
+ self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx2.serialize().hex())
# Calculate the median time past of a prior block ("confirmations" before
# the current tip).
@@ -144,20 +137,13 @@ class BIP68Test(BitcoinTestFramework):
# Create lots of confirmed utxos, and use them to generate lots of random
# transactions.
max_outputs = 50
- addresses = []
- while len(addresses) < max_outputs:
- addresses.append(self.nodes[0].getnewaddress())
- while len(self.nodes[0].listunspent()) < 200:
+ while len(self.wallet.get_utxos(include_immature_coinbase=False, mark_as_spent=False)) < 200:
import random
- random.shuffle(addresses)
num_outputs = random.randint(1, max_outputs)
- outputs = {}
- for i in range(num_outputs):
- outputs[addresses[i]] = random.randint(1, 20)*0.01
- self.nodes[0].sendmany("", outputs)
- self.generate(self.nodes[0], 1)
+ self.wallet.send_self_transfer_multi(from_node=self.nodes[0], num_outputs=num_outputs)
+ self.generate(self.wallet, 1)
- utxos = self.nodes[0].listunspent()
+ utxos = self.wallet.get_utxos(include_immature_coinbase=False)
# Try creating a lot of random transactions.
# Each time, choose a random number of inputs, and randomly set
@@ -214,19 +200,20 @@ class BIP68Test(BitcoinTestFramework):
sequence_value = ((cur_time - orig_time) >> SEQUENCE_LOCKTIME_GRANULARITY)+1
sequence_value |= SEQUENCE_LOCKTIME_TYPE_FLAG
tx.vin.append(CTxIn(COutPoint(int(utxos[j]["txid"], 16), utxos[j]["vout"]), nSequence=sequence_value))
- value += utxos[j]["amount"]*COIN
+ value += utxos[j]["value"]*COIN
# Overestimate the size of the tx - signatures should be less than 120 bytes, and leave 50 for the output
tx_size = len(tx.serialize().hex())//2 + 120*num_inputs + 50
tx.vout.append(CTxOut(int(value - self.relayfee * tx_size * COIN / 1000), SCRIPT_W0_SH_OP_TRUE))
- rawtx = self.nodes[0].signrawtransactionwithwallet(tx.serialize().hex())["hex"]
+ self.wallet.sign_tx(tx=tx)
if (using_sequence_locks and not should_pass):
# This transaction should be rejected
- assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.nodes[0].sendrawtransaction, rawtx)
+ assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.wallet.sendrawtransaction, from_node=self.nodes[0], tx_hex=tx.serialize().hex())
else:
# This raw transaction should be accepted
- self.nodes[0].sendrawtransaction(rawtx)
- utxos = self.nodes[0].listunspent()
+ self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx.serialize().hex())
+ self.wallet.rescan_utxos()
+ utxos = self.wallet.get_utxos(include_immature_coinbase=False)
# Test that sequence locks on unconfirmed inputs must have nSequence
# height or time of 0 to be accepted.
@@ -237,8 +224,8 @@ class BIP68Test(BitcoinTestFramework):
cur_height = self.nodes[0].getblockcount()
# Create a mempool tx.
- txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 2)
- tx1 = tx_from_hex(self.nodes[0].getrawtransaction(txid))
+ self.wallet.rescan_utxos()
+ tx1 = self.wallet.send_self_transfer(from_node=self.nodes[0])["tx"]
tx1.rehash()
# Anyone-can-spend mempool tx.
@@ -247,11 +234,11 @@ class BIP68Test(BitcoinTestFramework):
tx2.nVersion = 2
tx2.vin = [CTxIn(COutPoint(tx1.sha256, 0), nSequence=0)]
tx2.vout = [CTxOut(int(tx1.vout[0].nValue - self.relayfee * COIN), SCRIPT_W0_SH_OP_TRUE)]
- tx2_raw = self.nodes[0].signrawtransactionwithwallet(tx2.serialize().hex())["hex"]
- tx2 = tx_from_hex(tx2_raw)
+ self.wallet.sign_tx(tx=tx2)
+ tx2_raw = tx2.serialize().hex()
tx2.rehash()
- self.nodes[0].sendrawtransaction(tx2_raw)
+ self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx2_raw)
# Create a spend of the 0th output of orig_tx with a sequence lock
# of 1, and test what happens when submitting.
@@ -271,10 +258,10 @@ class BIP68Test(BitcoinTestFramework):
if (orig_tx.hash in node.getrawmempool()):
# sendrawtransaction should fail if the tx is in the mempool
- assert_raises_rpc_error(-26, NOT_FINAL_ERROR, node.sendrawtransaction, tx.serialize().hex())
+ assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.wallet.sendrawtransaction, from_node=node, tx_hex=tx.serialize().hex())
else:
# sendrawtransaction should succeed if the tx is not in the mempool
- node.sendrawtransaction(tx.serialize().hex())
+ self.wallet.sendrawtransaction(from_node=node, tx_hex=tx.serialize().hex())
return tx
@@ -287,7 +274,7 @@ class BIP68Test(BitcoinTestFramework):
cur_time = int(time.time())
for _ in range(10):
self.nodes[0].setmocktime(cur_time + 600)
- self.generate(self.nodes[0], 1, sync_fun=self.no_op)
+ self.generate(self.wallet, 1, sync_fun=self.no_op)
cur_time += 600
assert tx2.hash in self.nodes[0].getrawmempool()
@@ -321,12 +308,12 @@ class BIP68Test(BitcoinTestFramework):
tx5 = test_nonzero_locks(tx4, self.nodes[0], self.relayfee, use_height_lock=True)
assert tx5.hash not in self.nodes[0].getrawmempool()
- utxos = self.nodes[0].listunspent()
- tx5.vin.append(CTxIn(COutPoint(int(utxos[0]["txid"], 16), utxos[0]["vout"]), nSequence=1))
- tx5.vout[0].nValue += int(utxos[0]["amount"]*COIN)
- raw_tx5 = self.nodes[0].signrawtransactionwithwallet(tx5.serialize().hex())["hex"]
+ utxo = self.wallet.get_utxo()
+ tx5.vin.append(CTxIn(COutPoint(int(utxo["txid"], 16), utxo["vout"]), nSequence=1))
+ tx5.vout[0].nValue += int(utxo["value"]*COIN)
+ self.wallet.sign_tx(tx=tx5)
- assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.nodes[0].sendrawtransaction, raw_tx5)
+ assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.wallet.sendrawtransaction, from_node=self.nodes[0], tx_hex=tx5.serialize().hex())
# Test mempool-BIP68 consistency after reorg
#
@@ -362,7 +349,7 @@ class BIP68Test(BitcoinTestFramework):
# Reset the chain and get rid of the mocktimed-blocks
self.nodes[0].setmocktime(0)
self.nodes[0].invalidateblock(self.nodes[0].getblockhash(cur_height+1))
- self.generate(self.nodes[0], 10, sync_fun=self.no_op)
+ self.generate(self.wallet, 10, sync_fun=self.no_op)
# Make sure that BIP68 isn't being used to validate blocks prior to
# activation height. If more blocks are mined prior to this test
@@ -370,9 +357,8 @@ class BIP68Test(BitcoinTestFramework):
# this test should be moved to run earlier, or deleted.
def test_bip68_not_consensus(self):
assert not softfork_active(self.nodes[0], 'csv')
- txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 2)
- tx1 = tx_from_hex(self.nodes[0].getrawtransaction(txid))
+ tx1 = self.wallet.send_self_transfer(from_node=self.nodes[0])["tx"]
tx1.rehash()
# Make an anyone-can-spend transaction
@@ -382,11 +368,12 @@ class BIP68Test(BitcoinTestFramework):
tx2.vout = [CTxOut(int(tx1.vout[0].nValue - self.relayfee * COIN), SCRIPT_W0_SH_OP_TRUE)]
# sign tx2
- tx2_raw = self.nodes[0].signrawtransactionwithwallet(tx2.serialize().hex())["hex"]
+ self.wallet.sign_tx(tx=tx2)
+ tx2_raw = tx2.serialize().hex()
tx2 = tx_from_hex(tx2_raw)
tx2.rehash()
- self.nodes[0].sendrawtransaction(tx2.serialize().hex())
+ self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx2_raw)
# Now make an invalid spend of tx2 according to BIP68
sequence_value = 100 # 100 block relative locktime
@@ -399,7 +386,7 @@ class BIP68Test(BitcoinTestFramework):
tx3.vout = [CTxOut(int(tx2.vout[0].nValue - self.relayfee * COIN), SCRIPT_W0_SH_OP_TRUE)]
tx3.rehash()
- assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.nodes[0].sendrawtransaction, tx3.serialize().hex())
+ assert_raises_rpc_error(-26, NOT_FINAL_ERROR, self.wallet.sendrawtransaction, from_node=self.nodes[0], tx_hex=tx3.serialize().hex())
# make a block that violates bip68; ensure that the tip updates
block = create_block(tmpl=self.nodes[0].getblocktemplate(NORMAL_GBT_REQUEST_PARAMS), txlist=[tx1, tx2, tx3])
@@ -415,22 +402,19 @@ class BIP68Test(BitcoinTestFramework):
min_activation_height = 432
height = self.nodes[0].getblockcount()
assert_greater_than(min_activation_height - height, 2)
- self.generate(self.nodes[0], min_activation_height - height - 2, sync_fun=self.no_op)
+ self.generate(self.wallet, min_activation_height - height - 2, sync_fun=self.no_op)
assert not softfork_active(self.nodes[0], 'csv')
- self.generate(self.nodes[0], 1, sync_fun=self.no_op)
+ self.generate(self.wallet, 1, sync_fun=self.no_op)
assert softfork_active(self.nodes[0], 'csv')
self.sync_blocks()
# Use self.nodes[1] to test that version 2 transactions are standard.
def test_version2_relay(self):
- inputs = [ ]
- outputs = { self.nodes[1].getnewaddress() : 1.0 }
- rawtx = self.nodes[1].createrawtransaction(inputs, outputs)
- rawtxfund = self.nodes[1].fundrawtransaction(rawtx)['hex']
- tx = tx_from_hex(rawtxfund)
+ mini_wallet = MiniWallet(self.nodes[1])
+ mini_wallet.rescan_utxos()
+ tx = mini_wallet.create_self_transfer()["tx"]
tx.nVersion = 2
- tx_signed = self.nodes[1].signrawtransactionwithwallet(tx.serialize().hex())["hex"]
- self.nodes[1].sendrawtransaction(tx_signed)
+ mini_wallet.sendrawtransaction(from_node=self.nodes[1], tx_hex=tx.serialize().hex())
if __name__ == '__main__':
BIP68Test().main()
diff --git a/test/functional/test_framework/wallet.py b/test/functional/test_framework/wallet.py
index a72b5e5891..5dacf9d8d2 100644
--- a/test/functional/test_framework/wallet.py
+++ b/test/functional/test_framework/wallet.py
@@ -55,6 +55,7 @@ from test_framework.util import (
assert_equal,
assert_greater_than_or_equal,
)
+from test_framework.blocktools import COINBASE_MATURITY
DEFAULT_FEE = Decimal("0.0001")
@@ -100,8 +101,8 @@ class MiniWallet:
self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true()
self._scriptPubKey = bytes.fromhex(self._test_node.validateaddress(self._address)['scriptPubKey'])
- def _create_utxo(self, *, txid, vout, value, height):
- return {"txid": txid, "vout": vout, "value": value, "height": height}
+ def _create_utxo(self, *, txid, vout, value, height, coinbase, confirmations):
+ return {"txid": txid, "vout": vout, "value": value, "height": height, "coinbase": coinbase, "confirmations": confirmations}
def _bulk_tx(self, tx, target_weight):
"""Pad a transaction with extra outputs until it reaches a target weight (or higher).
@@ -118,13 +119,25 @@ class MiniWallet:
def get_balance(self):
return sum(u['value'] for u in self._utxos)
- def rescan_utxos(self):
+ def rescan_utxos(self, *, include_mempool=True):
"""Drop all utxos and rescan the utxo set"""
self._utxos = []
res = self._test_node.scantxoutset(action="start", scanobjects=[self.get_descriptor()])
assert_equal(True, res['success'])
for utxo in res['unspents']:
- self._utxos.append(self._create_utxo(txid=utxo["txid"], vout=utxo["vout"], value=utxo["amount"], height=utxo["height"]))
+ self._utxos.append(
+ self._create_utxo(txid=utxo["txid"],
+ vout=utxo["vout"],
+ value=utxo["amount"],
+ height=utxo["height"],
+ coinbase=utxo["coinbase"],
+ confirmations=res["height"] - utxo["height"] + 1))
+ if include_mempool:
+ mempool = self._test_node.getrawmempool(verbose=True)
+ # Sort tx by ancestor count. See BlockAssembler::SortForBlock in src/node/miner.cpp
+ sorted_mempool = sorted(mempool.items(), key=lambda item: (item[1]["ancestorcount"], int(item[0], 16)))
+ for txid, _ in sorted_mempool:
+ self.scan_tx(self._test_node.getrawtransaction(txid=txid, verbose=True))
def scan_tx(self, tx):
"""Scan the tx and adjust the internal list of owned utxos"""
@@ -139,27 +152,35 @@ class MiniWallet:
pass
for out in tx['vout']:
if out['scriptPubKey']['hex'] == self._scriptPubKey.hex():
- self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0))
+ self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0, coinbase=False, confirmations=0))
def scan_txs(self, txs):
for tx in txs:
self.scan_tx(tx)
def sign_tx(self, tx, fixed_length=True):
- """Sign tx that has been created by MiniWallet in P2PK mode"""
- assert_equal(self._mode, MiniWalletMode.RAW_P2PK)
- (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
- assert err is None
- # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
- # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes)
- # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes
- der_sig = b''
- while not len(der_sig) == 71:
- der_sig = self._priv_key.sign_ecdsa(sighash)
- if not fixed_length:
- break
- tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))])
- tx.rehash()
+ if self._mode == MiniWalletMode.RAW_P2PK:
+ (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
+ assert err is None
+ # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
+ # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes)
+ # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes
+ der_sig = b''
+ while not len(der_sig) == 71:
+ der_sig = self._priv_key.sign_ecdsa(sighash)
+ if not fixed_length:
+ break
+ tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))])
+ tx.rehash()
+ elif self._mode == MiniWalletMode.RAW_OP_TRUE:
+ for i in range(len(tx.vin)):
+ tx.vin[i].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
+ elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
+ tx.wit.vtxinwit = [CTxInWitness()] * len(tx.vin)
+ for i in range(len(tx.vin)):
+ tx.wit.vtxinwit[i].scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
+ else:
+ assert False
def generate(self, num_blocks, **kwargs):
"""Generate blocks with coinbase outputs to the internal address, and call rescan_utxos"""
@@ -204,9 +225,13 @@ class MiniWallet:
else:
return self._utxos[index]
- def get_utxos(self, *, mark_as_spent=True):
+ def get_utxos(self, *, include_immature_coinbase=False, mark_as_spent=True):
"""Returns the list of all utxos and optionally mark them as spent"""
- utxos = deepcopy(self._utxos)
+ if not include_immature_coinbase:
+ utxo_filter = filter(lambda utxo: not utxo['coinbase'] or COINBASE_MATURITY <= utxo['confirmations'], self._utxos)
+ else:
+ utxo_filter = self._utxos
+ utxos = deepcopy(list(utxo_filter))
if mark_as_spent:
self._utxos = []
return utxos
@@ -273,17 +298,7 @@ class MiniWallet:
tx.vout = [CTxOut(amount_per_output, bytearray(self._scriptPubKey)) for _ in range(num_outputs)]
tx.nLockTime = locktime
- if self._mode == MiniWalletMode.RAW_P2PK:
- self.sign_tx(tx)
- elif self._mode == MiniWalletMode.RAW_OP_TRUE:
- for i in range(len(utxos_to_spend)):
- tx.vin[i].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
- elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
- tx.wit.vtxinwit = [CTxInWitness()] * len(utxos_to_spend)
- for i in range(len(utxos_to_spend)):
- tx.wit.vtxinwit[i].scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
- else:
- assert False
+ self.sign_tx(tx)
if target_weight:
self._bulk_tx(tx, target_weight)
@@ -295,6 +310,8 @@ class MiniWallet:
vout=i,
value=Decimal(tx.vout[i].nValue) / COIN,
height=0,
+ coinbase=False,
+ confirmations=0,
) for i in range(len(tx.vout))],
"txid": txid,
"hex": tx.serialize().hex(),