diff options
Diffstat (limited to 'src/zmq/zmqnotificationinterface.cpp')
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 114 |
1 files changed, 44 insertions, 70 deletions
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index d55b106e04..a22772baed 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -4,15 +4,13 @@ #include <zmq/zmqnotificationinterface.h> #include <zmq/zmqpublishnotifier.h> +#include <zmq/zmqutil.h> + +#include <zmq.h> #include <validation.h> #include <util/system.h> -void zmqError(const char *str) -{ - LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); -} - CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) { } @@ -20,61 +18,52 @@ CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr) CZMQNotificationInterface::~CZMQNotificationInterface() { Shutdown(); - - for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) - { - delete *i; - } } std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const { std::list<const CZMQAbstractNotifier*> result; - for (const auto* n : notifiers) { - result.push_back(n); + for (const auto& n : notifiers) { + result.push_back(n.get()); } return result; } CZMQNotificationInterface* CZMQNotificationInterface::Create() { - CZMQNotificationInterface* notificationInterface = nullptr; std::map<std::string, CZMQNotifierFactory> factories; - std::list<CZMQAbstractNotifier*> notifiers; - factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; + std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; for (const auto& entry : factories) { std::string arg("-zmq" + entry.first); if (gArgs.IsArgSet(arg)) { - CZMQNotifierFactory factory = entry.second; - std::string address = gArgs.GetArg(arg, ""); - CZMQAbstractNotifier *notifier = factory(); + const auto& factory = entry.second; + const std::string address = gArgs.GetArg(arg, ""); + std::unique_ptr<CZMQAbstractNotifier> notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); - notifiers.push_back(notifier); + notifiers.push_back(std::move(notifier)); } } if (!notifiers.empty()) { - notificationInterface = new CZMQNotificationInterface(); - notificationInterface->notifiers = notifiers; + std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface()); + notificationInterface->notifiers = std::move(notifiers); - if (!notificationInterface->Initialize()) - { - delete notificationInterface; - notificationInterface = nullptr; + if (notificationInterface->Initialize()) { + return notificationInterface.release(); } } - return notificationInterface; + return nullptr; } // Called at startup to conditionally set up ZMQ socket(s) @@ -95,26 +84,15 @@ bool CZMQNotificationInterface::Initialize() return false; } - std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); - for (; i!=notifiers.end(); ++i) - { - CZMQAbstractNotifier *notifier = *i; - if (notifier->Initialize(pcontext)) - { + for (auto& notifier : notifiers) { + if (notifier->Initialize(pcontext)) { LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); - } - else - { + } else { LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); - break; + return false; } } - if (i!=notifiers.end()) - { - return false; - } - return true; } @@ -124,9 +102,7 @@ void CZMQNotificationInterface::Shutdown() LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); if (pcontext) { - for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) - { - CZMQAbstractNotifier *notifier = *i; + for (auto& notifier : notifiers) { LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } @@ -136,45 +112,43 @@ void CZMQNotificationInterface::Shutdown() } } -void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) -{ - if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones - return; +namespace { - for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) - { - CZMQAbstractNotifier *notifier = *i; - if (notifier->NotifyBlock(pindexNew)) - { - i++; - } - else - { +template <typename Function> +void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func) +{ + for (auto i = notifiers.begin(); i != notifiers.end(); ) { + CZMQAbstractNotifier* notifier = i->get(); + if (func(notifier)) { + ++i; + } else { notifier->Shutdown(); i = notifiers.erase(i); } } } +} // anonymous namespace + +void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) +{ + if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones + return; + + TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlock(pindexNew); + }); +} + void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) { // Used by BlockConnected and BlockDisconnected as well, because they're // all the same external callback. const CTransaction& tx = *ptx; - for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) - { - CZMQAbstractNotifier *notifier = *i; - if (notifier->NotifyTransaction(tx)) - { - i++; - } - else - { - notifier->Shutdown(); - i = notifiers.erase(i); - } - } + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) |