From e6a14b64d665eb1fafd03a6bbc8d14597ce1c83c Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Tue, 18 Nov 2014 12:06:32 -0500 Subject: Add ZeroMQ support. Notify blocks and transactions via ZeroMQ Continues Johnathan Corgan's work. Publishing multipart messages Bugfix: Add missing zmq header includes Bugfix: Adjust build system to link ZeroMQ code for Qt binaries --- src/zmq/zmqabstractnotifier.cpp | 22 +++++ src/zmq/zmqabstractnotifier.h | 42 +++++++++ src/zmq/zmqconfig.h | 24 +++++ src/zmq/zmqnotificationinterface.cpp | 155 +++++++++++++++++++++++++++++++ src/zmq/zmqnotificationinterface.h | 35 +++++++ src/zmq/zmqpublishnotifier.cpp | 172 +++++++++++++++++++++++++++++++++++ src/zmq/zmqpublishnotifier.h | 41 +++++++++ 7 files changed, 491 insertions(+) create mode 100644 src/zmq/zmqabstractnotifier.cpp create mode 100644 src/zmq/zmqabstractnotifier.h create mode 100644 src/zmq/zmqconfig.h create mode 100644 src/zmq/zmqnotificationinterface.cpp create mode 100644 src/zmq/zmqnotificationinterface.h create mode 100644 src/zmq/zmqpublishnotifier.cpp create mode 100644 src/zmq/zmqpublishnotifier.h (limited to 'src/zmq') diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp new file mode 100644 index 0000000000..744ec59234 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqabstractnotifier.h" +#include "util.h" + + +CZMQAbstractNotifier::~CZMQAbstractNotifier() +{ + assert(!psocket); +} + +bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h new file mode 100644 index 0000000000..626d1ddf92 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.h @@ -0,0 +1,42 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H +#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H + +#include "zmqconfig.h" + +class CZMQAbstractNotifier; +typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); + +class CZMQAbstractNotifier +{ +public: + CZMQAbstractNotifier() : psocket(0) { } + virtual ~CZMQAbstractNotifier(); + + template + static CZMQAbstractNotifier* Create() + { + return new T(); + } + + std::string GetType() const { return type; } + void SetType(const std::string &t) { type = t; } + std::string GetAddress() const { return address; } + void SetAddress(const std::string &a) { address = a; } + + virtual bool Initialize(void *pcontext) = 0; + virtual void Shutdown() = 0; + + virtual bool NotifyBlock(const uint256 &hash); + virtual bool NotifyTransaction(const CTransaction &transaction); + +protected: + void *psocket; + std::string type; + std::string address; +}; + +#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h new file mode 100644 index 0000000000..6057f5d1a0 --- /dev/null +++ b/src/zmq/zmqconfig.h @@ -0,0 +1,24 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQCONFIG_H +#define BITCOIN_ZMQ_ZMQCONFIG_H + +#if defined(HAVE_CONFIG_H) +#include "config/bitcoin-config.h" +#endif + +#include +#include + +#if ENABLE_ZMQ +#include +#endif + +#include "primitives/block.h" +#include "primitives/transaction.h" + +void zmqError(const char *str); + +#endif // BITCOIN_ZMQ_ZMQCONFIG_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp new file mode 100644 index 0000000000..71ccb59a4a --- /dev/null +++ b/src/zmq/zmqnotificationinterface.cpp @@ -0,0 +1,155 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqnotificationinterface.h" +#include "zmqpublishnotifier.h" + +#include "version.h" +#include "main.h" +#include "streams.h" +#include "util.h" + +void zmqError(const char *str) +{ + LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno)); +} + +CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) +{ +} + +CZMQNotificationInterface::~CZMQNotificationInterface() +{ + // ensure Shutdown if Initialize is called + assert(!pcontext); + + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + delete *i; + } +} + +CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map &args) +{ + CZMQNotificationInterface* notificationInterface = NULL; + std::map factories; + std::list notifiers; + + factories["pubhashblock"] = CZMQAbstractNotifier::Create; + factories["pubhashtx"] = CZMQAbstractNotifier::Create; + factories["pubrawblock"] = CZMQAbstractNotifier::Create; + factories["pubrawtx"] = CZMQAbstractNotifier::Create; + + for (std::map::const_iterator i=factories.begin(); i!=factories.end(); ++i) + { + std::map::const_iterator j = args.find("-zmq" + i->first); + if (j!=args.end()) + { + CZMQNotifierFactory factory = i->second; + std::string address = j->second; + CZMQAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) + { + notificationInterface = new CZMQNotificationInterface(); + notificationInterface->notifiers = notifiers; + } + + return notificationInterface; +} + +// Called at startup to conditionally set up ZMQ socket(s) +bool CZMQNotificationInterface::Initialize() +{ + LogPrint("zmq", "Initialize notification interface\n"); + assert(!pcontext); + + pcontext = zmq_init(1); + + if (!pcontext) + { + zmqError("Unable to initialize context"); + return false; + } + + std::list::iterator i=notifiers.begin(); + for (; i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->Initialize(pcontext)) + { + LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } + else + { + LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i!=notifiers.end()) + { + Shutdown(); + return false; + } + + return false; +} + +// Called during shutdown sequence +void CZMQNotificationInterface::Shutdown() +{ + LogPrint("zmq", "Shutdown notification interface\n"); + if (pcontext) + { + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + notifier->Shutdown(); + } + zmq_ctx_destroy(pcontext); + + pcontext = 0; + } +} + +void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(hash)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h new file mode 100644 index 0000000000..afc0b8d24e --- /dev/null +++ b/src/zmq/zmqnotificationinterface.h @@ -0,0 +1,35 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H +#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H + +#include "validationinterface.h" +#include +#include + +class CZMQAbstractNotifier; + +class CZMQNotificationInterface : public CValidationInterface +{ +public: + virtual ~CZMQNotificationInterface(); + + static CZMQNotificationInterface* CreateWithArguments(const std::map &args); + + bool Initialize(); + void Shutdown(); + +protected: // CValidationInterface + void SyncTransaction(const CTransaction &tx, const CBlock *pblock); + void UpdatedBlockTip(const uint256 &newHashTip); + +private: + CZMQNotificationInterface(); + + void *pcontext; + std::list notifiers; +}; + +#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 0000000000..0a6d7d0dbc --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,172 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqpublishnotifier.h" +#include "main.h" +#include "util.h" + +static std::multimap mapPublishNotifiers; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", " Reuse socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap::iterator iterator; + std::pair iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish hash block %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish raw block %s\n", hash.GetHex()); + + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + + CBlock block; + CBlockIndex* pblockindex = mapBlockIndex[hash]; + + if(!ReadBlockFromDisk(block, pblockindex)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss << transaction; + int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h new file mode 100644 index 0000000000..a0eb26f5e2 --- /dev/null +++ b/src/zmq/zmqpublishnotifier.h @@ -0,0 +1,41 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H +#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H + +#include "zmqabstractnotifier.h" + +class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier +{ +public: + bool Initialize(void *pcontext); + void Shutdown(); +}; + +class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const uint256 &hash); +}; + +class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const uint256 &hash); +}; + +class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H -- cgit v1.2.3