diff options
-rwxr-xr-x | test/functional/interface_zmq.py | 76 |
1 files changed, 54 insertions, 22 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index e9f61be4d4..d0967a9340 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -27,28 +27,31 @@ def hash256_reversed(byte_str): class ZMQSubscriber: def __init__(self, socket, topic): - self.sequence = 0 + self.sequence = None # no sequence number received yet self.socket = socket self.topic = topic self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) - def receive(self): + # Receive message from publisher and verify that topic and sequence match + def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. - assert_equal(struct.unpack('<I', seq)[-1], self.sequence) + received_seq = struct.unpack('<I', seq)[-1] + if self.sequence is None: + self.sequence = received_seq + else: + assert_equal(received_seq, self.sequence) self.sequence += 1 return body + def receive(self): + return self._receive_from_publisher_and_check() + def receive_sequence(self): - topic, body, seq = self.socket.recv_multipart() - # Topic should match the subscriber topic. - assert_equal(topic, self.topic) - # Sequence should be incremental. - assert_equal(struct.unpack('<I', seq)[-1], self.sequence) - self.sequence += 1 + body = self._receive_from_publisher_and_check() hash = body[:32].hex() label = chr(body[32]) mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0] @@ -64,6 +67,9 @@ class ZMQTest (BitcoinTestFramework): self.num_nodes = 2 if self.is_wallet_compiled(): self.requires_wallet = True + # This test isn't testing txn relay/timing, so set whitelist on the + # peers for instant txn relay. This speeds up the test run time 2-3x. + self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() @@ -84,23 +90,46 @@ class ZMQTest (BitcoinTestFramework): # Restart node with the specified zmq notifications enabled, subscribe to # all of them and return the corresponding ZMQSubscriber objects. - def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False): + def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True): subscribers = [] for topic, address in services: socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, recv_timeout*1000) subscribers.append(ZMQSubscriber(socket, topic.encode())) - self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services]) - - if connect_nodes: - self.connect_nodes(0, 1) + self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] + + self.extra_args[0]) for i, sub in enumerate(subscribers): sub.socket.connect(services[i][1]) - # Relax so that the subscribers are ready before publishing zmq messages - sleep(0.2) + # Ensure that all zmq publisher notification interfaces are ready by + # running the following "sync up" procedure: + # 1. Generate a block on the node + # 2. Try to receive a notification on all subscribers + # 3. If all subscribers get a message within the timeout (1 second), + # we are done, otherwise repeat starting from step 1 + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, 1000) + while True: + self.nodes[0].generate(1) + recv_failed = False + for sub in subscribers: + try: + sub.receive() + except zmq.error.Again: + self.log.debug("Didn't receive sync-up notification, trying again.") + recv_failed = True + if not recv_failed: + self.log.debug("ZMQ sync-up completed, all subscribers are ready.") + break + + # set subscriber's desired timeout for the test + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000) + + self.connect_nodes(0, 1) + if sync_blocks: + self.sync_blocks() return subscribers @@ -110,9 +139,7 @@ class ZMQTest (BitcoinTestFramework): self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) address = 'tcp://127.0.0.1:28332' - subs = self.setup_zmq_test( - [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]], - connect_nodes=True) + subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) hashblock = subs[0] hashtx = subs[1] @@ -189,6 +216,7 @@ class ZMQTest (BitcoinTestFramework): hashblock, hashtx = self.setup_zmq_test( [(topic, address) for topic in ["hashblock", "hashtx"]], recv_timeout=2) # 2 second timeout to check end of notifications + self.disconnect_nodes(0, 1) # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) @@ -237,6 +265,7 @@ class ZMQTest (BitcoinTestFramework): """ self.log.info("Testing 'sequence' publisher") [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) + self.disconnect_nodes(0, 1) # Mempool sequence number starts at 1 seq_num = 1 @@ -387,7 +416,7 @@ class ZMQTest (BitcoinTestFramework): return self.log.info("Testing 'mempool sync' usage of sequence notifier") - [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] @@ -487,10 +516,13 @@ class ZMQTest (BitcoinTestFramework): def test_multiple_interfaces(self): # Set up two subscribers with different addresses + # (note that after the reorg test, syncing would fail due to different + # chain lengths on node0 and node1; for this test we only need node0, so + # we can disable syncing blocks on the setup) subscribers = self.setup_zmq_test([ ("hashblock", "tcp://127.0.0.1:28334"), ("hashblock", "tcp://127.0.0.1:28335"), - ]) + ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) |