#!/usr/bin/env python3 # Copyright (c) 2015-2022 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" import struct from time import sleep from test_framework.address import ( ADDRESS_BCRT1_P2WSH_OP_TRUE, ADDRESS_BCRT1_UNSPENDABLE, ) from test_framework.blocktools import ( add_witness_commitment, create_block, create_coinbase, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import ( hash256, tx_from_hex, ) from test_framework.util import ( assert_equal, assert_raises_rpc_error, p2p_port, ) from test_framework.wallet import ( MiniWallet, ) from test_framework.netutil import test_ipv6_local # Test may be skipped and not have zmq installed try: import zmq except ImportError: pass def hash256_reversed(byte_str): return hash256(byte_str)[::-1] class ZMQSubscriber: def __init__(self, socket, topic): self.sequence = None # no sequence number received yet self.socket = socket self.topic = topic self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) # Receive message from publisher and verify that topic and sequence match def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. received_seq = struct.unpack('C : Blockhash connected <32-byte hash>D : Blockhash disconnected <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool """ self.log.info("Testing 'sequence' publisher") [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) self.disconnect_nodes(0, 1) # Mempool sequence number starts at 1 seq_num = 1 # Generate 1 block in nodes[0] and receive all notifications dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0] # Note: We are not notified of any block transactions, coinbase or mined assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) # Generate 2 blocks in nodes[1] to a different address to ensure a chain split self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op) # nodes[0] will reorg chain after connecting back nodes[1] self.connect_nodes(0, 1) # Then we receive all block (dis)connect notifications for the 2 block reorg assert_equal((dc_block, "D", None), seq.receive_sequence()) block_count = self.nodes[1].getblockcount() assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) self.log.info("Wait for tx from second node") payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) payment_txid = payment_tx['txid'] self.sync_all() self.log.info("Testing sequence notifications with mempool sequence values") # Should receive the broadcasted txid. assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 self.log.info("Testing RBF notification") # Replace it to test eviction/addition notification payment_tx['tx'].vout[0].nValue -= 1000 rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex()) self.sync_all() assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) seq_num += 1 assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Doesn't get published when mined, make a block and tx to "flush" the possibility # though the mempool sequence number does go up by the number of transactions # removed from the mempool by the block mining it. mempool_size = len(self.nodes[0].getrawmempool()) c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0] # Make sure the number of mined transactions matches the number of txs out of mempool mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) seq_num += mempool_size_delta payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid'] self.sync_all() assert_equal((c_block, "C", None), seq.receive_sequence()) assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Spot check getrawmempool results that they only show up when asked for assert type(self.nodes[0].getrawmempool()) is list assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) self.log.info("Testing reorg notifications") # Manually invalidate the last block to test mempool re-entry # N.B. This part could be made more lenient in exact ordering # since it greatly depends on inner-workings of blocks/mempool # during "deep" re-orgs. Probably should "re-construct" # blockchain/mempool state from notifications instead. block_count = self.nodes[0].getblockcount() best_hash = self.nodes[0].getbestblockhash() self.nodes[0].invalidateblock(best_hash) sleep(2) # Bit of room to make sure transaction things happened # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective # of the time they were gathered. assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num assert_equal((best_hash, "D", None), seq.receive_sequence()) assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 # Other things may happen but aren't wallet-deterministic so we don't test for them currently self.nodes[0].reconsiderblock(best_hash) self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE) self.log.info("Evict mempool transaction by block conflict") orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0]) orig_txid = orig_tx['txid'] # More to be simply mined more_tx = [] for _ in range(5): more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) orig_tx['tx'].vout[0].nValue -= 1000 bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex()) # Mine the pre-bump tx txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx] block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add) add_witness_commitment(block) block.solve() assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) tip = self.nodes[0].getbestblockhash() assert_equal(int(tip, 16), block.sha256) orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] # Flush old notifications until evicted tx original entry (hash_str, label, mempool_seq) = seq.receive_sequence() while hash_str != orig_txid: (hash_str, label, mempool_seq) = seq.receive_sequence() mempool_seq += 1 # Added original tx assert_equal(label, "A") # More transactions to be simply mined for i in range(len(more_tx)): assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 # Bumped by rbf assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence()) mempool_seq += 1 assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 # Conflict announced first, then block assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence()) mempool_seq += 1 assert_equal((tip, "C", None), seq.receive_sequence()) mempool_seq += len(more_tx) # Last tx assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) self.sync_all() # want to make sure we didn't break "consensus" for other tests def test_mempool_sync(self): """ Use sequence notification plus getrawmempool sequence results to "sync mempool" """ self.log.info("Testing 'mempool sync' usage of sequence notifier") [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] assert_equal(next_mempool_seq, 1) # Some transactions have been happening but we aren't consuming zmq notifications yet # or we lost a ZMQ message somehow and want to start over txs = [] num_txs = 5 for _ in range(num_txs): txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1])) self.sync_all() # 1) Consume backlog until we get a mempool sequence number (hash_str, label, zmq_mem_seq) = seq.receive_sequence() while zmq_mem_seq is None: (hash_str, label, zmq_mem_seq) = seq.receive_sequence() assert label == "A" or label == "R" assert hash_str is not None # 2) We need to "seed" our view of the mempool mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] assert_equal(get_raw_seq, 6) # Snapshot may be too old compared to zmq message we read off latest while zmq_mem_seq >= get_raw_seq: sleep(2) mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) mempool_view = set(mempool_snapshot["txids"]) get_raw_seq = mempool_snapshot["mempool_sequence"] # Things continue to happen in the "interim" while waiting for snapshot results # We have node 0 do all these to avoid p2p races with RBF announcements for _ in range(num_txs): txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) txs[-1]['tx'].vout[0].nValue -= 1000 self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex()) self.sync_all() self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot while True: if zmq_mem_seq == get_raw_seq - 1: break (hash_str, label, mempool_sequence) = seq.receive_sequence() if mempool_sequence is not None: zmq_mem_seq = mempool_sequence if zmq_mem_seq > get_raw_seq: raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}") # 4) Moving forward, we apply the delta to our local view # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx expected_sequence = get_raw_seq r_gap = 0 for _ in range(num_txs + 2 + 1 + 1): (hash_str, label, mempool_sequence) = seq.receive_sequence() if mempool_sequence is not None: if mempool_sequence != expected_sequence: # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its # position in the incoming block message "C" if label == "R": assert mempool_sequence > expected_sequence r_gap += mempool_sequence - expected_sequence else: raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") if label == "A": assert hash_str not in mempool_view mempool_view.add(hash_str) expected_sequence = mempool_sequence + 1 elif label == "R": assert hash_str in mempool_view mempool_view.remove(hash_str) expected_sequence = mempool_sequence + 1 elif label == "C": # (Attempt to) remove all txids from known block connects block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] for txid in block_txids: if txid in mempool_view: expected_sequence += 1 mempool_view.remove(txid) expected_sequence -= r_gap r_gap = 0 elif label == "D": # Not useful for mempool tracking per se continue else: raise Exception("Unexpected ZMQ sequence label!") assert_equal(self.nodes[0].getrawmempool(), [final_txid]) assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) # 5) If you miss a zmq/mempool sequence number, go back to step (2) self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) def test_multiple_interfaces(self): # Set up two subscribers with different addresses # (note that after the reorg test, syncing would fail due to different # chain lengths on node0 and node1; for this test we only need node0, so # we can disable syncing blocks on the setup) subscribers = self.setup_zmq_test([ ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"), ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"), ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op) # Should receive the same block hash on both subscribers assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) def test_ipv6(self): if not test_ipv6_local(): self.log.info("Skipping IPv6 test, because IPv6 is not supported.") return self.log.info("Testing IPv6") # Set up subscriber using IPv6 loopback address subscribers = self.setup_zmq_test([ ("hashblock", f"tcp://[::1]:{self.zmq_port_base}") ], ipv6=True) # Generate 1 block in nodes[0] self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) # Should receive the same block hash assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) if __name__ == '__main__': ZMQTest().main()