diff options
author | Jeff Garzik <jgarzik@bitpay.com> | 2014-11-18 12:06:32 -0500 |
---|---|---|
committer | João Barbosa <joao@bitreserve.org> | 2015-09-16 11:01:35 +0100 |
commit | e6a14b64d665eb1fafd03a6bbc8d14597ce1c83c (patch) | |
tree | bfefc581188fb56413c6d29cde625b0395faebc5 /src/zmq/zmqnotificationinterface.cpp | |
parent | 1136879df8af2358efb706c5af886778fbd94989 (diff) |
Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
Continues Johnathan Corgan's work.
Publishing multipart messages
Bugfix: Add missing zmq header includes
Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
Diffstat (limited to 'src/zmq/zmqnotificationinterface.cpp')
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp new file mode 100644 index 0000000000..71ccb59a4a --- /dev/null +++ b/src/zmq/zmqnotificationinterface.cpp @@ -0,0 +1,155 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqnotificationinterface.h" +#include "zmqpublishnotifier.h" + +#include "version.h" +#include "main.h" +#include "streams.h" +#include "util.h" + +void zmqError(const char *str) +{ + LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno)); +} + +CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) +{ +} + +CZMQNotificationInterface::~CZMQNotificationInterface() +{ + // ensure Shutdown if Initialize is called + assert(!pcontext); + + for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + delete *i; + } +} + +CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args) +{ + CZMQNotificationInterface* notificationInterface = NULL; + std::map<std::string, CZMQNotifierFactory> factories; + std::list<CZMQAbstractNotifier*> notifiers; + + factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; + factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; + factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; + factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; + + for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) + { + std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first); + if (j!=args.end()) + { + CZMQNotifierFactory factory = i->second; + std::string address = j->second; + CZMQAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) + { + notificationInterface = new CZMQNotificationInterface(); + notificationInterface->notifiers = notifiers; + } + + return notificationInterface; +} + +// Called at startup to conditionally set up ZMQ socket(s) +bool CZMQNotificationInterface::Initialize() +{ + LogPrint("zmq", "Initialize notification interface\n"); + assert(!pcontext); + + pcontext = zmq_init(1); + + if (!pcontext) + { + zmqError("Unable to initialize context"); + return false; + } + + std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); + for (; i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->Initialize(pcontext)) + { + LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } + else + { + LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i!=notifiers.end()) + { + Shutdown(); + return false; + } + + return false; +} + +// Called during shutdown sequence +void CZMQNotificationInterface::Shutdown() +{ + LogPrint("zmq", "Shutdown notification interface\n"); + if (pcontext) + { + for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + notifier->Shutdown(); + } + zmq_ctx_destroy(pcontext); + + pcontext = 0; + } +} + +void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash) +{ + for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(hash)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) +{ + for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} |