aboutsummaryrefslogtreecommitdiff
path: root/test/functional/interface_zmq.py
blob: 3c3ff1e4a0948e3f6518ca929c16f610b7ab04a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
#!/usr/bin/env python3
# Copyright (c) 2015-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface."""
import struct
from time import sleep
from io import BytesIO

from test_framework.address import (
    ADDRESS_BCRT1_P2WSH_OP_TRUE,
    ADDRESS_BCRT1_UNSPENDABLE,
)
from test_framework.blocktools import (
    add_witness_commitment,
    create_block,
    create_coinbase,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.messages import (
    CBlock,
    hash256,
    tx_from_hex,
)
from test_framework.util import (
    assert_equal,
    assert_raises_rpc_error,
    p2p_port,
)
from test_framework.wallet import (
    MiniWallet,
)
from test_framework.netutil import test_ipv6_local


# Test may be skipped and not have zmq installed
try:
    import zmq
except ImportError:
    pass

def hash256_reversed(byte_str):
    return hash256(byte_str)[::-1]

class ZMQSubscriber:
    def __init__(self, socket, topic):
        self.sequence = None  # no sequence number received yet
        self.socket = socket
        self.topic = topic

        self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)

    # Receive message from publisher and verify that topic and sequence match
    def _receive_from_publisher_and_check(self):
        topic, body, seq = self.socket.recv_multipart()
        # Topic should match the subscriber topic.
        assert_equal(topic, self.topic)
        # Sequence should be incremental.
        received_seq = struct.unpack('<I', seq)[-1]
        if self.sequence is None:
            self.sequence = received_seq
        else:
            assert_equal(received_seq, self.sequence)
        self.sequence += 1
        return body

    def receive(self):
        return self._receive_from_publisher_and_check()

    def receive_sequence(self):
        body = self._receive_from_publisher_and_check()
        hash = body[:32].hex()
        label = chr(body[32])
        mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
        if mempool_sequence is not None:
            assert label == "A" or label == "R"
        else:
            assert label == "D" or label == "C"
        return (hash, label, mempool_sequence)


class ZMQTestSetupBlock:
    """Helper class for setting up a ZMQ test via the "sync up" procedure.
    Generates a block on the specified node on instantiation and provides a
    method to check whether a ZMQ notification matches, i.e. the event was
    caused by this generated block.  Assumes that a notification either contains
    the generated block's hash, it's (coinbase) transaction id, the raw block or
    raw transaction data.
    """
    def __init__(self, test_framework, node):
        self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
        coinbase = node.getblock(self.block_hash, 2)['tx'][0]
        self.tx_hash = coinbase['txid']
        self.raw_tx = coinbase['hex']
        self.raw_block = node.getblock(self.block_hash, 0)

    def caused_notification(self, notification):
        return (
            self.block_hash in notification
            or self.tx_hash in notification
            or self.raw_block in notification
            or self.raw_tx in notification
        )


class ZMQTest (BitcoinTestFramework):
    def set_test_params(self):
        self.num_nodes = 2
        # whitelist peers to speed up tx relay / mempool sync
        self.noban_tx_relay = True
        self.zmq_port_base = p2p_port(self.num_nodes + 1)

    def skip_test_if_missing_module(self):
        self.skip_if_no_py3_zmq()
        self.skip_if_no_bitcoind_zmq()

    def run_test(self):
        self.wallet = MiniWallet(self.nodes[0])
        self.ctx = zmq.Context()
        try:
            self.test_basic()
            self.test_sequence()
            self.test_mempool_sync()
            self.test_reorg()
            self.test_multiple_interfaces()
            self.test_ipv6()
        finally:
            # Destroy the ZMQ context.
            self.log.debug("Destroying ZMQ context")
            self.ctx.destroy(linger=None)

    # Restart node with the specified zmq notifications enabled, subscribe to
    # all of them and return the corresponding ZMQSubscriber objects.
    def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
        subscribers = []
        for topic, address in services:
            socket = self.ctx.socket(zmq.SUB)
            if ipv6:
                socket.setsockopt(zmq.IPV6, 1)
            subscribers.append(ZMQSubscriber(socket, topic.encode()))

        self.restart_node(0, [f"-zmqpub{topic}={address}" for topic, address in services])

        for i, sub in enumerate(subscribers):
            sub.socket.connect(services[i][1])

        # Ensure that all zmq publisher notification interfaces are ready by
        # running the following "sync up" procedure:
        #   1. Generate a block on the node
        #   2. Try to receive the corresponding notification on all subscribers
        #   3. If all subscribers get the message within the timeout (1 second),
        #      we are done, otherwise repeat starting from step 1
        for sub in subscribers:
            sub.socket.set(zmq.RCVTIMEO, 1000)
        while True:
            test_block = ZMQTestSetupBlock(self, self.nodes[0])
            recv_failed = False
            for sub in subscribers:
                try:
                    while not test_block.caused_notification(sub.receive().hex()):
                        self.log.debug("Ignoring sync-up notification for previously generated block.")
                except zmq.error.Again:
                    self.log.debug("Didn't receive sync-up notification, trying again.")
                    recv_failed = True
            if not recv_failed:
                self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
                break

        # set subscriber's desired timeout for the test
        for sub in subscribers:
            sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)

        self.connect_nodes(0, 1)
        if sync_blocks:
            self.sync_blocks()

        return subscribers

    def test_basic(self):

        # Invalid zmq arguments don't take down the node, see #17185.
        self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])

        address = f"tcp://127.0.0.1:{self.zmq_port_base}"
        subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])

        hashblock = subs[0]
        hashtx = subs[1]
        rawblock = subs[2]
        rawtx = subs[3]

        num_blocks = 5
        self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
        genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)

        for x in range(num_blocks):
            # Should receive the coinbase txid.
            txid = hashtx.receive()

            # Should receive the coinbase raw transaction.
            tx = tx_from_hex(rawtx.receive().hex())
            tx.calc_sha256()
            assert_equal(tx.hash, txid.hex())

            # Should receive the generated raw block.
            hex = rawblock.receive()
            block = CBlock()
            block.deserialize(BytesIO(hex))
            assert block.is_valid()
            assert_equal(block.vtx[0].hash, tx.hash)
            assert_equal(len(block.vtx), 1)
            assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())

            # Should receive the generated block hash.
            hash = hashblock.receive().hex()
            assert_equal(genhashes[x], hash)
            # The block should only have the coinbase txid.
            assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])


        self.log.info("Wait for tx from second node")
        payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
        payment_txid = payment_tx['txid']
        self.sync_all()
        # Should receive the broadcasted txid.
        txid = hashtx.receive()
        assert_equal(payment_txid, txid.hex())

        # Should receive the broadcasted raw transaction.
        hex = rawtx.receive()
        assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex())

        # Mining the block with this tx should result in second notification
        # after coinbase tx notification
        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
        hashtx.receive()
        txid = hashtx.receive()
        assert_equal(payment_txid, txid.hex())


        self.log.info("Test the getzmqnotifications RPC")
        assert_equal(self.nodes[0].getzmqnotifications(), [
            {"type": "pubhashblock", "address": address, "hwm": 1000},
            {"type": "pubhashtx", "address": address, "hwm": 1000},
            {"type": "pubrawblock", "address": address, "hwm": 1000},
            {"type": "pubrawtx", "address": address, "hwm": 1000},
        ])

        assert_equal(self.nodes[1].getzmqnotifications(), [])

    def test_reorg(self):

        address = f"tcp://127.0.0.1:{self.zmq_port_base}"

        # Should only notify the tip if a reorg occurs
        hashblock, hashtx = self.setup_zmq_test(
            [(topic, address) for topic in ["hashblock", "hashtx"]],
            recv_timeout=2)  # 2 second timeout to check end of notifications
        self.disconnect_nodes(0, 1)

        # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
        payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
        disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
        disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
        assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
        assert_equal(hashtx.receive().hex(), payment_txid)
        assert_equal(hashtx.receive().hex(), disconnect_cb)

        # Generate 2 blocks in nodes[1] to a different address to ensure split
        connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)

        # nodes[0] will reorg chain after connecting back nodes[1]
        self.connect_nodes(0, 1)
        self.sync_blocks() # tx in mempool valid but not advertised

        # Should receive nodes[1] tip
        assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())

        # During reorg:
        # Get old payment transaction notification from disconnect and disconnected cb
        assert_equal(hashtx.receive().hex(), payment_txid)
        assert_equal(hashtx.receive().hex(), disconnect_cb)
        # And the payment transaction again due to mempool entry
        assert_equal(hashtx.receive().hex(), payment_txid)
        assert_equal(hashtx.receive().hex(), payment_txid)
        # And the new connected coinbases
        for i in [0, 1]:
            assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])

        # If we do a simple invalidate we announce the disconnected coinbase
        self.nodes[0].invalidateblock(connect_blocks[1])
        assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
        # And the current tip
        assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])

    def test_sequence(self):
        """
        Sequence zmq notifications give every blockhash and txhash in order
        of processing, regardless of IBD, re-orgs, etc.
        Format of messages:
        <32-byte hash>C :                 Blockhash connected
        <32-byte hash>D :                 Blockhash disconnected
        <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
        <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
        """
        self.log.info("Testing 'sequence' publisher")
        [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
        self.disconnect_nodes(0, 1)

        # Mempool sequence number starts at 1
        seq_num = 1

        # Generate 1 block in nodes[0] and receive all notifications
        dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]

        # Note: We are not notified of any block transactions, coinbase or mined
        assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())

        # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
        self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)

        # nodes[0] will reorg chain after connecting back nodes[1]
        self.connect_nodes(0, 1)

        # Then we receive all block (dis)connect notifications for the 2 block reorg
        assert_equal((dc_block, "D", None), seq.receive_sequence())
        block_count = self.nodes[1].getblockcount()
        assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
        assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())

        self.log.info("Wait for tx from second node")
        payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
        payment_txid = payment_tx['txid']
        self.sync_all()
        self.log.info("Testing sequence notifications with mempool sequence values")

        # Should receive the broadcasted txid.
        assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
        seq_num += 1

        self.log.info("Testing RBF notification")
        # Replace it to test eviction/addition notification
        payment_tx['tx'].vout[0].nValue -= 1000
        rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex())
        self.sync_all()
        assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
        seq_num += 1
        assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
        seq_num += 1

        # Doesn't get published when mined, make a block and tx to "flush" the possibility
        # though the mempool sequence number does go up by the number of transactions
        # removed from the mempool by the block mining it.
        mempool_size = len(self.nodes[0].getrawmempool())
        c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0]
        # Make sure the number of mined transactions matches the number of txs out of mempool
        mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
        assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
        seq_num += mempool_size_delta
        payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid']
        self.sync_all()
        assert_equal((c_block, "C", None), seq.receive_sequence())
        assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
        seq_num += 1

        # Spot check getrawmempool results that they only show up when asked for
        assert type(self.nodes[0].getrawmempool()) is list
        assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
        assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
        assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
        assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)

        self.log.info("Testing reorg notifications")
        # Manually invalidate the last block to test mempool re-entry
        # N.B. This part could be made more lenient in exact ordering
        # since it greatly depends on inner-workings of blocks/mempool
        # during "deep" re-orgs. Probably should "re-construct"
        # blockchain/mempool state from notifications instead.
        block_count = self.nodes[0].getblockcount()
        best_hash = self.nodes[0].getbestblockhash()
        self.nodes[0].invalidateblock(best_hash)
        sleep(2)  # Bit of room to make sure transaction things happened

        # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
        # of the time they were gathered.
        assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num

        assert_equal((best_hash, "D", None), seq.receive_sequence())
        assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
        seq_num += 1

        # Other things may happen but aren't wallet-deterministic so we don't test for them currently
        self.nodes[0].reconsiderblock(best_hash)
        self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE)

        self.log.info("Evict mempool transaction by block conflict")
        orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0])
        orig_txid = orig_tx['txid']

        # More to be simply mined
        more_tx = []
        for _ in range(5):
            more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))

        orig_tx['tx'].vout[0].nValue -= 1000
        bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex())
        # Mine the pre-bump tx
        txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx]
        block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add)
        add_witness_commitment(block)
        block.solve()
        assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
        tip = self.nodes[0].getbestblockhash()
        assert_equal(int(tip, 16), block.sha256)
        orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']

        # Flush old notifications until evicted tx original entry
        (hash_str, label, mempool_seq) = seq.receive_sequence()
        while hash_str != orig_txid:
            (hash_str, label, mempool_seq) = seq.receive_sequence()
        mempool_seq += 1

        # Added original tx
        assert_equal(label, "A")
        # More transactions to be simply mined
        for i in range(len(more_tx)):
            assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence())
            mempool_seq += 1
        # Bumped by rbf
        assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
        mempool_seq += 1
        assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence())
        mempool_seq += 1
        # Conflict announced first, then block
        assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence())
        mempool_seq += 1
        assert_equal((tip, "C", None), seq.receive_sequence())
        mempool_seq += len(more_tx)
        # Last tx
        assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
        mempool_seq += 1
        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
        self.sync_all()  # want to make sure we didn't break "consensus" for other tests

    def test_mempool_sync(self):
        """
        Use sequence notification plus getrawmempool sequence results to "sync mempool"
        """

        self.log.info("Testing 'mempool sync' usage of sequence notifier")
        [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])

        # In-memory counter, should always start at 1
        next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
        assert_equal(next_mempool_seq, 1)

        # Some transactions have been happening but we aren't consuming zmq notifications yet
        # or we lost a ZMQ message somehow and want to start over
        txs = []
        num_txs = 5
        for _ in range(num_txs):
            txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1]))
        self.sync_all()

        # 1) Consume backlog until we get a mempool sequence number
        (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
        while zmq_mem_seq is None:
            (hash_str, label, zmq_mem_seq) = seq.receive_sequence()

        assert label == "A" or label == "R"
        assert hash_str is not None

        # 2) We need to "seed" our view of the mempool
        mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
        mempool_view = set(mempool_snapshot["txids"])
        get_raw_seq = mempool_snapshot["mempool_sequence"]
        assert_equal(get_raw_seq, 6)
        # Snapshot may be too old compared to zmq message we read off latest
        while zmq_mem_seq >= get_raw_seq:
            sleep(2)
            mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
            mempool_view = set(mempool_snapshot["txids"])
            get_raw_seq = mempool_snapshot["mempool_sequence"]

        # Things continue to happen in the "interim" while waiting for snapshot results
        # We have node 0 do all these to avoid p2p races with RBF announcements
        for _ in range(num_txs):
            txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
        txs[-1]['tx'].vout[0].nValue -= 1000
        self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex())
        self.sync_all()
        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
        final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']

        # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
        while True:
            if zmq_mem_seq == get_raw_seq - 1:
                break
            (hash_str, label, mempool_sequence) = seq.receive_sequence()
            if mempool_sequence is not None:
                zmq_mem_seq = mempool_sequence
                if zmq_mem_seq > get_raw_seq:
                    raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")

        # 4) Moving forward, we apply the delta to our local view
        #    remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
        expected_sequence = get_raw_seq
        r_gap = 0
        for _ in range(num_txs + 2 + 1 + 1):
            (hash_str, label, mempool_sequence) = seq.receive_sequence()
            if mempool_sequence is not None:
                if mempool_sequence != expected_sequence:
                    # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
                    # position in the incoming block message "C"
                    if label == "R":
                        assert mempool_sequence > expected_sequence
                        r_gap += mempool_sequence - expected_sequence
                    else:
                        raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
            if label == "A":
                assert hash_str not in mempool_view
                mempool_view.add(hash_str)
                expected_sequence = mempool_sequence + 1
            elif label == "R":
                assert hash_str in mempool_view
                mempool_view.remove(hash_str)
                expected_sequence = mempool_sequence + 1
            elif label == "C":
                # (Attempt to) remove all txids from known block connects
                block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
                for txid in block_txids:
                    if txid in mempool_view:
                        expected_sequence += 1
                        mempool_view.remove(txid)
                expected_sequence -= r_gap
                r_gap = 0
            elif label == "D":
                # Not useful for mempool tracking per se
                continue
            else:
                raise Exception("Unexpected ZMQ sequence label!")

        assert_equal(self.nodes[0].getrawmempool(), [final_txid])
        assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)

        # 5) If you miss a zmq/mempool sequence number, go back to step (2)

        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)

    def test_multiple_interfaces(self):
        # Set up two subscribers with different addresses
        # (note that after the reorg test, syncing would fail due to different
        # chain lengths on node0 and node1; for this test we only need node0, so
        # we can disable syncing blocks on the setup)
        subscribers = self.setup_zmq_test([
            ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"),
            ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"),
        ], sync_blocks=False)

        # Generate 1 block in nodes[0] and receive all notifications
        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)

        # Should receive the same block hash on both subscribers
        assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
        assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())

    def test_ipv6(self):
        if not test_ipv6_local():
            self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
            return
        self.log.info("Testing IPv6")
        # Set up subscriber using IPv6 loopback address
        subscribers = self.setup_zmq_test([
            ("hashblock", f"tcp://[::1]:{self.zmq_port_base}")
        ], ipv6=True)

        # Generate 1 block in nodes[0]
        self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)

        # Should receive the same block hash
        assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())


if __name__ == '__main__':
    ZMQTest().main()