aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtest/functional/interface_zmq.py92
1 files changed, 70 insertions, 22 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index 89c55f31f3..ef4780cacb 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -54,28 +54,31 @@ class ZMQTest (BitcoinTestFramework):
self.ctx.destroy(linger=None)
def test_basic(self):
- # All messages are received in the same socket which means
- # that this test fails if the publishing order changes.
- # Note that the publishing order is not defined in the documentation and
- # is subject to change.
import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = 'tcp://127.0.0.1:28332'
- socket = self.ctx.socket(zmq.SUB)
- socket.set(zmq.RCVTIMEO, 60000)
+ sockets = []
+ subs = []
+ services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"]
+ for service in services:
+ sockets.append(self.ctx.socket(zmq.SUB))
+ sockets[-1].set(zmq.RCVTIMEO, 60000)
+ subs.append(ZMQSubscriber(sockets[-1], service))
# Subscribe to all available topics.
- hashblock = ZMQSubscriber(socket, b"hashblock")
- hashtx = ZMQSubscriber(socket, b"hashtx")
- rawblock = ZMQSubscriber(socket, b"rawblock")
- rawtx = ZMQSubscriber(socket, b"rawtx")
+ hashblock = subs[0]
+ hashtx = subs[1]
+ rawblock = subs[2]
+ rawtx = subs[3]
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
connect_nodes(self.nodes[0], 1)
- socket.connect(address)
+ for socket in sockets:
+ socket.connect(address)
+
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
@@ -96,15 +99,16 @@ class ZMQTest (BitcoinTestFramework):
tx.calc_sha256()
assert_equal(tx.hash, txid.hex())
+ # Should receive the generated raw block.
+ block = rawblock.receive()
+ assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
+
# Should receive the generated block hash.
hash = hashblock.receive().hex()
assert_equal(genhashes[x], hash)
# The block should only have the coinbase txid.
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
- # Should receive the generated raw block.
- block = rawblock.receive()
- assert_equal(genhashes[x], hash256_reversed(block[:80]).hex())
if self.is_wallet_compiled():
self.log.info("Wait for tx from second node")
@@ -119,6 +123,13 @@ class ZMQTest (BitcoinTestFramework):
hex = rawtx.receive()
assert_equal(payment_txid, hash256_reversed(hex).hex())
+ # Mining the block with this tx should result in second notification
+ # after coinbase tx notification
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ hashtx.receive()
+ txid = hashtx.receive()
+ assert_equal(payment_txid, txid.hex())
+
self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [
@@ -131,30 +142,67 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(self.nodes[1].getzmqnotifications(), [])
def test_reorg(self):
+ if not self.is_wallet_compiled():
+ self.log.info("Skipping reorg test because wallet is disabled")
+ return
+
import zmq
address = 'tcp://127.0.0.1:28333'
- socket = self.ctx.socket(zmq.SUB)
- socket.set(zmq.RCVTIMEO, 60000)
- hashblock = ZMQSubscriber(socket, b'hashblock')
+
+ services = [b"hashblock", b"hashtx"]
+ sockets = []
+ subs = []
+ for service in services:
+ sockets.append(self.ctx.socket(zmq.SUB))
+ # 2 second timeout to check end of notifications
+ sockets[-1].set(zmq.RCVTIMEO, 2000)
+ subs.append(ZMQSubscriber(sockets[-1], service))
+
+ # Subscribe to all available topics.
+ hashblock = subs[0]
+ hashtx = subs[1]
# Should only notify the tip if a reorg occurs
- self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)])
- socket.connect(address)
+ self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]])
+ for socket in sockets:
+ socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
- # Generate 1 block in nodes[0] and receive all notifications
- self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
+ payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
+ disconnect_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
+ disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
+ assert_equal(hashtx.receive().hex(), payment_txid)
+ assert_equal(hashtx.receive().hex(), disconnect_cb)
# Generate 2 blocks in nodes[1]
- self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
+ connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
+ self.sync_blocks() # tx in mempool valid but not advertised
# Should receive nodes[1] tip
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
+ # During reorg:
+ # Get old payment transaction notification from disconnect and disconnected cb
+ assert_equal(hashtx.receive().hex(), payment_txid)
+ assert_equal(hashtx.receive().hex(), disconnect_cb)
+ # And the payment transaction again due to mempool entry
+ assert_equal(hashtx.receive().hex(), payment_txid)
+ assert_equal(hashtx.receive().hex(), payment_txid)
+ # And the new connected coinbases
+ for i in [0, 1]:
+ assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
+
+ # If we do a simple invalidate we announce the disconnected coinbase
+ self.nodes[0].invalidateblock(connect_blocks[1])
+ assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
+ # And the current tip
+ assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
+
if __name__ == '__main__':
ZMQTest().main()