diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/init.cpp | 4 | ||||
-rw-r--r-- | src/interfaces/chain.cpp | 10 | ||||
-rw-r--r-- | src/interfaces/chain.h | 4 | ||||
-rw-r--r-- | src/rpc/blockchain.cpp | 41 | ||||
-rw-r--r-- | src/rpc/blockchain.h | 2 | ||||
-rw-r--r-- | src/rpc/client.cpp | 1 | ||||
-rw-r--r-- | src/txmempool.cpp | 6 | ||||
-rw-r--r-- | src/txmempool.h | 14 | ||||
-rw-r--r-- | src/validation.cpp | 2 | ||||
-rw-r--r-- | src/validationinterface.cpp | 12 | ||||
-rw-r--r-- | src/validationinterface.h | 9 | ||||
-rw-r--r-- | src/wallet/wallet.cpp | 6 | ||||
-rw-r--r-- | src/wallet/wallet.h | 4 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.cpp | 20 | ||||
-rw-r--r-- | src/zmq/zmqabstractnotifier.h | 10 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.cpp | 41 | ||||
-rw-r--r-- | src/zmq/zmqnotificationinterface.h | 3 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.cpp | 49 | ||||
-rw-r--r-- | src/zmq/zmqpublishnotifier.h | 9 |
19 files changed, 206 insertions, 41 deletions
diff --git a/src/init.cpp b/src/init.cpp index 023aa9aba5..132acbfc81 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashtx=<address>"); hidden_args.emplace_back("-zmqpubrawblock=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>"); + hidden_args.emplace_back("-zmqpubsequence=<n>"); hidden_args.emplace_back("-zmqpubhashblockhwm=<n>"); hidden_args.emplace_back("-zmqpubhashtxhwm=<n>"); hidden_args.emplace_back("-zmqpubrawblockhwm=<n>"); hidden_args.emplace_back("-zmqpubrawtxhwm=<n>"); + hidden_args.emplace_back("-zmqpubsequencehwm=<n>"); #endif argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp index 13d885a20d..4c5ebe66fc 100644 --- a/src/interfaces/chain.cpp +++ b/src/interfaces/chain.cpp @@ -59,13 +59,13 @@ public: explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications) : m_notifications(std::move(notifications)) {} virtual ~NotificationsProxy() = default; - void TransactionAddedToMempool(const CTransactionRef& tx) override + void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override { - m_notifications->transactionAddedToMempool(tx); + m_notifications->transactionAddedToMempool(tx, mempool_sequence); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override { - m_notifications->transactionRemovedFromMempool(tx, reason); + m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence); } void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override { @@ -405,7 +405,7 @@ public: if (!m_node.mempool) return; LOCK2(::cs_main, m_node.mempool->cs); for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) { - notifications.transactionAddedToMempool(entry.GetSharedTx()); + notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */); } } NodeContext& m_node; diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index 6e50ccb27a..85d09be0f3 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -242,8 +242,8 @@ public: { public: virtual ~Notifications() {} - virtual void transactionAddedToMempool(const CTransactionRef& tx) {} - virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {} + virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} virtual void blockConnected(const CBlock& block, int height) {} virtual void blockDisconnected(const CBlock& block, int height) {} virtual void updatedBlockTip() {} diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 9b464bee72..0bb7342db0 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); } -UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose) +UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence) { if (verbose) { + if (include_mempool_sequence) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values."); + } LOCK(pool.cs); UniValue o(UniValue::VOBJ); for (const CTxMemPoolEntry& e : pool.mapTx) { @@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose) } return o; } else { + uint64_t mempool_sequence; std::vector<uint256> vtxid; - pool.queryHashes(vtxid); - + { + LOCK(pool.cs); + pool.queryHashes(vtxid); + mempool_sequence = pool.GetSequence(); + } UniValue a(UniValue::VARR); for (const uint256& hash : vtxid) a.push_back(hash.ToString()); - return a; + if (!include_mempool_sequence) { + return a; + } else { + UniValue o(UniValue::VOBJ); + o.pushKV("txids", a); + o.pushKV("mempool_sequence", mempool_sequence); + return o; + } } } @@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool() "\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n", { {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, + {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."}, }, { RPCResult{"for verbose = false", @@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool() { {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, }}, + RPCResult{"for verbose = false and mempool_sequence = true", + RPCResult::Type::OBJ, "", "", + { + {RPCResult::Type::ARR, "txids", "", + { + {RPCResult::Type::STR_HEX, "", "The transaction id"}, + }}, + {RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."}, + }}, }, RPCExamples{ HelpExampleCli("getrawmempool", "true") @@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool() if (!request.params[0].isNull()) fVerbose = request.params[0].get_bool(); - return MempoolToJSON(EnsureMemPool(request.context), fVerbose); + bool include_mempool_sequence = false; + if (!request.params[1].isNull()) { + include_mempool_sequence = request.params[1].get_bool(); + } + + return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence); }, }; } @@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] = { "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} }, { "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} }, { "blockchain", "getmempoolinfo", &getmempoolinfo, {} }, - { "blockchain", "getrawmempool", &getrawmempool, {"verbose"} }, + { "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} }, { "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} }, { "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} }, { "blockchain", "pruneblockchain", &pruneblockchain, {"height"} }, diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h index 5c9a43b13e..5b362bf211 100644 --- a/src/rpc/blockchain.h +++ b/src/rpc/blockchain.h @@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn UniValue MempoolInfoToJSON(const CTxMemPool& pool); /** Mempool to JSON */ -UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false); +UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false); /** Block header to JSON */ UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 6ef3294132..3c432464f2 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "pruneblockchain", 0, "height" }, { "keypoolrefill", 0, "newsize" }, { "getrawmempool", 0, "verbose" }, + { "getrawmempool", 1, "mempool_sequence" }, { "estimatesmartfee", 0, "conf_target" }, { "estimaterawfee", 0, "conf_target" }, { "estimaterawfee", 1, "threshold" }, diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 6a525c97db..0c2b731967 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) { + // We increment mempool sequence value no matter removal reason + // even if not directly reported below. + uint64_t mempool_sequence = GetAndIncrementSequence(); + if (reason != MemPoolRemovalReason::BLOCK) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason); + GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } const uint256 hash = it->GetTx().GetHash(); diff --git a/src/txmempool.h b/src/txmempool.h index 664fb5986a..f513f14af6 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -501,6 +501,11 @@ private: mutable uint64_t m_epoch; mutable bool m_has_epoch_guard; + // In-memory counter for external mempool tracking purposes. + // This number is incremented once every time a transaction + // is added or removed from the mempool for any reason. + mutable uint64_t m_sequence_number{1}; + void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs); bool m_is_loaded GUARDED_BY(cs){false}; @@ -776,6 +781,15 @@ public: return m_unbroadcast_txids.count(txid) != 0; } + /** Guards this internal counter for external reporting */ + uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number++; + } + + uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/validation.cpp b/src/validation.cpp index a96913e3a0..7020b59cb8 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs if (!Finalize(args, workspace)) return false; - GetMainSignals().TransactionAddedToMempool(ptx); + GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence()); return true; } diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 3dfbcc581c..1e07ff23ae 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd fInitialDownload); } -void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) { - auto event = [tx, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); }); +void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) { + auto event = [tx, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__, tx->GetHash().ToString(), tx->GetWitnessHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { - auto event = [tx, reason, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); }); +void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { + auto event = [tx, reason, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__, tx->GetHash().ToString(), diff --git a/src/validationinterface.h b/src/validationinterface.h index e96f2883fc..7c3ce00fbc 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -97,7 +97,8 @@ protected: * * Called on a background thread. */ - virtual void TransactionAddedToMempool(const CTransactionRef& tx) {} + virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {} + /** * Notifies listeners of a transaction leaving mempool. * @@ -130,7 +131,7 @@ protected: * * Called on a background thread. */ - virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. @@ -197,8 +198,8 @@ public: void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); - void TransactionAddedToMempool(const CTransactionRef&); - void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason); + void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence); + void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence); void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex); void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex); void ChainStateFlushed(const CBlockLocator &); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 73e11a5b52..66857dbb39 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1177,7 +1177,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio MarkInputsDirty(ptx); } -void CWallet::transactionAddedToMempool(const CTransactionRef& tx) { +void CWallet::transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) { LOCK(cs_wallet); SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block height */ 0, /* block hash */ {}, /* index */ 0}); @@ -1187,7 +1187,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx) { } } -void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { +void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { LOCK(cs_wallet); auto it = mapWallet.find(tx->GetHash()); if (it != mapWallet.end()) { @@ -1234,7 +1234,7 @@ void CWallet::blockConnected(const CBlock& block, int height) m_last_block_processed = block_hash; for (size_t index = 0; index < block.vtx.size(); index++) { SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}); - transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK); + transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */); } } diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index c54480612a..f15712dd0e 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -900,7 +900,7 @@ public: CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true); bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); - void transactionAddedToMempool(const CTransactionRef& tx) override; + void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override; void blockConnected(const CBlock& block, int height) override; void blockDisconnected(const CBlock& block, int height) override; void updatedBlockTip() override; @@ -922,7 +922,7 @@ public: uint256 last_failed_block; }; ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, Optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate); - void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override; + void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ResendWalletTransactions(); struct Balance { diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 0d0428f3c0..3938f6fd2c 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -22,3 +22,23 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ { return true; } + +bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 34d7e5ef03..dddba8d6b6 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -44,7 +44,17 @@ public: virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); protected: diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a22772baed..a7e9a34269 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -36,6 +36,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; + factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; for (const auto& entry : factories) @@ -140,31 +141,53 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co }); } -void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) +void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence) { - // Used by BlockConnected and BlockDisconnected as well, because they're - // all the same external callback. const CTransaction& tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { - return notifier->NotifyTransaction(tx); + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) +{ + // Called for all non-block inclusion reasons + const CTransaction& tx = *ptx; + + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block disconnection - TransactionAddedToMempool(ptx); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } CZMQNotificationInterface* g_zmq_notification_interface = nullptr; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 0686960ed4..788a383517 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,7 +26,8 @@ protected: void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef& tx) override; + void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override; void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index d4d21b05ba..a0e7a0a600 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -26,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -225,3 +226,51 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } + + +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex()); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'C'; // Block (C)onnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex()); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'D'; // Block (D)isconnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex()); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex()); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'R'; // Mempool (R)emoval + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index eb9ae881be..f13ed6f537 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -52,4 +52,13 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override; +}; + #endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H |