diff options
Diffstat (limited to 'src/zmq')
-rw-r--r-- | src/zmq/zmqabstractnotifier.cpp | 20 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.h | 10 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 41 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.h | 3 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 49 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 9 |
6 files changed, 122 insertions, 10 deletions
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 0d0428f3c0..3938f6fd2c 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -22,3 +22,23 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ { return true; } + +bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 34d7e5ef03..dddba8d6b6 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -44,7 +44,17 @@ public: virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); protected: diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a22772baed..a7e9a34269 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -36,6 +36,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; + factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; for (const auto& entry : factories) @@ -140,31 +141,53 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co }); } -void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) +void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence) { - // Used by BlockConnected and BlockDisconnected as well, because they're - // all the same external callback. const CTransaction& tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { - return notifier->NotifyTransaction(tx); + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) +{ + // Called for all non-block inclusion reasons + const CTransaction& tx = *ptx; + + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block disconnection - TransactionAddedToMempool(ptx); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } CZMQNotificationInterface* g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 0686960ed4..788a383517 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,7 +26,8 @@ protected: void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef& tx) override; + void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override; void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index d4d21b05ba..a0e7a0a600 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -26,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -225,3 +226,51 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } + + +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex()); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'C'; // Block (C)onnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex()); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'D'; // Block (D)isconnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex()); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex()); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'R'; // Mempool (R)emoval + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index eb9ae881be..f13ed6f537 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -52,4 +52,13 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override; +}; + #endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H |