From 3b09d0eb7f2c1d6ebdab73d18db28e5bf7d74f18 Mon Sep 17 00:00:00 2001 From: Ava Chow Date: Wed, 20 Dec 2023 18:11:51 -0500 Subject: tests: Test for gethdkeys --- test/functional/test_runner.py | 1 + test/functional/wallet_gethdkeys.py | 185 ++++++++++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100755 test/functional/wallet_gethdkeys.py (limited to 'test/functional') diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index e438a60edc..1f6bb32e0a 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -181,6 +181,7 @@ BASE_SCRIPTS = [ 'wallet_keypool_topup.py --legacy-wallet', 'wallet_keypool_topup.py --descriptors', 'wallet_fast_rescan.py --descriptors', + 'wallet_gethdkeys.py --descriptors', 'interface_zmq.py', 'rpc_invalid_address_message.py', 'rpc_validateaddress.py', diff --git a/test/functional/wallet_gethdkeys.py b/test/functional/wallet_gethdkeys.py new file mode 100755 index 0000000000..f09b8c875a --- /dev/null +++ b/test/functional/wallet_gethdkeys.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test wallet gethdkeys RPC.""" + +from test_framework.descriptors import descsum_create +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, +) +from test_framework.wallet_util import WalletUnlock + + +class WalletGetHDKeyTest(BitcoinTestFramework): + def add_options(self, parser): + self.add_wallet_options(parser, descriptors=True, legacy=False) + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + + def skip_test_if_missing_module(self): + self.skip_if_no_wallet() + + def run_test(self): + self.test_basic_gethdkeys() + self.test_ranged_imports() + self.test_lone_key_imports() + self.test_ranged_multisig() + self.test_mixed_multisig() + + def test_basic_gethdkeys(self): + self.log.info("Test gethdkeys basics") + self.nodes[0].createwallet("basic") + wallet = self.nodes[0].get_wallet_rpc("basic") + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 1) + assert_equal(xpub_info[0]["has_private"], True) + + assert "xprv" not in xpub_info[0] + xpub = xpub_info[0]["xpub"] + + xpub_info = wallet.gethdkeys(private=True) + xprv = xpub_info[0]["xprv"] + assert_equal(xpub_info[0]["xpub"], xpub) + assert_equal(xpub_info[0]["has_private"], True) + + descs = wallet.listdescriptors(True) + for desc in descs["descriptors"]: + assert xprv in desc["desc"] + + self.log.info("HD pubkey can be retrieved from encrypted wallets") + prev_xprv = xprv + wallet.encryptwallet("pass") + # HD key is rotated on encryption, there should now be 2 HD keys + assert_equal(len(wallet.gethdkeys()), 2) + # New key is active, should be able to get only that one and its descriptors + xpub_info = wallet.gethdkeys(active_only=True) + assert_equal(len(xpub_info), 1) + assert xpub_info[0]["xpub"] != xpub + assert "xprv" not in xpub_info[0] + assert_equal(xpub_info[0]["has_private"], True) + + self.log.info("HD privkey can be retrieved from encrypted wallets") + assert_raises_rpc_error(-13, "Error: Please enter the wallet passphrase with walletpassphrase first", wallet.gethdkeys, private=True) + with WalletUnlock(wallet, "pass"): + xpub_info = wallet.gethdkeys(active_only=True, private=True)[0] + assert xpub_info["xprv"] != xprv + for desc in wallet.listdescriptors(True)["descriptors"]: + if desc["active"]: + # After encrypting, HD key was rotated and should appear in all active descriptors + assert xpub_info["xprv"] in desc["desc"] + else: + # Inactive descriptors should have the previous HD key + assert prev_xprv in desc["desc"] + + def test_ranged_imports(self): + self.log.info("Keys of imported ranged descriptors appear in gethdkeys") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("imports") + wallet = self.nodes[0].get_wallet_rpc("imports") + + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 1) + active_xpub = xpub_info[0]["xpub"] + + import_xpub = def_wallet.gethdkeys(active_only=True)[0]["xpub"] + desc_import = def_wallet.listdescriptors(True)["descriptors"] + for desc in desc_import: + desc["active"] = False + wallet.importdescriptors(desc_import) + assert_equal(wallet.gethdkeys(active_only=True), xpub_info) + + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 2) + for x in xpub_info: + if x["xpub"] == active_xpub: + for desc in x["descriptors"]: + assert_equal(desc["active"], True) + elif x["xpub"] == import_xpub: + for desc in x["descriptors"]: + assert_equal(desc["active"], False) + else: + assert False + + + def test_lone_key_imports(self): + self.log.info("Non-HD keys do not appear in gethdkeys") + self.nodes[0].createwallet("lonekey", blank=True) + wallet = self.nodes[0].get_wallet_rpc("lonekey") + + assert_equal(wallet.gethdkeys(), []) + wallet.importdescriptors([{"desc": descsum_create("wpkh(cTe1f5rdT8A8DFgVWTjyPwACsDPJM9ff4QngFxUixCSvvbg1x6sh)"), "timestamp": "now"}]) + assert_equal(wallet.gethdkeys(), []) + + self.log.info("HD keys of non-ranged descriptors should appear in gethdkeys") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + xpub_info = def_wallet.gethdkeys(private=True) + xpub = xpub_info[0]["xpub"] + xprv = xpub_info[0]["xprv"] + prv_desc = descsum_create(f"wpkh({xprv})") + pub_desc = descsum_create(f"wpkh({xpub})") + assert_equal(wallet.importdescriptors([{"desc": prv_desc, "timestamp": "now"}])[0]["success"], True) + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 1) + assert_equal(xpub_info[0]["xpub"], xpub) + assert_equal(len(xpub_info[0]["descriptors"]), 1) + assert_equal(xpub_info[0]["descriptors"][0]["desc"], pub_desc) + assert_equal(xpub_info[0]["descriptors"][0]["active"], False) + + def test_ranged_multisig(self): + self.log.info("HD keys of a multisig appear in gethdkeys") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("ranged_multisig") + wallet = self.nodes[0].get_wallet_rpc("ranged_multisig") + + xpub1 = wallet.gethdkeys()[0]["xpub"] + xprv1 = wallet.gethdkeys(private=True)[0]["xprv"] + xpub2 = def_wallet.gethdkeys()[0]["xpub"] + + prv_multi_desc = descsum_create(f"wsh(multi(2,{xprv1}/*,{xpub2}/*))") + pub_multi_desc = descsum_create(f"wsh(multi(2,{xpub1}/*,{xpub2}/*))") + assert_equal(wallet.importdescriptors([{"desc": prv_multi_desc, "timestamp": "now"}])[0]["success"], True) + + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 2) + for x in xpub_info: + if x["xpub"] == xpub1: + found_desc = next((d for d in xpub_info[0]["descriptors"] if d["desc"] == pub_multi_desc), None) + assert found_desc is not None + assert_equal(found_desc["active"], False) + elif x["xpub"] == xpub2: + assert_equal(len(x["descriptors"]), 1) + assert_equal(x["descriptors"][0]["desc"], pub_multi_desc) + assert_equal(x["descriptors"][0]["active"], False) + else: + assert False + + def test_mixed_multisig(self): + self.log.info("Non-HD keys of a multisig do not appear in gethdkeys") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("single_multisig") + wallet = self.nodes[0].get_wallet_rpc("single_multisig") + + xpub = wallet.gethdkeys()[0]["xpub"] + xprv = wallet.gethdkeys(private=True)[0]["xprv"] + pub = def_wallet.getaddressinfo(def_wallet.getnewaddress())["pubkey"] + + prv_multi_desc = descsum_create(f"wsh(multi(2,{xprv},{pub}))") + pub_multi_desc = descsum_create(f"wsh(multi(2,{xpub},{pub}))") + import_res = wallet.importdescriptors([{"desc": prv_multi_desc, "timestamp": "now"}]) + assert_equal(import_res[0]["success"], True) + + xpub_info = wallet.gethdkeys() + assert_equal(len(xpub_info), 1) + assert_equal(xpub_info[0]["xpub"], xpub) + found_desc = next((d for d in xpub_info[0]["descriptors"] if d["desc"] == pub_multi_desc), None) + assert found_desc is not None + assert_equal(found_desc["active"], False) + + +if __name__ == '__main__': + WalletGetHDKeyTest().main() -- cgit v1.2.3 From 2402b6306215a9ee8d5f4068ea81f4e7f324adeb Mon Sep 17 00:00:00 2001 From: Ava Chow Date: Thu, 21 Dec 2023 18:44:03 -0500 Subject: wallet: Test upgrade of pre-taproot wallet to have tr() descriptors --- test/functional/wallet_backwards_compatibility.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'test/functional') diff --git a/test/functional/wallet_backwards_compatibility.py b/test/functional/wallet_backwards_compatibility.py index 4d6e6024c5..ab008a40cd 100755 --- a/test/functional/wallet_backwards_compatibility.py +++ b/test/functional/wallet_backwards_compatibility.py @@ -355,6 +355,25 @@ class BackwardsCompatibilityTest(BitcoinTestFramework): down_wallet_name = f"re_down_{node.version}" down_backup_path = os.path.join(self.options.tmpdir, f"{down_wallet_name}.dat") wallet.backupwallet(down_backup_path) + + # Check that taproot descriptors can be added to 0.21 wallets + # This must be done after the backup is created so that 0.21 can still load + # the backup + if self.options.descriptors and self.major_version_equals(node, 21): + assert_raises_rpc_error(-12, "No bech32m addresses available", wallet.getnewaddress, address_type="bech32m") + xpubs = wallet.gethdkeys(active_only=True) + assert_equal(len(xpubs), 1) + assert_equal(len(xpubs[0]["descriptors"]), 6) + wallet.createwalletdescriptor("bech32m") + xpubs = wallet.gethdkeys(active_only=True) + assert_equal(len(xpubs), 1) + assert_equal(len(xpubs[0]["descriptors"]), 8) + tr_descs = [desc["desc"] for desc in xpubs[0]["descriptors"] if desc["desc"].startswith("tr(")] + assert_equal(len(tr_descs), 2) + for desc in tr_descs: + assert info["hdmasterfingerprint"] in desc + wallet.getnewaddress(address_type="bech32m") + wallet.unloadwallet() # Check that no automatic upgrade broke the downgrading the wallet -- cgit v1.2.3 From 746b6d88395607abbd3c13bbdcdd4ca83e9bc9e4 Mon Sep 17 00:00:00 2001 From: Ava Chow Date: Fri, 22 Dec 2023 12:55:59 -0500 Subject: test: Add test for createwalletdescriptor --- test/functional/test_runner.py | 1 + test/functional/wallet_createwalletdescriptor.py | 123 +++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100755 test/functional/wallet_createwalletdescriptor.py (limited to 'test/functional') diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 1f6bb32e0a..cf0c5d701c 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -182,6 +182,7 @@ BASE_SCRIPTS = [ 'wallet_keypool_topup.py --descriptors', 'wallet_fast_rescan.py --descriptors', 'wallet_gethdkeys.py --descriptors', + 'wallet_createwalletdescriptor.py --descriptors', 'interface_zmq.py', 'rpc_invalid_address_message.py', 'rpc_validateaddress.py', diff --git a/test/functional/wallet_createwalletdescriptor.py b/test/functional/wallet_createwalletdescriptor.py new file mode 100755 index 0000000000..18e1703da3 --- /dev/null +++ b/test/functional/wallet_createwalletdescriptor.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test wallet createwalletdescriptor RPC.""" + +from test_framework.descriptors import descsum_create +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, +) +from test_framework.wallet_util import WalletUnlock + + +class WalletCreateDescriptorTest(BitcoinTestFramework): + def add_options(self, parser): + self.add_wallet_options(parser, descriptors=True, legacy=False) + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + + def skip_test_if_missing_module(self): + self.skip_if_no_wallet() + + def run_test(self): + self.test_basic() + self.test_imported_other_keys() + self.test_encrypted() + + def test_basic(self): + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("blank", blank=True) + wallet = self.nodes[0].get_wallet_rpc("blank") + + xpub_info = def_wallet.gethdkeys(private=True) + xpub = xpub_info[0]["xpub"] + xprv = xpub_info[0]["xprv"] + expected_descs = [] + for desc in def_wallet.listdescriptors()["descriptors"]: + if desc["desc"].startswith("wpkh("): + expected_descs.append(desc["desc"]) + + assert_raises_rpc_error(-5, "Unable to determine which HD key to use from active descriptors. Please specify with 'hdkey'", wallet.createwalletdescriptor, "bech32") + assert_raises_rpc_error(-5, f"Private key for {xpub} is not known", wallet.createwalletdescriptor, type="bech32", hdkey=xpub) + + self.log.info("Test createwalletdescriptor after importing active descriptor to blank wallet") + # Import one active descriptor + assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"pkh({xprv}/44h/2h/0h/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True) + assert_equal(len(wallet.listdescriptors()["descriptors"]), 1) + assert_equal(len(wallet.gethdkeys()), 1) + + new_descs = wallet.createwalletdescriptor("bech32")["descs"] + assert_equal(len(new_descs), 2) + assert_equal(len(wallet.gethdkeys()), 1) + assert_equal(new_descs, expected_descs) + + self.log.info("Test descriptor creation options") + old_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]]) + wallet.createwalletdescriptor(type="bech32m", internal=False) + curr_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]]) + new_descs = list(curr_descs - old_descs) + assert_equal(len(new_descs), 1) + assert_equal(len(wallet.gethdkeys()), 1) + assert_equal(new_descs[0][0], descsum_create(f"tr({xprv}/86h/1h/0h/0/*)")) + assert_equal(new_descs[0][1], True) + assert_equal(new_descs[0][2], False) + + old_descs = curr_descs + wallet.createwalletdescriptor(type="bech32m", internal=True) + curr_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]]) + new_descs = list(curr_descs - old_descs) + assert_equal(len(new_descs), 1) + assert_equal(len(wallet.gethdkeys()), 1) + assert_equal(new_descs[0][0], descsum_create(f"tr({xprv}/86h/1h/0h/1/*)")) + assert_equal(new_descs[0][1], True) + assert_equal(new_descs[0][2], True) + + def test_imported_other_keys(self): + self.log.info("Test createwalletdescriptor with multiple keys in active descriptors") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("multiple_keys") + wallet = self.nodes[0].get_wallet_rpc("multiple_keys") + + wallet_xpub = wallet.gethdkeys()[0]["xpub"] + + xpub_info = def_wallet.gethdkeys(private=True) + xpub = xpub_info[0]["xpub"] + xprv = xpub_info[0]["xprv"] + + assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"wpkh({xprv}/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True) + assert_equal(len(wallet.gethdkeys()), 2) + + assert_raises_rpc_error(-5, "Unable to determine which HD key to use from active descriptors. Please specify with 'hdkey'", wallet.createwalletdescriptor, "bech32") + assert_raises_rpc_error(-4, "Descriptor already exists", wallet.createwalletdescriptor, type="bech32m", hdkey=wallet_xpub) + assert_raises_rpc_error(-5, "Unable to parse HD key. Please provide a valid xpub", wallet.createwalletdescriptor, type="bech32m", hdkey=xprv) + + # Able to replace tr() descriptor with other hd key + wallet.createwalletdescriptor(type="bech32m", hdkey=xpub) + + def test_encrypted(self): + self.log.info("Test createwalletdescriptor with encrypted wallets") + def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].createwallet("encrypted", blank=True, passphrase="pass") + wallet = self.nodes[0].get_wallet_rpc("encrypted") + + xpub_info = def_wallet.gethdkeys(private=True) + xprv = xpub_info[0]["xprv"] + + with WalletUnlock(wallet, "pass"): + assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"wpkh({xprv}/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True) + assert_equal(len(wallet.gethdkeys()), 1) + + assert_raises_rpc_error(-13, "Error: Please enter the wallet passphrase with walletpassphrase first.", wallet.createwalletdescriptor, type="bech32m") + + with WalletUnlock(wallet, "pass"): + wallet.createwalletdescriptor(type="bech32m") + + + +if __name__ == '__main__': + WalletCreateDescriptorTest().main() -- cgit v1.2.3