diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/init.cpp | 9 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.cpp | 1 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.h | 11 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 7 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 16 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 2 | ||||
-rw-r--r-- | src/zmq/zmqrpc.cpp | 4 |
7 files changed, 41 insertions, 9 deletions
diff --git a/src/init.cpp b/src/init.cpp index 90dfeb3604..d54d4a8782 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -62,6 +62,7 @@ #include <openssl/crypto.h> #if ENABLE_ZMQ +#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqnotificationinterface.h> #include <zmq/zmqrpc.h> #endif @@ -418,11 +419,19 @@ void SetupServerArgs() gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashtx=<address>"); hidden_args.emplace_back("-zmqpubrawblock=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>"); + hidden_args.emplace_back("-zmqpubhashblockhwm=<n>"); + hidden_args.emplace_back("-zmqpubhashtxhwm=<n>"); + hidden_args.emplace_back("-zmqpubrawblockhwm=<n>"); + hidden_args.emplace_back("-zmqpubrawtxhwm=<n>"); #endif gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST); diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 82e5ee5cc9..6a9661e3e8 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -5,6 +5,7 @@ #include <zmq/zmqabstractnotifier.h> #include <util/system.h> +const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; CZMQAbstractNotifier::~CZMQAbstractNotifier() { diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 5081c6cd02..887dde7b27 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); class CZMQAbstractNotifier { public: - CZMQAbstractNotifier() : psocket(nullptr) { } + static const int DEFAULT_ZMQ_SNDHWM {1000}; + + CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { } virtual ~CZMQAbstractNotifier(); template <typename T> @@ -28,6 +30,12 @@ public: void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } + int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; } + void SetOutboundMessageHighWaterMark(const int sndhwm) { + if (sndhwm >= 0) { + outbound_message_high_water_mark = sndhwm; + } + } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; @@ -39,6 +47,7 @@ protected: void *psocket; std::string type; std::string address; + int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index df4e51ca20..6826cf62d6 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() CZMQAbstractNotifier *notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); + notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(notifier); } } @@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize() CZMQAbstractNotifier *notifier = *i; if (notifier->Initialize(pcontext)) { - LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { - LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); break; } } @@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown() for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; - LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 718597e57c..15d4ac1b89 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } - int rc = zmq_bind(psocket, address.c_str()); - if (rc!=0) + LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); + + int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); + if (rc != 0) + { + zmqError("Failed to set outbound message high water mark"); + zmq_close(psocket); + return false; + } + + rc = zmq_bind(psocket, address.c_str()); + if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); @@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown() if (count == 1) { - LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); + LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 0f5b43d726..278fdb94d2 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -12,7 +12,7 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { private: - uint32_t nSequence; //!< upcounting per message sequence number + uint32_t nSequence {0U}; //!< upcounting per message sequence number public: diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp index 4f88bf4eb9..0e0b745375 100644 --- a/src/zmq/zmqrpc.cpp +++ b/src/zmq/zmqrpc.cpp @@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) "[\n" " { (json object)\n" " \"type\": \"pubhashtx\", (string) Type of notification\n" - " \"address\": \"...\" (string) Address of the publisher\n" + " \"address\": \"...\", (string) Address of the publisher\n" + " \"hwm\": n (numeric) Outbound message high water mark\n" " },\n" " ...\n" "]\n" @@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) UniValue obj(UniValue::VOBJ); obj.pushKV("type", n->GetType()); obj.pushKV("address", n->GetAddress()); + obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark()); result.push_back(obj); } } |