aboutsummaryrefslogtreecommitdiff
path: root/src/zmq
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq')
-rw-r--r--src/zmq/zmqabstractnotifier.cpp5
-rw-r--r--src/zmq/zmqabstractnotifier.h13
-rw-r--r--src/zmq/zmqconfig.h2
-rw-r--r--src/zmq/zmqnotificationinterface.cpp17
-rw-r--r--src/zmq/zmqpublishnotifier.cpp21
-rw-r--r--src/zmq/zmqpublishnotifier.h4
-rw-r--r--src/zmq/zmqrpc.cpp20
7 files changed, 57 insertions, 25 deletions
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
index fc1ff6d031..6a9661e3e8 100644
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -1,10 +1,11 @@
-// Copyright (c) 2015-2017 The Bitcoin Core developers
+// Copyright (c) 2015-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <zmq/zmqabstractnotifier.h>
-#include <util.h>
+#include <util/system.h>
+const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index 7270ae203b..887dde7b27 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2015-2017 The Bitcoin Core developers
+// Copyright (c) 2015-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
- CZMQAbstractNotifier() : psocket(nullptr) { }
+ static const int DEFAULT_ZMQ_SNDHWM {1000};
+
+ CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier();
template <typename T>
@@ -28,6 +30,12 @@ public:
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
+ int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
+ void SetOutboundMessageHighWaterMark(const int sndhwm) {
+ if (sndhwm >= 0) {
+ outbound_message_high_water_mark = sndhwm;
+ }
+ }
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
@@ -39,6 +47,7 @@ protected:
void *psocket;
std::string type;
std::string address;
+ int outbound_message_high_water_mark; // aka SNDHWM
};
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h
index 1ba1262a83..5219ffad40 100644
--- a/src/zmq/zmqconfig.h
+++ b/src/zmq/zmqconfig.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2014-2017 The Bitcoin Core developers
+// Copyright (c) 2014-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index 8cbc969972..6826cf62d6 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -8,7 +8,7 @@
#include <version.h>
#include <validation.h>
#include <streams.h>
-#include <util.h>
+#include <util/system.h>
void zmqError(const char *str)
{
@@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
+ notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier);
}
}
@@ -81,10 +82,14 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
+ int major = 0, minor = 0, patch = 0;
+ zmq_version(&major, &minor, &patch);
+ LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
+
LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
assert(!pcontext);
- pcontext = zmq_init(1);
+ pcontext = zmq_ctx_new();
if (!pcontext)
{
@@ -98,11 +103,11 @@ bool CZMQNotificationInterface::Initialize()
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
- LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
- LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
@@ -124,10 +129,10 @@ void CZMQNotificationInterface::Shutdown()
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
- LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
- zmq_ctx_destroy(pcontext);
+ zmq_ctx_term(pcontext);
pcontext = nullptr;
}
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index 8c9acef257..ba89d1401d 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -1,4 +1,4 @@
-// Copyright (c) 2015-2017 The Bitcoin Core developers
+// Copyright (c) 2015-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -7,7 +7,7 @@
#include <streams.h>
#include <zmq/zmqpublishnotifier.h>
#include <validation.h>
-#include <util.h>
+#include <util/system.h>
#include <rpc/server.h>
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
@@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}
- int rc = zmq_bind(psocket, address.c_str());
- if (rc!=0)
+ LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
+
+ int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
+ if (rc != 0)
+ {
+ zmqError("Failed to set outbound message high water mark");
+ zmq_close(psocket);
+ return false;
+ }
+
+ rc = zmq_bind(psocket, address.c_str());
+ if (rc != 0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
@@ -91,6 +101,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
else
{
LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
+ LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
@@ -120,7 +131,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1)
{
- LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
+ LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index d53bba9971..278fdb94d2 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2015-2017 The Bitcoin Core developers
+// Copyright (c) 2015-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -12,7 +12,7 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
private:
- uint32_t nSequence; //!< upcounting per message sequence number
+ uint32_t nSequence {0U}; //!< upcounting per message sequence number
public:
diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp
index 4f88bf4eb9..a34968ef7d 100644
--- a/src/zmq/zmqrpc.cpp
+++ b/src/zmq/zmqrpc.cpp
@@ -5,6 +5,7 @@
#include <zmq/zmqrpc.h>
#include <rpc/server.h>
+#include <rpc/util.h>
#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqnotificationinterface.h>
@@ -16,20 +17,24 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
{
if (request.fHelp || request.params.size() != 0) {
throw std::runtime_error(
- "getzmqnotifications\n"
- "\nReturns information about the active ZeroMQ notifications.\n"
- "\nResult:\n"
+ RPCHelpMan{"getzmqnotifications",
+ "\nReturns information about the active ZeroMQ notifications.\n",
+ {},
+ RPCResult{
"[\n"
" { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n"
- " \"address\": \"...\" (string) Address of the publisher\n"
+ " \"address\": \"...\", (string) Address of the publisher\n"
+ " \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n"
" ...\n"
"]\n"
- "\nExamples:\n"
- + HelpExampleCli("getzmqnotifications", "")
+ },
+ RPCExamples{
+ HelpExampleCli("getzmqnotifications", "")
+ HelpExampleRpc("getzmqnotifications", "")
- );
+ },
+ }.ToString());
}
UniValue result(UniValue::VARR);
@@ -38,6 +43,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress());
+ obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj);
}
}