aboutsummaryrefslogtreecommitdiff
path: root/test/functional/test_framework
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_framework')
-rw-r--r--test/functional/test_framework/address.py17
-rw-r--r--test/functional/test_framework/authproxy.py44
-rw-r--r--test/functional/test_framework/blocktools.py9
-rw-r--r--test/functional/test_framework/crypto/secp256k1.py8
-rw-r--r--test/functional/test_framework/key.py6
-rw-r--r--test/functional/test_framework/mempool_util.py81
-rwxr-xr-xtest/functional/test_framework/messages.py24
-rw-r--r--test/functional/test_framework/netutil.py9
-rwxr-xr-xtest/functional/test_framework/p2p.py48
-rw-r--r--test/functional/test_framework/script.py61
-rwxr-xr-xtest/functional/test_framework/script_util.py28
-rwxr-xr-xtest/functional/test_framework/test_framework.py74
-rwxr-xr-xtest/functional/test_framework/test_node.py54
-rw-r--r--test/functional/test_framework/test_shell.py10
-rw-r--r--test/functional/test_framework/util.py41
-rw-r--r--test/functional/test_framework/v2_p2p.py12
-rw-r--r--test/functional/test_framework/wallet.py45
-rwxr-xr-xtest/functional/test_framework/wallet_util.py58
18 files changed, 485 insertions, 144 deletions
diff --git a/test/functional/test_framework/address.py b/test/functional/test_framework/address.py
index 5b2e3289a9..2c754e35aa 100644
--- a/test/functional/test_framework/address.py
+++ b/test/functional/test_framework/address.py
@@ -47,18 +47,20 @@ class AddressType(enum.Enum):
b58chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
-def create_deterministic_address_bcrt1_p2tr_op_true():
+def create_deterministic_address_bcrt1_p2tr_op_true(explicit_internal_key=None):
"""
Generates a deterministic bech32m address (segwit v1 output) that
can be spent with a witness stack of OP_TRUE and the control block
with internal public key (script-path spending).
- Returns a tuple with the generated address and the internal key.
+ Returns a tuple with the generated address and the TaprootInfo object.
"""
- internal_key = (1).to_bytes(32, 'big')
- address = output_key_to_p2tr(taproot_construct(internal_key, [(None, CScript([OP_TRUE]))]).output_pubkey)
- assert_equal(address, 'bcrt1p9yfmy5h72durp7zrhlw9lf7jpwjgvwdg0jr0lqmmjtgg83266lqsekaqka')
- return (address, internal_key)
+ internal_key = explicit_internal_key or (1).to_bytes(32, 'big')
+ taproot_info = taproot_construct(internal_key, [("only-path", CScript([OP_TRUE]))])
+ address = output_key_to_p2tr(taproot_info.output_pubkey)
+ if explicit_internal_key is None:
+ assert_equal(address, 'bcrt1p9yfmy5h72durp7zrhlw9lf7jpwjgvwdg0jr0lqmmjtgg83266lqsekaqka')
+ return (address, taproot_info)
def byte_to_base58(b, version):
@@ -153,6 +155,9 @@ def output_key_to_p2tr(key, main=False):
assert len(key) == 32
return program_to_witness(1, key, main)
+def p2a(main=False):
+ return program_to_witness(1, "4e73", main)
+
def check_key(key):
if (type(key) is str):
key = bytes.fromhex(key) # Assuming this is hex string
diff --git a/test/functional/test_framework/authproxy.py b/test/functional/test_framework/authproxy.py
index 03042877b2..a357ae4d34 100644
--- a/test/functional/test_framework/authproxy.py
+++ b/test/functional/test_framework/authproxy.py
@@ -26,7 +26,7 @@ ServiceProxy class:
- HTTP connections persist for the life of the AuthServiceProxy object
(if server supports HTTP/1.1)
-- sends protocol 'version', per JSON-RPC 1.1
+- sends "jsonrpc":"2.0", per JSON-RPC 2.0
- sends proper, incrementing 'id'
- sends Basic HTTP authentication headers
- parses all JSON numbers that look like floats as Decimal
@@ -117,7 +117,7 @@ class AuthServiceProxy():
params = dict(args=args, **argsn)
else:
params = args or argsn
- return {'version': '1.1',
+ return {'jsonrpc': '2.0',
'method': self._service_name,
'params': params,
'id': AuthServiceProxy.__id_count}
@@ -125,15 +125,28 @@ class AuthServiceProxy():
def __call__(self, *args, **argsn):
postdata = json.dumps(self.get_request(*args, **argsn), default=serialization_fallback, ensure_ascii=self.ensure_ascii)
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
- if response['error'] is not None:
- raise JSONRPCException(response['error'], status)
- elif 'result' not in response:
- raise JSONRPCException({
- 'code': -343, 'message': 'missing JSON-RPC result'}, status)
- elif status != HTTPStatus.OK:
- raise JSONRPCException({
- 'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
+ # For backwards compatibility tests, accept JSON RPC 1.1 responses
+ if 'jsonrpc' not in response:
+ if response['error'] is not None:
+ raise JSONRPCException(response['error'], status)
+ elif 'result' not in response:
+ raise JSONRPCException({
+ 'code': -343, 'message': 'missing JSON-RPC result'}, status)
+ elif status != HTTPStatus.OK:
+ raise JSONRPCException({
+ 'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
+ else:
+ return response['result']
else:
+ assert response['jsonrpc'] == '2.0'
+ if status != HTTPStatus.OK:
+ raise JSONRPCException({
+ 'code': -342, 'message': 'non-200 HTTP status code'}, status)
+ if 'error' in response:
+ raise JSONRPCException(response['error'], status)
+ elif 'result' not in response:
+ raise JSONRPCException({
+ 'code': -343, 'message': 'missing JSON-RPC 2.0 result and error'}, status)
return response['result']
def batch(self, rpc_call_list):
@@ -142,7 +155,7 @@ class AuthServiceProxy():
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
if status != HTTPStatus.OK:
raise JSONRPCException({
- 'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
+ 'code': -342, 'message': 'non-200 HTTP status code'}, status)
return response
def _get_response(self):
@@ -160,6 +173,15 @@ class AuthServiceProxy():
raise JSONRPCException({
'code': -342, 'message': 'missing HTTP response from server'})
+ # Check for no-content HTTP status code, which can be returned when an
+ # RPC client requests a JSON-RPC 2.0 "notification" with no response.
+ # Currently this is only possible if clients call the _request() method
+ # directly to send a raw request.
+ if http_response.status == HTTPStatus.NO_CONTENT:
+ if len(http_response.read()) != 0:
+ raise JSONRPCException({'code': -342, 'message': 'Content received with NO CONTENT status code'})
+ return None, http_response.status
+
content_type = http_response.getheader('Content-Type')
if content_type != 'application/json':
raise JSONRPCException(
diff --git a/test/functional/test_framework/blocktools.py b/test/functional/test_framework/blocktools.py
index cfd923bab3..5c2fa28a31 100644
--- a/test/functional/test_framework/blocktools.py
+++ b/test/functional/test_framework/blocktools.py
@@ -28,6 +28,7 @@ from .messages import (
ser_uint256,
tx_from_hex,
uint256_from_str,
+ WITNESS_SCALE_FACTOR,
)
from .script import (
CScript,
@@ -45,9 +46,9 @@ from .script_util import (
)
from .util import assert_equal
-WITNESS_SCALE_FACTOR = 4
MAX_BLOCK_SIGOPS = 20000
MAX_BLOCK_SIGOPS_WEIGHT = MAX_BLOCK_SIGOPS * WITNESS_SCALE_FACTOR
+MAX_STANDARD_TX_WEIGHT = 400000
# Genesis block time (regtest)
TIME_GENESIS_BLOCK = 1296688602
@@ -153,16 +154,18 @@ def create_coinbase(height, pubkey=None, *, script_pubkey=None, extra_output_scr
coinbase.calc_sha256()
return coinbase
-def create_tx_with_script(prevtx, n, script_sig=b"", *, amount, script_pub_key=CScript()):
+def create_tx_with_script(prevtx, n, script_sig=b"", *, amount, output_script=None):
"""Return one-input, one-output transaction object
spending the prevtx's n-th output with the given amount.
Can optionally pass scriptPubKey and scriptSig, default is anyone-can-spend output.
"""
+ if output_script is None:
+ output_script = CScript()
tx = CTransaction()
assert n < len(prevtx.vout)
tx.vin.append(CTxIn(COutPoint(prevtx.sha256, n), script_sig, SEQUENCE_FINAL))
- tx.vout.append(CTxOut(amount, script_pub_key))
+ tx.vout.append(CTxOut(amount, output_script))
tx.calc_sha256()
return tx
diff --git a/test/functional/test_framework/crypto/secp256k1.py b/test/functional/test_framework/crypto/secp256k1.py
index 2e9e419da5..50a46dce37 100644
--- a/test/functional/test_framework/crypto/secp256k1.py
+++ b/test/functional/test_framework/crypto/secp256k1.py
@@ -15,6 +15,8 @@ Exports:
* G: the secp256k1 generator point
"""
+import unittest
+from hashlib import sha256
class FE:
"""Objects of this class represent elements of the field GF(2**256 - 2**32 - 977).
@@ -344,3 +346,9 @@ class FastGEMul:
# Precomputed table with multiples of G for fast multiplication
FAST_G = FastGEMul(G)
+
+class TestFrameworkSecp256k1(unittest.TestCase):
+ def test_H(self):
+ H = sha256(G.to_bytes_uncompressed()).digest()
+ assert GE.lift_x(FE.from_bytes(H)) is not None
+ self.assertEqual(H.hex(), "50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0")
diff --git a/test/functional/test_framework/key.py b/test/functional/test_framework/key.py
index 06252f8996..939c7cbef6 100644
--- a/test/functional/test_framework/key.py
+++ b/test/functional/test_framework/key.py
@@ -14,6 +14,7 @@ import random
import unittest
from test_framework.crypto import secp256k1
+from test_framework.util import random_bitflip
# Point with no known discrete log.
H_POINT = "50929b74c1a04954b78b4b6035e97a5e078a5a0f28ec96d547bfee9ace803ac0"
@@ -292,11 +293,6 @@ def sign_schnorr(key, msg, aux=None, flip_p=False, flip_r=False):
class TestFrameworkKey(unittest.TestCase):
def test_ecdsa_and_schnorr(self):
"""Test the Python ECDSA and Schnorr implementations."""
- def random_bitflip(sig):
- sig = list(sig)
- sig[random.randrange(len(sig))] ^= (1 << (random.randrange(8)))
- return bytes(sig)
-
byte_arrays = [generate_privkey() for _ in range(3)] + [v.to_bytes(32, 'big') for v in [0, ORDER - 1, ORDER, 2**256 - 1]]
keys = {}
for privkey_bytes in byte_arrays: # build array of key/pubkey pairs
diff --git a/test/functional/test_framework/mempool_util.py b/test/functional/test_framework/mempool_util.py
new file mode 100644
index 0000000000..148cc935ed
--- /dev/null
+++ b/test/functional/test_framework/mempool_util.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+# Copyright (c) 2024 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Helpful routines for mempool testing."""
+from decimal import Decimal
+
+from .blocktools import (
+ COINBASE_MATURITY,
+)
+from .util import (
+ assert_equal,
+ assert_greater_than,
+ create_lots_of_big_transactions,
+ gen_return_txouts,
+)
+from .wallet import (
+ MiniWallet,
+)
+
+
+def fill_mempool(test_framework, node):
+ """Fill mempool until eviction.
+
+ Allows for simpler testing of scenarios with floating mempoolminfee > minrelay
+ Requires -datacarriersize=100000 and
+ -maxmempool=5.
+ It will not ensure mempools become synced as it
+ is based on a single node and assumes -minrelaytxfee
+ is 1 sat/vbyte.
+ To avoid unintentional tx dependencies, the mempool filling txs are created with a
+ tagged ephemeral miniwallet instance.
+ """
+ test_framework.log.info("Fill the mempool until eviction is triggered and the mempoolminfee rises")
+ txouts = gen_return_txouts()
+ relayfee = node.getnetworkinfo()['relayfee']
+
+ assert_equal(relayfee, Decimal('0.00001000'))
+
+ tx_batch_size = 1
+ num_of_batches = 75
+ # Generate UTXOs to flood the mempool
+ # 1 to create a tx initially that will be evicted from the mempool later
+ # 75 transactions each with a fee rate higher than the previous one
+ ephemeral_miniwallet = MiniWallet(node, tag_name="fill_mempool_ephemeral_wallet")
+ test_framework.generate(ephemeral_miniwallet, 1 + num_of_batches * tx_batch_size)
+
+ # Mine enough blocks so that the UTXOs are allowed to be spent
+ test_framework.generate(node, COINBASE_MATURITY - 1)
+
+ # Get all UTXOs up front to ensure none of the transactions spend from each other, as that may
+ # change their effective feerate and thus the order in which they are selected for eviction.
+ confirmed_utxos = [ephemeral_miniwallet.get_utxo(confirmed_only=True) for _ in range(num_of_batches * tx_batch_size + 1)]
+ assert_equal(len(confirmed_utxos), num_of_batches * tx_batch_size + 1)
+
+ test_framework.log.debug("Create a mempool tx that will be evicted")
+ tx_to_be_evicted_id = ephemeral_miniwallet.send_self_transfer(
+ from_node=node, utxo_to_spend=confirmed_utxos.pop(0), fee_rate=relayfee)["txid"]
+
+ # Increase the tx fee rate to give the subsequent transactions a higher priority in the mempool
+ # The tx has an approx. vsize of 65k, i.e. multiplying the previous fee rate (in sats/kvB)
+ # by 130 should result in a fee that corresponds to 2x of that fee rate
+ base_fee = relayfee * 130
+
+ test_framework.log.debug("Fill up the mempool with txs with higher fee rate")
+ with node.assert_debug_log(["rolling minimum fee bumped"]):
+ for batch_of_txid in range(num_of_batches):
+ fee = (batch_of_txid + 1) * base_fee
+ utxos = confirmed_utxos[:tx_batch_size]
+ create_lots_of_big_transactions(ephemeral_miniwallet, node, fee, tx_batch_size, txouts, utxos)
+ del confirmed_utxos[:tx_batch_size]
+
+ test_framework.log.debug("The tx should be evicted by now")
+ # The number of transactions created should be greater than the ones present in the mempool
+ assert_greater_than(tx_batch_size * num_of_batches, len(node.getrawmempool()))
+ # Initial tx created should not be present in the mempool anymore as it had a lower fee rate
+ assert tx_to_be_evicted_id not in node.getrawmempool()
+
+ test_framework.log.debug("Check that mempoolminfee is larger than minrelaytxfee")
+ assert_equal(node.getmempoolinfo()['minrelaytxfee'], Decimal('0.00001000'))
+ assert_greater_than(node.getmempoolinfo()['mempoolminfee'], Decimal('0.00001000'))
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
index 1780678de1..1f566a1348 100755
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -46,6 +46,7 @@ MAX_PROTOCOL_MESSAGE_LENGTH = 4000000 # Maximum length of incoming protocol mes
MAX_HEADERS_RESULTS = 2000 # Number of headers sent in one getheaders result
MAX_INV_SIZE = 50000 # Maximum number of entries in an 'inv' protocol message
+NODE_NONE = 0
NODE_NETWORK = (1 << 0)
NODE_BLOOM = (1 << 2)
NODE_WITNESS = (1 << 3)
@@ -559,12 +560,12 @@ class CTxWitness:
class CTransaction:
- __slots__ = ("hash", "nLockTime", "nVersion", "sha256", "vin", "vout",
+ __slots__ = ("hash", "nLockTime", "version", "sha256", "vin", "vout",
"wit")
def __init__(self, tx=None):
if tx is None:
- self.nVersion = 2
+ self.version = 2
self.vin = []
self.vout = []
self.wit = CTxWitness()
@@ -572,7 +573,7 @@ class CTransaction:
self.sha256 = None
self.hash = None
else:
- self.nVersion = tx.nVersion
+ self.version = tx.version
self.vin = copy.deepcopy(tx.vin)
self.vout = copy.deepcopy(tx.vout)
self.nLockTime = tx.nLockTime
@@ -581,7 +582,7 @@ class CTransaction:
self.wit = copy.deepcopy(tx.wit)
def deserialize(self, f):
- self.nVersion = int.from_bytes(f.read(4), "little", signed=True)
+ self.version = int.from_bytes(f.read(4), "little")
self.vin = deser_vector(f, CTxIn)
flags = 0
if len(self.vin) == 0:
@@ -604,7 +605,7 @@ class CTransaction:
def serialize_without_witness(self):
r = b""
- r += self.nVersion.to_bytes(4, "little", signed=True)
+ r += self.version.to_bytes(4, "little")
r += ser_vector(self.vin)
r += ser_vector(self.vout)
r += self.nLockTime.to_bytes(4, "little")
@@ -616,7 +617,7 @@ class CTransaction:
if not self.wit.is_null():
flags |= 1
r = b""
- r += self.nVersion.to_bytes(4, "little", signed=True)
+ r += self.version.to_bytes(4, "little")
if flags:
dummy = []
r += ser_vector(dummy)
@@ -676,8 +677,8 @@ class CTransaction:
return math.ceil(self.get_weight() / WITNESS_SCALE_FACTOR)
def __repr__(self):
- return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
- % (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)
+ return "CTransaction(version=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
+ % (self.version, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)
class CBlockHeader:
@@ -1293,8 +1294,11 @@ class msg_tx:
__slots__ = ("tx",)
msgtype = b"tx"
- def __init__(self, tx=CTransaction()):
- self.tx = tx
+ def __init__(self, tx=None):
+ if tx is None:
+ self.tx = CTransaction()
+ else:
+ self.tx = tx
def deserialize(self, f):
self.tx.deserialize(f)
diff --git a/test/functional/test_framework/netutil.py b/test/functional/test_framework/netutil.py
index 30a4a58d6f..08d41fe97f 100644
--- a/test/functional/test_framework/netutil.py
+++ b/test/functional/test_framework/netutil.py
@@ -158,3 +158,12 @@ def test_ipv6_local():
except socket.error:
have_ipv6 = False
return have_ipv6
+
+def test_unix_socket():
+ '''Return True if UNIX sockets are available on this platform.'''
+ try:
+ socket.AF_UNIX
+ except AttributeError:
+ return False
+ else:
+ return True
diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py
index dc04696114..4f1265eb54 100755
--- a/test/functional/test_framework/p2p.py
+++ b/test/functional/test_framework/p2p.py
@@ -223,6 +223,7 @@ class P2PConnection(asyncio.Protocol):
# send the initial handshake immediately
if self.supports_v2_p2p and self.v2_state.initiating and not self.v2_state.tried_v2_handshake:
send_handshake_bytes = self.v2_state.initiate_v2_handshake()
+ logger.debug(f"sending {len(self.v2_state.sent_garbage)} bytes of garbage data")
self.send_raw_message(send_handshake_bytes)
# for v1 outbound connections, send version message immediately after opening
# (for v2 outbound connections, send it after the initial v2 handshake)
@@ -262,6 +263,7 @@ class P2PConnection(asyncio.Protocol):
self.v2_state = None
return
elif send_handshake_bytes:
+ logger.debug(f"sending {len(self.v2_state.sent_garbage)} bytes of garbage data")
self.send_raw_message(send_handshake_bytes)
elif send_handshake_bytes == b"":
return # only after send_handshake_bytes are sent can `complete_handshake()` be done
@@ -411,7 +413,7 @@ class P2PConnection(asyncio.Protocol):
tmsg = self.magic_bytes
tmsg += msgtype
tmsg += b"\x00" * (12 - len(msgtype))
- tmsg += struct.pack("<I", len(data))
+ tmsg += len(data).to_bytes(4, "little")
th = sha256(data)
h = sha256(th)
tmsg += h[:4]
@@ -585,22 +587,22 @@ class P2PInterface(P2PConnection):
wait_until_helper_internal(test_function, timeout=timeout, lock=p2p_lock, timeout_factor=self.timeout_factor)
- def wait_for_connect(self, timeout=60):
+ def wait_for_connect(self, *, timeout=60):
test_function = lambda: self.is_connected
self.wait_until(test_function, timeout=timeout, check_connected=False)
- def wait_for_disconnect(self, timeout=60):
+ def wait_for_disconnect(self, *, timeout=60):
test_function = lambda: not self.is_connected
self.wait_until(test_function, timeout=timeout, check_connected=False)
- def wait_for_reconnect(self, timeout=60):
+ def wait_for_reconnect(self, *, timeout=60):
def test_function():
return self.is_connected and self.last_message.get('version') and not self.supports_v2_p2p
self.wait_until(test_function, timeout=timeout, check_connected=False)
# Message receiving helper methods
- def wait_for_tx(self, txid, timeout=60):
+ def wait_for_tx(self, txid, *, timeout=60):
def test_function():
if not self.last_message.get('tx'):
return False
@@ -608,13 +610,13 @@ class P2PInterface(P2PConnection):
self.wait_until(test_function, timeout=timeout)
- def wait_for_block(self, blockhash, timeout=60):
+ def wait_for_block(self, blockhash, *, timeout=60):
def test_function():
return self.last_message.get("block") and self.last_message["block"].block.rehash() == blockhash
self.wait_until(test_function, timeout=timeout)
- def wait_for_header(self, blockhash, timeout=60):
+ def wait_for_header(self, blockhash, *, timeout=60):
def test_function():
last_headers = self.last_message.get('headers')
if not last_headers:
@@ -623,7 +625,7 @@ class P2PInterface(P2PConnection):
self.wait_until(test_function, timeout=timeout)
- def wait_for_merkleblock(self, blockhash, timeout=60):
+ def wait_for_merkleblock(self, blockhash, *, timeout=60):
def test_function():
last_filtered_block = self.last_message.get('merkleblock')
if not last_filtered_block:
@@ -632,7 +634,7 @@ class P2PInterface(P2PConnection):
self.wait_until(test_function, timeout=timeout)
- def wait_for_getdata(self, hash_list, timeout=60):
+ def wait_for_getdata(self, hash_list, *, timeout=60):
"""Waits for a getdata message.
The object hashes in the inventory vector must match the provided hash_list."""
@@ -644,19 +646,21 @@ class P2PInterface(P2PConnection):
self.wait_until(test_function, timeout=timeout)
- def wait_for_getheaders(self, timeout=60):
- """Waits for a getheaders message.
+ def wait_for_getheaders(self, block_hash=None, *, timeout=60):
+ """Waits for a getheaders message containing a specific block hash.
- Receiving any getheaders message will satisfy the predicate. the last_message["getheaders"]
- value must be explicitly cleared before calling this method, or this will return
- immediately with success. TODO: change this method to take a hash value and only
- return true if the correct block header has been requested."""
+ If no block hash is provided, checks whether any getheaders message has been received by the node."""
def test_function():
- return self.last_message.get("getheaders")
+ last_getheaders = self.last_message.pop("getheaders", None)
+ if block_hash is None:
+ return last_getheaders
+ if last_getheaders is None:
+ return False
+ return block_hash == last_getheaders.locator.vHave[0]
self.wait_until(test_function, timeout=timeout)
- def wait_for_inv(self, expected_inv, timeout=60):
+ def wait_for_inv(self, expected_inv, *, timeout=60):
"""Waits for an INV message and checks that the first inv object in the message was as expected."""
if len(expected_inv) > 1:
raise NotImplementedError("wait_for_inv() will only verify the first inv object")
@@ -668,7 +672,7 @@ class P2PInterface(P2PConnection):
self.wait_until(test_function, timeout=timeout)
- def wait_for_verack(self, timeout=60):
+ def wait_for_verack(self, *, timeout=60):
def test_function():
return "verack" in self.last_message
@@ -681,11 +685,11 @@ class P2PInterface(P2PConnection):
self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None # Never used again
- def send_and_ping(self, message, timeout=60):
+ def send_and_ping(self, message, *, timeout=60):
self.send_message(message)
self.sync_with_ping(timeout=timeout)
- def sync_with_ping(self, timeout=60):
+ def sync_with_ping(self, *, timeout=60):
"""Ensure ProcessMessages and SendMessages is called on this connection"""
# Sending two pings back-to-back, requires that the node calls
# `ProcessMessage` twice, and thus ensures `SendMessages` must have
@@ -726,7 +730,7 @@ class NetworkThread(threading.Thread):
"""Start the network thread."""
self.network_event_loop.run_forever()
- def close(self, timeout=10):
+ def close(self, *, timeout=10):
"""Close the connections and network event loop."""
self.network_event_loop.call_soon_threadsafe(self.network_event_loop.stop)
wait_until_helper_internal(lambda: not self.network_event_loop.is_running(), timeout=timeout)
@@ -933,7 +937,7 @@ class P2PTxInvStore(P2PInterface):
with p2p_lock:
return list(self.tx_invs_received.keys())
- def wait_for_broadcast(self, txns, timeout=60):
+ def wait_for_broadcast(self, txns, *, timeout=60):
"""Waits for the txns (list of txids) to complete initial broadcast.
The mempool should mark unbroadcast=False for these transactions.
"""
diff --git a/test/functional/test_framework/script.py b/test/functional/test_framework/script.py
index 3275517888..d510cf9b1c 100644
--- a/test/functional/test_framework/script.py
+++ b/test/functional/test_framework/script.py
@@ -8,7 +8,6 @@ This file is modified from python-bitcoinlib.
"""
from collections import namedtuple
-import struct
import unittest
from .key import TaggedHash, tweak_add_pubkey, compute_xonly_pubkey
@@ -58,9 +57,9 @@ class CScriptOp(int):
elif len(d) <= 0xff:
return b'\x4c' + bytes([len(d)]) + d # OP_PUSHDATA1
elif len(d) <= 0xffff:
- return b'\x4d' + struct.pack(b'<H', len(d)) + d # OP_PUSHDATA2
+ return b'\x4d' + len(d).to_bytes(2, "little") + d # OP_PUSHDATA2
elif len(d) <= 0xffffffff:
- return b'\x4e' + struct.pack(b'<I', len(d)) + d # OP_PUSHDATA4
+ return b'\x4e' + len(d).to_bytes(4, "little") + d # OP_PUSHDATA4
else:
raise ValueError("Data too long to encode in a PUSHDATA op")
@@ -483,7 +482,7 @@ class CScript(bytes):
i = 0
while i < len(self):
sop_idx = i
- opcode = self[i]
+ opcode = CScriptOp(self[i])
i += 1
if opcode > OP_PUSHDATA4:
@@ -590,7 +589,7 @@ class CScript(bytes):
n += 1
elif opcode in (OP_CHECKMULTISIG, OP_CHECKMULTISIGVERIFY):
if fAccurate and (OP_1 <= lastOpcode <= OP_16):
- n += opcode.decode_op_n()
+ n += lastOpcode.decode_op_n()
else:
n += 20
lastOpcode = opcode
@@ -670,7 +669,7 @@ def LegacySignatureMsg(script, txTo, inIdx, hashtype):
txtmp.vin.append(tmp)
s = txtmp.serialize_without_witness()
- s += struct.pack(b"<I", hashtype)
+ s += hashtype.to_bytes(4, "little")
return (s, None)
@@ -726,7 +725,7 @@ def SegwitV0SignatureMsg(script, txTo, inIdx, hashtype, amount):
if (not (hashtype & SIGHASH_ANYONECANPAY) and (hashtype & 0x1f) != SIGHASH_SINGLE and (hashtype & 0x1f) != SIGHASH_NONE):
serialize_sequence = bytes()
for i in txTo.vin:
- serialize_sequence += struct.pack("<I", i.nSequence)
+ serialize_sequence += i.nSequence.to_bytes(4, "little")
hashSequence = uint256_from_str(hash256(serialize_sequence))
if ((hashtype & 0x1f) != SIGHASH_SINGLE and (hashtype & 0x1f) != SIGHASH_NONE):
@@ -739,16 +738,16 @@ def SegwitV0SignatureMsg(script, txTo, inIdx, hashtype, amount):
hashOutputs = uint256_from_str(hash256(serialize_outputs))
ss = bytes()
- ss += struct.pack("<i", txTo.nVersion)
+ ss += txTo.version.to_bytes(4, "little")
ss += ser_uint256(hashPrevouts)
ss += ser_uint256(hashSequence)
ss += txTo.vin[inIdx].prevout.serialize()
ss += ser_string(script)
- ss += struct.pack("<q", amount)
- ss += struct.pack("<I", txTo.vin[inIdx].nSequence)
+ ss += amount.to_bytes(8, "little", signed=True)
+ ss += txTo.vin[inIdx].nSequence.to_bytes(4, "little")
ss += ser_uint256(hashOutputs)
ss += txTo.nLockTime.to_bytes(4, "little")
- ss += struct.pack("<I", hashtype)
+ ss += hashtype.to_bytes(4, "little")
return ss
def SegwitV0SignatureHash(*args, **kwargs):
@@ -782,30 +781,44 @@ class TestFrameworkScript(unittest.TestCase):
for value in values:
self.assertEqual(CScriptNum.decode(CScriptNum.encode(CScriptNum(value))), value)
+ def test_legacy_sigopcount(self):
+ # test repeated single sig ops
+ for n_ops in range(1, 100, 10):
+ for singlesig_op in (OP_CHECKSIG, OP_CHECKSIGVERIFY):
+ singlesigs_script = CScript([singlesig_op]*n_ops)
+ self.assertEqual(singlesigs_script.GetSigOpCount(fAccurate=False), n_ops)
+ self.assertEqual(singlesigs_script.GetSigOpCount(fAccurate=True), n_ops)
+ # test multisig op (including accurate counting, i.e. BIP16)
+ for n in range(1, 16+1):
+ for multisig_op in (OP_CHECKMULTISIG, OP_CHECKMULTISIGVERIFY):
+ multisig_script = CScript([CScriptOp.encode_op_n(n), multisig_op])
+ self.assertEqual(multisig_script.GetSigOpCount(fAccurate=False), 20)
+ self.assertEqual(multisig_script.GetSigOpCount(fAccurate=True), n)
+
def BIP341_sha_prevouts(txTo):
return sha256(b"".join(i.prevout.serialize() for i in txTo.vin))
def BIP341_sha_amounts(spent_utxos):
- return sha256(b"".join(struct.pack("<q", u.nValue) for u in spent_utxos))
+ return sha256(b"".join(u.nValue.to_bytes(8, "little", signed=True) for u in spent_utxos))
def BIP341_sha_scriptpubkeys(spent_utxos):
return sha256(b"".join(ser_string(u.scriptPubKey) for u in spent_utxos))
def BIP341_sha_sequences(txTo):
- return sha256(b"".join(struct.pack("<I", i.nSequence) for i in txTo.vin))
+ return sha256(b"".join(i.nSequence.to_bytes(4, "little") for i in txTo.vin))
def BIP341_sha_outputs(txTo):
return sha256(b"".join(o.serialize() for o in txTo.vout))
-def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index = 0, scriptpath = False, script = CScript(), codeseparator_pos = -1, annex = None, leaf_ver = LEAF_VERSION_TAPSCRIPT):
+def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index=0, *, scriptpath=False, leaf_script=None, codeseparator_pos=-1, annex=None, leaf_ver=LEAF_VERSION_TAPSCRIPT):
assert (len(txTo.vin) == len(spent_utxos))
assert (input_index < len(txTo.vin))
out_type = SIGHASH_ALL if hash_type == 0 else hash_type & 3
in_type = hash_type & SIGHASH_ANYONECANPAY
spk = spent_utxos[input_index].scriptPubKey
ss = bytes([0, hash_type]) # epoch, hash_type
- ss += struct.pack("<i", txTo.nVersion)
- ss += struct.pack("<I", txTo.nLockTime)
+ ss += txTo.version.to_bytes(4, "little")
+ ss += txTo.nLockTime.to_bytes(4, "little")
if in_type != SIGHASH_ANYONECANPAY:
ss += BIP341_sha_prevouts(txTo)
ss += BIP341_sha_amounts(spent_utxos)
@@ -816,16 +829,16 @@ def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index = 0, scriptpat
spend_type = 0
if annex is not None:
spend_type |= 1
- if (scriptpath):
+ if scriptpath:
spend_type |= 2
ss += bytes([spend_type])
if in_type == SIGHASH_ANYONECANPAY:
ss += txTo.vin[input_index].prevout.serialize()
- ss += struct.pack("<q", spent_utxos[input_index].nValue)
+ ss += spent_utxos[input_index].nValue.to_bytes(8, "little", signed=True)
ss += ser_string(spk)
- ss += struct.pack("<I", txTo.vin[input_index].nSequence)
+ ss += txTo.vin[input_index].nSequence.to_bytes(4, "little")
else:
- ss += struct.pack("<I", input_index)
+ ss += input_index.to_bytes(4, "little")
if (spend_type & 1):
ss += sha256(ser_string(annex))
if out_type == SIGHASH_SINGLE:
@@ -833,11 +846,11 @@ def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index = 0, scriptpat
ss += sha256(txTo.vout[input_index].serialize())
else:
ss += bytes(0 for _ in range(32))
- if (scriptpath):
- ss += TaggedHash("TapLeaf", bytes([leaf_ver]) + ser_string(script))
+ if scriptpath:
+ ss += TaggedHash("TapLeaf", bytes([leaf_ver]) + ser_string(leaf_script))
ss += bytes([0])
- ss += struct.pack("<i", codeseparator_pos)
- assert len(ss) == 175 - (in_type == SIGHASH_ANYONECANPAY) * 49 - (out_type != SIGHASH_ALL and out_type != SIGHASH_SINGLE) * 32 + (annex is not None) * 32 + scriptpath * 37
+ ss += codeseparator_pos.to_bytes(4, "little", signed=True)
+ assert len(ss) == 175 - (in_type == SIGHASH_ANYONECANPAY) * 49 - (out_type != SIGHASH_ALL and out_type != SIGHASH_SINGLE) * 32 + (annex is not None) * 32 + scriptpath * 37
return ss
def TaprootSignatureHash(*args, **kwargs):
diff --git a/test/functional/test_framework/script_util.py b/test/functional/test_framework/script_util.py
index 62894cc0f4..938183ece4 100755
--- a/test/functional/test_framework/script_util.py
+++ b/test/functional/test_framework/script_util.py
@@ -3,10 +3,14 @@
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Useful Script constants and utils."""
+import unittest
+
from test_framework.script import (
CScript,
- CScriptOp,
OP_0,
+ OP_1,
+ OP_15,
+ OP_16,
OP_CHECKMULTISIG,
OP_CHECKSIG,
OP_DUP,
@@ -39,6 +43,8 @@ assert MIN_PADDING == 5
DUMMY_MIN_OP_RETURN_SCRIPT = CScript([OP_RETURN] + ([OP_0] * (MIN_PADDING - 1)))
assert len(DUMMY_MIN_OP_RETURN_SCRIPT) == MIN_PADDING
+PAY_TO_ANCHOR = CScript([OP_1, bytes.fromhex("4e73")])
+
def key_to_p2pk_script(key):
key = check_key(key)
return CScript([key, OP_CHECKSIG])
@@ -49,10 +55,8 @@ def keys_to_multisig_script(keys, *, k=None):
if k is None: # n-of-n multisig by default
k = n
assert k <= n
- op_k = CScriptOp.encode_op_n(k)
- op_n = CScriptOp.encode_op_n(n)
checked_keys = [check_key(key) for key in keys]
- return CScript([op_k] + checked_keys + [op_n, OP_CHECKMULTISIG])
+ return CScript([k] + checked_keys + [n, OP_CHECKMULTISIG])
def keyhash_to_p2pkh_script(hash):
@@ -125,3 +129,19 @@ def check_script(script):
if isinstance(script, bytes) or isinstance(script, CScript):
return script
assert False
+
+
+class TestFrameworkScriptUtil(unittest.TestCase):
+ def test_multisig(self):
+ fake_pubkey = bytes([0]*33)
+ # check correct encoding of P2MS script with n,k <= 16
+ normal_ms_script = keys_to_multisig_script([fake_pubkey]*16, k=15)
+ self.assertEqual(len(normal_ms_script), 1 + 16*34 + 1 + 1)
+ self.assertTrue(normal_ms_script.startswith(bytes([OP_15])))
+ self.assertTrue(normal_ms_script.endswith(bytes([OP_16, OP_CHECKMULTISIG])))
+
+ # check correct encoding of P2MS script with n,k > 16
+ max_ms_script = keys_to_multisig_script([fake_pubkey]*20, k=19)
+ self.assertEqual(len(max_ms_script), 2 + 20*34 + 2 + 1)
+ self.assertTrue(max_ms_script.startswith(bytes([1, 19]))) # using OP_PUSH1
+ self.assertTrue(max_ms_script.endswith(bytes([1, 20, OP_CHECKMULTISIG])))
diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py
index d8ae20981d..49212eb019 100755
--- a/test/functional/test_framework/test_framework.py
+++ b/test/functional/test_framework/test_framework.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
-# Copyright (c) 2014-2022 The Bitcoin Core developers
+# Copyright (c) 2014-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Base class for RPC testing."""
@@ -92,17 +92,18 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
This class also contains various public and private helper methods."""
- def __init__(self) -> None:
+ def __init__(self, test_file) -> None:
"""Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method"""
self.chain: str = 'regtest'
self.setup_clean_chain: bool = False
+ self.noban_tx_relay: bool = False
self.nodes: list[TestNode] = []
self.extra_args = None
self.network_thread = None
self.rpc_timeout = 60 # Wait for up to 60 seconds for the RPC server to respond
self.supports_cli = True
self.bind_to_localhost_only = True
- self.parse_args()
+ self.parse_args(test_file)
self.default_wallet_name = "default_wallet" if self.options.descriptors else ""
self.wallet_data_filename = "wallet.dat"
# Optional list of wallet names that can be set in set_test_params to
@@ -154,16 +155,16 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
exit_code = self.shutdown()
sys.exit(exit_code)
- def parse_args(self):
+ def parse_args(self, test_file):
previous_releases_path = os.getenv("PREVIOUS_RELEASES_DIR") or os.getcwd() + "/releases"
parser = argparse.ArgumentParser(usage="%(prog)s [options]")
parser.add_argument("--nocleanup", dest="nocleanup", default=False, action="store_true",
help="Leave bitcoinds and test.* datadir on exit or error")
parser.add_argument("--noshutdown", dest="noshutdown", default=False, action="store_true",
help="Don't stop bitcoinds after the test execution")
- parser.add_argument("--cachedir", dest="cachedir", default=os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../../cache"),
+ parser.add_argument("--cachedir", dest="cachedir", default=os.path.abspath(os.path.dirname(test_file) + "/../cache"),
help="Directory for caching pregenerated datadirs (default: %(default)s)")
- parser.add_argument("--tmpdir", dest="tmpdir", help="Root directory for datadirs")
+ parser.add_argument("--tmpdir", dest="tmpdir", help="Root directory for datadirs (must not exist)")
parser.add_argument("-l", "--loglevel", dest="loglevel", default="INFO",
help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console. Note that logs at all levels are always written to the test_framework.log file in the temporary test directory.")
parser.add_argument("--tracerpc", dest="trace_rpc", default=False, action="store_true",
@@ -176,7 +177,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
parser.add_argument("--coveragedir", dest="coveragedir",
help="Write tested RPC commands into this directory")
parser.add_argument("--configfile", dest="configfile",
- default=os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../../config.ini"),
+ default=os.path.abspath(os.path.dirname(test_file) + "/../config.ini"),
help="Location of the test framework config file (default: %(default)s)")
parser.add_argument("--pdbonfailure", dest="pdbonfailure", default=False, action="store_true",
help="Attach a python debugger if test fails")
@@ -191,6 +192,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
parser.add_argument("--timeout-factor", dest="timeout_factor", type=float, help="adjust test timeouts by a factor. Setting it to 0 disables all timeouts")
parser.add_argument("--v2transport", dest="v2transport", default=False, action="store_true",
help="use BIP324 v2 connections between all nodes by default")
+ parser.add_argument("--v1transport", dest="v1transport", default=False, action="store_true",
+ help="Explicitly use v1 transport (can be used to overwrite global --v2transport option)")
self.add_options(parser)
# Running TestShell in a Jupyter notebook causes an additional -f argument
@@ -206,6 +209,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
config = configparser.ConfigParser()
config.read_file(open(self.options.configfile))
self.config = config
+ if self.options.v1transport:
+ self.options.v2transport=False
if "descriptors" not in self.options:
# Wallet is not required by the test at all and the value of self.options.descriptors won't matter.
@@ -439,6 +444,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
n.createwallet(wallet_name=wallet_name, descriptors=self.options.descriptors, load_on_startup=True)
n.importprivkey(privkey=n.get_deterministic_priv_key().key, label='coinbase', rescan=True)
+ # Only enables wallet support when the module is available
+ def enable_wallet_if_possible(self):
+ self._requires_wallet = self.is_wallet_compiled()
+
def run_test(self):
"""Tests must override this method to define test logic"""
raise NotImplementedError
@@ -494,6 +503,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
extra_confs = [[]] * num_nodes
if extra_args is None:
extra_args = [[]] * num_nodes
+ # Whitelist peers to speed up tx relay / mempool sync. Don't use it if testing tx relay or timing.
+ if self.noban_tx_relay:
+ for i in range(len(extra_args)):
+ extra_args[i] = extra_args[i] + ["-whitelist=noban,in,out@127.0.0.1"]
if versions is None:
versions = [None] * num_nodes
if binary is None:
@@ -577,10 +590,16 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
# Wait for nodes to stop
node.wait_until_stopped()
- def restart_node(self, i, extra_args=None):
+ def restart_node(self, i, extra_args=None, clear_addrman=False):
"""Stop and start a test node"""
self.stop_node(i)
- self.start_node(i, extra_args)
+ if clear_addrman:
+ peers_dat = self.nodes[i].chain_path / "peers.dat"
+ os.remove(peers_dat)
+ with self.nodes[i].assert_debug_log(expected_msgs=[f'Creating peers.dat because the file was not found ("{peers_dat}")']):
+ self.start_node(i, extra_args)
+ else:
+ self.start_node(i, extra_args)
def wait_for_node_exit(self, i, timeout):
self.nodes[i].process.wait(timeout)
@@ -595,8 +614,6 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
"""
from_connection = self.nodes[a]
to_connection = self.nodes[b]
- from_num_peers = 1 + len(from_connection.getpeerinfo())
- to_num_peers = 1 + len(to_connection.getpeerinfo())
ip_port = "127.0.0.1:" + str(p2p_port(b))
if peer_advertises_v2 is None:
@@ -612,19 +629,28 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
if not wait_for_connect:
return
- # poll until version handshake complete to avoid race conditions
- # with transaction relaying
- # See comments in net_processing:
- # * Must have a version message before anything else
- # * Must have a verack message before anything else
- self.wait_until(lambda: sum(peer['version'] != 0 for peer in from_connection.getpeerinfo()) == from_num_peers)
- self.wait_until(lambda: sum(peer['version'] != 0 for peer in to_connection.getpeerinfo()) == to_num_peers)
- self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) >= 21 for peer in from_connection.getpeerinfo()) == from_num_peers)
- self.wait_until(lambda: sum(peer['bytesrecv_per_msg'].pop('verack', 0) >= 21 for peer in to_connection.getpeerinfo()) == to_num_peers)
- # The message bytes are counted before processing the message, so make
- # sure it was fully processed by waiting for a ping.
- self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 29 for peer in from_connection.getpeerinfo()) == from_num_peers)
- self.wait_until(lambda: sum(peer["bytesrecv_per_msg"].pop("pong", 0) >= 29 for peer in to_connection.getpeerinfo()) == to_num_peers)
+ # Use subversion as peer id. Test nodes have their node number appended to the user agent string
+ from_connection_subver = from_connection.getnetworkinfo()['subversion']
+ to_connection_subver = to_connection.getnetworkinfo()['subversion']
+
+ def find_conn(node, peer_subversion, inbound):
+ return next(filter(lambda peer: peer['subver'] == peer_subversion and peer['inbound'] == inbound, node.getpeerinfo()), None)
+
+ self.wait_until(lambda: find_conn(from_connection, to_connection_subver, inbound=False) is not None)
+ self.wait_until(lambda: find_conn(to_connection, from_connection_subver, inbound=True) is not None)
+
+ def check_bytesrecv(peer, msg_type, min_bytes_recv):
+ assert peer is not None, "Error: peer disconnected"
+ return peer['bytesrecv_per_msg'].pop(msg_type, 0) >= min_bytes_recv
+
+ # Poll until version handshake (fSuccessfullyConnected) is complete to
+ # avoid race conditions, because some message types are blocked from
+ # being sent or received before fSuccessfullyConnected.
+ #
+ # As the flag fSuccessfullyConnected is not exposed, check it by
+ # waiting for a pong, which can only happen after the flag was set.
+ self.wait_until(lambda: check_bytesrecv(find_conn(from_connection, to_connection_subver, inbound=False), 'pong', 29))
+ self.wait_until(lambda: check_bytesrecv(find_conn(to_connection, from_connection_subver, inbound=True), 'pong', 29))
def disconnect_nodes(self, a, b):
def disconnect_nodes_helper(node_a, node_b):
diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py
index 3baa78fd79..60ca9269a5 100755
--- a/test/functional/test_framework/test_node.py
+++ b/test/functional/test_framework/test_node.py
@@ -39,9 +39,15 @@ from .util import (
rpc_url,
wait_until_helper_internal,
p2p_port,
+ tor_port,
)
BITCOIND_PROC_WAIT_TIMEOUT = 60
+# The size of the blocks xor key
+# from InitBlocksdirXorKey::xor_key.size()
+NUM_XOR_BYTES = 8
+# The null blocks key (all 0s)
+NULL_BLK_XOR_KEY = bytes([0] * NUM_XOR_BYTES)
class FailedToStartError(Exception):
@@ -88,8 +94,11 @@ class TestNode():
self.coverage_dir = coverage_dir
self.cwd = cwd
self.descriptors = descriptors
+ self.has_explicit_bind = False
if extra_conf is not None:
append_config(self.datadir_path, extra_conf)
+ # Remember if there is bind=... in the config file.
+ self.has_explicit_bind = any(e.startswith("bind=") for e in extra_conf)
# Most callers will just need to add extra args to the standard list below.
# For those callers that need more flexibility, they can just set the args property directly.
# Note that common args are set in the config file (see initialize_datadir)
@@ -106,7 +115,7 @@ class TestNode():
"-debugexclude=libevent",
"-debugexclude=leveldb",
"-debugexclude=rand",
- "-uacomment=testnode%d" % i,
+ "-uacomment=testnode%d" % i, # required for subversion uniqueness across peers
]
if self.descriptors is None:
self.args.append("-disablewallet")
@@ -136,9 +145,7 @@ class TestNode():
self.args.append("-v2transport=1")
else:
self.args.append("-v2transport=0")
- else:
- # v2transport requested but not supported for node
- assert not v2transport
+ # if v2transport is requested via global flag but not supported for node version, ignore it
self.cli = TestNodeCLI(bitcoin_cli, self.datadir_path)
self.use_cli = use_cli
@@ -212,6 +219,17 @@ class TestNode():
if extra_args is None:
extra_args = self.extra_args
+ # If listening and no -bind is given, then bitcoind would bind P2P ports on
+ # 0.0.0.0:P and 127.0.0.1:18445 (for incoming Tor connections), where P is
+ # a unique port chosen by the test framework and configured as port=P in
+ # bitcoin.conf. To avoid collisions on 127.0.0.1:18445, change it to
+ # 127.0.0.1:tor_port().
+ will_listen = all(e != "-nolisten" and e != "-listen=0" for e in extra_args)
+ has_explicit_bind = self.has_explicit_bind or any(e.startswith("-bind=") for e in extra_args)
+ if will_listen and not has_explicit_bind:
+ extra_args.append(f"-bind=0.0.0.0:{p2p_port(self.index)}")
+ extra_args.append(f"-bind=127.0.0.1:{tor_port(self.index)}=onion")
+
self.use_v2transport = "-v2transport=1" in extra_args or (self.default_to_v2 and "-v2transport=0" not in extra_args)
# Add a new stdout and stderr file each time bitcoind is started
@@ -243,7 +261,7 @@ class TestNode():
if self.start_perf:
self._start_perf()
- def wait_for_rpc_connection(self):
+ def wait_for_rpc_connection(self, *, wait_for_import=True):
"""Sets up an RPC connection to the bitcoind process. Returns False if unable to connect."""
# Poll at a rate of four times per second
poll_per_s = 4
@@ -265,7 +283,7 @@ class TestNode():
)
rpc.getblockcount()
# If the call to getblockcount() succeeds then the RPC connection is up
- if self.version_is_at_least(190000):
+ if self.version_is_at_least(190000) and wait_for_import:
# getmempoolinfo.loaded is available since commit
# bb8ae2c (version 0.19.0)
self.wait_until(lambda: rpc.getmempoolinfo()['loaded'])
@@ -421,8 +439,9 @@ class TestNode():
return True
def wait_until_stopped(self, *, timeout=BITCOIND_PROC_WAIT_TIMEOUT, expect_error=False, **kwargs):
- expected_ret_code = 1 if expect_error else 0 # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS
- self.wait_until(lambda: self.is_node_stopped(expected_ret_code=expected_ret_code, **kwargs), timeout=timeout)
+ if "expected_ret_code" not in kwargs:
+ kwargs["expected_ret_code"] = 1 if expect_error else 0 # Whether node shutdown return EXIT_FAILURE or EXIT_SUCCESS
+ self.wait_until(lambda: self.is_node_stopped(**kwargs), timeout=timeout)
def replace_in_config(self, replacements):
"""
@@ -452,6 +471,14 @@ class TestNode():
return self.chain_path / "blocks"
@property
+ def blocks_key_path(self) -> Path:
+ return self.blocks_path / "xor.dat"
+
+ def read_xor_key(self) -> bytes:
+ with open(self.blocks_key_path, "rb") as xor_f:
+ return xor_f.read(NUM_XOR_BYTES)
+
+ @property
def wallets_path(self) -> Path:
return self.chain_path / "wallets"
@@ -492,7 +519,7 @@ class TestNode():
self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log))
@contextlib.contextmanager
- def wait_for_debug_log(self, expected_msgs, timeout=60):
+ def busy_wait_for_debug_log(self, expected_msgs, timeout=60):
"""
Block until we see a particular debug log message fragment or until we exceed the timeout.
Return:
@@ -667,7 +694,7 @@ class TestNode():
assert_msg += "with expected error " + expected_msg
self._raise_assertion_error(assert_msg)
- def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, **kwargs):
+ def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, send_version=True, supports_v2_p2p=None, wait_for_v2_handshake=True, expect_success=True, **kwargs):
"""Add an inbound p2p connection to the node.
This method adds the p2p connection to the self.p2ps list and also
@@ -687,7 +714,6 @@ class TestNode():
if supports_v2_p2p is None:
supports_v2_p2p = self.use_v2transport
-
p2p_conn.p2p_connected_to_node = True
if self.use_v2transport:
kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
@@ -695,6 +721,8 @@ class TestNode():
p2p_conn.peer_connect(**kwargs, send_version=send_version, net=self.chain, timeout_factor=self.timeout_factor, supports_v2_p2p=supports_v2_p2p)()
self.p2ps.append(p2p_conn)
+ if not expect_success:
+ return p2p_conn
p2p_conn.wait_until(lambda: p2p_conn.is_connected, check_connected=False)
if supports_v2_p2p and wait_for_v2_handshake:
p2p_conn.wait_until(lambda: p2p_conn.v2_state.tried_v2_handshake)
@@ -726,7 +754,7 @@ class TestNode():
return p2p_conn
- def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs):
+ def add_outbound_p2p_connection(self, p2p_conn, *, wait_for_verack=True, wait_for_disconnect=False, p2p_idx, connection_type="outbound-full-relay", supports_v2_p2p=None, advertise_v2_p2p=None, **kwargs):
"""Add an outbound p2p connection from node. Must be an
"outbound-full-relay", "block-relay-only", "addr-fetch" or "feeler" connection.
@@ -773,7 +801,7 @@ class TestNode():
if reconnect:
p2p_conn.wait_for_reconnect()
- if connection_type == "feeler":
+ if connection_type == "feeler" or wait_for_disconnect:
# feeler connections are closed as soon as the node receives a `version` message
p2p_conn.wait_until(lambda: p2p_conn.message_count["version"] == 1, check_connected=False)
p2p_conn.wait_until(lambda: not p2p_conn.is_connected, check_connected=False)
diff --git a/test/functional/test_framework/test_shell.py b/test/functional/test_framework/test_shell.py
index 09ccec28a1..e232430507 100644
--- a/test/functional/test_framework/test_shell.py
+++ b/test/functional/test_framework/test_shell.py
@@ -2,9 +2,11 @@
# Copyright (c) 2019-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+import pathlib
from test_framework.test_framework import BitcoinTestFramework
+
class TestShell:
"""Wrapper Class for BitcoinTestFramework.
@@ -67,7 +69,13 @@ class TestShell:
# This implementation enforces singleton pattern, and will return the
# previously initialized instance if available
if not TestShell.instance:
- TestShell.instance = TestShell.__TestShell()
+ # BitcoinTestFramework instances are supposed to be constructed with the path
+ # of the calling test in order to find shared data like configuration and the
+ # cache. Since TestShell is meant for interactive use, there is no concrete
+ # test; passing a dummy name is fine though, as only the containing directory
+ # is relevant for successful initialization.
+ tests_directory = pathlib.Path(__file__).resolve().parent.parent
+ TestShell.instance = TestShell.__TestShell(tests_directory / "testshell_dummy.py")
TestShell.instance.running = False
return TestShell.instance
diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py
index 623e97ec89..ce68de7eaa 100644
--- a/test/functional/test_framework/util.py
+++ b/test/functional/test_framework/util.py
@@ -14,6 +14,7 @@ import logging
import os
import pathlib
import platform
+import random
import re
import time
@@ -54,7 +55,24 @@ def assert_fee_amount(fee, tx_size, feerate_BTC_kvB):
raise AssertionError("Fee of %s BTC too high! (Should be %s BTC)" % (str(fee), str(target_fee)))
+def summarise_dict_differences(thing1, thing2):
+ if not isinstance(thing1, dict) or not isinstance(thing2, dict):
+ return thing1, thing2
+ d1, d2 = {}, {}
+ for k in sorted(thing1.keys()):
+ if k not in thing2:
+ d1[k] = thing1[k]
+ elif thing1[k] != thing2[k]:
+ d1[k], d2[k] = summarise_dict_differences(thing1[k], thing2[k])
+ for k in sorted(thing2.keys()):
+ if k not in thing1:
+ d2[k] = thing2[k]
+ return d1, d2
+
def assert_equal(thing1, thing2, *args):
+ if thing1 != thing2 and not args and isinstance(thing1, dict) and isinstance(thing2, dict):
+ d1,d2 = summarise_dict_differences(thing1, thing2)
+ raise AssertionError("not(%s == %s)\n in particular not(%s == %s)" % (thing1, thing2, d1, d2))
if thing1 != thing2 or any(thing1 != arg for arg in args):
raise AssertionError("not(%s)" % " == ".join(str(arg) for arg in (thing1, thing2) + args))
@@ -232,6 +250,12 @@ def ceildiv(a, b):
return -(-a // b)
+def random_bitflip(data):
+ data = list(data)
+ data[random.randrange(len(data))] ^= (1 << (random.randrange(8)))
+ return bytes(data)
+
+
def get_fee(tx_size, feerate_btc_kvb):
"""Calculate the fee in BTC given a feerate is BTC/kvB. Reflects CFeeRate::GetFee"""
feerate_sat_kvb = int(feerate_btc_kvb * Decimal(1e8)) # Fee in sat/kvb as an int to avoid float precision errors
@@ -290,14 +314,21 @@ def sha256sum_file(filename):
return h.digest()
+def util_xor(data, key, *, offset):
+ data = bytearray(data)
+ for i in range(len(data)):
+ data[i] ^= key[(i + offset) % len(key)]
+ return bytes(data)
+
+
# RPC/P2P connection constants and functions
############################################
# The maximum number of nodes a single test can spawn
MAX_NODES = 12
-# Don't assign rpc or p2p ports lower than this
+# Don't assign p2p, rpc or tor ports lower than this
PORT_MIN = int(os.getenv('TEST_RUNNER_PORT_MIN', default=11000))
-# The number of ports to "reserve" for p2p and rpc, each
+# The number of ports to "reserve" for p2p, rpc and tor, each
PORT_RANGE = 5000
@@ -337,7 +368,11 @@ def p2p_port(n):
def rpc_port(n):
- return PORT_MIN + PORT_RANGE + n + (MAX_NODES * PortSeed.n) % (PORT_RANGE - 1 - MAX_NODES)
+ return p2p_port(n) + PORT_RANGE
+
+
+def tor_port(n):
+ return p2p_port(n) + PORT_RANGE * 2
def rpc_url(datadir, i, chain, rpchost):
diff --git a/test/functional/test_framework/v2_p2p.py b/test/functional/test_framework/v2_p2p.py
index 8f79623bd8..87600c36de 100644
--- a/test/functional/test_framework/v2_p2p.py
+++ b/test/functional/test_framework/v2_p2p.py
@@ -4,7 +4,6 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Class for v2 P2P protocol (see BIP 324)"""
-import logging
import random
from .crypto.bip324_cipher import FSChaCha20Poly1305
@@ -14,14 +13,12 @@ from .crypto.hkdf import hkdf_sha256
from .key import TaggedHash
from .messages import MAGIC_BYTES
-logger = logging.getLogger("TestFramework.v2_p2p")
CHACHA20POLY1305_EXPANSION = 16
HEADER_LEN = 1
IGNORE_BIT_POS = 7
LENGTH_FIELD_LEN = 3
MAX_GARBAGE_LEN = 4095
-TRANSPORT_VERSION = b''
SHORTID = {
1: b"addr",
@@ -95,6 +92,7 @@ class EncryptedP2PState:
# has been decrypted. set to -1 if decryption hasn't been done yet.
self.contents_len = -1
self.found_garbage_terminator = False
+ self.transport_version = b''
@staticmethod
def v2_ecdh(priv, ellswift_theirs, ellswift_ours, initiating):
@@ -111,12 +109,12 @@ class EncryptedP2PState:
# Responding, place their public key encoding first.
return TaggedHash("bip324_ellswift_xonly_ecdh", ellswift_theirs + ellswift_ours + ecdh_point_x32)
- def generate_keypair_and_garbage(self):
+ def generate_keypair_and_garbage(self, garbage_len=None):
"""Generates ellswift keypair and 4095 bytes garbage at max"""
self.privkey_ours, self.ellswift_ours = ellswift_create()
- garbage_len = random.randrange(MAX_GARBAGE_LEN + 1)
+ if garbage_len is None:
+ garbage_len = random.randrange(MAX_GARBAGE_LEN + 1)
self.sent_garbage = random.randbytes(garbage_len)
- logger.debug(f"sending {garbage_len} bytes of garbage data")
return self.ellswift_ours + self.sent_garbage
def initiate_v2_handshake(self):
@@ -172,7 +170,7 @@ class EncryptedP2PState:
msg_to_send += self.v2_enc_packet(decoy_content_len * b'\x00', aad=aad, ignore=True)
aad = b''
# Send version packet.
- msg_to_send += self.v2_enc_packet(TRANSPORT_VERSION, aad=aad)
+ msg_to_send += self.v2_enc_packet(self.transport_version, aad=aad)
return 64 - len(self.received_prefix), msg_to_send
def authenticate_handshake(self, response):
diff --git a/test/functional/test_framework/wallet.py b/test/functional/test_framework/wallet.py
index 470ed08ed4..f3713f297e 100644
--- a/test/functional/test_framework/wallet.py
+++ b/test/functional/test_framework/wallet.py
@@ -7,6 +7,7 @@
from copy import deepcopy
from decimal import Decimal
from enum import Enum
+import math
from typing import (
Any,
Optional,
@@ -32,10 +33,13 @@ from test_framework.messages import (
CTxIn,
CTxInWitness,
CTxOut,
+ hash256,
+ ser_compact_size,
+ WITNESS_SCALE_FACTOR,
)
from test_framework.script import (
CScript,
- LEAF_VERSION_TAPSCRIPT,
+ OP_1,
OP_NOP,
OP_RETURN,
OP_TRUE,
@@ -51,6 +55,7 @@ from test_framework.script_util import (
from test_framework.util import (
assert_equal,
assert_greater_than_or_equal,
+ get_fee,
)
from test_framework.wallet_util import generate_keypair
@@ -65,7 +70,10 @@ class MiniWalletMode(Enum):
However, if the transactions need to be modified by the user (e.g. prepending
scriptSig for testing opcodes that are activated by a soft-fork), or the txs
should contain an actual signature, the raw modes RAW_OP_TRUE and RAW_P2PK
- can be useful. Summary of modes:
+ can be useful. In order to avoid mixing of UTXOs between different MiniWallet
+ instances, a tag name can be passed to the default mode, to create different
+ output scripts. Note that the UTXOs from the pre-generated test chain can
+ only be spent if no tag is passed. Summary of modes:
| output | | tx is | can modify | needs
mode | description | address | standard | scriptSig | signing
@@ -80,22 +88,25 @@ class MiniWalletMode(Enum):
class MiniWallet:
- def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE):
+ def __init__(self, test_node, *, mode=MiniWalletMode.ADDRESS_OP_TRUE, tag_name=None):
self._test_node = test_node
self._utxos = []
self._mode = mode
assert isinstance(mode, MiniWalletMode)
if mode == MiniWalletMode.RAW_OP_TRUE:
+ assert tag_name is None
self._scriptPubKey = bytes(CScript([OP_TRUE]))
elif mode == MiniWalletMode.RAW_P2PK:
# use simple deterministic private key (k=1)
+ assert tag_name is None
self._priv_key = ECKey()
self._priv_key.set((1).to_bytes(32, 'big'), True)
pub_key = self._priv_key.get_pubkey()
self._scriptPubKey = key_to_p2pk_script(pub_key.get_bytes())
elif mode == MiniWalletMode.ADDRESS_OP_TRUE:
- self._address, self._internal_key = create_deterministic_address_bcrt1_p2tr_op_true()
+ internal_key = None if tag_name is None else compute_xonly_pubkey(hash256(tag_name.encode()))[0]
+ self._address, self._taproot_info = create_deterministic_address_bcrt1_p2tr_op_true(internal_key)
self._scriptPubKey = address_to_scriptpubkey(self._address)
# When the pre-mined test framework chain is used, it contains coinbase
@@ -112,13 +123,16 @@ class MiniWallet:
"""Pad a transaction with extra outputs until it reaches a target weight (or higher).
returns the tx
"""
- tx.vout.append(CTxOut(nValue=0, scriptPubKey=CScript([OP_RETURN, b'a'])))
+ tx.vout.append(CTxOut(nValue=0, scriptPubKey=CScript([OP_RETURN])))
+ # determine number of needed padding bytes by converting weight difference to vbytes
dummy_vbytes = (target_weight - tx.get_weight() + 3) // 4
- tx.vout[-1].scriptPubKey = CScript([OP_RETURN, b'a' * dummy_vbytes])
- # Lower bound should always be off by at most 3
+ # compensate for the increase of the compact-size encoded script length
+ # (note that the length encoding of the unpadded output script needs one byte)
+ dummy_vbytes -= len(ser_compact_size(dummy_vbytes)) - 1
+ tx.vout[-1].scriptPubKey = CScript([OP_RETURN] + [OP_1] * dummy_vbytes)
+ # Actual weight should be at most 3 higher than target weight
assert_greater_than_or_equal(tx.get_weight(), target_weight)
- # Higher bound should always be off by at most 3 + 12 weight (for encoding the length)
- assert_greater_than_or_equal(target_weight + 15, tx.get_weight())
+ assert_greater_than_or_equal(target_weight + 3, tx.get_weight())
def get_balance(self):
return sum(u['value'] for u in self._utxos)
@@ -180,7 +194,12 @@ class MiniWallet:
elif self._mode == MiniWalletMode.ADDRESS_OP_TRUE:
tx.wit.vtxinwit = [CTxInWitness()] * len(tx.vin)
for i in tx.wit.vtxinwit:
- i.scriptWitness.stack = [CScript([OP_TRUE]), bytes([LEAF_VERSION_TAPSCRIPT]) + self._internal_key]
+ assert_equal(len(self._taproot_info.leaves), 1)
+ leaf_info = list(self._taproot_info.leaves.values())[0]
+ i.scriptWitness.stack = [
+ leaf_info.script,
+ bytes([leaf_info.version | self._taproot_info.negflag]) + self._taproot_info.internal_pubkey,
+ ]
else:
assert False
@@ -314,7 +333,7 @@ class MiniWallet:
tx = CTransaction()
tx.vin = [CTxIn(COutPoint(int(utxo_to_spend['txid'], 16), utxo_to_spend['vout']), nSequence=seq) for utxo_to_spend, seq in zip(utxos_to_spend, sequence)]
tx.vout = [CTxOut(amount_per_output, bytearray(self._scriptPubKey)) for _ in range(num_outputs)]
- tx.nVersion = version
+ tx.version = version
tx.nLockTime = locktime
self.sign_tx(tx)
@@ -360,6 +379,10 @@ class MiniWallet:
vsize = Decimal(168) # P2PK (73 bytes scriptSig + 35 bytes scriptPubKey + 60 bytes other)
else:
assert False
+ if target_weight and not fee: # respect fee_rate if target weight is passed
+ # the actual weight might be off by 3 WUs, so calculate based on that (see self._bulk_tx)
+ max_actual_weight = target_weight + 3
+ fee = get_fee(math.ceil(max_actual_weight / WITNESS_SCALE_FACTOR), fee_rate)
send_value = utxo_to_spend["value"] - (fee or (fee_rate * vsize / 1000))
# create tx
diff --git a/test/functional/test_framework/wallet_util.py b/test/functional/test_framework/wallet_util.py
index 44811918bf..2168e607b2 100755
--- a/test/functional/test_framework/wallet_util.py
+++ b/test/functional/test_framework/wallet_util.py
@@ -4,6 +4,7 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Useful util functions for testing the wallet"""
from collections import namedtuple
+import unittest
from test_framework.address import (
byte_to_base58,
@@ -15,6 +16,11 @@ from test_framework.address import (
script_to_p2wsh,
)
from test_framework.key import ECKey
+from test_framework.messages import (
+ CTxIn,
+ CTxInWitness,
+ WITNESS_SCALE_FACTOR,
+)
from test_framework.script_util import (
key_to_p2pkh_script,
key_to_p2wpkh_script,
@@ -123,6 +129,19 @@ def generate_keypair(compressed=True, wif=False):
privkey = bytes_to_wif(privkey.get_bytes(), compressed)
return privkey, pubkey
+def calculate_input_weight(scriptsig_hex, witness_stack_hex=None):
+ """Given a scriptSig and a list of witness stack items for an input in hex format,
+ calculate the total input weight. If the input has no witness data,
+ `witness_stack_hex` can be set to None."""
+ tx_in = CTxIn(scriptSig=bytes.fromhex(scriptsig_hex))
+ witness_size = 0
+ if witness_stack_hex is not None:
+ tx_inwit = CTxInWitness()
+ for witness_item_hex in witness_stack_hex:
+ tx_inwit.scriptWitness.stack.append(bytes.fromhex(witness_item_hex))
+ witness_size = len(tx_inwit.serialize())
+ return len(tx_in.serialize()) * WITNESS_SCALE_FACTOR + witness_size
+
class WalletUnlock():
"""
A context manager for unlocking a wallet with a passphrase and automatically locking it afterward.
@@ -141,3 +160,42 @@ class WalletUnlock():
def __exit__(self, *args):
_ = args
self.wallet.walletlock()
+
+
+class TestFrameworkWalletUtil(unittest.TestCase):
+ def test_calculate_input_weight(self):
+ SKELETON_BYTES = 32 + 4 + 4 # prevout-txid, prevout-index, sequence
+ SMALL_LEN_BYTES = 1 # bytes needed for encoding scriptSig / witness item lengths < 253
+ LARGE_LEN_BYTES = 3 # bytes needed for encoding scriptSig / witness item lengths >= 253
+
+ # empty scriptSig, no witness
+ self.assertEqual(calculate_input_weight(""),
+ (SKELETON_BYTES + SMALL_LEN_BYTES) * WITNESS_SCALE_FACTOR)
+ self.assertEqual(calculate_input_weight("", None),
+ (SKELETON_BYTES + SMALL_LEN_BYTES) * WITNESS_SCALE_FACTOR)
+ # small scriptSig, no witness
+ scriptSig_small = "00"*252
+ self.assertEqual(calculate_input_weight(scriptSig_small, None),
+ (SKELETON_BYTES + SMALL_LEN_BYTES + 252) * WITNESS_SCALE_FACTOR)
+ # small scriptSig, empty witness stack
+ self.assertEqual(calculate_input_weight(scriptSig_small, []),
+ (SKELETON_BYTES + SMALL_LEN_BYTES + 252) * WITNESS_SCALE_FACTOR + SMALL_LEN_BYTES)
+ # large scriptSig, no witness
+ scriptSig_large = "00"*253
+ self.assertEqual(calculate_input_weight(scriptSig_large, None),
+ (SKELETON_BYTES + LARGE_LEN_BYTES + 253) * WITNESS_SCALE_FACTOR)
+ # large scriptSig, empty witness stack
+ self.assertEqual(calculate_input_weight(scriptSig_large, []),
+ (SKELETON_BYTES + LARGE_LEN_BYTES + 253) * WITNESS_SCALE_FACTOR + SMALL_LEN_BYTES)
+ # empty scriptSig, 5 small witness stack items
+ self.assertEqual(calculate_input_weight("", ["00", "11", "22", "33", "44"]),
+ ((SKELETON_BYTES + SMALL_LEN_BYTES) * WITNESS_SCALE_FACTOR) + SMALL_LEN_BYTES + 5 * SMALL_LEN_BYTES + 5)
+ # empty scriptSig, 253 small witness stack items
+ self.assertEqual(calculate_input_weight("", ["00"]*253),
+ ((SKELETON_BYTES + SMALL_LEN_BYTES) * WITNESS_SCALE_FACTOR) + LARGE_LEN_BYTES + 253 * SMALL_LEN_BYTES + 253)
+ # small scriptSig, 3 large witness stack items
+ self.assertEqual(calculate_input_weight(scriptSig_small, ["00"*253]*3),
+ ((SKELETON_BYTES + SMALL_LEN_BYTES + 252) * WITNESS_SCALE_FACTOR) + SMALL_LEN_BYTES + 3 * LARGE_LEN_BYTES + 3*253)
+ # large scriptSig, 3 large witness stack items
+ self.assertEqual(calculate_input_weight(scriptSig_large, ["00"*253]*3),
+ ((SKELETON_BYTES + LARGE_LEN_BYTES + 253) * WITNESS_SCALE_FACTOR) + SMALL_LEN_BYTES + 3 * LARGE_LEN_BYTES + 3*253)