aboutsummaryrefslogtreecommitdiff
path: root/test/functional/rpc_psbt.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/rpc_psbt.py')
-rwxr-xr-xtest/functional/rpc_psbt.py120
1 files changed, 116 insertions, 4 deletions
diff --git a/test/functional/rpc_psbt.py b/test/functional/rpc_psbt.py
index 444e56610e..4583ca25cf 100755
--- a/test/functional/rpc_psbt.py
+++ b/test/functional/rpc_psbt.py
@@ -9,10 +9,24 @@ from decimal import Decimal
from itertools import product
from test_framework.descriptors import descsum_create
-from test_framework.key import ECKey
+from test_framework.key import ECKey, H_POINT
from test_framework.messages import (
- ser_compact_size,
+ COutPoint,
+ CTransaction,
+ CTxIn,
+ CTxOut,
+ MAX_BIP125_RBF_SEQUENCE,
WITNESS_SCALE_FACTOR,
+ ser_compact_size,
+)
+from test_framework.psbt import (
+ PSBT,
+ PSBTMap,
+ PSBT_GLOBAL_UNSIGNED_TX,
+ PSBT_IN_RIPEMD160,
+ PSBT_IN_SHA256,
+ PSBT_IN_HASH160,
+ PSBT_IN_HASH256,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
@@ -21,13 +35,14 @@ from test_framework.util import (
assert_greater_than,
assert_raises_rpc_error,
find_output,
+ find_vout_for_address,
+ random_bytes,
)
from test_framework.wallet_util import bytes_to_wif
import json
import os
-MAX_BIP125_RBF_SEQUENCE = 0xfffffffd
# Create one-input, one-output, no-fee transaction:
class PSBTTest(BitcoinTestFramework):
@@ -435,6 +450,7 @@ class PSBTTest(BitcoinTestFramework):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_psbt.json'), encoding='utf-8') as f:
d = json.load(f)
invalids = d['invalid']
+ invalid_with_msgs = d["invalid_with_msg"]
valids = d['valid']
creators = d['creator']
signers = d['signer']
@@ -445,6 +461,9 @@ class PSBTTest(BitcoinTestFramework):
# Invalid PSBTs
for invalid in invalids:
assert_raises_rpc_error(-22, "TX decode failed", self.nodes[0].decodepsbt, invalid)
+ for invalid in invalid_with_msgs:
+ psbt, msg = invalid
+ assert_raises_rpc_error(-22, f"TX decode failed {msg}", self.nodes[0].decodepsbt, psbt)
# Valid PSBTs
for valid in valids:
@@ -452,7 +471,7 @@ class PSBTTest(BitcoinTestFramework):
# Creator Tests
for creator in creators:
- created_tx = self.nodes[0].createpsbt(creator['inputs'], creator['outputs'])
+ created_tx = self.nodes[0].createpsbt(inputs=creator['inputs'], outputs=creator['outputs'], replaceable=False)
assert_equal(created_tx, creator['result'])
# Signer tests
@@ -723,5 +742,98 @@ class PSBTTest(BitcoinTestFramework):
)
assert_equal(psbt2["fee"], psbt3["fee"])
+ self.log.info("Test signing inputs that the wallet has keys for but is not watching the scripts")
+ self.nodes[1].createwallet(wallet_name="scriptwatchonly", disable_private_keys=True)
+ watchonly = self.nodes[1].get_wallet_rpc("scriptwatchonly")
+
+ eckey = ECKey()
+ eckey.generate()
+ privkey = bytes_to_wif(eckey.get_bytes())
+
+ desc = descsum_create("wsh(pkh({}))".format(eckey.get_pubkey().get_bytes().hex()))
+ if self.options.descriptors:
+ res = watchonly.importdescriptors([{"desc": desc, "timestamp": "now"}])
+ else:
+ res = watchonly.importmulti([{"desc": desc, "timestamp": "now"}])
+ assert res[0]["success"]
+ addr = self.nodes[0].deriveaddresses(desc)[0]
+ self.nodes[0].sendtoaddress(addr, 10)
+ self.generate(self.nodes[0], 1)
+ self.nodes[0].importprivkey(privkey)
+
+ psbt = watchonly.sendall([wallet.getnewaddress()])["psbt"]
+ psbt = self.nodes[0].walletprocesspsbt(psbt)["psbt"]
+ self.nodes[0].sendrawtransaction(self.nodes[0].finalizepsbt(psbt)["hex"])
+
+ # Same test but for taproot
+ if self.options.descriptors:
+ eckey = ECKey()
+ eckey.generate()
+ privkey = bytes_to_wif(eckey.get_bytes())
+
+ desc = descsum_create("tr({},pk({}))".format(H_POINT, eckey.get_pubkey().get_bytes().hex()))
+ res = watchonly.importdescriptors([{"desc": desc, "timestamp": "now"}])
+ assert res[0]["success"]
+ addr = self.nodes[0].deriveaddresses(desc)[0]
+ self.nodes[0].sendtoaddress(addr, 10)
+ self.generate(self.nodes[0], 1)
+ self.nodes[0].importdescriptors([{"desc": descsum_create("tr({})".format(privkey)), "timestamp":"now"}])
+
+ psbt = watchonly.sendall([wallet.getnewaddress()])["psbt"]
+ psbt = self.nodes[0].walletprocesspsbt(psbt)["psbt"]
+ self.nodes[0].sendrawtransaction(self.nodes[0].finalizepsbt(psbt)["hex"])
+
+ self.log.info("Test that walletprocesspsbt both updates and signs a non-updated psbt containing Taproot inputs")
+ addr = self.nodes[0].getnewaddress("", "bech32m")
+ txid = self.nodes[0].sendtoaddress(addr, 1)
+ vout = find_vout_for_address(self.nodes[0], txid, addr)
+ psbt = self.nodes[0].createpsbt([{"txid": txid, "vout": vout}], [{self.nodes[0].getnewaddress(): 0.9999}])
+ signed = self.nodes[0].walletprocesspsbt(psbt)
+ rawtx = self.nodes[0].finalizepsbt(signed["psbt"])["hex"]
+ self.nodes[0].sendrawtransaction(rawtx)
+ self.generate(self.nodes[0], 1)
+
+ self.log.info("Test decoding PSBT with per-input preimage types")
+ # note that the decodepsbt RPC doesn't check whether preimages and hashes match
+ hash_ripemd160, preimage_ripemd160 = random_bytes(20), random_bytes(50)
+ hash_sha256, preimage_sha256 = random_bytes(32), random_bytes(50)
+ hash_hash160, preimage_hash160 = random_bytes(20), random_bytes(50)
+ hash_hash256, preimage_hash256 = random_bytes(32), random_bytes(50)
+
+ tx = CTransaction()
+ tx.vin = [CTxIn(outpoint=COutPoint(hash=int('aa' * 32, 16), n=0), scriptSig=b""),
+ CTxIn(outpoint=COutPoint(hash=int('bb' * 32, 16), n=0), scriptSig=b""),
+ CTxIn(outpoint=COutPoint(hash=int('cc' * 32, 16), n=0), scriptSig=b""),
+ CTxIn(outpoint=COutPoint(hash=int('dd' * 32, 16), n=0), scriptSig=b"")]
+ tx.vout = [CTxOut(nValue=0, scriptPubKey=b"")]
+ psbt = PSBT()
+ psbt.g = PSBTMap({PSBT_GLOBAL_UNSIGNED_TX: tx.serialize()})
+ psbt.i = [PSBTMap({bytes([PSBT_IN_RIPEMD160]) + hash_ripemd160: preimage_ripemd160}),
+ PSBTMap({bytes([PSBT_IN_SHA256]) + hash_sha256: preimage_sha256}),
+ PSBTMap({bytes([PSBT_IN_HASH160]) + hash_hash160: preimage_hash160}),
+ PSBTMap({bytes([PSBT_IN_HASH256]) + hash_hash256: preimage_hash256})]
+ psbt.o = [PSBTMap()]
+ res_inputs = self.nodes[0].decodepsbt(psbt.to_base64())["inputs"]
+ assert_equal(len(res_inputs), 4)
+ preimage_keys = ["ripemd160_preimages", "sha256_preimages", "hash160_preimages", "hash256_preimages"]
+ expected_hashes = [hash_ripemd160, hash_sha256, hash_hash160, hash_hash256]
+ expected_preimages = [preimage_ripemd160, preimage_sha256, preimage_hash160, preimage_hash256]
+ for res_input, preimage_key, hash, preimage in zip(res_inputs, preimage_keys, expected_hashes, expected_preimages):
+ assert preimage_key in res_input
+ assert_equal(len(res_input[preimage_key]), 1)
+ assert hash.hex() in res_input[preimage_key]
+ assert_equal(res_input[preimage_key][hash.hex()], preimage.hex())
+
+ self.log.info("Test that combining PSBTs with different transactions fails")
+ tx = CTransaction()
+ tx.vin = [CTxIn(outpoint=COutPoint(hash=int('aa' * 32, 16), n=0), scriptSig=b"")]
+ tx.vout = [CTxOut(nValue=0, scriptPubKey=b"")]
+ psbt1 = PSBT(g=PSBTMap({PSBT_GLOBAL_UNSIGNED_TX: tx.serialize()}), i=[PSBTMap()], o=[PSBTMap()]).to_base64()
+ tx.vout[0].nValue += 1 # slightly modify tx
+ psbt2 = PSBT(g=PSBTMap({PSBT_GLOBAL_UNSIGNED_TX: tx.serialize()}), i=[PSBTMap()], o=[PSBTMap()]).to_base64()
+ assert_raises_rpc_error(-8, "PSBTs not compatible (different transactions)", self.nodes[0].combinepsbt, [psbt1, psbt2])
+ assert_equal(self.nodes[0].combinepsbt([psbt1, psbt1]), psbt1)
+
+
if __name__ == '__main__':
PSBTTest().main()