diff options
author | mruddy <6440430+mruddy@users.noreply.github.com> | 2018-08-24 20:42:03 -0400 |
---|---|---|
committer | mruddy <6440430+mruddy@users.noreply.github.com> | 2018-10-19 07:36:13 -0400 |
commit | a4edb168b635b6f5c36324e44961cd42cf9bbbaa (patch) | |
tree | b3d617db74c7b1f86ee79df156139627fb7cabfe | |
parent | d387507aeca652a5569825af65243536f2ce26ea (diff) |
ZMQ: add options to configure outbound message high water mark, aka SNDHWM
-rw-r--r-- | contrib/zmq/zmq_sub.py | 1 | ||||
-rw-r--r-- | contrib/zmq/zmq_sub3.4.py | 1 | ||||
-rw-r--r-- | doc/zmq.md | 13 | ||||
-rw-r--r-- | src/init.cpp | 9 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.cpp | 1 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.h | 11 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 7 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 16 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 2 | ||||
-rw-r--r-- | src/zmq/zmqrpc.cpp | 4 | ||||
-rwxr-xr-x | test/functional/interface_zmq.py | 8 |
11 files changed, 59 insertions, 14 deletions
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 20763e935d..06893407f5 100644 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -42,6 +42,7 @@ class ZMQHandler(): self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") diff --git a/contrib/zmq/zmq_sub3.4.py b/contrib/zmq/zmq_sub3.4.py index 7e608c1a50..66fdf7887f 100644 --- a/contrib/zmq/zmq_sub3.4.py +++ b/contrib/zmq/zmq_sub3.4.py @@ -46,6 +46,7 @@ class ZMQHandler(): self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") diff --git a/doc/zmq.md b/doc/zmq.md index a1a506f2e7..7ffc5623b6 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -66,10 +66,21 @@ Currently, the following notifications are supported: The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The option to set the PUB socket's outbound message high water mark +(SNDHWM) may be set individually for each notification: + + -zmqpubhashtxhwm=n + -zmqpubhashblockhwm=n + -zmqpubrawblockhwm=n + -zmqpubrawtxhwm=n + +The high water mark value must be an integer greater than or equal to 0. + For instance: $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw + -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \ + -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the diff --git a/src/init.cpp b/src/init.cpp index f9efaf7dc1..06f9ace7ec 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -62,6 +62,7 @@ #include <openssl/crypto.h> #if ENABLE_ZMQ +#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqnotificationinterface.h> #include <zmq/zmqrpc.h> #endif @@ -418,11 +419,19 @@ void SetupServerArgs() gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashtx=<address>"); hidden_args.emplace_back("-zmqpubrawblock=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>"); + hidden_args.emplace_back("-zmqpubhashblockhwm=<n>"); + hidden_args.emplace_back("-zmqpubhashtxhwm=<n>"); + hidden_args.emplace_back("-zmqpubrawblockhwm=<n>"); + hidden_args.emplace_back("-zmqpubrawtxhwm=<n>"); #endif gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST); diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 39cc8968d2..0eeef8cf1d 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -5,6 +5,7 @@ #include <zmq/zmqabstractnotifier.h> #include <util.h> +const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; CZMQAbstractNotifier::~CZMQAbstractNotifier() { diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 5081c6cd02..887dde7b27 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); class CZMQAbstractNotifier { public: - CZMQAbstractNotifier() : psocket(nullptr) { } + static const int DEFAULT_ZMQ_SNDHWM {1000}; + + CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { } virtual ~CZMQAbstractNotifier(); template <typename T> @@ -28,6 +30,12 @@ public: void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } + int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; } + void SetOutboundMessageHighWaterMark(const int sndhwm) { + if (sndhwm >= 0) { + outbound_message_high_water_mark = sndhwm; + } + } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; @@ -39,6 +47,7 @@ protected: void *psocket; std::string type; std::string address; + int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 1d9f86d450..680e38bd77 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() CZMQAbstractNotifier *notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); + notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(notifier); } } @@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize() CZMQAbstractNotifier *notifier = *i; if (notifier->Initialize(pcontext)) { - LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { - LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); break; } } @@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown() for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; - LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 36a6458f67..280cd1642e 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } - int rc = zmq_bind(psocket, address.c_str()); - if (rc!=0) + LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); + + int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); + if (rc != 0) + { + zmqError("Failed to set outbound message high water mark"); + zmq_close(psocket); + return false; + } + + rc = zmq_bind(psocket, address.c_str()); + if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); @@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown() if (count == 1) { - LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); + LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 0f5b43d726..278fdb94d2 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -12,7 +12,7 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { private: - uint32_t nSequence; //!< upcounting per message sequence number + uint32_t nSequence {0U}; //!< upcounting per message sequence number public: diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp index 4f88bf4eb9..0e0b745375 100644 --- a/src/zmq/zmqrpc.cpp +++ b/src/zmq/zmqrpc.cpp @@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) "[\n" " { (json object)\n" " \"type\": \"pubhashtx\", (string) Type of notification\n" - " \"address\": \"...\" (string) Address of the publisher\n" + " \"address\": \"...\", (string) Address of the publisher\n" + " \"hwm\": n (numeric) Outbound message high water mark\n" " },\n" " ...\n" "]\n" @@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) UniValue obj(UniValue::VOBJ); obj.pushKV("type", n->GetType()); obj.pushKV("address", n->GetAddress()); + obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark()); result.push_back(obj); } } diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 48136a0108..037cdaf38d 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -121,10 +121,10 @@ class ZMQTest (BitcoinTestFramework): self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ - {"type": "pubhashblock", "address": ADDRESS}, - {"type": "pubhashtx", "address": ADDRESS}, - {"type": "pubrawblock", "address": ADDRESS}, - {"type": "pubrawtx", "address": ADDRESS}, + {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, ]) assert_equal(self.nodes[1].getzmqnotifications(), []) |