aboutsummaryrefslogtreecommitdiff
path: root/src/zmq
diff options
context:
space:
mode:
authorWladimir J. van der Laan <laanwj@gmail.com>2018-11-05 13:44:53 +0100
committerWladimir J. van der Laan <laanwj@gmail.com>2018-11-05 13:45:41 +0100
commitdac2caa371a1c65faf34966c85dbafc30a0c640a (patch)
tree86bc2303329410e2aa5869d8d7deb3a8c6d464eb /src/zmq
parent45f50063a90d6f3a0de3480cd9f2513b0c76047c (diff)
parenta4edb168b635b6f5c36324e44961cd42cf9bbbaa (diff)
Merge #14060: ZMQ: add options to configure outbound message high water mark, aka SNDHWM
a4edb168b635b6f5c36324e44961cd42cf9bbbaa ZMQ: add options to configure outbound message high water mark, aka SNDHWM (mruddy) Pull request description: ZMQ: add options to configure outbound message high water mark, aka SNDHWM This is my attempt at https://github.com/bitcoin/bitcoin/pull/13315 Tree-SHA512: a4cc3bcf179776899261a97c8c4f31f35d1d8950fd71a09a79c5c064879b38e600b26824c89c4091d941502ed5b0255390882f7d44baf9e6dc49d685a86e8edb
Diffstat (limited to 'src/zmq')
-rw-r--r--src/zmq/zmqabstractnotifier.cpp1
-rw-r--r--src/zmq/zmqabstractnotifier.h11
-rw-r--r--src/zmq/zmqnotificationinterface.cpp7
-rw-r--r--src/zmq/zmqpublishnotifier.cpp16
-rw-r--r--src/zmq/zmqpublishnotifier.h2
-rw-r--r--src/zmq/zmqrpc.cpp4
6 files changed, 32 insertions, 9 deletions
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
index 82e5ee5cc9..6a9661e3e8 100644
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -5,6 +5,7 @@
#include <zmq/zmqabstractnotifier.h>
#include <util/system.h>
+const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index 5081c6cd02..887dde7b27 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
- CZMQAbstractNotifier() : psocket(nullptr) { }
+ static const int DEFAULT_ZMQ_SNDHWM {1000};
+
+ CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier();
template <typename T>
@@ -28,6 +30,12 @@ public:
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
+ int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
+ void SetOutboundMessageHighWaterMark(const int sndhwm) {
+ if (sndhwm >= 0) {
+ outbound_message_high_water_mark = sndhwm;
+ }
+ }
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
@@ -39,6 +47,7 @@ protected:
void *psocket;
std::string type;
std::string address;
+ int outbound_message_high_water_mark; // aka SNDHWM
};
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index df4e51ca20..6826cf62d6 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
+ notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier);
}
}
@@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize()
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
- LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
- LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
@@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown()
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
- LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_term(pcontext);
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index 718597e57c..15d4ac1b89 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}
- int rc = zmq_bind(psocket, address.c_str());
- if (rc!=0)
+ LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
+
+ int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
+ if (rc != 0)
+ {
+ zmqError("Failed to set outbound message high water mark");
+ zmq_close(psocket);
+ return false;
+ }
+
+ rc = zmq_bind(psocket, address.c_str());
+ if (rc != 0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
@@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1)
{
- LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
+ LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index 0f5b43d726..278fdb94d2 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -12,7 +12,7 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
- uint32_t nSequence; //!< upcounting per message sequence number
+ uint32_t nSequence {0U}; //!< upcounting per message sequence number
public:
diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp
index 4f88bf4eb9..0e0b745375 100644
--- a/src/zmq/zmqrpc.cpp
+++ b/src/zmq/zmqrpc.cpp
@@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
"[\n"
" { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n"
- " \"address\": \"...\" (string) Address of the publisher\n"
+ " \"address\": \"...\", (string) Address of the publisher\n"
+ " \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n"
" ...\n"
"]\n"
@@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress());
+ obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj);
}
}