diff options
Diffstat (limited to 'src/zmq/zmqpublishnotifier.cpp')
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 91 |
1 files changed, 79 insertions, 12 deletions
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 04806903c2..c0207f9dd6 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -2,13 +2,23 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +#include <zmq/zmqpublishnotifier.h> + #include <chain.h> #include <chainparams.h> +#include <rpc/server.h> #include <streams.h> -#include <zmq/zmqpublishnotifier.h> -#include <validation.h> #include <util/system.h> -#include <rpc/server.h> +#include <validation.h> +#include <zmq/zmqutil.h> + +#include <zmq.h> + +#include <cstdarg> +#include <cstddef> +#include <map> +#include <string> +#include <utility> static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; @@ -16,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -86,6 +97,14 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } + const int so_keepalive_option {1}; + rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option)); + if (rc != 0) { + zmqError("Failed to set SO_KEEPALIVE"); + zmq_close(psocket); + return false; + } + rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { @@ -141,7 +160,7 @@ void CZMQAbstractPublishNotifier::Shutdown() psocket = nullptr; } -bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size) { assert(psocket); @@ -161,26 +180,26 @@ bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* d bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - return SendMessage(MSG_HASHBLOCK, data, 32); + return SendZmqMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - return SendMessage(MSG_HASHTX, data, 32); + return SendZmqMessage(MSG_HASHTX, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); @@ -196,14 +215,62 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) ss << block; } - return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); + return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); ss << transaction; - return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); + return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); +} + + +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'C'; // Block (C)onnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'D'; // Block (D)isconnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'R'; // Mempool (R)emoval + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); } |