diff options
Diffstat (limited to 'src/zmq')
-rw-r--r-- | src/zmq/zmqabstractnotifier.h | 2 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 26 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.h | 4 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 59 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 4 | ||||
-rw-r--r-- | src/zmq/zmqrpc.cpp | 11 | ||||
-rw-r--r-- | src/zmq/zmqutil.cpp | 2 |
7 files changed, 70 insertions, 38 deletions
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index fa3944e32b..97c2599366 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -5,7 +5,7 @@ #ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H - +#include <cstdint> #include <memory> #include <string> diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a53de34db4..6ee134f392 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -3,13 +3,23 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include <zmq/zmqnotificationinterface.h> + +#include <logging.h> +#include <primitives/block.h> +#include <primitives/transaction.h> +#include <util/system.h> +#include <validationinterface.h> +#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqpublishnotifier.h> #include <zmq/zmqutil.h> #include <zmq.h> -#include <validation.h> -#include <util/system.h> +#include <cassert> +#include <map> +#include <string> +#include <utility> +#include <vector> CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) { @@ -70,9 +80,9 @@ bool CZMQNotificationInterface::Initialize() { int major = 0, minor = 0, patch = 0; zmq_version(&major, &minor, &patch); - LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch); + LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch); - LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); + LogPrint(BCLog::ZMQ, "Initialize notification interface\n"); assert(!pcontext); pcontext = zmq_ctx_new(); @@ -85,9 +95,9 @@ bool CZMQNotificationInterface::Initialize() for (auto& notifier : notifiers) { if (notifier->Initialize(pcontext)) { - LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { - LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); return false; } } @@ -98,11 +108,11 @@ bool CZMQNotificationInterface::Initialize() // Called during shutdown sequence void CZMQNotificationInterface::Shutdown() { - LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); + LogPrint(BCLog::ZMQ, "Shutdown notification interface\n"); if (pcontext) { for (auto& notifier : notifiers) { - LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_term(pcontext); diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 8f81bfd63f..585e900ca6 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -5,10 +5,14 @@ #ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H #define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H +#include <primitives/transaction.h> #include <validationinterface.h> + +#include <cstdint> #include <list> #include <memory> +class CBlock; class CBlockIndex; class CZMQAbstractNotifier; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 2c6f24a239..c785a929d3 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -6,22 +6,37 @@ #include <chain.h> #include <chainparams.h> +#include <crypto/common.h> +#include <logging.h> +#include <netaddress.h> #include <netbase.h> #include <node/blockstorage.h> +#include <primitives/block.h> +#include <primitives/transaction.h> #include <rpc/server.h> +#include <serialize.h> #include <streams.h> -#include <util/system.h> -#include <validation.h> // For cs_main +#include <sync.h> +#include <uint256.h> +#include <version.h> #include <zmq/zmqutil.h> #include <zmq.h> +#include <cassert> #include <cstdarg> #include <cstddef> +#include <cstdint> +#include <cstring> #include <map> #include <optional> #include <string> #include <utility> +#include <vector> + +namespace Consensus { +struct Params; +} using node::ReadBlockFromDisk; @@ -106,7 +121,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } - LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); + LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); if (rc != 0) @@ -147,8 +162,8 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) } else { - LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address); - LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); + LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address); + LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); psocket = i->second->psocket; mapPublishNotifiers.insert(std::make_pair(address, this)); @@ -179,7 +194,7 @@ void CZMQAbstractPublishNotifier::Shutdown() if (count == 1) { - LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); + LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); @@ -208,7 +223,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address); uint8_t data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; @@ -219,7 +234,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address); uint8_t data[32]; for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; @@ -229,29 +244,25 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); - { - LOCK(cs_main); - CBlock block; - if(!ReadBlockFromDisk(block, pindex, consensusParams)) - { - zmqError("Can't read block from disk"); - return false; - } - - ss << block; + CBlock block; + if (!ReadBlockFromDisk(block, pindex, consensusParams)) { + zmqError("Can't read block from disk"); + return false; } + ss << block; + return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); @@ -273,27 +284,27 @@ static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C'); } bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D'); } bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence); } bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); + LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index c1d66bddb1..fcedd1aabe 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -7,7 +7,11 @@ #include <zmq/zmqabstractnotifier.h> +#include <cstddef> +#include <cstdint> + class CBlockIndex; +class CTransaction; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp index f9f8b5a9dc..047e6bf9b7 100644 --- a/src/zmq/zmqrpc.cpp +++ b/src/zmq/zmqrpc.cpp @@ -11,6 +11,11 @@ #include <univalue.h> +#include <list> +#include <string> + +class JSONRPCRequest; + namespace { static RPCHelpMan getzmqnotifications() @@ -51,10 +56,8 @@ static RPCHelpMan getzmqnotifications() }; } -const CRPCCommand commands[] = -{ // category actor (function) - // ----------------- ----------------------- - { "zmq", &getzmqnotifications, }, +const CRPCCommand commands[]{ + {"zmq", &getzmqnotifications}, }; } // anonymous namespace diff --git a/src/zmq/zmqutil.cpp b/src/zmq/zmqutil.cpp index f0568634d4..cf3a0b2d71 100644 --- a/src/zmq/zmqutil.cpp +++ b/src/zmq/zmqutil.cpp @@ -12,5 +12,5 @@ void zmqError(const std::string& str) { - LogPrint(BCLog::ZMQ, "zmq: Error: %s, msg: %s\n", str, zmq_strerror(errno)); + LogPrint(BCLog::ZMQ, "Error: %s, msg: %s\n", str, zmq_strerror(errno)); } |