diff options
Diffstat (limited to 'test/functional/wallet_taproot.py')
-rwxr-xr-x | test/functional/wallet_taproot.py | 165 |
1 files changed, 160 insertions, 5 deletions
diff --git a/test/functional/wallet_taproot.py b/test/functional/wallet_taproot.py index 65ca7bdef7..9eb204bf37 100755 --- a/test/functional/wallet_taproot.py +++ b/test/functional/wallet_taproot.py @@ -6,6 +6,7 @@ import random +from decimal import Decimal from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.descriptors import descsum_create @@ -172,9 +173,9 @@ class WalletTaprootTest(BitcoinTestFramework): """Test generation and spending of P2TR address outputs.""" def set_test_params(self): - self.num_nodes = 2 + self.num_nodes = 3 self.setup_clean_chain = True - self.extra_args = [['-keypool=100'], ['-keypool=100']] + self.extra_args = [['-keypool=100'], ['-keypool=100'], ["-vbparams=taproot:1:1"]] self.supports_cli = False def skip_test_if_missing_module(self): @@ -225,19 +226,124 @@ class WalletTaprootTest(BitcoinTestFramework): result = self.addr_gen.importdescriptors([{"desc": desc_pub, "active": True, "timestamp": "now"}]) assert(result[0]['success']) for i in range(4): - addr_g = self.addr_gen.getnewaddress(address_type='bech32') + addr_g = self.addr_gen.getnewaddress(address_type='bech32m') if treefn is not None: addr_r = self.make_addr(treefn, keys, i) assert_equal(addr_g, addr_r) + desc_a = self.addr_gen.getaddressinfo(addr_g)['desc'] + if desc.startswith("tr("): + assert desc_a.startswith("tr(") + rederive = self.nodes[1].deriveaddresses(desc_a) + assert_equal(len(rederive), 1) + assert_equal(rederive[0], addr_g) + + # tr descriptors cannot be imported when Taproot is not active + result = self.privs_tr_enabled.importdescriptors([{"desc": desc, "timestamp": "now"}]) + assert(result[0]["success"]) + result = self.pubs_tr_enabled.importdescriptors([{"desc": desc_pub, "timestamp": "now"}]) + assert(result[0]["success"]) + if desc.startswith("tr"): + result = self.privs_tr_disabled.importdescriptors([{"desc": desc, "timestamp": "now"}]) + assert(not result[0]["success"]) + assert_equal(result[0]["error"]["code"], -4) + assert_equal(result[0]["error"]["message"], "Cannot import tr() descriptor when Taproot is not active") + result = self.pubs_tr_disabled.importdescriptors([{"desc": desc_pub, "timestamp": "now"}]) + assert(not result[0]["success"]) + assert_equal(result[0]["error"]["code"], -4) + assert_equal(result[0]["error"]["message"], "Cannot import tr() descriptor when Taproot is not active") + + def do_test_sendtoaddress(self, comment, pattern, privmap, treefn, keys_pay, keys_change): + self.log.info("Testing %s through sendtoaddress" % comment) + desc_pay = self.make_desc(pattern, privmap, keys_pay) + desc_change = self.make_desc(pattern, privmap, keys_change) + desc_pay_pub = self.make_desc(pattern, privmap, keys_pay, True) + desc_change_pub = self.make_desc(pattern, privmap, keys_change, True) + assert_equal(self.nodes[0].getdescriptorinfo(desc_pay)['descriptor'], desc_pay_pub) + assert_equal(self.nodes[0].getdescriptorinfo(desc_change)['descriptor'], desc_change_pub) + result = self.rpc_online.importdescriptors([{"desc": desc_pay, "active": True, "timestamp": "now"}]) + assert(result[0]['success']) + result = self.rpc_online.importdescriptors([{"desc": desc_change, "active": True, "timestamp": "now", "internal": True}]) + assert(result[0]['success']) + for i in range(4): + addr_g = self.rpc_online.getnewaddress(address_type='bech32m') + if treefn is not None: + addr_r = self.make_addr(treefn, keys_pay, i) + assert_equal(addr_g, addr_r) + boring_balance = int(self.boring.getbalance() * 100000000) + to_amnt = random.randrange(1000000, boring_balance) + self.boring.sendtoaddress(address=addr_g, amount=Decimal(to_amnt) / 100000000, subtractfeefromamount=True) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + test_balance = int(self.rpc_online.getbalance() * 100000000) + ret_amnt = random.randrange(100000, test_balance) + res = self.rpc_online.sendtoaddress(address=self.boring.getnewaddress(), amount=Decimal(ret_amnt) / 100000000, subtractfeefromamount=True) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + assert(self.rpc_online.gettransaction(res)["confirmations"] > 0) + + def do_test_psbt(self, comment, pattern, privmap, treefn, keys_pay, keys_change): + self.log.info("Testing %s through PSBT" % comment) + desc_pay = self.make_desc(pattern, privmap, keys_pay, False) + desc_change = self.make_desc(pattern, privmap, keys_change, False) + desc_pay_pub = self.make_desc(pattern, privmap, keys_pay, True) + desc_change_pub = self.make_desc(pattern, privmap, keys_change, True) + assert_equal(self.nodes[0].getdescriptorinfo(desc_pay)['descriptor'], desc_pay_pub) + assert_equal(self.nodes[0].getdescriptorinfo(desc_change)['descriptor'], desc_change_pub) + result = self.psbt_online.importdescriptors([{"desc": desc_pay_pub, "active": True, "timestamp": "now"}]) + assert(result[0]['success']) + result = self.psbt_online.importdescriptors([{"desc": desc_change_pub, "active": True, "timestamp": "now", "internal": True}]) + assert(result[0]['success']) + result = self.psbt_offline.importdescriptors([{"desc": desc_pay, "active": True, "timestamp": "now"}]) + assert(result[0]['success']) + result = self.psbt_offline.importdescriptors([{"desc": desc_change, "active": True, "timestamp": "now", "internal": True}]) + assert(result[0]['success']) + for i in range(4): + addr_g = self.psbt_online.getnewaddress(address_type='bech32m') + if treefn is not None: + addr_r = self.make_addr(treefn, keys_pay, i) + assert_equal(addr_g, addr_r) + boring_balance = int(self.boring.getbalance() * 100000000) + to_amnt = random.randrange(1000000, boring_balance) + self.boring.sendtoaddress(address=addr_g, amount=Decimal(to_amnt) / 100000000, subtractfeefromamount=True) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + test_balance = int(self.psbt_online.getbalance() * 100000000) + ret_amnt = random.randrange(100000, test_balance) + psbt = self.psbt_online.walletcreatefundedpsbt([], [{self.boring.getnewaddress(): Decimal(ret_amnt) / 100000000}], None, {"subtractFeeFromOutputs":[0]})['psbt'] + res = self.psbt_offline.walletprocesspsbt(psbt) + assert(res['complete']) + rawtx = self.nodes[0].finalizepsbt(res['psbt'])['hex'] + txid = self.nodes[0].sendrawtransaction(rawtx) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + assert(self.psbt_online.gettransaction(txid)['confirmations'] > 0) def do_test(self, comment, pattern, privmap, treefn, nkeys): - keys = self.rand_keys(nkeys) - self.do_test_addr(comment, pattern, privmap, treefn, keys) + keys = self.rand_keys(nkeys * 4) + self.do_test_addr(comment, pattern, privmap, treefn, keys[0:nkeys]) + self.do_test_sendtoaddress(comment, pattern, privmap, treefn, keys[0:nkeys], keys[nkeys:2*nkeys]) + self.do_test_psbt(comment, pattern, privmap, treefn, keys[2*nkeys:3*nkeys], keys[3*nkeys:4*nkeys]) def run_test(self): self.log.info("Creating wallets...") + self.nodes[0].createwallet(wallet_name="privs_tr_enabled", descriptors=True, blank=True) + self.privs_tr_enabled = self.nodes[0].get_wallet_rpc("privs_tr_enabled") + self.nodes[2].createwallet(wallet_name="privs_tr_disabled", descriptors=True, blank=True) + self.privs_tr_disabled=self.nodes[2].get_wallet_rpc("privs_tr_disabled") + self.nodes[0].createwallet(wallet_name="pubs_tr_enabled", descriptors=True, blank=True, disable_private_keys=True) + self.pubs_tr_enabled = self.nodes[0].get_wallet_rpc("pubs_tr_enabled") + self.nodes[2].createwallet(wallet_name="pubs_tr_disabled", descriptors=True, blank=True, disable_private_keys=True) + self.pubs_tr_disabled=self.nodes[2].get_wallet_rpc("pubs_tr_disabled") + self.nodes[0].createwallet(wallet_name="boring") self.nodes[0].createwallet(wallet_name="addr_gen", descriptors=True, disable_private_keys=True, blank=True) + self.nodes[0].createwallet(wallet_name="rpc_online", descriptors=True, blank=True) + self.nodes[0].createwallet(wallet_name="psbt_online", descriptors=True, disable_private_keys=True, blank=True) + self.nodes[1].createwallet(wallet_name="psbt_offline", descriptors=True, blank=True) + self.boring = self.nodes[0].get_wallet_rpc("boring") self.addr_gen = self.nodes[0].get_wallet_rpc("addr_gen") + self.rpc_online = self.nodes[0].get_wallet_rpc("rpc_online") + self.psbt_online = self.nodes[0].get_wallet_rpc("psbt_online") + self.psbt_offline = self.nodes[1].get_wallet_rpc("psbt_offline") + + self.log.info("Mining blocks...") + gen_addr = self.boring.getnewaddress() + self.nodes[0].generatetoaddress(101, gen_addr) self.do_test( "tr(XPRV)", @@ -254,6 +360,13 @@ class WalletTaprootTest(BitcoinTestFramework): 1 ) self.do_test( + "wpkh(XPRV)", + "wpkh($1/*)", + [True], + None, + 1 + ) + self.do_test( "tr(XPRV,{H,{H,XPUB}})", "tr($1/*,{pk($H),{pk($H),pk($2/*)}})", [True, False], @@ -261,12 +374,54 @@ class WalletTaprootTest(BitcoinTestFramework): 2 ) self.do_test( + "wsh(multi(1,XPRV,XPUB))", + "wsh(multi(1,$1/*,$2/*))", + [True, False], + None, + 2 + ) + self.do_test( + "tr(XPRV,{XPUB,XPUB})", + "tr($1/*,{pk($2/*),pk($2/*)})", + [True, False], + lambda k1, k2: (key(k1), [pk(k2), pk(k2)]), + 2 + ) + self.do_test( + "tr(XPRV,{{XPUB,H},{H,XPUB}})", + "tr($1/*,{{pk($2/*),pk($H)},{pk($H),pk($2/*)}})", + [True, False], + lambda k1, k2: (key(k1), [[pk(k2), pk(H_POINT)], [pk(H_POINT), pk(k2)]]), + 2 + ) + self.do_test( "tr(XPUB,{{H,{H,XPUB}},{H,{H,{H,XPRV}}}})", "tr($1/*,{{pk($H),{pk($H),pk($2/*)}},{pk($H),{pk($H),{pk($H),pk($3/*)}}}})", [False, False, True], lambda k1, k2, k3: (key(k1), [[pk(H_POINT), [pk(H_POINT), pk(k2)]], [pk(H_POINT), [pk(H_POINT), [pk(H_POINT), pk(k3)]]]]), 3 ) + self.do_test( + "tr(XPRV,{XPUB,{{XPUB,{H,H}},{{H,H},XPUB}}})", + "tr($1/*,{pk($2/*),{{pk($2/*),{pk($H),pk($H)}},{{pk($H),pk($H)},pk($2/*)}}})", + [True, False], + lambda k1, k2: (key(k1), [pk(k2), [[pk(k2), [pk(H_POINT), pk(H_POINT)]], [[pk(H_POINT), pk(H_POINT)], pk(k2)]]]), + 2 + ) + + self.log.info("Sending everything back...") + + txid = self.rpc_online.sendtoaddress(address=self.boring.getnewaddress(), amount=self.rpc_online.getbalance(), subtractfeefromamount=True) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + assert(self.rpc_online.gettransaction(txid)["confirmations"] > 0) + + psbt = self.psbt_online.walletcreatefundedpsbt([], [{self.boring.getnewaddress(): self.psbt_online.getbalance()}], None, {"subtractFeeFromOutputs": [0]})['psbt'] + res = self.psbt_offline.walletprocesspsbt(psbt) + assert(res['complete']) + rawtx = self.nodes[0].finalizepsbt(res['psbt'])['hex'] + txid = self.nodes[0].sendrawtransaction(rawtx) + self.nodes[0].generatetoaddress(1, self.boring.getnewaddress()) + assert(self.psbt_online.gettransaction(txid)['confirmations'] > 0) if __name__ == '__main__': WalletTaprootTest().main() |