aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework/wallet.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_framework/wallet.py')
-rw-r--r--test/functional/test_framework/wallet.py128
1 files changed, 75 insertions, 53 deletions
diff --git a/test/functional/test_framework/wallet.py b/test/functional/test_framework/wallet.py
index a72b5e5891..f3253630c4 100644
--- a/test/functional/test_framework/wallet.py
+++ b/test/functional/test_framework/wallet.py
@@ -55,6 +55,7 @@ from test_framework.util import (
assert_equal,
assert_greater_than_or_equal,
)
+from test_framework.blocktools import COINBASE_MATURITY
DEFAULT_FEE = Decimal("0.0001")
@@ -100,8 +101,15 @@ class MiniWallet:
self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true()
self._scriptPubKey = bytes.fromhex(self._test_node.validateaddress(self._address)['scriptPubKey'])
- def _create_utxo(self, *, txid, vout, value, height):
- return {"txid": txid, "vout": vout, "value": value, "height": height}
+ # When the pre-mined test framework chain is used, it contains coinbase
+ # outputs to the MiniWallet's default address in blocks 76-100
+ # (see method BitcoinTestFramework._initialize_chain())
+ # The MiniWallet needs to rescan_utxos() in order to account
+ # for those mature UTXOs, so that all txs spend confirmed coins
+ self.rescan_utxos()
+
+ def _create_utxo(self, *, txid, vout, value, height, coinbase, confirmations):
+ return {"txid": txid, "vout": vout, "value": value, "height": height, "coinbase": coinbase, "confirmations": confirmations}
def _bulk_tx(self, tx, target_weight):
"""Pad a transaction with extra outputs until it reaches a target weight (or higher).
@@ -118,13 +126,25 @@ class MiniWallet:
def get_balance(self):
return sum(u['value'] for u in self._utxos)
- def rescan_utxos(self):
+ def rescan_utxos(self, *, include_mempool=True):
"""Drop all utxos and rescan the utxo set"""
self._utxos = []
res = self._test_node.scantxoutset(action="start", scanobjects=[self.get_descriptor()])
assert_equal(True, res['success'])
for utxo in res['unspents']:
- self._utxos.append(self._create_utxo(txid=utxo["txid"], vout=utxo["vout"], value=utxo["amount"], height=utxo["height"]))
+ self._utxos.append(
+ self._create_utxo(txid=utxo["txid"],
+ vout=utxo["vout"],
+ value=utxo["amount"],
+ height=utxo["height"],
+ coinbase=utxo["coinbase"],
+ confirmations=res["height"] - utxo["height"] + 1))
+ if include_mempool:
+ mempool = self._test_node.getrawmempool(verbose=True)
+ # Sort tx by ancestor count. See BlockAssembler::SortForBlock in src/node/miner.cpp
+ sorted_mempool = sorted(mempool.items(), key=lambda item: (item[1]["ancestorcount"], int(item[0], 16)))
+ for txid, _ in sorted_mempool:
+ self.scan_tx(self._test_node.getrawtransaction(txid=txid, verbose=True))
def scan_tx(self, tx):
"""Scan the tx and adjust the internal list of owned utxos"""
@@ -139,27 +159,35 @@ class MiniWallet:
pass
for out in tx['vout']:
if out['scriptPubKey']['hex'] == self._scriptPubKey.hex():
- self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0))
+ self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0, coinbase=False, confirmations=0))
def scan_txs(self, txs):
for tx in txs:
self.scan_tx(tx)
def sign_tx(self, tx, fixed_length=True):
- """Sign tx that has been created by MiniWallet in P2PK mode"""
- assert_equal(self._mode, MiniWalletMode.RAW_P2PK)
- (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
- assert err is None
- # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
- # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes)
- # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes
- der_sig = b''
- while not len(der_sig) == 71:
- der_sig = self._priv_key.sign_ecdsa(sighash)
- if not fixed_length:
- break
- tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))])
- tx.rehash()
+ if self._mode == MiniWalletMode.RAW_P2PK:
+ (sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
+ assert err is None
+ # for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
+ # 65 bytes: high-R val (33 bytes) + low-S val (32 bytes)
+ # with the DER header/skeleton data of 6 bytes added, this leads to a target size of 71 bytes
+ der_sig = b''
+ while not len(der_sig) == 71:
+ der_sig = self._priv_key.sign_ecdsa(sighash)
+ if not fixed_length:
+ break
+ tx.vin[0].scriptSig = CScript([der_sig + bytes(bytearray([SIGHASH_ALL]))])
+ tx.rehash()
+ elif self._mode == MiniWalletMode.RAW_OP_TRUE:
+ for i in tx.vin:
+ i.scriptSig = CScript([OP_NOP] * 43) # pad to identical size
+ elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
+ tx.wit.vtxinwit = [CTxInWitness()] * len(tx.vin)
+ for i in tx.wit.vtxinwit:
+ i.scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
+ else:
+ assert False
def generate(self, num_blocks, **kwargs):
"""Generate blocks with coinbase outputs to the internal address, and call rescan_utxos"""
@@ -204,9 +232,13 @@ class MiniWallet:
else:
return self._utxos[index]
- def get_utxos(self, *, mark_as_spent=True):
+ def get_utxos(self, *, include_immature_coinbase=False, mark_as_spent=True):
"""Returns the list of all utxos and optionally mark them as spent"""
- utxos = deepcopy(self._utxos)
+ if not include_immature_coinbase:
+ utxo_filter = filter(lambda utxo: not utxo['coinbase'] or COINBASE_MATURITY <= utxo['confirmations'], self._utxos)
+ else:
+ utxo_filter = self._utxos
+ utxos = deepcopy(list(utxo_filter))
if mark_as_spent:
self._utxos = []
return utxos
@@ -266,24 +298,17 @@ class MiniWallet:
inputs_value_total = sum([int(COIN * utxo['value']) for utxo in utxos_to_spend])
outputs_value_total = inputs_value_total - fee_per_output * num_outputs
amount_per_output = amount_per_output or (outputs_value_total // num_outputs)
+ assert amount_per_output > 0
+ outputs_value_total = amount_per_output * num_outputs
+ fee = Decimal(inputs_value_total - outputs_value_total) / COIN
# create tx
tx = CTransaction()
- tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=seq) for utxo_to_spend,seq in zip(utxos_to_spend, sequence)]
+ tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=seq) for utxo_to_spend, seq in zip(utxos_to_spend, sequence)]
tx.vout = [CTxOut(amount_per_output, bytearray(self._scriptPubKey)) for _ in range(num_outputs)]
tx.nLockTime = locktime
- if self._mode == MiniWalletMode.RAW_P2PK:
- self.sign_tx(tx)
- elif self._mode == MiniWalletMode.RAW_OP_TRUE:
- for i in range(len(utxos_to_spend)):
- tx.vin[i].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
- elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
- tx.wit.vtxinwit = [CTxInWitness()] * len(utxos_to_spend)
- for i in range(len(utxos_to_spend)):
- tx.wit.vtxinwit[i].scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
- else:
- assert False
+ self.sign_tx(tx)
if target_weight:
self._bulk_tx(tx, target_weight)
@@ -295,8 +320,12 @@ class MiniWallet:
vout=i,
value=Decimal(tx.vout[i].nValue) / COIN,
height=0,
+ coinbase=False,
+ confirmations=0,
) for i in range(len(tx.vout))],
+ "fee": fee,
"txid": txid,
+ "wtxid": tx.getwtxid(),
"hex": tx.serialize().hex(),
"tx": tx,
}
@@ -314,52 +343,45 @@ class MiniWallet:
else:
assert False
send_value = utxo_to_spend["value"] - (fee or (fee_rate * vsize / 1000))
- assert send_value > 0
# create tx
tx = self.create_self_transfer_multi(utxos_to_spend=[utxo_to_spend], locktime=locktime, sequence=sequence, amount_per_output=int(COIN * send_value), target_weight=target_weight)
if not target_weight:
assert_equal(tx["tx"].get_vsize(), vsize)
+ tx["new_utxo"] = tx.pop("new_utxos")[0]
- return {"txid": tx["txid"], "wtxid": tx["tx"].getwtxid(), "hex": tx["hex"], "tx": tx["tx"], "new_utxo": tx["new_utxos"][0]}
+ return tx
def sendrawtransaction(self, *, from_node, tx_hex, maxfeerate=0, **kwargs):
txid = from_node.sendrawtransaction(hexstring=tx_hex, maxfeerate=maxfeerate, **kwargs)
self.scan_tx(from_node.decoderawtransaction(tx_hex))
return txid
- def create_self_transfer_chain(self, *, chain_length):
+ def create_self_transfer_chain(self, *, chain_length, utxo_to_spend=None):
"""
Create a "chain" of chain_length transactions. The nth transaction in
the chain is a child of the n-1th transaction and parent of the n+1th transaction.
-
- Returns a dic {"chain_hex": chain_hex, "chain_txns" : chain_txns}
-
- "chain_hex" is a list representing the chain's transactions in hexadecimal.
- "chain_txns" is a list representing the chain's transactions in the CTransaction object.
"""
- chaintip_utxo = self.get_utxo()
- chain_hex = []
- chain_txns = []
+ chaintip_utxo = utxo_to_spend or self.get_utxo()
+ chain = []
for _ in range(chain_length):
tx = self.create_self_transfer(utxo_to_spend=chaintip_utxo)
chaintip_utxo = tx["new_utxo"]
- chain_hex.append(tx["hex"])
- chain_txns.append(tx["tx"])
+ chain.append(tx)
- return {"chain_hex": chain_hex, "chain_txns" : chain_txns}
+ return chain
- def send_self_transfer_chain(self, *, from_node, chain_length, utxo_to_spend=None):
+ def send_self_transfer_chain(self, *, from_node, **kwargs):
"""Create and send a "chain" of chain_length transactions. The nth transaction in
the chain is a child of the n-1th transaction and parent of the n+1th transaction.
- Returns the chaintip (nth) utxo
+ Returns a list of objects for each tx (see create_self_transfer_multi).
"""
- chaintip_utxo = utxo_to_spend or self.get_utxo()
- for _ in range(chain_length):
- chaintip_utxo = self.send_self_transfer(utxo_to_spend=chaintip_utxo, from_node=from_node)["new_utxo"]
- return chaintip_utxo
+ chain = self.create_self_transfer_chain(**kwargs)
+ for t in chain:
+ self.sendrawtransaction(from_node=from_node, tx_hex=t["hex"])
+ return chain
def getnewdestination(address_type='bech32m'):