aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_framework')
-rw-r--r--test/functional/test_framework/address.py7
-rw-r--r--test/functional/test_framework/key.py4
-rwxr-xr-xtest/functional/test_framework/script_util.py5
-rwxr-xr-xtest/functional/test_framework/test_framework.py12
-rw-r--r--test/functional/test_framework/util.py92
-rw-r--r--test/functional/test_framework/wallet.py151
6 files changed, 129 insertions, 142 deletions
diff --git a/test/functional/test_framework/address.py b/test/functional/test_framework/address.py
index fcea24655b..92244b5ed8 100644
--- a/test/functional/test_framework/address.py
+++ b/test/functional/test_framework/address.py
@@ -47,8 +47,7 @@ def create_deterministic_address_bcrt1_p2tr_op_true():
Returns a tuple with the generated address and the internal key.
"""
internal_key = (1).to_bytes(32, 'big')
- scriptPubKey = taproot_construct(internal_key, [(None, CScript([OP_TRUE]))]).scriptPubKey
- address = encode_segwit_address("bcrt", 1, scriptPubKey[2:])
+ address = output_key_to_p2tr(taproot_construct(internal_key, [(None, CScript([OP_TRUE]))]).output_pubkey)
assert_equal(address, 'bcrt1p9yfmy5h72durp7zrhlw9lf7jpwjgvwdg0jr0lqmmjtgg83266lqsekaqka')
return (address, internal_key)
@@ -141,6 +140,10 @@ def script_to_p2sh_p2wsh(script, main=False):
p2shscript = CScript([OP_0, sha256(script)])
return script_to_p2sh(p2shscript, main)
+def output_key_to_p2tr(key, main=False):
+ assert len(key) == 32
+ return program_to_witness(1, key, main)
+
def check_key(key):
if (type(key) is str):
key = bytes.fromhex(key) # Assuming this is hex string
diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py
index e5dea66963..68afc1383d 100644
--- a/test/functional/test_framework/key.py
+++ b/test/functional/test_framework/key.py
@@ -15,6 +15,10 @@ import unittest
from .util import modinv
+# Point with no known discrete log.
+H_POINT = "50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0"
+
+
def TaggedHash(tag, data):
ss = hashlib.sha256(tag.encode('utf-8')).digest()
ss += ss
diff --git a/test/functional/test_framework/script_util.py b/test/functional/test_framework/script_util.py
index f7d8422eee..b114002145 100755
--- a/test/functional/test_framework/script_util.py
+++ b/test/functional/test_framework/script_util.py
@@ -105,6 +105,11 @@ def script_to_p2sh_p2wsh_script(script):
return script_to_p2sh_script(p2shscript)
+def output_key_to_p2tr_script(key):
+ assert len(key) == 32
+ return program_to_witness_script(1, key)
+
+
def check_key(key):
if isinstance(key, str):
key = bytes.fromhex(key) # Assuming this is hex string
diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py
index 3f02d21d42..c880aabd21 100755
--- a/test/functional/test_framework/test_framework.py
+++ b/test/functional/test_framework/test_framework.py
@@ -581,6 +581,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
def connect_nodes(self, a, b):
from_connection = self.nodes[a]
to_connection = self.nodes[b]
+ from_num_peers = 1 + len(from_connection.getpeerinfo())
+ to_num_peers = 1 + len(to_connection.getpeerinfo())
ip_port = "127.0.0.1:" + str(p2p_port(b))
from_connection.addnode(ip_port, "onetry")
# poll until version handshake complete to avoid race conditions
@@ -588,10 +590,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
# See comments in net_processing:
# * Must have a version message before anything else
# * Must have a verack message before anything else
- wait_until_helper(lambda: all(peer['version'] != 0 for peer in from_connection.getpeerinfo()))
- wait_until_helper(lambda: all(peer['version'] != 0 for peer in to_connection.getpeerinfo()))
- wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo()))
- wait_until_helper(lambda: all(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo()))
+ self.wait_until(lambda: sum(peer['version'] != 0 for peer in from_connection.getpeerinfo()) == from_num_peers)
+ self.wait_until(lambda: sum(peer['version'] != 0 for peer in to_connection.getpeerinfo()) == to_num_peers)
+ self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in from_connection.getpeerinfo()) == from_num_peers)
+ self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) == 24 for peer in to_connection.getpeerinfo()) == to_num_peers)
def disconnect_nodes(self, a, b):
def disconnect_nodes_helper(from_connection, node_num):
@@ -620,7 +622,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
raise
# wait to disconnect
- wait_until_helper(lambda: not get_peer_ids(), timeout=5)
+ self.wait_until(lambda: not get_peer_ids(), timeout=5)
disconnect_nodes_helper(self.nodes[a], b)
diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py
index b043d1a70d..1ee23f7574 100644
--- a/test/functional/test_framework/util.py
+++ b/test/functional/test_framework/util.py
@@ -476,39 +476,6 @@ def find_output(node, txid, amount, *, blockhash=None):
raise RuntimeError("find_output txid %s : %s not found" % (txid, str(amount)))
-# Helper to create at least "count" utxos
-# Pass in a fee that is sufficient for relay and mining new transactions.
-def create_confirmed_utxos(test_framework, fee, node, count, **kwargs):
- to_generate = int(0.5 * count) + 101
- while to_generate > 0:
- test_framework.generate(node, min(25, to_generate), **kwargs)
- to_generate -= 25
- utxos = node.listunspent()
- iterations = count - len(utxos)
- addr1 = node.getnewaddress()
- addr2 = node.getnewaddress()
- if iterations <= 0:
- return utxos
- for _ in range(iterations):
- t = utxos.pop()
- inputs = []
- inputs.append({"txid": t["txid"], "vout": t["vout"]})
- outputs = {}
- send_value = t['amount'] - fee
- outputs[addr1] = satoshi_round(send_value / 2)
- outputs[addr2] = satoshi_round(send_value / 2)
- raw_tx = node.createrawtransaction(inputs, outputs)
- signed_tx = node.signrawtransactionwithwallet(raw_tx)["hex"]
- node.sendrawtransaction(signed_tx)
-
- while (node.getmempoolinfo()['size'] > 0):
- test_framework.generate(node, 1, **kwargs)
-
- utxos = node.listunspent()
- assert len(utxos) >= count
- return utxos
-
-
def chain_transaction(node, parent_txids, vouts, value, fee, num_outputs):
"""Build and send a transaction that spends the given inputs (specified
by lists of parent_txid:vout each), with the desired total value and fee,
@@ -532,45 +499,33 @@ def chain_transaction(node, parent_txids, vouts, value, fee, num_outputs):
# Create large OP_RETURN txouts that can be appended to a transaction
-# to make it large (helper for constructing large transactions).
+# to make it large (helper for constructing large transactions). The
+# total serialized size of the txouts is about 66k vbytes.
def gen_return_txouts():
- # Some pre-processing to create a bunch of OP_RETURN txouts to insert into transactions we create
- # So we have big transactions (and therefore can't fit very many into each block)
- # create one script_pubkey
- script_pubkey = "6a4d0200" # OP_RETURN OP_PUSH2 512 bytes
- for _ in range(512):
- script_pubkey = script_pubkey + "01"
- # concatenate 128 txouts of above script_pubkey which we'll insert before the txout for change
- txouts = []
from .messages import CTxOut
- txout = CTxOut()
- txout.nValue = 0
- txout.scriptPubKey = bytes.fromhex(script_pubkey)
- for _ in range(128):
- txouts.append(txout)
+ from .script import CScript, OP_RETURN
+ txouts = [CTxOut(nValue=0, scriptPubKey=CScript([OP_RETURN, b'\x01'*67437]))]
+ assert_equal(sum([len(txout.serialize()) for txout in txouts]), 67456)
return txouts
# Create a spend of each passed-in utxo, splicing in "txouts" to each raw
# transaction to make it large. See gen_return_txouts() above.
-def create_lots_of_big_transactions(node, txouts, utxos, num, fee):
- addr = node.getnewaddress()
+def create_lots_of_big_transactions(mini_wallet, node, fee, tx_batch_size, txouts, utxos=None):
+ from .messages import COIN
+ fee_sats = int(fee * COIN)
txids = []
- from .messages import tx_from_hex
- for _ in range(num):
- t = utxos.pop()
- inputs = [{"txid": t["txid"], "vout": t["vout"]}]
- outputs = {}
- change = t['amount'] - fee
- outputs[addr] = satoshi_round(change)
- rawtx = node.createrawtransaction(inputs, outputs)
- tx = tx_from_hex(rawtx)
- for txout in txouts:
- tx.vout.append(txout)
- newtx = tx.serialize().hex()
- signresult = node.signrawtransactionwithwallet(newtx, None, "NONE")
- txid = node.sendrawtransaction(signresult["hex"], 0)
- txids.append(txid)
+ use_internal_utxos = utxos is None
+ for _ in range(tx_batch_size):
+ tx = mini_wallet.create_self_transfer(
+ utxo_to_spend=None if use_internal_utxos else utxos.pop(),
+ fee_rate=0,
+ )["tx"]
+ tx.vout[0].nValue -= fee_sats
+ tx.vout.extend(txouts)
+ res = node.testmempoolaccept([tx.serialize().hex()])[0]
+ assert_equal(res['fees']['base'], fee)
+ txids.append(node.sendrawtransaction(tx.serialize().hex()))
return txids
@@ -578,13 +533,8 @@ def mine_large_block(test_framework, mini_wallet, node):
# generate a 66k transaction,
# and 14 of them is close to the 1MB block limit
txouts = gen_return_txouts()
- from .messages import COIN
- fee = 100 * int(node.getnetworkinfo()["relayfee"] * COIN)
- for _ in range(14):
- tx = mini_wallet.create_self_transfer(from_node=node, fee_rate=0, mempool_valid=False)['tx']
- tx.vout[0].nValue -= fee
- tx.vout.extend(txouts)
- mini_wallet.sendrawtransaction(from_node=node, tx_hex=tx.serialize().hex())
+ fee = 100 * node.getnetworkinfo()["relayfee"]
+ create_lots_of_big_transactions(mini_wallet, node, fee, 14, txouts)
test_framework.generate(node, 1)
diff --git a/test/functional/test_framework/wallet.py b/test/functional/test_framework/wallet.py
index e43dd9f61a..68d5dfa880 100644
--- a/test/functional/test_framework/wallet.py
+++ b/test/functional/test_framework/wallet.py
@@ -19,9 +19,13 @@ from test_framework.address import (
key_to_p2pkh,
key_to_p2sh_p2wpkh,
key_to_p2wpkh,
+ output_key_to_p2tr,
)
from test_framework.descriptors import descsum_create
-from test_framework.key import ECKey
+from test_framework.key import (
+ ECKey,
+ compute_xonly_pubkey,
+)
from test_framework.messages import (
COIN,
COutPoint,
@@ -38,6 +42,7 @@ from test_framework.script import (
OP_NOP,
OP_TRUE,
SIGHASH_ALL,
+ taproot_construct,
)
from test_framework.script_util import (
key_to_p2pk_script,
@@ -81,8 +86,7 @@ class MiniWallet:
def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE):
self._test_node = test_node
self._utxos = []
- self._priv_key = None
- self._address = None
+ self._mode = mode
assert isinstance(mode, MiniWalletMode)
if mode == MiniWalletMode.RAW_OP_TRUE:
@@ -97,6 +101,9 @@ class MiniWallet:
self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true()
self._scriptPubKey = bytes.fromhex(self._test_node.validateaddress(self._address)['scriptPubKey'])
+ def _create_utxo(self, *, txid, vout, value, height):
+ return {"txid": txid, "vout": vout, "value": value, "height": height}
+
def get_balance(self):
return sum(u['value'] for u in self._utxos)
@@ -106,17 +113,26 @@ class MiniWallet:
res = self._test_node.scantxoutset(action="start", scanobjects=[self.get_descriptor()])
assert_equal(True, res['success'])
for utxo in res['unspents']:
- self._utxos.append({'txid': utxo['txid'], 'vout': utxo['vout'], 'value': utxo['amount'], 'height': utxo['height']})
+ self._utxos.append(self._create_utxo(txid=utxo["txid"], vout=utxo["vout"], value=utxo["amount"], height=utxo["height"]))
def scan_tx(self, tx):
- """Scan the tx for self._scriptPubKey outputs and add them to self._utxos"""
+ """Scan the tx and adjust the internal list of owned utxos"""
+ for spent in tx["vin"]:
+ # Mark spent. This may happen when the caller has ownership of a
+ # utxo that remained in this wallet. For example, by passing
+ # mark_as_spent=False to get_utxo or by using an utxo returned by a
+ # create_self_transfer* call.
+ try:
+ self.get_utxo(txid=spent["txid"], vout=spent["vout"])
+ except StopIteration:
+ pass
for out in tx['vout']:
if out['scriptPubKey']['hex'] == self._scriptPubKey.hex():
- self._utxos.append({'txid': tx['txid'], 'vout': out['n'], 'value': out['value'], 'height': 0})
+ self._utxos.append(self._create_utxo(txid=tx["txid"], vout=out["n"], value=out["value"], height=0))
def sign_tx(self, tx, fixed_length=True):
"""Sign tx that has been created by MiniWallet in P2PK mode"""
- assert self._priv_key is not None
+ assert_equal(self._mode, MiniWalletMode.RAW_P2PK)
(sighash, err) = LegacySignatureHash(CScript(self._scriptPubKey), tx, 0, SIGHASH_ALL)
assert err is None
# for exact fee calculation, create only signatures with fixed size by default (>49.89% probability):
@@ -131,12 +147,16 @@ class MiniWallet:
tx.rehash()
def generate(self, num_blocks, **kwargs):
- """Generate blocks with coinbase outputs to the internal address, and append the outputs to the internal list"""
+ """Generate blocks with coinbase outputs to the internal address, and call rescan_utxos"""
blocks = self._test_node.generatetodescriptor(num_blocks, self.get_descriptor(), **kwargs)
- for b in blocks:
- block_info = self._test_node.getblock(blockhash=b, verbosity=2)
- cb_tx = block_info['tx'][0]
- self._utxos.append({'txid': cb_tx['txid'], 'vout': 0, 'value': cb_tx['vout'][0]['value'], 'height': block_info['height']})
+ # Calling rescan_utxos here makes sure that after a generate the utxo
+ # set is in a clean state. For example, the wallet will update
+ # - if the caller consumed utxos, but never used them
+ # - if the caller sent a transaction that is not mined or got rbf'd
+ # - after block re-orgs
+ # - the utxo height for mined mempool txs
+ # - However, the wallet will not consider remaining mempool txs
+ self.rescan_utxos()
return blocks
def get_scriptPubKey(self):
@@ -146,6 +166,7 @@ class MiniWallet:
return descsum_create(f'raw({self._scriptPubKey.hex()})')
def get_address(self):
+ assert_equal(self._mode, MiniWalletMode.ADDRESS_OP_TRUE)
return self._address
def get_utxo(self, *, txid: str = '', vout: Optional[int] = None, mark_as_spent=True) -> dict:
@@ -175,10 +196,10 @@ class MiniWallet:
self._utxos = []
return utxos
- def send_self_transfer(self, **kwargs):
+ def send_self_transfer(self, *, from_node, **kwargs):
"""Create and send a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed."""
tx = self.create_self_transfer(**kwargs)
- self.sendrawtransaction(from_node=kwargs['from_node'], tx_hex=tx['hex'])
+ self.sendrawtransaction(from_node=from_node, tx_hex=tx['hex'])
return tx
def send_to(self, *, from_node, scriptPubKey, amount, fee=1000):
@@ -193,35 +214,27 @@ class MiniWallet:
Returns a tuple (txid, n) referring to the created external utxo outpoint.
"""
- tx = self.create_self_transfer(from_node=from_node, fee_rate=0, mempool_valid=False)['tx']
+ tx = self.create_self_transfer(fee_rate=0)["tx"]
assert_greater_than_or_equal(tx.vout[0].nValue, amount + fee)
tx.vout[0].nValue -= (amount + fee) # change output -> MiniWallet
tx.vout.append(CTxOut(amount, scriptPubKey)) # arbitrary output -> to be returned
txid = self.sendrawtransaction(from_node=from_node, tx_hex=tx.serialize().hex())
return txid, 1
- def send_self_transfer_multi(self, **kwargs):
- """
- Create and send a transaction that spends the given UTXOs and creates a
- certain number of outputs with equal amounts.
-
- Returns a dictionary with
- - txid
- - serialized transaction in hex format
- - transaction as CTransaction instance
- - list of newly created UTXOs, ordered by vout index
- """
+ def send_self_transfer_multi(self, *, from_node, **kwargs):
+ """Call create_self_transfer_multi and send the transaction."""
tx = self.create_self_transfer_multi(**kwargs)
- txid = self.sendrawtransaction(from_node=kwargs['from_node'], tx_hex=tx.serialize().hex())
- return {'new_utxos': [self.get_utxo(txid=txid, vout=vout) for vout in range(len(tx.vout))],
- 'txid': txid, 'hex': tx.serialize().hex(), 'tx': tx}
+ self.sendrawtransaction(from_node=from_node, tx_hex=tx["hex"])
+ return tx
def create_self_transfer_multi(
- self, *, from_node,
- utxos_to_spend: Optional[List[dict]] = None,
- num_outputs=1,
- sequence=0,
- fee_per_output=1000):
+ self,
+ *,
+ utxos_to_spend: Optional[List[dict]] = None,
+ num_outputs=1,
+ sequence=0,
+ fee_per_output=1000,
+ ):
"""
Create and return a transaction that spends the given UTXOs and creates a
certain number of outputs with equal amounts.
@@ -229,8 +242,8 @@ class MiniWallet:
utxos_to_spend = utxos_to_spend or [self.get_utxo()]
# create simple tx template (1 input, 1 output)
tx = self.create_self_transfer(
- fee_rate=0, from_node=from_node,
- utxo_to_spend=utxos_to_spend[0], sequence=sequence, mempool_valid=False)['tx']
+ fee_rate=0,
+ utxo_to_spend=utxos_to_spend[0], sequence=sequence)["tx"]
# duplicate inputs, witnesses and outputs
tx.vin = [deepcopy(tx.vin[0]) for _ in range(len(utxos_to_spend))]
@@ -246,44 +259,50 @@ class MiniWallet:
outputs_value_total = inputs_value_total - fee_per_output * num_outputs
for o in tx.vout:
o.nValue = outputs_value_total // num_outputs
- return tx
-
- def create_self_transfer(self, *, fee_rate=Decimal("0.003"), from_node=None, utxo_to_spend=None, mempool_valid=True, locktime=0, sequence=0):
- """Create and return a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed.
- Checking mempool validity via the testmempoolaccept RPC can be skipped by setting mempool_valid to False."""
- from_node = from_node or self._test_node
+ txid = tx.rehash()
+ return {
+ "new_utxos": [self._create_utxo(
+ txid=txid,
+ vout=i,
+ value=Decimal(tx.vout[i].nValue) / COIN,
+ height=0,
+ ) for i in range(len(tx.vout))],
+ "txid": txid,
+ "hex": tx.serialize().hex(),
+ "tx": tx,
+ }
+
+ def create_self_transfer(self, *, fee_rate=Decimal("0.003"), utxo_to_spend=None, locktime=0, sequence=0):
+ """Create and return a tx with the specified fee_rate. Fee may be exact or at most one satoshi higher than needed."""
utxo_to_spend = utxo_to_spend or self.get_utxo()
- if self._priv_key is None:
+ if self._mode in (MiniWalletMode.RAW_OP_TRUE, MiniWalletMode.ADDRESS_OP_TRUE):
vsize = Decimal(104) # anyone-can-spend
- else:
+ elif self._mode == MiniWalletMode.RAW_P2PK:
vsize = Decimal(168) # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other)
- send_value = int(COIN * (utxo_to_spend['value'] - fee_rate * (vsize / 1000)))
+ else:
+ assert False
+ send_value = utxo_to_spend["value"] - (fee_rate * vsize / 1000)
assert send_value > 0
tx = CTransaction()
tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=sequence)]
- tx.vout = [CTxOut(send_value, self._scriptPubKey)]
+ tx.vout = [CTxOut(int(COIN * send_value), self._scriptPubKey)]
tx.nLockTime = locktime
- if not self._address:
- # raw script
- if self._priv_key is not None:
- # P2PK, need to sign
- self.sign_tx(tx)
- else:
- # anyone-can-spend
- tx.vin[0].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
- else:
+ if self._mode == MiniWalletMode.RAW_P2PK:
+ self.sign_tx(tx)
+ elif self._mode == MiniWalletMode.RAW_OP_TRUE:
+ tx.vin[0].scriptSig = CScript([OP_NOP] * 43) # pad to identical size
+ elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
tx.wit.vtxinwit = [CTxInWitness()]
tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
+ else:
+ assert False
tx_hex = tx.serialize().hex()
- if mempool_valid:
- tx_info = from_node.testmempoolaccept([tx_hex])[0]
- assert_equal(tx_info['allowed'], True)
- assert_equal(tx_info['vsize'], vsize)
- assert_equal(tx_info['fees']['base'], utxo_to_spend['value'] - Decimal(send_value) / COIN)
+ assert_equal(tx.get_vsize(), vsize)
+ new_utxo = self._create_utxo(txid=tx.rehash(), vout=0, value=send_value, height=0)
- return {'txid': tx.rehash(), 'wtxid': tx.getwtxid(), 'hex': tx_hex, 'tx': tx}
+ return {"txid": new_utxo["txid"], "wtxid": tx.getwtxid(), "hex": tx_hex, "tx": tx, "new_utxo": new_utxo}
def sendrawtransaction(self, *, from_node, tx_hex, maxfeerate=0, **kwargs):
txid = from_node.sendrawtransaction(hexstring=tx_hex, maxfeerate=maxfeerate, **kwargs)
@@ -291,10 +310,10 @@ class MiniWallet:
return txid
-def getnewdestination(address_type='bech32'):
+def getnewdestination(address_type='bech32m'):
"""Generate a random destination of the specified type and return the
corresponding public key, scriptPubKey and address. Supported types are
- 'legacy', 'p2sh-segwit' and 'bech32'. Can be used when a random
+ 'legacy', 'p2sh-segwit', 'bech32' and 'bech32m'. Can be used when a random
destination is needed, but no compiled wallet is available (e.g. as
replacement to the getnewaddress/getaddressinfo RPCs)."""
key = ECKey()
@@ -309,7 +328,11 @@ def getnewdestination(address_type='bech32'):
elif address_type == 'bech32':
scriptpubkey = key_to_p2wpkh_script(pubkey)
address = key_to_p2wpkh(pubkey)
- # TODO: also support bech32m (need to generate x-only-pubkey)
+ elif address_type == 'bech32m':
+ tap = taproot_construct(compute_xonly_pubkey(key.get_bytes())[0])
+ pubkey = tap.output_pubkey
+ scriptpubkey = tap.scriptPubKey
+ address = output_key_to_p2tr(pubkey)
else:
assert False
return pubkey, scriptpubkey, address