From 6014d6e1b5a0dda6e20c2721f0bdb7e6a63ece81 Mon Sep 17 00:00:00 2001 From: Sebastian Falbesoner Date: Sat, 23 Jan 2021 22:19:15 +0100 Subject: zmq test: dedup message reception handling in ZMQSubscriber --- test/functional/interface_zmq.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'test') diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index e9f61be4d4..4c23c4d30c 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -33,7 +33,8 @@ class ZMQSubscriber: self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) - def receive(self): + # Receive message from publisher and verify that topic and sequence match + def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) @@ -42,13 +43,11 @@ class ZMQSubscriber: self.sequence += 1 return body + def receive(self): + return self._receive_from_publisher_and_check() + def receive_sequence(self): - topic, body, seq = self.socket.recv_multipart() - # Topic should match the subscriber topic. - assert_equal(topic, self.topic) - # Sequence should be incremental. - assert_equal(struct.unpack(' Date: Sat, 23 Jan 2021 22:33:48 +0100 Subject: zmq test: accept arbitrary sequence start number in ZMQSubscriber The ZMQSubscriber reception methods currently assert that the first received publisher message has a sequence number of zero. In order to fix the current test flakiness via "syncing up" to nodes in the setup phase, we have to cope with the situation that messages get lost and the first actual received message has a sequence number larger than zero. --- test/functional/interface_zmq.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'test') diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 4c23c4d30c..527126fac3 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -27,7 +27,7 @@ def hash256_reversed(byte_str): class ZMQSubscriber: def __init__(self, socket, topic): - self.sequence = 0 + self.sequence = None # no sequence number received yet self.socket = socket self.topic = topic @@ -39,7 +39,11 @@ class ZMQSubscriber: # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. - assert_equal(struct.unpack(' Date: Tue, 26 Jan 2021 00:30:24 +0100 Subject: zmq test: fix flakiness by using more robust sync method After connecting the subscriber sockets to the node, there is no guarantee that the node's zmq publisher interfaces are ready yet, which means that potentially the first expected notification messages could get lost and the test fails. Currently this is handled by just waiting for a short period of time (200ms), which works most of the time but is still problematic, as in some rare cases the setup time takes much longer, even in the range of multiple seconds. The solution in this commit approaches the problem by using a more robust method of syncing up, originally proposed by instagibbs: 1. Generate a block on the node 2. Try to receive a notification on all subscribers 3. If all subscribers get a message within the timeout (1 second), we are done, otherwise repeat starting from step 1 --- test/functional/interface_zmq.py | 49 ++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 12 deletions(-) (limited to 'test') diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 527126fac3..3ec23e75dc 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -87,23 +87,45 @@ class ZMQTest (BitcoinTestFramework): # Restart node with the specified zmq notifications enabled, subscribe to # all of them and return the corresponding ZMQSubscriber objects. - def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False): + def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True): subscribers = [] for topic, address in services: socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, recv_timeout*1000) subscribers.append(ZMQSubscriber(socket, topic.encode())) self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services]) - if connect_nodes: - self.connect_nodes(0, 1) - for i, sub in enumerate(subscribers): sub.socket.connect(services[i][1]) - # Relax so that the subscribers are ready before publishing zmq messages - sleep(0.2) + # Ensure that all zmq publisher notification interfaces are ready by + # running the following "sync up" procedure: + # 1. Generate a block on the node + # 2. Try to receive a notification on all subscribers + # 3. If all subscribers get a message within the timeout (1 second), + # we are done, otherwise repeat starting from step 1 + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, 1000) + while True: + self.nodes[0].generate(1) + recv_failed = False + for sub in subscribers: + try: + sub.receive() + except zmq.error.Again: + self.log.debug("Didn't receive sync-up notification, trying again.") + recv_failed = True + if not recv_failed: + self.log.debug("ZMQ sync-up completed, all subscribers are ready.") + break + + # set subscriber's desired timeout for the test + for sub in subscribers: + sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000) + + self.connect_nodes(0, 1) + if sync_blocks: + self.sync_blocks() return subscribers @@ -113,9 +135,7 @@ class ZMQTest (BitcoinTestFramework): self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) address = 'tcp://127.0.0.1:28332' - subs = self.setup_zmq_test( - [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]], - connect_nodes=True) + subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) hashblock = subs[0] hashtx = subs[1] @@ -192,6 +212,7 @@ class ZMQTest (BitcoinTestFramework): hashblock, hashtx = self.setup_zmq_test( [(topic, address) for topic in ["hashblock", "hashtx"]], recv_timeout=2) # 2 second timeout to check end of notifications + self.disconnect_nodes(0, 1) # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) @@ -240,6 +261,7 @@ class ZMQTest (BitcoinTestFramework): """ self.log.info("Testing 'sequence' publisher") [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) + self.disconnect_nodes(0, 1) # Mempool sequence number starts at 1 seq_num = 1 @@ -390,7 +412,7 @@ class ZMQTest (BitcoinTestFramework): return self.log.info("Testing 'mempool sync' usage of sequence notifier") - [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] @@ -490,10 +512,13 @@ class ZMQTest (BitcoinTestFramework): def test_multiple_interfaces(self): # Set up two subscribers with different addresses + # (note that after the reorg test, syncing would fail due to different + # chain lengths on node0 and node1; for this test we only need node0, so + # we can disable syncing blocks on the setup) subscribers = self.setup_zmq_test([ ("hashblock", "tcp://127.0.0.1:28334"), ("hashblock", "tcp://127.0.0.1:28335"), - ]) + ], sync_blocks=False) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) -- cgit v1.2.3 From ef21fb7313005a8a2d4f03fb4056f1f66c1b04f0 Mon Sep 17 00:00:00 2001 From: Sebastian Falbesoner Date: Tue, 26 Jan 2021 00:52:17 +0100 Subject: zmq test: speedup test by whitelisting peers (immediate tx relay) Speeds up the zmq test roughly by a factor of 2x (~20 sec. instead of ~40 sec.) and also avoids timeouts on the synchronization methods (sync_mempool() / sync_blocks()) that happened with a slight chance. This is due to the fact that there is no upper bound on the trickle relay time, so even the default of 60s is sometimes too low. Fixed by enabling immediate tx relay on node1. --- test/functional/interface_zmq.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 3ec23e75dc..d0967a9340 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -67,6 +67,9 @@ class ZMQTest (BitcoinTestFramework): self.num_nodes = 2 if self.is_wallet_compiled(): self.requires_wallet = True + # This test isn't testing txn relay/timing, so set whitelist on the + # peers for instant txn relay. This speeds up the test run time 2-3x. + self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() @@ -93,7 +96,8 @@ class ZMQTest (BitcoinTestFramework): socket = self.ctx.socket(zmq.SUB) subscribers.append(ZMQSubscriber(socket, topic.encode())) - self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services]) + self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] + + self.extra_args[0]) for i, sub in enumerate(subscribers): sub.socket.connect(services[i][1]) -- cgit v1.2.3