aboutsummaryrefslogtreecommitdiff
path: root/test/functional/interface_zmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/interface_zmq.py')
-rwxr-xr-xtest/functional/interface_zmq.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
new file mode 100755
index 0000000000..86ccea4394
--- /dev/null
+++ b/test/functional/interface_zmq.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python3
+# Copyright (c) 2015-2017 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+"""Test the ZMQ notification interface."""
+import configparser
+import os
+import struct
+
+from test_framework.test_framework import BitcoinTestFramework, SkipTest
+from test_framework.mininode import CTransaction
+from test_framework.util import (assert_equal,
+ bytes_to_hex_str,
+ hash256,
+ )
+from io import BytesIO
+
+class ZMQSubscriber:
+ def __init__(self, socket, topic):
+ self.sequence = 0
+ self.socket = socket
+ self.topic = topic
+
+ import zmq
+ self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
+
+ def receive(self):
+ topic, body, seq = self.socket.recv_multipart()
+ # Topic should match the subscriber topic.
+ assert_equal(topic, self.topic)
+ # Sequence should be incremental.
+ assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
+ self.sequence += 1
+ return body
+
+
+class ZMQTest (BitcoinTestFramework):
+ def set_test_params(self):
+ self.num_nodes = 2
+
+ def setup_nodes(self):
+ # Try to import python3-zmq. Skip this test if the import fails.
+ try:
+ import zmq
+ except ImportError:
+ raise SkipTest("python3-zmq module not available.")
+
+ # Check that bitcoin has been built with ZMQ enabled.
+ config = configparser.ConfigParser()
+ if not self.options.configfile:
+ self.options.configfile = os.path.abspath(os.path.join(os.path.dirname(__file__), "../config.ini"))
+ config.read_file(open(self.options.configfile))
+
+ if not config["components"].getboolean("ENABLE_ZMQ"):
+ raise SkipTest("bitcoind has not been built with zmq enabled.")
+
+ # Initialize ZMQ context and socket.
+ # All messages are received in the same socket which means
+ # that this test fails if the publishing order changes.
+ # Note that the publishing order is not defined in the documentation and
+ # is subject to change.
+ address = "tcp://127.0.0.1:28332"
+ self.zmq_context = zmq.Context()
+ socket = self.zmq_context.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ socket.connect(address)
+
+ # Subscribe to all available topics.
+ self.hashblock = ZMQSubscriber(socket, b"hashblock")
+ self.hashtx = ZMQSubscriber(socket, b"hashtx")
+ self.rawblock = ZMQSubscriber(socket, b"rawblock")
+ self.rawtx = ZMQSubscriber(socket, b"rawtx")
+
+ self.extra_args = [["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]], []]
+ self.add_nodes(self.num_nodes, self.extra_args)
+ self.start_nodes()
+
+ def run_test(self):
+ try:
+ self._zmq_test()
+ finally:
+ # Destroy the ZMQ context.
+ self.log.debug("Destroying ZMQ context")
+ self.zmq_context.destroy(linger=None)
+
+ def _zmq_test(self):
+ num_blocks = 5
+ self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
+ genhashes = self.nodes[0].generate(num_blocks)
+ self.sync_all()
+
+ for x in range(num_blocks):
+ # Should receive the coinbase txid.
+ txid = self.hashtx.receive()
+
+ # Should receive the coinbase raw transaction.
+ hex = self.rawtx.receive()
+ tx = CTransaction()
+ tx.deserialize(BytesIO(hex))
+ tx.calc_sha256()
+ assert_equal(tx.hash, bytes_to_hex_str(txid))
+
+ # Should receive the generated block hash.
+ hash = bytes_to_hex_str(self.hashblock.receive())
+ assert_equal(genhashes[x], hash)
+ # The block should only have the coinbase txid.
+ assert_equal([bytes_to_hex_str(txid)], self.nodes[1].getblock(hash)["tx"])
+
+ # Should receive the generated raw block.
+ block = self.rawblock.receive()
+ assert_equal(genhashes[x], bytes_to_hex_str(hash256(block[:80])))
+
+ self.log.info("Wait for tx from second node")
+ payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
+ self.sync_all()
+
+ # Should receive the broadcasted txid.
+ txid = self.hashtx.receive()
+ assert_equal(payment_txid, bytes_to_hex_str(txid))
+
+ # Should receive the broadcasted raw transaction.
+ hex = self.rawtx.receive()
+ assert_equal(payment_txid, bytes_to_hex_str(hash256(hex)))
+
+if __name__ == '__main__':
+ ZMQTest().main()