diff options
Diffstat (limited to 'src/zmq/zmqpublishnotifier.cpp')
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 0000000000..0a6d7d0dbc --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,172 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqpublishnotifier.h" +#include "main.h" +#include "util.h" + +static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", " Reuse socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; + std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish hash block %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish raw block %s\n", hash.GetHex()); + + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + + CBlock block; + CBlockIndex* pblockindex = mapBlockIndex[hash]; + + if(!ReadBlockFromDisk(block, pblockindex)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss << transaction; + int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} |