diff options
-rwxr-xr-x | test/functional/interface_zmq.py | 92 |
1 files changed, 70 insertions, 22 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 89c55f31f3..ef4780cacb 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -54,28 +54,31 @@ class ZMQTest (BitcoinTestFramework): self.ctx.destroy(linger=None) def test_basic(self): - # All messages are received in the same socket which means - # that this test fails if the publishing order changes. - # Note that the publishing order is not defined in the documentation and - # is subject to change. import zmq # Invalid zmq arguments don't take down the node, see #17185. self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) address = 'tcp://127.0.0.1:28332' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) + sockets = [] + subs = [] + services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"] + for service in services: + sockets.append(self.ctx.socket(zmq.SUB)) + sockets[-1].set(zmq.RCVTIMEO, 60000) + subs.append(ZMQSubscriber(sockets[-1], service)) # Subscribe to all available topics. - hashblock = ZMQSubscriber(socket, b"hashblock") - hashtx = ZMQSubscriber(socket, b"hashtx") - rawblock = ZMQSubscriber(socket, b"rawblock") - rawtx = ZMQSubscriber(socket, b"rawtx") + hashblock = subs[0] + hashtx = subs[1] + rawblock = subs[2] + rawtx = subs[3] self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]]) connect_nodes(self.nodes[0], 1) - socket.connect(address) + for socket in sockets: + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) @@ -96,15 +99,16 @@ class ZMQTest (BitcoinTestFramework): tx.calc_sha256() assert_equal(tx.hash, txid.hex()) + # Should receive the generated raw block. + block = rawblock.receive() + assert_equal(genhashes[x], hash256_reversed(block[:80]).hex()) + # Should receive the generated block hash. hash = hashblock.receive().hex() assert_equal(genhashes[x], hash) # The block should only have the coinbase txid. assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) - # Should receive the generated raw block. - block = rawblock.receive() - assert_equal(genhashes[x], hash256_reversed(block[:80]).hex()) if self.is_wallet_compiled(): self.log.info("Wait for tx from second node") @@ -119,6 +123,13 @@ class ZMQTest (BitcoinTestFramework): hex = rawtx.receive() assert_equal(payment_txid, hash256_reversed(hex).hex()) + # Mining the block with this tx should result in second notification + # after coinbase tx notification + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + hashtx.receive() + txid = hashtx.receive() + assert_equal(payment_txid, txid.hex()) + self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ @@ -131,30 +142,67 @@ class ZMQTest (BitcoinTestFramework): assert_equal(self.nodes[1].getzmqnotifications(), []) def test_reorg(self): + if not self.is_wallet_compiled(): + self.log.info("Skipping reorg test because wallet is disabled") + return + import zmq address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - hashblock = ZMQSubscriber(socket, b'hashblock') + + services = [b"hashblock", b"hashtx"] + sockets = [] + subs = [] + for service in services: + sockets.append(self.ctx.socket(zmq.SUB)) + # 2 second timeout to check end of notifications + sockets[-1].set(zmq.RCVTIMEO, 2000) + subs.append(ZMQSubscriber(sockets[-1], service)) + + # Subscribe to all available topics. + hashblock = subs[0] + hashtx = subs[1] # Should only notify the tip if a reorg occurs - self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)]) - socket.connect(address) + self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]]) + for socket in sockets: + socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) - # Generate 1 block in nodes[0] and receive all notifications - self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications + payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) + disconnect_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0] assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) + assert_equal(hashtx.receive().hex(), payment_txid) + assert_equal(hashtx.receive().hex(), disconnect_cb) # Generate 2 blocks in nodes[1] - self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE) + connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE) # nodes[0] will reorg chain after connecting back nodes[1] connect_nodes(self.nodes[0], 1) + self.sync_blocks() # tx in mempool valid but not advertised # Should receive nodes[1] tip assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) + # During reorg: + # Get old payment transaction notification from disconnect and disconnected cb + assert_equal(hashtx.receive().hex(), payment_txid) + assert_equal(hashtx.receive().hex(), disconnect_cb) + # And the payment transaction again due to mempool entry + assert_equal(hashtx.receive().hex(), payment_txid) + assert_equal(hashtx.receive().hex(), payment_txid) + # And the new connected coinbases + for i in [0, 1]: + assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0]) + + # If we do a simple invalidate we announce the disconnected coinbase + self.nodes[0].invalidateblock(connect_blocks[1]) + assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0]) + # And the current tip + assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]) + if __name__ == '__main__': ZMQTest().main() |