aboutsummaryrefslogtreecommitdiff
path: root/src/zmq
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq')
-rw-r--r--src/zmq/zmqabstractnotifier.h2
-rw-r--r--src/zmq/zmqnotificationinterface.cpp26
-rw-r--r--src/zmq/zmqnotificationinterface.h4
-rw-r--r--src/zmq/zmqpublishnotifier.cpp59
-rw-r--r--src/zmq/zmqpublishnotifier.h4
-rw-r--r--src/zmq/zmqrpc.cpp11
-rw-r--r--src/zmq/zmqutil.cpp2
7 files changed, 70 insertions, 38 deletions
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index fa3944e32b..97c2599366 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -5,7 +5,7 @@
#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
-
+#include <cstdint>
#include <memory>
#include <string>
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index a53de34db4..6ee134f392 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -3,13 +3,23 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <zmq/zmqnotificationinterface.h>
+
+#include <logging.h>
+#include <primitives/block.h>
+#include <primitives/transaction.h>
+#include <util/system.h>
+#include <validationinterface.h>
+#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqpublishnotifier.h>
#include <zmq/zmqutil.h>
#include <zmq.h>
-#include <validation.h>
-#include <util/system.h>
+#include <cassert>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
{
@@ -70,9 +80,9 @@ bool CZMQNotificationInterface::Initialize()
{
int major = 0, minor = 0, patch = 0;
zmq_version(&major, &minor, &patch);
- LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
+ LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
- LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
+ LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_ctx_new();
@@ -85,9 +95,9 @@ bool CZMQNotificationInterface::Initialize()
for (auto& notifier : notifiers) {
if (notifier->Initialize(pcontext)) {
- LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
} else {
- LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
return false;
}
}
@@ -98,11 +108,11 @@ bool CZMQNotificationInterface::Initialize()
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
- LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
+ LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
if (pcontext)
{
for (auto& notifier : notifiers) {
- LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+ LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_term(pcontext);
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
index 8f81bfd63f..585e900ca6 100644
--- a/src/zmq/zmqnotificationinterface.h
+++ b/src/zmq/zmqnotificationinterface.h
@@ -5,10 +5,14 @@
#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+#include <primitives/transaction.h>
#include <validationinterface.h>
+
+#include <cstdint>
#include <list>
#include <memory>
+class CBlock;
class CBlockIndex;
class CZMQAbstractNotifier;
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index 2c6f24a239..c785a929d3 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -6,22 +6,37 @@
#include <chain.h>
#include <chainparams.h>
+#include <crypto/common.h>
+#include <logging.h>
+#include <netaddress.h>
#include <netbase.h>
#include <node/blockstorage.h>
+#include <primitives/block.h>
+#include <primitives/transaction.h>
#include <rpc/server.h>
+#include <serialize.h>
#include <streams.h>
-#include <util/system.h>
-#include <validation.h> // For cs_main
+#include <sync.h>
+#include <uint256.h>
+#include <version.h>
#include <zmq/zmqutil.h>
#include <zmq.h>
+#include <cassert>
#include <cstdarg>
#include <cstddef>
+#include <cstdint>
+#include <cstring>
#include <map>
#include <optional>
#include <string>
#include <utility>
+#include <vector>
+
+namespace Consensus {
+struct Params;
+}
using node::ReadBlockFromDisk;
@@ -106,7 +121,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false;
}
- LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
+ LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
if (rc != 0)
@@ -147,8 +162,8 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
}
else
{
- LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
- LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
+ LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
+ LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
@@ -179,7 +194,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1)
{
- LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
+ LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
@@ -208,7 +223,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address);
uint8_t data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = hash.begin()[i];
@@ -219,7 +234,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address);
uint8_t data[32];
for (unsigned int i = 0; i < 32; i++) {
data[31 - i] = hash.begin()[i];
@@ -229,29 +244,25 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
- LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
- {
- LOCK(cs_main);
- CBlock block;
- if(!ReadBlockFromDisk(block, pindex, consensusParams))
- {
- zmqError("Can't read block from disk");
- return false;
- }
-
- ss << block;
+ CBlock block;
+ if (!ReadBlockFromDisk(block, pindex, consensusParams)) {
+ zmqError("Can't read block from disk");
+ return false;
}
+ ss << block;
+
return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
@@ -273,27 +284,27 @@ static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash,
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
}
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
}
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
}
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
- LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
+ LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index c1d66bddb1..fcedd1aabe 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -7,7 +7,11 @@
#include <zmq/zmqabstractnotifier.h>
+#include <cstddef>
+#include <cstdint>
+
class CBlockIndex;
+class CTransaction;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp
index f9f8b5a9dc..047e6bf9b7 100644
--- a/src/zmq/zmqrpc.cpp
+++ b/src/zmq/zmqrpc.cpp
@@ -11,6 +11,11 @@
#include <univalue.h>
+#include <list>
+#include <string>
+
+class JSONRPCRequest;
+
namespace {
static RPCHelpMan getzmqnotifications()
@@ -51,10 +56,8 @@ static RPCHelpMan getzmqnotifications()
};
}
-const CRPCCommand commands[] =
-{ // category actor (function)
- // ----------------- -----------------------
- { "zmq", &getzmqnotifications, },
+const CRPCCommand commands[]{
+ {"zmq", &getzmqnotifications},
};
} // anonymous namespace
diff --git a/src/zmq/zmqutil.cpp b/src/zmq/zmqutil.cpp
index f0568634d4..cf3a0b2d71 100644
--- a/src/zmq/zmqutil.cpp
+++ b/src/zmq/zmqutil.cpp
@@ -12,5 +12,5 @@
void zmqError(const std::string& str)
{
- LogPrint(BCLog::ZMQ, "zmq: Error: %s, msg: %s\n", str, zmq_strerror(errno));
+ LogPrint(BCLog::ZMQ, "Error: %s, msg: %s\n", str, zmq_strerror(errno));
}