From 347c94f551c3f144c44e00373e4dd61ff6d908b7 Mon Sep 17 00:00:00 2001 From: Nicolas Thumann Date: Tue, 10 Mar 2020 16:29:45 +0100 Subject: zmq: Add support to listen on multiple interfaces --- src/zmq/zmqnotificationinterface.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a7e9a34269..a2f994d7df 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -42,10 +42,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() for (const auto& entry : factories) { std::string arg("-zmq" + entry.first); - if (gArgs.IsArgSet(arg)) - { - const auto& factory = entry.second; - const std::string address = gArgs.GetArg(arg, ""); + const auto& factory = entry.second; + for (const std::string& address : gArgs.GetArgs(arg)) { std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); -- cgit v1.2.3 From b1c3f180ecb63f3960506d202feebaa4271058ae Mon Sep 17 00:00:00 2001 From: nthumann Date: Wed, 11 Mar 2020 00:13:02 +0100 Subject: doc: Adjust ZMQ usage to support multiple interfaces --- doc/zmq.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/zmq.md b/doc/zmq.md index f003c90d3a..85f3370130 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -67,6 +67,7 @@ Currently, the following notifications are supported: The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The same notification can be specified more than once. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: @@ -82,6 +83,7 @@ The high water mark value must be an integer greater than or equal to 0. For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ + -zmqpubhashtx=tcp://192.168.1.2:28332 \ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ -zmqpubhashtxhwm=10000 -- cgit v1.2.3 From a0b2e5cb6aa8db0563fac7d67a949b9baefe3a25 Mon Sep 17 00:00:00 2001 From: nthumann Date: Wed, 11 Mar 2020 00:29:34 +0100 Subject: doc: Add release notes to support multiple interfaces --- doc/release-notes-18309.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 doc/release-notes-18309.md diff --git a/doc/release-notes-18309.md b/doc/release-notes-18309.md new file mode 100644 index 0000000000..b31f85eb6e --- /dev/null +++ b/doc/release-notes-18309.md @@ -0,0 +1,4 @@ +Command-line options +----------------------------- + +The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets. \ No newline at end of file -- cgit v1.2.3 From 241803da211265444e65f254f24dd184f2457fa9 Mon Sep 17 00:00:00 2001 From: nthumann Date: Wed, 11 Mar 2020 19:09:05 +0100 Subject: test: Add zmq test to support multiple interfaces --- test/functional/interface_zmq.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 17032a3b83..a0bc937f75 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -75,6 +75,7 @@ class ZMQTest (BitcoinTestFramework): self.test_sequence() self.test_mempool_sync() self.test_reorg() + self.test_multiple_interfaces() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -506,5 +507,28 @@ class ZMQTest (BitcoinTestFramework): self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + def test_multiple_interfaces(self): + # Set up two subscribers with different addresses + subscribers = [] + for i in range(2): + address = 'tcp://127.0.0.1:%d' % (28334 + i) + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + hashblock = ZMQSubscriber(socket, b"hashblock") + socket.connect(address) + subscribers.append({'address': address, 'hashblock': hashblock}) + + self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers]) + + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Generate 1 block in nodes[0] and receive all notifications + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + + # Should receive the same block hash on both subscribers + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) + if __name__ == '__main__': ZMQTest().main() -- cgit v1.2.3 From e66870c5a4c2adbd30dca67d409fd5cd98697587 Mon Sep 17 00:00:00 2001 From: nthumann Date: Wed, 11 Mar 2020 19:48:01 +0100 Subject: zmq: Append address to notify log output --- src/zmq/zmqpublishnotifier.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index a0e7a0a600..c0207f9dd6 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -180,7 +180,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -190,7 +190,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -199,7 +199,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); @@ -221,7 +221,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); @@ -232,7 +232,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); char data[sizeof(uint256)+1]; for (unsigned int i = 0; i < sizeof(uint256); i++) data[sizeof(uint256) - 1 - i] = hash.begin()[i]; @@ -243,7 +243,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); char data[sizeof(uint256)+1]; for (unsigned int i = 0; i < sizeof(uint256); i++) data[sizeof(uint256) - 1 - i] = hash.begin()[i]; @@ -254,7 +254,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pinde bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; for (unsigned int i = 0; i < sizeof(uint256); i++) data[sizeof(uint256) - 1 - i] = hash.begin()[i]; @@ -266,7 +266,7 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; for (unsigned int i = 0; i < sizeof(uint256); i++) data[sizeof(uint256) - 1 - i] = hash.begin()[i]; -- cgit v1.2.3