1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
#!/usr/bin/env python3
# Copyright (c) 2021-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the scanblocks RPC call."""
from test_framework.blockfilter import (
bip158_basic_element_hash,
bip158_relevant_scriptpubkeys,
)
from test_framework.messages import COIN
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
from test_framework.wallet import (
MiniWallet,
getnewdestination,
)
class ScanblocksTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-blockfilterindex=1"], []]
def run_test(self):
node = self.nodes[0]
wallet = MiniWallet(node)
wallet.rescan_utxos()
# send 1.0, mempool only
_, spk_1, addr_1 = getnewdestination()
wallet.send_to(from_node=node, scriptPubKey=spk_1, amount=1 * COIN)
parent_key = "tpubD6NzVbkrYhZ4WaWSyoBvQwbpLkojyoTZPRsgXELWz3Popb3qkjcJyJUGLnL4qHHoQvao8ESaAstxYSnhyswJ76uZPStJRJCTKvosUCJZL5B"
# send 1.0, mempool only
# childkey 5 of `parent_key`
wallet.send_to(from_node=node,
scriptPubKey=bytes.fromhex(node.validateaddress("mkS4HXoTYWRTescLGaUTGbtTTYX5EjJyEE")['scriptPubKey']),
amount=1 * COIN)
# mine a block and assure that the mined blockhash is in the filterresult
blockhash = self.generate(node, 1)[0]
height = node.getblockheader(blockhash)['height']
self.wait_until(lambda: all(i["synced"] for i in node.getindexinfo().values()))
out = node.scanblocks("start", [f"addr({addr_1})"])
assert(blockhash in out['relevant_blocks'])
assert_equal(height, out['to_height'])
assert_equal(0, out['from_height'])
# mine another block
blockhash_new = self.generate(node, 1)[0]
height_new = node.getblockheader(blockhash_new)['height']
# make sure the blockhash is not in the filter result if we set the start_height
# to the just mined block (unlikely to hit a false positive)
assert(blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], height_new)['relevant_blocks'])
# make sure the blockhash is present when using the first mined block as start_height
assert(blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height)['relevant_blocks'])
# also test the stop height
assert(blockhash in node.scanblocks(
"start", [f"addr({addr_1})"], height, height)['relevant_blocks'])
# use the stop_height to exclude the relevant block
assert(blockhash not in node.scanblocks(
"start", [f"addr({addr_1})"], 0, height - 1)['relevant_blocks'])
# make sure the blockhash is present when using the first mined block as start_height
assert(blockhash in node.scanblocks(
"start", [{"desc": f"pkh({parent_key}/*)", "range": [0, 100]}], height)['relevant_blocks'])
# check that false-positives are included in the result now; note that
# finding a false-positive at runtime would take too long, hence we simply
# use a pre-calculated one that collides with the regtest genesis block's
# coinbase output and verify that their BIP158 ranged hashes match
genesis_blockhash = node.getblockhash(0)
genesis_spks = bip158_relevant_scriptpubkeys(node, genesis_blockhash)
assert_equal(len(genesis_spks), 1)
genesis_coinbase_spk = list(genesis_spks)[0]
false_positive_spk = bytes.fromhex("001400000000000000000000000000000000000cadcb")
genesis_coinbase_hash = bip158_basic_element_hash(genesis_coinbase_spk, 1, genesis_blockhash)
false_positive_hash = bip158_basic_element_hash(false_positive_spk, 1, genesis_blockhash)
assert_equal(genesis_coinbase_hash, false_positive_hash)
assert(genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({genesis_coinbase_spk.hex()})"}], 0, 0)['relevant_blocks'])
assert(genesis_blockhash in node.scanblocks(
"start", [{"desc": f"raw({false_positive_spk.hex()})"}], 0, 0)['relevant_blocks'])
# TODO: after an "accurate" mode for scanblocks is implemented (e.g. PR #26325)
# check here that it filters out the false-positive
# test node with disabled blockfilterindex
assert_raises_rpc_error(-1, "Index is not enabled for filtertype basic",
self.nodes[1].scanblocks, "start", [f"addr({addr_1})"])
# test unknown filtertype
assert_raises_rpc_error(-5, "Unknown filtertype",
node.scanblocks, "start", [f"addr({addr_1})"], 0, 10, "extended")
# test invalid start_height
assert_raises_rpc_error(-1, "Invalid start_height",
node.scanblocks, "start", [f"addr({addr_1})"], 100000000)
# test invalid stop_height
assert_raises_rpc_error(-1, "Invalid stop_height",
node.scanblocks, "start", [f"addr({addr_1})"], 10, 0)
assert_raises_rpc_error(-1, "Invalid stop_height",
node.scanblocks, "start", [f"addr({addr_1})"], 10, 100000000)
# test accessing the status (must be empty)
assert_equal(node.scanblocks("status"), None)
# test aborting the current scan (there is no, must return false)
assert_equal(node.scanblocks("abort"), False)
# test invalid command
assert_raises_rpc_error(-8, "Invalid action 'foobar'", node.scanblocks, "foobar")
if __name__ == '__main__':
ScanblocksTest().main()
|