aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormruddy <6440430+mruddy@users.noreply.github.com>2018-08-24 20:42:03 -0400
committermruddy <6440430+mruddy@users.noreply.github.com>2018-10-19 07:36:13 -0400
commita4edb168b635b6f5c36324e44961cd42cf9bbbaa (patch)
treeb3d617db74c7b1f86ee79df156139627fb7cabfe
parentd387507aeca652a5569825af65243536f2ce26ea (diff)
ZMQ: add options to configure outbound message high water mark, aka SNDHWM
-rw-r--r--contrib/zmq/zmq_sub.py1
-rw-r--r--contrib/zmq/zmq_sub3.4.py1
-rw-r--r--doc/zmq.md13
-rw-r--r--src/init.cpp9
-rw-r--r--src/zmq/zmqabstractnotifier.cpp1
-rw-r--r--src/zmq/zmqabstractnotifier.h11
-rw-r--r--src/zmq/zmqnotificationinterface.cpp7
-rw-r--r--src/zmq/zmqpublishnotifier.cpp16
-rw-r--r--src/zmq/zmqpublishnotifier.h2
-rw-r--r--src/zmq/zmqrpc.cpp4
-rwxr-xr-xtest/functional/interface_zmq.py8
11 files changed, 59 insertions, 14 deletions
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
index 20763e935d..06893407f5 100644
--- a/contrib/zmq/zmq_sub.py
+++ b/contrib/zmq/zmq_sub.py
@@ -42,6 +42,7 @@ class ZMQHandler():
self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
+ self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
diff --git a/contrib/zmq/zmq_sub3.4.py b/contrib/zmq/zmq_sub3.4.py
index 7e608c1a50..66fdf7887f 100644
--- a/contrib/zmq/zmq_sub3.4.py
+++ b/contrib/zmq/zmq_sub3.4.py
@@ -46,6 +46,7 @@ class ZMQHandler():
self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
+ self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
diff --git a/doc/zmq.md b/doc/zmq.md
index a1a506f2e7..7ffc5623b6 100644
--- a/doc/zmq.md
+++ b/doc/zmq.md
@@ -66,10 +66,21 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
+The option to set the PUB socket's outbound message high water mark
+(SNDHWM) may be set individually for each notification:
+
+ -zmqpubhashtxhwm=n
+ -zmqpubhashblockhwm=n
+ -zmqpubrawblockhwm=n
+ -zmqpubrawtxhwm=n
+
+The high water mark value must be an integer greater than or equal to 0.
+
For instance:
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
- -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
+ -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
+ -zmqpubhashtxhwm=10000
Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
diff --git a/src/init.cpp b/src/init.cpp
index f9efaf7dc1..06f9ace7ec 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -62,6 +62,7 @@
#include <openssl/crypto.h>
#if ENABLE_ZMQ
+#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqrpc.h>
#endif
@@ -418,11 +419,19 @@ void SetupServerArgs()
gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ);
+ gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
+ gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
+ gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
+ gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
+ hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
+ hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
+ hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
+ hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
#endif
gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST);
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
index 39cc8968d2..0eeef8cf1d 100644
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -5,6 +5,7 @@
#include <zmq/zmqabstractnotifier.h>
#include <util.h>
+const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index 5081c6cd02..887dde7b27 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
- CZMQAbstractNotifier() : psocket(nullptr) { }
+ static const int DEFAULT_ZMQ_SNDHWM {1000};
+
+ CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier();
template <typename T>
@@ -28,6 +30,12 @@ public:
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
+ int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
+ void SetOutboundMessageHighWaterMark(const int sndhwm) {
+ if (sndhwm >= 0) {
+ outbound_message_high_water_mark = sndhwm;
+ }
+ }
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
@@ -39,6 +47,7 @@ protected:
void *psocket;
std::string type;
std::string address;
+ int outbound_message_high_water_mark; // aka SNDHWM
};
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index 1d9f86d450..680e38bd77 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
+ notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier);
}
}
@@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize()
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
- LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
- LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
@@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown()
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
- LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_term(pcontext);
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index 36a6458f67..280cd1642e 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}
- int rc = zmq_bind(psocket, address.c_str());
- if (rc!=0)
+ LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
+
+ int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
+ if (rc != 0)
+ {
+ zmqError("Failed to set outbound message high water mark");
+ zmq_close(psocket);
+ return false;
+ }
+
+ rc = zmq_bind(psocket, address.c_str());
+ if (rc != 0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
@@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1)
{
- LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
+ LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index 0f5b43d726..278fdb94d2 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -12,7 +12,7 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
- uint32_t nSequence; //!< upcounting per message sequence number
+ uint32_t nSequence {0U}; //!< upcounting per message sequence number
public:
diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp
index 4f88bf4eb9..0e0b745375 100644
--- a/src/zmq/zmqrpc.cpp
+++ b/src/zmq/zmqrpc.cpp
@@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
"[\n"
" { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n"
- " \"address\": \"...\" (string) Address of the publisher\n"
+ " \"address\": \"...\", (string) Address of the publisher\n"
+ " \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n"
" ...\n"
"]\n"
@@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress());
+ obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj);
}
}
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index 48136a0108..037cdaf38d 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -121,10 +121,10 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [
- {"type": "pubhashblock", "address": ADDRESS},
- {"type": "pubhashtx", "address": ADDRESS},
- {"type": "pubrawblock", "address": ADDRESS},
- {"type": "pubrawtx", "address": ADDRESS},
+ {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000},
+ {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000},
+ {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000},
+ {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000},
])
assert_equal(self.nodes[1].getzmqnotifications(), [])