aboutsummaryrefslogtreecommitdiff
path: root/test/functional/interface_zmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/interface_zmq.py')
-rwxr-xr-xtest/functional/interface_zmq.py344
1 files changed, 335 insertions, 9 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index ef4780cacb..a0bc937f75 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -5,13 +5,24 @@
"""Test the ZMQ notification interface."""
import struct
-from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
+from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE
+from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.messages import CTransaction, hash256
-from test_framework.util import assert_equal, connect_nodes
+from test_framework.messages import CTransaction, hash256, FromHex
+from test_framework.util import (
+ assert_equal,
+ connect_nodes,
+ assert_raises_rpc_error,
+)
from io import BytesIO
from time import sleep
+# Test may be skipped and not have zmq installed
+try:
+ import zmq
+except ImportError:
+ pass
+
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
@@ -21,7 +32,6 @@ class ZMQSubscriber:
self.socket = socket
self.topic = topic
- import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
@@ -33,6 +43,22 @@ class ZMQSubscriber:
self.sequence += 1
return body
+ def receive_sequence(self):
+ topic, body, seq = self.socket.recv_multipart()
+ # Topic should match the subscriber topic.
+ assert_equal(topic, self.topic)
+ # Sequence should be incremental.
+ assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
+ self.sequence += 1
+ hash = body[:32].hex()
+ label = chr(body[32])
+ mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
+ if mempool_sequence is not None:
+ assert label == "A" or label == "R"
+ else:
+ assert label == "D" or label == "C"
+ return (hash, label, mempool_sequence)
+
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
@@ -43,18 +69,19 @@ class ZMQTest (BitcoinTestFramework):
self.skip_if_no_bitcoind_zmq()
def run_test(self):
- import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
+ self.test_sequence()
+ self.test_mempool_sync()
self.test_reorg()
+ self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)
def test_basic(self):
- import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@@ -146,7 +173,6 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Skipping reorg test because wallet is disabled")
return
- import zmq
address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
@@ -177,8 +203,8 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
- # Generate 2 blocks in nodes[1]
- connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
+ # Generate 2 blocks in nodes[1] to a different address to ensure split
+ connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
@@ -204,5 +230,305 @@ class ZMQTest (BitcoinTestFramework):
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
+ def test_sequence(self):
+ """
+ Sequence zmq notifications give every blockhash and txhash in order
+ of processing, regardless of IBD, re-orgs, etc.
+ Format of messages:
+ <32-byte hash>C : Blockhash connected
+ <32-byte hash>D : Blockhash disconnected
+ <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
+ <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
+ """
+ self.log.info("Testing 'sequence' publisher")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # Mempool sequence number starts at 1
+ seq_num = 1
+
+ # Generate 1 block in nodes[0] and receive all notifications
+ dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
+
+ # Note: We are not notified of any block transactions, coinbase or mined
+ assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
+
+ # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
+ self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
+
+ # nodes[0] will reorg chain after connecting back nodes[1]
+ connect_nodes(self.nodes[0], 1)
+
+ # Then we receive all block (dis)connect notifications for the 2 block reorg
+ assert_equal((dc_block, "D", None), seq.receive_sequence())
+ block_count = self.nodes[1].getblockcount()
+ assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
+ assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
+
+ # Rest of test requires wallet functionality
+ if self.is_wallet_compiled():
+ self.log.info("Wait for tx from second node")
+ payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True)
+ self.sync_all()
+ self.log.info("Testing sequence notifications with mempool sequence values")
+
+ # Should receive the broadcasted txid.
+ assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ self.log.info("Testing RBF notification")
+ # Replace it to test eviction/addition notification
+ rbf_info = self.nodes[1].bumpfee(payment_txid)
+ self.sync_all()
+ assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
+ seq_num += 1
+ assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Doesn't get published when mined, make a block and tx to "flush" the possibility
+ # though the mempool sequence number does go up by the number of transactions
+ # removed from the mempool by the block mining it.
+ mempool_size = len(self.nodes[0].getrawmempool())
+ c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
+ self.sync_all()
+ # Make sure the number of mined transactions matches the number of txs out of mempool
+ mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
+ assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
+ seq_num += mempool_size_delta
+ payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
+ self.sync_all()
+ assert_equal((c_block, "C", None), seq.receive_sequence())
+ assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Spot check getrawmempool results that they only show up when asked for
+ assert type(self.nodes[0].getrawmempool()) is list
+ assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
+ assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
+ assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
+ assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
+
+ self.log.info("Testing reorg notifications")
+ # Manually invalidate the last block to test mempool re-entry
+ # N.B. This part could be made more lenient in exact ordering
+ # since it greatly depends on inner-workings of blocks/mempool
+ # during "deep" re-orgs. Probably should "re-construct"
+ # blockchain/mempool state from notifications instead.
+ block_count = self.nodes[0].getblockcount()
+ best_hash = self.nodes[0].getbestblockhash()
+ self.nodes[0].invalidateblock(best_hash)
+ sleep(2) # Bit of room to make sure transaction things happened
+
+ # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
+ # of the time they were gathered.
+ assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
+
+ assert_equal((best_hash, "D", None), seq.receive_sequence())
+ assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Other things may happen but aren't wallet-deterministic so we don't test for them currently
+ self.nodes[0].reconsiderblock(best_hash)
+ self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ self.sync_all()
+
+ self.log.info("Evict mempool transaction by block conflict")
+ orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
+
+ # More to be simply mined
+ more_tx = []
+ for _ in range(5):
+ more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
+
+ raw_tx = self.nodes[0].getrawtransaction(orig_txid)
+ bump_info = self.nodes[0].bumpfee(orig_txid)
+ # Mine the pre-bump tx
+ block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
+ tx = FromHex(CTransaction(), raw_tx)
+ block.vtx.append(tx)
+ for txid in more_tx:
+ tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
+ block.vtx.append(tx)
+ add_witness_commitment(block)
+ block.solve()
+ assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
+ tip = self.nodes[0].getbestblockhash()
+ assert_equal(int(tip, 16), block.sha256)
+ orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
+
+ # Flush old notifications until evicted tx original entry
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ while hash_str != orig_txid:
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ mempool_seq += 1
+
+ # Added original tx
+ assert_equal(label, "A")
+ # More transactions to be simply mined
+ for i in range(len(more_tx)):
+ assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ # Bumped by rbf
+ assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ # Conflict announced first, then block
+ assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ assert_equal((tip, "C", None), seq.receive_sequence())
+ mempool_seq += len(more_tx)
+ # Last tx
+ assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ self.sync_all() # want to make sure we didn't break "consensus" for other tests
+
+ def test_mempool_sync(self):
+ """
+ Use sequence notification plus getrawmempool sequence results to "sync mempool"
+ """
+ if not self.is_wallet_compiled():
+ self.log.info("Skipping mempool sync test")
+ return
+
+ self.log.info("Testing 'mempool sync' usage of sequence notifier")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
+ connect_nodes(self.nodes[0], 1)
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # In-memory counter, should always start at 1
+ next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
+ assert_equal(next_mempool_seq, 1)
+
+ # Some transactions have been happening but we aren't consuming zmq notifications yet
+ # or we lost a ZMQ message somehow and want to start over
+ txids = []
+ num_txs = 5
+ for _ in range(num_txs):
+ txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True))
+ self.sync_all()
+
+ # 1) Consume backlog until we get a mempool sequence number
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+ while zmq_mem_seq is None:
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+
+ assert label == "A" or label == "R"
+ assert hash_str is not None
+
+ # 2) We need to "seed" our view of the mempool
+ mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+ assert_equal(get_raw_seq, 6)
+ # Snapshot may be too old compared to zmq message we read off latest
+ while zmq_mem_seq >= get_raw_seq:
+ sleep(2)
+ mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+
+ # Things continue to happen in the "interim" while waiting for snapshot results
+ # We have node 0 do all these to avoid p2p races with RBF announcements
+ for _ in range(num_txs):
+ txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True))
+ self.nodes[0].bumpfee(txids[-1])
+ self.sync_all()
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)
+
+ # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
+ while True:
+ if zmq_mem_seq == get_raw_seq - 1:
+ break
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if mempool_sequence is not None:
+ zmq_mem_seq = mempool_sequence
+ if zmq_mem_seq > get_raw_seq:
+ raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
+
+ # 4) Moving forward, we apply the delta to our local view
+ # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
+ expected_sequence = get_raw_seq
+ r_gap = 0
+ for _ in range(num_txs + 2 + 1 + 1):
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if mempool_sequence is not None:
+ if mempool_sequence != expected_sequence:
+ # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
+ # position in the incoming block message "C"
+ if label == "R":
+ assert mempool_sequence > expected_sequence
+ r_gap += mempool_sequence - expected_sequence
+ else:
+ raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
+ if label == "A":
+ assert hash_str not in mempool_view
+ mempool_view.add(hash_str)
+ expected_sequence = mempool_sequence + 1
+ elif label == "R":
+ assert hash_str in mempool_view
+ mempool_view.remove(hash_str)
+ expected_sequence = mempool_sequence + 1
+ elif label == "C":
+ # (Attempt to) remove all txids from known block connects
+ block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
+ for txid in block_txids:
+ if txid in mempool_view:
+ expected_sequence += 1
+ mempool_view.remove(txid)
+ expected_sequence -= r_gap
+ r_gap = 0
+ elif label == "D":
+ # Not useful for mempool tracking per se
+ continue
+ else:
+ raise Exception("Unexpected ZMQ sequence label!")
+
+ assert_equal(self.nodes[0].getrawmempool(), [final_txid])
+ assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
+
+ # 5) If you miss a zmq/mempool sequence number, go back to step (2)
+
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+
+ def test_multiple_interfaces(self):
+ # Set up two subscribers with different addresses
+ subscribers = []
+ for i in range(2):
+ address = 'tcp://127.0.0.1:%d' % (28334 + i)
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ hashblock = ZMQSubscriber(socket, b"hashblock")
+ socket.connect(address)
+ subscribers.append({'address': address, 'hashblock': hashblock})
+
+ self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
+
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # Generate 1 block in nodes[0] and receive all notifications
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+
+ # Should receive the same block hash on both subscribers
+ assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
+ assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
+
if __name__ == '__main__':
ZMQTest().main()