aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorSebastian Falbesoner <sebastian.falbesoner@gmail.com>2021-01-26 00:30:24 +0100
committerSebastian Falbesoner <sebastian.falbesoner@gmail.com>2021-02-09 23:55:23 +0100
commit5c6546362dce8b468268578e345c37ed515a1855 (patch)
tree2fc93bf72039faa667b52575e8e87f6505bcd272 /test
parent8666033630eeaf851ec69e018bb53eb23093f4b9 (diff)
zmq test: fix flakiness by using more robust sync method
After connecting the subscriber sockets to the node, there is no guarantee that the node's zmq publisher interfaces are ready yet, which means that potentially the first expected notification messages could get lost and the test fails. Currently this is handled by just waiting for a short period of time (200ms), which works most of the time but is still problematic, as in some rare cases the setup time takes much longer, even in the range of multiple seconds. The solution in this commit approaches the problem by using a more robust method of syncing up, originally proposed by instagibbs: 1. Generate a block on the node 2. Try to receive a notification on all subscribers 3. If all subscribers get a message within the timeout (1 second), we are done, otherwise repeat starting from step 1
Diffstat (limited to 'test')
-rwxr-xr-xtest/functional/interface_zmq.py49
1 files changed, 37 insertions, 12 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index 527126fac3..3ec23e75dc 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -87,23 +87,45 @@ class ZMQTest (BitcoinTestFramework):
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
- def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
+ def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
- socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
- if connect_nodes:
- self.connect_nodes(0, 1)
-
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
- # Relax so that the subscribers are ready before publishing zmq messages
- sleep(0.2)
+ # Ensure that all zmq publisher notification interfaces are ready by
+ # running the following "sync up" procedure:
+ # 1. Generate a block on the node
+ # 2. Try to receive a notification on all subscribers
+ # 3. If all subscribers get a message within the timeout (1 second),
+ # we are done, otherwise repeat starting from step 1
+ for sub in subscribers:
+ sub.socket.set(zmq.RCVTIMEO, 1000)
+ while True:
+ self.nodes[0].generate(1)
+ recv_failed = False
+ for sub in subscribers:
+ try:
+ sub.receive()
+ except zmq.error.Again:
+ self.log.debug("Didn't receive sync-up notification, trying again.")
+ recv_failed = True
+ if not recv_failed:
+ self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
+ break
+
+ # set subscriber's desired timeout for the test
+ for sub in subscribers:
+ sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
+
+ self.connect_nodes(0, 1)
+ if sync_blocks:
+ self.sync_blocks()
return subscribers
@@ -113,9 +135,7 @@ class ZMQTest (BitcoinTestFramework):
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = 'tcp://127.0.0.1:28332'
- subs = self.setup_zmq_test(
- [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
- connect_nodes=True)
+ subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
hashblock = subs[0]
hashtx = subs[1]
@@ -192,6 +212,7 @@ class ZMQTest (BitcoinTestFramework):
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
+ self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@@ -240,6 +261,7 @@ class ZMQTest (BitcoinTestFramework):
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
+ self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
@@ -390,7 +412,7 @@ class ZMQTest (BitcoinTestFramework):
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
- [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
+ [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@@ -490,10 +512,13 @@ class ZMQTest (BitcoinTestFramework):
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
+ # (note that after the reorg test, syncing would fail due to different
+ # chain lengths on node0 and node1; for this test we only need node0, so
+ # we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
- ])
+ ], sync_blocks=False)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)