diff options
author | Andrew Chow <achow101-github@achow101.com> | 2020-07-27 16:00:11 -0400 |
---|---|---|
committer | Andrew Chow <github@achow101.com> | 2022-08-29 17:30:38 -0400 |
commit | 9c44bfe244f35f08ba576d8b979a90dcd68d2c77 (patch) | |
tree | b1f24ccaeb69ff106829847d742b92386b32d010 /test/functional/wallet_migration.py | |
parent | 0b26e7cdf2659fd8b54d21fd2bd749f9f3e87af8 (diff) |
Test migratewallet
Co-Authored-By: furszy <matiasfurszyfer@protonmail.com>
Diffstat (limited to 'test/functional/wallet_migration.py')
-rwxr-xr-x | test/functional/wallet_migration.py | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/test/functional/wallet_migration.py b/test/functional/wallet_migration.py new file mode 100755 index 0000000000..3c1cb6ac32 --- /dev/null +++ b/test/functional/wallet_migration.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test Migrating a wallet from legacy to descriptor.""" + +import os +import random +from test_framework.descriptors import descsum_create +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, + find_vout_for_address, +) +from test_framework.wallet_util import ( + get_generate_key, +) + + +class WalletMigrationTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [[]] + self.supports_cli = False + + def skip_test_if_missing_module(self): + self.skip_if_no_wallet() + self.skip_if_no_sqlite() + self.skip_if_no_bdb() + + def assert_is_sqlite(self, wallet_name): + wallet_file_path = os.path.join(self.nodes[0].datadir, "regtest/wallets", wallet_name, self.wallet_data_filename) + with open(wallet_file_path, 'rb') as f: + file_magic = f.read(16) + assert_equal(file_magic, b'SQLite format 3\x00') + assert_equal(self.nodes[0].get_wallet_rpc(wallet_name).getwalletinfo()["format"], "sqlite") + + def create_legacy_wallet(self, wallet_name): + self.nodes[0].createwallet(wallet_name=wallet_name) + wallet = self.nodes[0].get_wallet_rpc(wallet_name) + assert_equal(wallet.getwalletinfo()["descriptors"], False) + assert_equal(wallet.getwalletinfo()["format"], "bdb") + return wallet + + def assert_addr_info_equal(self, addr_info, addr_info_old): + assert_equal(addr_info["address"], addr_info_old["address"]) + assert_equal(addr_info["scriptPubKey"], addr_info_old["scriptPubKey"]) + assert_equal(addr_info["ismine"], addr_info_old["ismine"]) + assert_equal(addr_info["hdkeypath"], addr_info_old["hdkeypath"]) + assert_equal(addr_info["solvable"], addr_info_old["solvable"]) + assert_equal(addr_info["ischange"], addr_info_old["ischange"]) + assert_equal(addr_info["hdmasterfingerprint"], addr_info_old["hdmasterfingerprint"]) + + def assert_list_txs_equal(self, received_list_txs, expected_list_txs): + for d in received_list_txs: + if "parent_descs" in d: + del d["parent_descs"] + for d in expected_list_txs: + if "parent_descs" in d: + del d["parent_descs"] + assert_equal(received_list_txs, expected_list_txs) + + def test_basic(self): + default = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + + self.log.info("Test migration of a basic keys only wallet without balance") + basic0 = self.create_legacy_wallet("basic0") + + addr = basic0.getnewaddress() + change = basic0.getrawchangeaddress() + + old_addr_info = basic0.getaddressinfo(addr) + old_change_addr_info = basic0.getaddressinfo(change) + assert_equal(old_addr_info["ismine"], True) + assert_equal(old_addr_info["hdkeypath"], "m/0'/0'/0'") + assert_equal(old_change_addr_info["ismine"], True) + assert_equal(old_change_addr_info["hdkeypath"], "m/0'/1'/0'") + + # Note: migration could take a while. + basic0.migratewallet() + + # Verify created descriptors + assert_equal(basic0.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("basic0") + + # The wallet should create the following descriptors: + # * BIP32 descriptors in the form of "0'/0'/*" and "0'/1'/*" (2 descriptors) + # * BIP44 descriptors in the form of "44'/1'/0'/0/*" and "44'/1'/0'/1/*" (2 descriptors) + # * BIP49 descriptors, P2SH(P2WPKH), in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors) + # * BIP84 descriptors, P2WPKH, in the form of "84'/1'/0'/1/*" and "84'/1'/0'/1/*" (2 descriptors) + # * BIP86 descriptors, P2TR, in the form of "86'/1'/0'/0/*" and "86'/1'/0'/1/*" (2 descriptors) + # * A combo(PK) descriptor for the wallet master key. + # So, should have a total of 11 descriptors on it. + assert_equal(len(basic0.listdescriptors()["descriptors"]), 11) + + # Compare addresses info + addr_info = basic0.getaddressinfo(addr) + change_addr_info = basic0.getaddressinfo(change) + self.assert_addr_info_equal(addr_info, old_addr_info) + self.assert_addr_info_equal(change_addr_info, old_change_addr_info) + + addr_info = basic0.getaddressinfo(basic0.getnewaddress("", "bech32")) + assert_equal(addr_info["hdkeypath"], "m/84'/1'/0'/0/0") + + self.log.info("Test migration of a basic keys only wallet with a balance") + basic1 = self.create_legacy_wallet("basic1") + + for _ in range(0, 10): + default.sendtoaddress(basic1.getnewaddress(), 1) + + self.generate(self.nodes[0], 1) + + for _ in range(0, 5): + basic1.sendtoaddress(default.getnewaddress(), 0.5) + + self.generate(self.nodes[0], 1) + bal = basic1.getbalance() + txs = basic1.listtransactions() + + basic1.migratewallet() + assert_equal(basic1.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("basic1") + assert_equal(basic1.getbalance(), bal) + self.assert_list_txs_equal(basic1.listtransactions(), txs) + + # restart node and verify that everything is still there + self.restart_node(0) + default = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.nodes[0].loadwallet("basic1") + basic1 = self.nodes[0].get_wallet_rpc("basic1") + assert_equal(basic1.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("basic1") + assert_equal(basic1.getbalance(), bal) + self.assert_list_txs_equal(basic1.listtransactions(), txs) + + self.log.info("Test migration of a wallet with balance received on the seed") + basic2 = self.create_legacy_wallet("basic2") + basic2_seed = get_generate_key() + basic2.sethdseed(True, basic2_seed.privkey) + assert_equal(basic2.getbalance(), 0) + + # Receive coins on different output types for the same seed + basic2_balance = 0 + for addr in [basic2_seed.p2pkh_addr, basic2_seed.p2wpkh_addr, basic2_seed.p2sh_p2wpkh_addr]: + send_value = random.randint(1, 4) + default.sendtoaddress(addr, send_value) + basic2_balance += send_value + self.generate(self.nodes[0], 1) + assert_equal(basic2.getbalance(), basic2_balance) + basic2_txs = basic2.listtransactions() + + # Now migrate and test that we still see have the same balance/transactions + basic2.migratewallet() + assert_equal(basic2.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("basic2") + assert_equal(basic2.getbalance(), basic2_balance) + self.assert_list_txs_equal(basic2.listtransactions(), basic2_txs) + + def test_multisig(self): + default = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + + # Contrived case where all the multisig keys are in a single wallet + self.log.info("Test migration of a wallet with all keys for a multisig") + multisig0 = self.create_legacy_wallet("multisig0") + addr1 = multisig0.getnewaddress() + addr2 = multisig0.getnewaddress() + addr3 = multisig0.getnewaddress() + + ms_info = multisig0.addmultisigaddress(2, [addr1, addr2, addr3]) + + multisig0.migratewallet() + assert_equal(multisig0.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("multisig0") + ms_addr_info = multisig0.getaddressinfo(ms_info["address"]) + assert_equal(ms_addr_info["ismine"], True) + assert_equal(ms_addr_info["desc"], ms_info["descriptor"]) + assert_equal("multisig0_watchonly" in self.nodes[0].listwallets(), False) + assert_equal("multisig0_solvables" in self.nodes[0].listwallets(), False) + + pub1 = multisig0.getaddressinfo(addr1)["pubkey"] + pub2 = multisig0.getaddressinfo(addr2)["pubkey"] + + # Some keys in multisig do not belong to this wallet + self.log.info("Test migration of a wallet that has some keys in a multisig") + self.nodes[0].createwallet(wallet_name="multisig1") + multisig1 = self.nodes[0].get_wallet_rpc("multisig1") + ms_info = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2]) + ms_info2 = multisig1.addmultisigaddress(2, [multisig1.getnewaddress(), pub1, pub2]) + assert_equal(multisig1.getwalletinfo()["descriptors"], False) + + addr1 = ms_info["address"] + addr2 = ms_info2["address"] + txid = default.sendtoaddress(addr1, 10) + multisig1.importaddress(addr1) + assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False) + assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], True) + assert_equal(multisig1.getaddressinfo(addr1)["solvable"], True) + self.generate(self.nodes[0], 1) + multisig1.gettransaction(txid) + assert_equal(multisig1.getbalances()["watchonly"]["trusted"], 10) + assert_equal(multisig1.getaddressinfo(addr2)["ismine"], False) + assert_equal(multisig1.getaddressinfo(addr2)["iswatchonly"], False) + assert_equal(multisig1.getaddressinfo(addr2)["solvable"], True) + + # Migrating multisig1 should see the multisig is no longer part of multisig1 + # A new wallet multisig1_watchonly is created which has the multisig address + # Transaction to multisig is in multisig1_watchonly and not multisig1 + multisig1.migratewallet() + assert_equal(multisig1.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("multisig1") + assert_equal(multisig1.getaddressinfo(addr1)["ismine"], False) + assert_equal(multisig1.getaddressinfo(addr1)["iswatchonly"], False) + assert_equal(multisig1.getaddressinfo(addr1)["solvable"], False) + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", multisig1.gettransaction, txid) + assert_equal(multisig1.getbalance(), 0) + assert_equal(multisig1.listtransactions(), []) + + assert_equal("multisig1_watchonly" in self.nodes[0].listwallets(), True) + ms1_watchonly = self.nodes[0].get_wallet_rpc("multisig1_watchonly") + ms1_wallet_info = ms1_watchonly.getwalletinfo() + assert_equal(ms1_wallet_info['descriptors'], True) + assert_equal(ms1_wallet_info['private_keys_enabled'], False) + self.assert_is_sqlite("multisig1_watchonly") + assert_equal(ms1_watchonly.getaddressinfo(addr1)["ismine"], True) + assert_equal(ms1_watchonly.getaddressinfo(addr1)["solvable"], True) + # Because addr2 was not being watched, it isn't in multisig1_watchonly but rather multisig1_solvables + assert_equal(ms1_watchonly.getaddressinfo(addr2)["ismine"], False) + assert_equal(ms1_watchonly.getaddressinfo(addr2)["solvable"], False) + ms1_watchonly.gettransaction(txid) + assert_equal(ms1_watchonly.getbalance(), 10) + + # Migrating multisig1 should see the second multisig is no longer part of multisig1 + # A new wallet multisig1_solvables is created which has the second address + # This should have no transactions + assert_equal("multisig1_solvables" in self.nodes[0].listwallets(), True) + ms1_solvable = self.nodes[0].get_wallet_rpc("multisig1_solvables") + ms1_wallet_info = ms1_solvable.getwalletinfo() + assert_equal(ms1_wallet_info['descriptors'], True) + assert_equal(ms1_wallet_info['private_keys_enabled'], False) + self.assert_is_sqlite("multisig1_solvables") + assert_equal(ms1_solvable.getaddressinfo(addr1)["ismine"], False) + assert_equal(ms1_solvable.getaddressinfo(addr1)["solvable"], False) + assert_equal(ms1_solvable.getaddressinfo(addr2)["ismine"], True) + assert_equal(ms1_solvable.getaddressinfo(addr2)["solvable"], True) + assert_equal(ms1_solvable.getbalance(), 0) + assert_equal(ms1_solvable.listtransactions(), []) + + + def test_other_watchonly(self): + default = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + + # Wallet with an imported address. Should be the same thing as the multisig test + self.log.info("Test migration of a wallet with watchonly imports") + self.nodes[0].createwallet(wallet_name="imports0") + imports0 = self.nodes[0].get_wallet_rpc("imports0") + assert_equal(imports0.getwalletinfo()["descriptors"], False) + + # Exteranl address label + imports0.setlabel(default.getnewaddress(), "external") + + # Normal non-watchonly tx + received_addr = imports0.getnewaddress() + imports0.setlabel(received_addr, "Receiving") + received_txid = default.sendtoaddress(received_addr, 10) + + # Watchonly tx + import_addr = default.getnewaddress() + imports0.importaddress(import_addr) + imports0.setlabel(import_addr, "imported") + received_watchonly_txid = default.sendtoaddress(import_addr, 10) + + # Received watchonly tx that is then spent + import_sent_addr = default.getnewaddress() + imports0.importaddress(import_sent_addr) + received_sent_watchonly_txid = default.sendtoaddress(import_sent_addr, 10) + received_sent_watchonly_vout = find_vout_for_address(self.nodes[0], received_sent_watchonly_txid, import_sent_addr) + send = default.sendall(recipients=[default.getnewaddress()], options={"inputs": [{"txid": received_sent_watchonly_txid, "vout": received_sent_watchonly_vout}]}) + sent_watchonly_txid = send["txid"] + + self.generate(self.nodes[0], 1) + + balances = imports0.getbalances() + spendable_bal = balances["mine"]["trusted"] + watchonly_bal = balances["watchonly"]["trusted"] + assert_equal(len(imports0.listtransactions(include_watchonly=True)), 4) + + # Migrate + imports0.migratewallet() + assert_equal(imports0.getwalletinfo()["descriptors"], True) + self.assert_is_sqlite("imports0") + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_watchonly_txid) + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, received_sent_watchonly_txid) + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", imports0.gettransaction, sent_watchonly_txid) + assert_equal(len(imports0.listtransactions(include_watchonly=True)), 1) + imports0.gettransaction(received_txid) + assert_equal(imports0.getbalance(), spendable_bal) + + assert_equal("imports0_watchonly" in self.nodes[0].listwallets(), True) + watchonly = self.nodes[0].get_wallet_rpc("imports0_watchonly") + watchonly_info = watchonly.getwalletinfo() + assert_equal(watchonly_info["descriptors"], True) + self.assert_is_sqlite("imports0_watchonly") + assert_equal(watchonly_info["private_keys_enabled"], False) + watchonly.gettransaction(received_watchonly_txid) + watchonly.gettransaction(received_sent_watchonly_txid) + watchonly.gettransaction(sent_watchonly_txid) + assert_equal(watchonly.getbalance(), watchonly_bal) + assert_raises_rpc_error(-5, "Invalid or non-wallet transaction id", watchonly.gettransaction, received_txid) + assert_equal(len(watchonly.listtransactions(include_watchonly=True)), 3) + + def test_no_privkeys(self): + default = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + + # Migrating an actual watchonly wallet should not create a new watchonly wallet + self.log.info("Test migration of a pure watchonly wallet") + self.nodes[0].createwallet(wallet_name="watchonly0", disable_private_keys=True) + watchonly0 = self.nodes[0].get_wallet_rpc("watchonly0") + info = watchonly0.getwalletinfo() + assert_equal(info["descriptors"], False) + assert_equal(info["private_keys_enabled"], False) + + addr = default.getnewaddress() + desc = default.getaddressinfo(addr)["desc"] + res = watchonly0.importmulti([ + { + "desc": desc, + "watchonly": True, + "timestamp": "now", + }]) + assert_equal(res[0]['success'], True) + default.sendtoaddress(addr, 10) + self.generate(self.nodes[0], 1) + + watchonly0.migratewallet() + assert_equal("watchonly0_watchonly" in self.nodes[0].listwallets(), False) + info = watchonly0.getwalletinfo() + assert_equal(info["descriptors"], True) + assert_equal(info["private_keys_enabled"], False) + self.assert_is_sqlite("watchonly0") + + # Migrating a wallet with pubkeys added to the keypool + self.log.info("Test migration of a pure watchonly wallet with pubkeys in keypool") + self.nodes[0].createwallet(wallet_name="watchonly1", disable_private_keys=True) + watchonly1 = self.nodes[0].get_wallet_rpc("watchonly1") + info = watchonly1.getwalletinfo() + assert_equal(info["descriptors"], False) + assert_equal(info["private_keys_enabled"], False) + + addr1 = default.getnewaddress(address_type="bech32") + addr2 = default.getnewaddress(address_type="bech32") + desc1 = default.getaddressinfo(addr1)["desc"] + desc2 = default.getaddressinfo(addr2)["desc"] + res = watchonly1.importmulti([ + { + "desc": desc1, + "keypool": True, + "timestamp": "now", + }, + { + "desc": desc2, + "keypool": True, + "timestamp": "now", + } + ]) + assert_equal(res[0]["success"], True) + assert_equal(res[1]["success"], True) + # Before migrating, we can fetch addr1 from the keypool + assert_equal(watchonly1.getnewaddress(address_type="bech32"), addr1) + + watchonly1.migratewallet() + info = watchonly1.getwalletinfo() + assert_equal(info["descriptors"], True) + assert_equal(info["private_keys_enabled"], False) + self.assert_is_sqlite("watchonly1") + # After migrating, the "keypool" is empty + assert_raises_rpc_error(-4, "Error: This wallet has no available keys", watchonly1.getnewaddress) + + def test_pk_coinbases(self): + self.log.info("Test migration of a wallet using old pk() coinbases") + wallet = self.create_legacy_wallet("pkcb") + + addr = wallet.getnewaddress() + addr_info = wallet.getaddressinfo(addr) + desc = descsum_create("pk(" + addr_info["pubkey"] + ")") + + self.nodes[0].generatetodescriptor(1, desc, invalid_call=False) + + bals = wallet.getbalances() + + wallet.migratewallet() + + assert_equal(bals, wallet.getbalances()) + + def run_test(self): + self.generate(self.nodes[0], 101) + + # TODO: Test the actual records in the wallet for these tests too. The behavior may be correct, but the data written may not be what we actually want + self.test_basic() + self.test_multisig() + self.test_other_watchonly() + self.test_no_privkeys() + self.test_pk_coinbases() + +if __name__ == '__main__': + WalletMigrationTest().main() |