diff options
author | Sebastian Falbesoner <sebastian.falbesoner@gmail.com> | 2021-01-26 00:30:24 +0100 |
---|---|---|
committer | Sebastian Falbesoner <sebastian.falbesoner@gmail.com> | 2021-02-09 23:55:23 +0100 |
commit | 5c6546362dce8b468268578e345c37ed515a1855 (patch) | |
tree | 2fc93bf72039faa667b52575e8e87f6505bcd272 /test | |
parent | 8666033630eeaf851ec69e018bb53eb23093f4b9 (diff) |
zmq test: fix flakiness by using more robust sync method
After connecting the subscriber sockets to the node, there is no
guarantee that the node's zmq publisher interfaces are ready yet, which
means that potentially the first expected notification messages could
get lost and the test fails. Currently this is handled by just waiting
for a short period of time (200ms), which works most of the time but is
still problematic, as in some rare cases the setup time takes much
longer, even in the range of multiple seconds.
The solution in this commit approaches the problem by using a more
robust method of syncing up, originally proposed by instagibbs:
1. Generate a block on the node
2. Try to receive a notification on all subscribers
3. If all subscribers get a message within the timeout (1 second),
we are done, otherwise repeat starting from step 1
Diffstat (limited to 'test')
-rwxr-xr-x | test/functional/interface_zmq.py | 49 |
1 files changed, 37 insertions, 12 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 527126fac3..3ec23e75dc 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -87,23 +87,45 @@ class ZMQTest (BitcoinTestFramework): # Restart node with the specified zmq notifications enabled, subscribe to # all of them and return the corresponding ZMQSubscriber objects. - def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False): + def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True): subscribers = [] for topic, address in services: socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, recv_timeout*1000) subscribers.append(ZMQSubscriber(socket, topic.encode())) self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services]) - if connect_nodes: - self.connect_nodes(0, 1) - for i, sub in enumerate(subscribers): sub.socket.connect(services[i][1]) - # Relax so that the subscribers are ready before publishing zmq messages - sleep(0.2) + # Ensure that all zmq publisher notification interfaces are ready by + # running the following "sync up" procedure: + # 1. Generate a block on the node + # 2. Try to receive a notification on all subscribers + # 3. If all subscribers get a message within the timeout (1 second), + # we are done, otherwise repeat starting from step 1 + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, 1000) + while True: + self.nodes[0].generate(1) + recv_failed = False + for sub in subscribers: + try: + sub.receive() + except zmq.error.Again: + self.log.debug("Didn't receive sync-up notification, trying again.") + recv_failed = True + if not recv_failed: + self.log.debug("ZMQ sync-up completed, all subscribers are ready.") + break + + # set subscriber's desired timeout for the test + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000) + + self.connect_nodes(0, 1) + if sync_blocks: + self.sync_blocks() return subscribers @@ -113,9 +135,7 @@ class ZMQTest (BitcoinTestFramework): self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) address = 'tcp://127.0.0.1:28332' - subs = self.setup_zmq_test( - [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]], - connect_nodes=True) + subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) hashblock = subs[0] hashtx = subs[1] @@ -192,6 +212,7 @@ class ZMQTest (BitcoinTestFramework): hashblock, hashtx = self.setup_zmq_test( [(topic, address) for topic in ["hashblock", "hashtx"]], recv_timeout=2) # 2 second timeout to check end of notifications + self.disconnect_nodes(0, 1) # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) @@ -240,6 +261,7 @@ class ZMQTest (BitcoinTestFramework): """ self.log.info("Testing 'sequence' publisher") [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) + self.disconnect_nodes(0, 1) # Mempool sequence number starts at 1 seq_num = 1 @@ -390,7 +412,7 @@ class ZMQTest (BitcoinTestFramework): return self.log.info("Testing 'mempool sync' usage of sequence notifier") - [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] @@ -490,10 +512,13 @@ class ZMQTest (BitcoinTestFramework): def test_multiple_interfaces(self): # Set up two subscribers with different addresses + # (note that after the reorg test, syncing would fail due to different + # chain lengths on node0 and node1; for this test we only need node0, so + # we can disable syncing blocks on the setup) subscribers = self.setup_zmq_test([ ("hashblock", "tcp://127.0.0.1:28334"), ("hashblock", "tcp://127.0.0.1:28335"), - ]) + ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) |