diff options
40 files changed, 365 insertions, 289 deletions
diff --git a/contrib/devtools/test_deterministic_coverage.sh b/contrib/devtools/test_deterministic_coverage.sh index 8501c72f04..23c260b529 100755 --- a/contrib/devtools/test_deterministic_coverage.sh +++ b/contrib/devtools/test_deterministic_coverage.sh @@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=( "blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... } "coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2)) "fs_tests/fsbridge_fstream" # deterministic test failure? - "miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) + "miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10) "scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue() "scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue() - "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) + "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10) + "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10) + "txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10) + "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10) + "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10) ) TEST_BITCOIN_BINARY="src/test/test_bitcoin" diff --git a/src/Makefile.am b/src/Makefile.am index d797ac880b..1f55bfbafd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -323,6 +323,7 @@ BITCOIN_CORE_H = \ util/spanparsing.h \ util/string.h \ util/syserror.h \ + util/task_runner.h \ util/thread.h \ util/threadinterrupt.h \ util/threadnames.h \ @@ -966,7 +967,6 @@ libbitcoinkernel_la_SOURCES = \ pubkey.cpp \ random.cpp \ randomenv.cpp \ - scheduler.cpp \ script/interpreter.cpp \ script/script.cpp \ script/script_error.cpp \ @@ -983,7 +983,6 @@ libbitcoinkernel_la_SOURCES = \ util/batchpriority.cpp \ util/chaintype.cpp \ util/check.cpp \ - util/exception.cpp \ util/fs.cpp \ util/fs_helpers.cpp \ util/hasher.cpp \ @@ -994,7 +993,6 @@ libbitcoinkernel_la_SOURCES = \ util/strencodings.cpp \ util/string.cpp \ util/syserror.cpp \ - util/thread.cpp \ util/threadnames.cpp \ util/time.cpp \ util/tokenpipe.cpp \ diff --git a/src/bench/wallet_balance.cpp b/src/bench/wallet_balance.cpp index bf2195293e..7a10b167a6 100644 --- a/src/bench/wallet_balance.cpp +++ b/src/bench/wallet_balance.cpp @@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY)); generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY); } - SyncWithValidationInterfaceQueue(); + // Calls SyncWithValidationInterfaceQueue + wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO); auto bal = GetBalance(wallet); // Cache diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index c1a71ed749..3eb64aa344 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -23,11 +23,10 @@ #include <node/caches.h> #include <node/chainstate.h> #include <random.h> -#include <scheduler.h> #include <script/sigcache.h> #include <util/chaintype.h> #include <util/fs.h> -#include <util/thread.h> +#include <util/task_runner.h> #include <validation.h> #include <validationinterface.h> @@ -68,16 +67,7 @@ int main(int argc, char* argv[]) Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes)); Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes)); - - // SETUP: Scheduling and Background Signals - CScheduler scheduler{}; - // Start the lightweight task scheduler thread - scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); - - // Gather some entropy once per minute. - scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1}); - - GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()}; class KernelNotifications : public kernel::Notifications { @@ -118,6 +108,7 @@ int main(int argc, char* argv[]) .chainparams = *chainparams, .datadir = abs_datadir, .notifications = *notifications, + .signals = &validation_signals, }; const node::BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, @@ -235,9 +226,9 @@ int main(int argc, char* argv[]) bool new_block; auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash()); - RegisterSharedValidationInterface(sc); + validation_signals.RegisterSharedValidationInterface(sc); bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block); - UnregisterSharedValidationInterface(sc); + validation_signals.UnregisterSharedValidationInterface(sc); if (!new_block && accepted) { std::cerr << "duplicate" << std::endl; break; @@ -287,10 +278,9 @@ int main(int argc, char* argv[]) epilogue: // Without this precise shutdown sequence, there will be a lot of nullptr // dereferencing and UB. - scheduler.stop(); if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join(); - GetMainSignals().FlushBackgroundCallbacks(); + validation_signals.FlushBackgroundCallbacks(); { LOCK(cs_main); for (Chainstate* chainstate : chainman.GetAll()) { @@ -300,5 +290,4 @@ epilogue: } } } - GetMainSignals().UnregisterBackgroundSignalScheduler(); } diff --git a/src/index/base.cpp b/src/index/base.cpp index bcfe7215be..2287437f8f 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -89,7 +89,7 @@ bool BaseIndex::Init() return &m_chain->context()->chainman->GetChainstateForIndexing()); // Register to validation interface before setting the 'm_synced' flag, so that // callbacks are not missed once m_synced is true. - RegisterValidationInterface(this); + m_chain->context()->validation_signals->RegisterValidationInterface(this); CBlockLocator locator; if (!GetDB().ReadBestBlock(locator)) { @@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const } LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName()); - SyncWithValidationInterfaceQueue(); + m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue(); return true; } @@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync() void BaseIndex::Stop() { - UnregisterValidationInterface(this); + if (m_chain->context()->validation_signals) { + m_chain->context()->validation_signals->UnregisterValidationInterface(this); + } if (m_thread_sync.joinable()) { m_thread_sync.join(); diff --git a/src/init.cpp b/src/init.cpp index 988daefeec..9ea7b881cb 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -291,7 +291,7 @@ void Shutdown(NodeContext& node) // Because these depend on each-other, we make sure that neither can be // using the other before destroying them. - if (node.peerman) UnregisterValidationInterface(node.peerman.get()); + if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get()); if (node.connman) node.connman->Stop(); StopTorControl(); @@ -317,7 +317,9 @@ void Shutdown(NodeContext& node) // fee estimator from validation interface. if (node.fee_estimator) { node.fee_estimator->Flush(); - UnregisterValidationInterface(node.fee_estimator.get()); + if (node.validation_signals) { + node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get()); + } } // FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing @@ -332,7 +334,7 @@ void Shutdown(NodeContext& node) // After there are no more peers/RPC left to give us new data which may generate // CValidationInterface callbacks, flush them... - GetMainSignals().FlushBackgroundCallbacks(); + if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks(); // Stop and delete all indexes only after flushing background callbacks. if (g_txindex) { @@ -367,17 +369,19 @@ void Shutdown(NodeContext& node) #if ENABLE_ZMQ if (g_zmq_notification_interface) { - UnregisterValidationInterface(g_zmq_notification_interface.get()); + if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get()); g_zmq_notification_interface.reset(); } #endif node.chain_clients.clear(); - UnregisterAllValidationInterfaces(); - GetMainSignals().UnregisterBackgroundSignalScheduler(); + if (node.validation_signals) { + node.validation_signals->UnregisterAllValidationInterfaces(); + } node.mempool.reset(); node.fee_estimator.reset(); node.chainman.reset(); + node.validation_signals.reset(); node.scheduler.reset(); node.kernel.reset(); @@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) assert(!node.scheduler); node.scheduler = std::make_unique<CScheduler>(); + auto& scheduler = *node.scheduler; // Start the lightweight task scheduler thread - node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); }); + scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); // Gather some entropy once per minute. - node.scheduler->scheduleEvery([]{ + scheduler.scheduleEvery([]{ RandAddPeriodic(); }, std::chrono::minutes{1}); // Check disk space every 5 minutes to avoid db corruption. - node.scheduler->scheduleEvery([&args, &node]{ + scheduler.scheduleEvery([&args, &node]{ constexpr uint64_t min_disk_space = 50 << 20; // 50 MB if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) { LogPrintf("Shutting down due to lack of disk space!\n"); @@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } }, std::chrono::minutes{5}); - GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler); + assert(!node.validation_signals); + node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler)); + auto& validation_signals = *node.validation_signals; // Create client interfaces for wallets that are supposed to be loaded // according to -wallet and -disablewallet options. This only constructs @@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Flush estimates to disk periodically CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get(); - node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); - RegisterValidationInterface(fee_estimator); + scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); + validation_signals.RegisterValidationInterface(fee_estimator); } // Check port numbers @@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) }); if (g_zmq_notification_interface) { - RegisterValidationInterface(g_zmq_notification_interface.get()); + validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get()); } #endif @@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) .chainparams = chainparams, .datadir = args.GetDataDirNet(), .notifications = *node.notifications, + .signals = &validation_signals, }; Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction @@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) CTxMemPool::Options mempool_opts{ .check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0, + .signals = &validation_signals, }; auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)}; if (!result) { @@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Drain the validation interface queue to ensure that the old indexes // don't have any pending work. - SyncWithValidationInterfaceQueue(); + Assert(node.validation_signals)->SyncWithValidationInterfaceQueue(); for (auto* index : node.indexes) { index->Interrupt(); @@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman = PeerManager::make(*node.connman, *node.addrman, node.banman.get(), chainman, *node.mempool, peerman_opts); - RegisterValidationInterface(node.peerman.get()); + validation_signals.RegisterValidationInterface(node.peerman.get()); // ********************************************************* Step 8: start indexers @@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING); - if (!node.connman->Start(*node.scheduler, connOptions)) { + if (!node.connman->Start(scheduler, connOptions)) { return false; } @@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) uiInterface.InitMessage(_("Done loading").translated); for (const auto& client : node.chain_clients) { - client->start(*node.scheduler); + client->start(scheduler); } BanMan* banman = node.banman.get(); - node.scheduler->scheduleEvery([banman]{ + scheduler.scheduleEvery([banman]{ banman->DumpBanlist(); }, DUMP_BANS_INTERVAL); - if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler); + if (node.peerman) node.peerman->StartScheduledTasks(scheduler); #if HAVE_SYSTEM StartupNotify(args); diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h index 864aac336e..de5f78494a 100644 --- a/src/kernel/chainstatemanager_opts.h +++ b/src/kernel/chainstatemanager_opts.h @@ -18,6 +18,7 @@ #include <optional> class CChainParams; +class ValidationSignals; static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true}; static constexpr auto DEFAULT_MAX_TIP_AGE{24h}; @@ -44,6 +45,7 @@ struct ChainstateManagerOpts { DBOptions coins_db{}; CoinsViewOptions coins_view{}; Notifications& notifications; + ValidationSignals* signals{nullptr}; //! Number of script check worker threads. Zero means no parallel verification. int worker_threads_num{0}; }; diff --git a/src/kernel/mempool_options.h b/src/kernel/mempool_options.h index 753aebd455..0850b2e60e 100644 --- a/src/kernel/mempool_options.h +++ b/src/kernel/mempool_options.h @@ -13,6 +13,8 @@ #include <cstdint> #include <optional> +class ValidationSignals; + /** Default for -maxmempool, maximum megabytes of mempool memory usage */ static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300}; /** Default for -maxmempool when blocksonly is set */ @@ -56,6 +58,8 @@ struct MemPoolOptions { bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF}; bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT}; MemPoolLimits limits{}; + + ValidationSignals* signals{nullptr}; }; } // namespace kernel diff --git a/src/node/context.h b/src/node/context.h index 4f3b640b2d..245f230aec 100644 --- a/src/node/context.h +++ b/src/node/context.h @@ -20,6 +20,7 @@ class BanMan; class BaseIndex; class CBlockPolicyEstimator; class CConnman; +class ValidationSignals; class CScheduler; class CTxMemPool; class ChainstateManager; @@ -70,7 +71,10 @@ struct NodeContext { interfaces::WalletLoader* wallet_loader{nullptr}; std::unique_ptr<CScheduler> scheduler; std::function<void()> rpc_interruption_point = [] {}; + //! Issues blocking calls about sync status, errors and warnings std::unique_ptr<KernelNotifications> notifications; + //! Issues calls about blocks and transactions + std::unique_ptr<ValidationSignals> validation_signals; std::atomic<int> exit_status{EXIT_SUCCESS}; //! Declare default constructor and destructor that are not inline, so code diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index 5a8b7fc105..f9a372e3de 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -460,19 +460,20 @@ public: class NotificationsHandlerImpl : public Handler { public: - explicit NotificationsHandlerImpl(std::shared_ptr<Chain::Notifications> notifications) - : m_proxy(std::make_shared<NotificationsProxy>(std::move(notifications))) + explicit NotificationsHandlerImpl(ValidationSignals& signals, std::shared_ptr<Chain::Notifications> notifications) + : m_signals{signals}, m_proxy{std::make_shared<NotificationsProxy>(std::move(notifications))} { - RegisterSharedValidationInterface(m_proxy); + m_signals.RegisterSharedValidationInterface(m_proxy); } ~NotificationsHandlerImpl() override { disconnect(); } void disconnect() override { if (m_proxy) { - UnregisterSharedValidationInterface(m_proxy); + m_signals.UnregisterSharedValidationInterface(m_proxy); m_proxy.reset(); } } + ValidationSignals& m_signals; std::shared_ptr<NotificationsProxy> m_proxy; }; @@ -761,12 +762,12 @@ public: } std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) override { - return std::make_unique<NotificationsHandlerImpl>(std::move(notifications)); + return std::make_unique<NotificationsHandlerImpl>(validation_signals(), std::move(notifications)); } void waitForNotificationsIfTipChanged(const uint256& old_tip) override { if (!old_tip.IsNull() && old_tip == WITH_LOCK(::cs_main, return chainman().ActiveChain().Tip()->GetBlockHash())) return; - SyncWithValidationInterfaceQueue(); + validation_signals().SyncWithValidationInterfaceQueue(); } std::unique_ptr<Handler> handleRpc(const CRPCCommand& command) override { @@ -822,6 +823,7 @@ public: NodeContext* context() override { return &m_node; } ArgsManager& args() { return *Assert(m_node.args); } ChainstateManager& chainman() { return *Assert(m_node.chainman); } + ValidationSignals& validation_signals() { return *Assert(m_node.validation_signals); } NodeContext& m_node; }; } // namespace diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index ef9a30a076..b66a4f2f39 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -92,7 +92,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t node.mempool->AddUnbroadcastTx(txid); } - if (wait_callback) { + if (wait_callback && node.validation_signals) { // For transactions broadcast from outside the wallet, make sure // that the wallet has been notified of the transaction before // continuing. @@ -101,7 +101,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t // with a transaction to/from their wallet, immediately call some // wallet RPC, and get a stale result because callbacks have not // yet been processed. - CallFunctionInValidationInterfaceQueue([&promise] { + node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] { promise.set_value(); }); callback_set = true; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 50908e9f96..dfdddeacea 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -395,7 +395,8 @@ static RPCHelpMan syncwithvalidationinterfacequeue() }, [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { - SyncWithValidationInterfaceQueue(); + NodeContext& node = EnsureAnyNodeContext(request.context); + CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue(); return UniValue::VNULL; }, }; diff --git a/src/rpc/fees.cpp b/src/rpc/fees.cpp index 57ba486ed9..f3345b4f1c 100644 --- a/src/rpc/fees.cpp +++ b/src/rpc/fees.cpp @@ -4,6 +4,7 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include <core_io.h> +#include <node/context.h> #include <policy/feerate.h> #include <policy/fees.h> #include <rpc/protocol.h> @@ -21,10 +22,6 @@ #include <cmath> #include <string> -namespace node { -struct NodeContext; -} - using node::NodeContext; static RPCHelpMan estimatesmartfee() @@ -68,7 +65,7 @@ static RPCHelpMan estimatesmartfee() const NodeContext& node = EnsureAnyNodeContext(request.context); const CTxMemPool& mempool = EnsureMemPool(node); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(mempool.m_signals)->SyncWithValidationInterfaceQueue(); unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE); unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target); bool conservative = true; @@ -156,8 +153,9 @@ static RPCHelpMan estimaterawfee() [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { CBlockPolicyEstimator& fee_estimator = EnsureAnyFeeEstimator(request.context); + const NodeContext& node = EnsureAnyNodeContext(request.context); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue(); unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE); unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target); double threshold = 0.95; diff --git a/src/rpc/mining.cpp b/src/rpc/mining.cpp index ff39a31a43..f7cdbf52dd 100644 --- a/src/rpc/mining.cpp +++ b/src/rpc/mining.cpp @@ -1042,9 +1042,9 @@ static RPCHelpMan submitblock() bool new_block; auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash()); - RegisterSharedValidationInterface(sc); + CHECK_NONFATAL(chainman.m_options.signals)->RegisterSharedValidationInterface(sc); bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block); - UnregisterSharedValidationInterface(sc); + CHECK_NONFATAL(chainman.m_options.signals)->UnregisterSharedValidationInterface(sc); if (!new_block && accepted) { return "duplicate"; } diff --git a/src/rpc/node.cpp b/src/rpc/node.cpp index 45053f882d..b8c0080aef 100644 --- a/src/rpc/node.cpp +++ b/src/rpc/node.cpp @@ -94,7 +94,7 @@ static RPCHelpMan mockscheduler() const NodeContext& node_context{EnsureAnyNodeContext(request.context)}; CHECK_NONFATAL(node_context.scheduler)->MockForward(std::chrono::seconds{delta_seconds}); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(node_context.validation_signals)->SyncWithValidationInterfaceQueue(); for (const auto& chain_client : node_context.chain_clients) { chain_client->schedulerMockForward(std::chrono::seconds(delta_seconds)); } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 6c6f644142..a1158e0c07 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const } -void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() +void SerialTaskRunner::MaybeScheduleProcessQueue() { { LOCK(m_callbacks_mutex); @@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); } -void SingleThreadedSchedulerClient::ProcessQueue() +void SerialTaskRunner::ProcessQueue() { std::function<void()> callback; { @@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue // to ensure both happen safely even if callback() throws. struct RAIICallbacksRunning { - SingleThreadedSchedulerClient* instance; - explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} + SerialTaskRunner* instance; + explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {} ~RAIICallbacksRunning() { { @@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue() callback(); } -void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) +void SerialTaskRunner::insert(std::function<void()> func) { { LOCK(m_callbacks_mutex); @@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func MaybeScheduleProcessQueue(); } -void SingleThreadedSchedulerClient::EmptyQueue() +void SerialTaskRunner::flush() { assert(!m_scheduler.AreThreadsServicingQueue()); bool should_continue = true; @@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue() } } -size_t SingleThreadedSchedulerClient::CallbacksPending() +size_t SerialTaskRunner::size() { LOCK(m_callbacks_mutex); return m_callbacks_pending.size(); diff --git a/src/scheduler.h b/src/scheduler.h index 9212582b97..454c083b31 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -8,6 +8,7 @@ #include <attributes.h> #include <sync.h> #include <threadsafety.h> +#include <util/task_runner.h> #include <chrono> #include <condition_variable> @@ -120,12 +121,16 @@ private: * B() will be able to observe all of the effects of callback A() which executed * before it. */ -class SingleThreadedSchedulerClient +class SerialTaskRunner : public util::TaskRunnerInterface { private: CScheduler& m_scheduler; Mutex m_callbacks_mutex; + + // We are not allowed to assume the scheduler only runs in one thread, + // but must ensure all callbacks happen in-order, so we end up creating + // our own queue here :( std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; @@ -133,7 +138,7 @@ private: void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); public: - explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} + explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} /** * Add a callback to be executed. Callbacks are executed serially @@ -141,15 +146,15 @@ public: * Practically, this means that callbacks can behave as if they are executed * in order by a single thread. */ - void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); /** * Processes all remaining queue members on the calling thread, blocking until queue is empty * Must be called after the CScheduler has no remaining processing threads! */ - void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); - size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); }; #endif // BITCOIN_SCHEDULER_H diff --git a/src/test/coinstatsindex_tests.cpp b/src/test/coinstatsindex_tests.cpp index cc1ec49d41..08814c1499 100644 --- a/src/test/coinstatsindex_tests.cpp +++ b/src/test/coinstatsindex_tests.cpp @@ -70,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(coinstatsindex_initial_sync, TestChain100Setup) // SyncWithValidationInterfaceQueue() call below is also needed to ensure // TSAN always sees the test thread waiting for the notification thread, and // avoid potential false positive reports. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // Shutdown sequence (c.f. Shutdown() in init.cpp) coin_stats_index.Stop(); diff --git a/src/test/fuzz/package_eval.cpp b/src/test/fuzz/package_eval.cpp index 9e658e0ced..a48ce37bce 100644 --- a/src/test/fuzz/package_eval.cpp +++ b/src/test/fuzz/package_eval.cpp @@ -47,7 +47,7 @@ void initialize_tx_pool() g_outpoints_coinbase_init_mature.push_back(prevout); } } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } struct OutpointsUpdater final : public CValidationInterface { @@ -147,7 +147,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) } auto outpoints_updater = std::make_shared<OutpointsUpdater>(mempool_outpoints); - RegisterSharedValidationInterface(outpoints_updater); + node.validation_signals->RegisterSharedValidationInterface(outpoints_updater); CTxMemPool tx_pool_{MakeMempool(fuzzed_data_provider, node)}; MockedTxPool& tx_pool = *static_cast<MockedTxPool*>(&tx_pool_); @@ -269,7 +269,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) // Remember all added transactions std::set<CTransactionRef> added; auto txr = std::make_shared<TransactionsDelta>(added); - RegisterSharedValidationInterface(txr); + node.validation_signals->RegisterSharedValidationInterface(txr); // When there are multiple transactions in the package, we call ProcessNewPackage(txs, test_accept=false) // and AcceptToMemoryPool(txs.back(), test_accept=true). When there is only 1 transaction, we might flip it @@ -285,8 +285,8 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) /*bypass_limits=*/false, /*test_accept=*/!single_submit)); const bool passed = res.m_result_type == MempoolAcceptResult::ResultType::VALID; - SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(txr); + node.validation_signals->SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterSharedValidationInterface(txr); // There is only 1 transaction in the package. We did a test-package-accept and a ATMP if (single_submit) { @@ -310,7 +310,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) CheckMempoolV3Invariants(tx_pool); } - UnregisterSharedValidationInterface(outpoints_updater); + node.validation_signals->UnregisterSharedValidationInterface(outpoints_updater); WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1)); } diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 56b391ed5c..a467fd5382 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -47,7 +47,7 @@ void initialize_process_message() for (int i = 0; i < 2 * COINBASE_MATURITY; i++) { MineBlock(g_setup->m_node, CScript() << OP_TRUE); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } FUZZ_TARGET(process_message, .init = initialize_process_message) @@ -89,6 +89,6 @@ FUZZ_TARGET(process_message, .init = initialize_process_message) } g_setup->m_node.peerman->SendMessages(&p2p_node); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); } diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 6b264907b5..38acd432fa 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -37,7 +37,7 @@ void initialize_process_messages() for (int i = 0; i < 2 * COINBASE_MATURITY; i++) { MineBlock(g_setup->m_node, CScript() << OP_TRUE); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } FUZZ_TARGET(process_messages, .init = initialize_process_messages) @@ -89,6 +89,6 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) g_setup->m_node.peerman->SendMessages(&random_node); } } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); } diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp index fcf230642a..b6ba612a84 100644 --- a/src/test/fuzz/tx_pool.cpp +++ b/src/test/fuzz/tx_pool.cpp @@ -50,7 +50,7 @@ void initialize_tx_pool() g_outpoints_coinbase_init_immature; outpoints.push_back(prevout); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } struct TransactionsDelta final : public CValidationInterface { @@ -105,7 +105,7 @@ void Finish(FuzzedDataProvider& fuzzed_data_provider, MockedTxPool& tx_pool, Cha assert(tx_pool.size() < info_all.size()); WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1)); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } void MockTime(FuzzedDataProvider& fuzzed_data_provider, const Chainstate& chainstate) @@ -285,7 +285,7 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool) std::set<CTransactionRef> removed; std::set<CTransactionRef> added; auto txr = std::make_shared<TransactionsDelta>(removed, added); - RegisterSharedValidationInterface(txr); + node.validation_signals->RegisterSharedValidationInterface(txr); const bool bypass_limits = fuzzed_data_provider.ConsumeBool(); // Make sure ProcessNewPackage on one transaction works. @@ -303,8 +303,8 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool) const auto res = WITH_LOCK(::cs_main, return AcceptToMemoryPool(chainstate, tx, GetTime(), bypass_limits, /*test_accept=*/false)); const bool accepted = res.m_result_type == MempoolAcceptResult::ResultType::VALID; - SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(txr); + node.validation_signals->SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterSharedValidationInterface(txr); bool txid_in_mempool = tx_pool.exists(GenTxid::Txid(tx->GetHash())); bool wtxid_in_mempool = tx_pool.exists(GenTxid::Wtxid(tx->GetWitnessHash())); diff --git a/src/test/peerman_tests.cpp b/src/test/peerman_tests.cpp index 2c79329385..28866695bc 100644 --- a/src/test/peerman_tests.cpp +++ b/src/test/peerman_tests.cpp @@ -25,7 +25,7 @@ static void mineBlock(const node::NodeContext& node, std::chrono::seconds block_ block.fChecked = true; // little speedup SetMockTime(curr_time); // process block at current time Assert(node.chainman->ProcessNewBlock(std::make_shared<const CBlock>(block), /*force_processing=*/true, /*min_pow_checked=*/true, nullptr)); - SyncWithValidationInterfaceQueue(); // drain events queue + node.validation_signals->SyncWithValidationInterfaceQueue(); // drain events queue } // Verifying when network-limited peer connections are desirable based on the node's proximity to the tip @@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE(connections_desirable_service_flags) // By now, we tested that the connections desirable services flags change based on the node's time proximity to the tip. // Now, perform the same tests for when the node receives a block. - RegisterValidationInterface(peerman.get()); + m_node.validation_signals->RegisterValidationInterface(peerman.get()); // First, verify a block in the past doesn't enable limited peers connections // At this point, our time is (NODE_NETWORK_LIMITED_ALLOW_CONN_BLOCKS + 1) * 10 minutes ahead the tip's time. diff --git a/src/test/policyestimator_tests.cpp b/src/test/policyestimator_tests.cpp index ede73c6895..6cadc3290a 100644 --- a/src/test/policyestimator_tests.cpp +++ b/src/test/policyestimator_tests.cpp @@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) { CBlockPolicyEstimator& feeEst = *Assert(m_node.fee_estimator); CTxMemPool& mpool = *Assert(m_node.mempool); - RegisterValidationInterface(&feeEst); + m_node.validation_signals->RegisterValidationInterface(&feeEst); TestMemPoolEntryHelper entry; CAmount basefee(2000); CAmount deltaFee(100); @@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); txHashes[j].push_back(hash); @@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) // Check after just a few txs that combining buckets works as expected if (blocknum == 3) { // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // At this point we should need to combine 3 buckets to get enough data points // So estimateFee(1) should fail and estimateFee(2) should return somewhere around // 9*baserate. estimateFee(2) %'s are 100,100,90 = average 97% @@ -113,7 +113,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); std::vector<CAmount> origFeeEst; // Highest feerate is 10*baseRate and gets in all blocks, @@ -146,7 +146,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 10;i++) { @@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); txHashes[j].push_back(hash); @@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); for (int i = 1; i < 10;i++) { BOOST_CHECK(feeEst.estimateFee(i) == CFeeRate(0) || feeEst.estimateFee(i).GetFeePerK() > origFeeEst[i-1] - deltaFee); @@ -212,7 +212,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) block.clear(); // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 10;i++) { @@ -239,7 +239,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); CTransactionRef ptx = mpool.get(hash); @@ -257,7 +257,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) block.clear(); } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 9; i++) { // At 9, the original estimate was already at the bottom (b/c scale = 2) BOOST_CHECK(feeEst.estimateFee(i).GetFeePerK() < origFeeEst[i-1] - deltaFee); diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 3fb3378598..95e725de46 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) CScheduler scheduler; // each queue should be well ordered with respect to itself but not other queues - SingleThreadedSchedulerClient queue1(scheduler); - SingleThreadedSchedulerClient queue2(scheduler); + SerialTaskRunner queue1(scheduler); + SerialTaskRunner queue2(scheduler); // create more threads than queues // if the queues only permit execution of one task at once then @@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) threads.emplace_back([&] { scheduler.serviceQueue(); }); } - // these are not atomic, if SinglethreadedSchedulerClient prevents + // these are not atomic, if SerialTaskRunner prevents // parallel execution at the queue level no synchronization should be required here int counter1 = 0; int counter2 = 0; @@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) // just simply count up on each queue - if execution is properly ordered then // the callbacks should run in exactly the order in which they were enqueued for (int i = 0; i < 100; ++i) { - queue1.AddToProcessQueue([i, &counter1]() { + queue1.insert([i, &counter1]() { bool expectation = i == counter1++; assert(expectation); }); - queue2.AddToProcessQueue([i, &counter2]() { + queue2.insert([i, &counter2]() { bool expectation = i == counter2++; assert(expectation); }); diff --git a/src/test/txindex_tests.cpp b/src/test/txindex_tests.cpp index e2432a4718..5a32b02ad9 100644 --- a/src/test/txindex_tests.cpp +++ b/src/test/txindex_tests.cpp @@ -71,7 +71,7 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup) // SyncWithValidationInterfaceQueue() call below is also needed to ensure // TSAN always sees the test thread waiting for the notification thread, and // avoid potential false positive reports. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // shutdown sequence (c.f. Shutdown() in init.cpp) txindex.Stop(); diff --git a/src/test/util/mining.cpp b/src/test/util/mining.cpp index 08d1b4c902..ad7a38d3fe 100644 --- a/src/test/util/mining.cpp +++ b/src/test/util/mining.cpp @@ -95,12 +95,12 @@ COutPoint MineBlock(const NodeContext& node, std::shared_ptr<CBlock>& block) const auto old_height = WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight()); bool new_block; BlockValidationStateCatcher bvsc{block->GetHash()}; - RegisterValidationInterface(&bvsc); + node.validation_signals->RegisterValidationInterface(&bvsc); const bool processed{chainman.ProcessNewBlock(block, true, true, &new_block)}; const bool duplicate{!new_block && processed}; assert(!duplicate); - UnregisterValidationInterface(&bvsc); - SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterValidationInterface(&bvsc); + node.validation_signals->SyncWithValidationInterfaceQueue(); const bool was_valid{bvsc.m_state && bvsc.m_state->IsValid()}; assert(old_height + was_valid == WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight())); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 1f8dbac5be..d26a440074 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -175,7 +175,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto // from blocking due to queue overrun. m_node.scheduler = std::make_unique<CScheduler>(); m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); }); - GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler); + m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler)); m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES); m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node)); @@ -189,6 +189,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto .datadir = m_args.GetDataDirNet(), .check_block_index = true, .notifications = *m_node.notifications, + .signals = m_node.validation_signals.get(), .worker_threads_num = 2, }; const BlockManager::Options blockman_opts{ @@ -206,8 +207,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto ChainTestingSetup::~ChainTestingSetup() { if (m_node.scheduler) m_node.scheduler->stop(); - GetMainSignals().FlushBackgroundCallbacks(); - GetMainSignals().UnregisterBackgroundSignalScheduler(); + m_node.validation_signals->FlushBackgroundCallbacks(); m_node.connman.reset(); m_node.banman.reset(); m_node.addrman.reset(); @@ -216,6 +216,7 @@ ChainTestingSetup::~ChainTestingSetup() m_node.mempool.reset(); m_node.fee_estimator.reset(); m_node.chainman.reset(); + m_node.validation_signals.reset(); m_node.scheduler.reset(); } diff --git a/src/test/util/txmempool.cpp b/src/test/util/txmempool.cpp index 3b4161ddd3..71cf8aca60 100644 --- a/src/test/util/txmempool.cpp +++ b/src/test/util/txmempool.cpp @@ -22,6 +22,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node) // Default to always checking mempool regardless of // chainparams.DefaultConsistencyChecks for tests .check_ratio = 1, + .signals = node.validation_signals.get(), }; const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)}; Assert(result); diff --git a/src/test/validation_block_tests.cpp b/src/test/validation_block_tests.cpp index 35e5c6a037..316ab86c2b 100644 --- a/src/test/validation_block_tests.cpp +++ b/src/test/validation_block_tests.cpp @@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) bool ignored; // Connect the genesis block and drain any outstanding events BOOST_CHECK(Assert(m_node.chainman)->ProcessNewBlock(std::make_shared<CBlock>(Params().GenesisBlock()), true, true, &ignored)); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // subscribe to events (this subscriber will validate event ordering) const CBlockIndex* initial_tip = nullptr; @@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) initial_tip = m_node.chainman->ActiveChain().Tip(); } auto sub = std::make_shared<TestSubscriber>(initial_tip->GetBlockHash()); - RegisterSharedValidationInterface(sub); + m_node.validation_signals->RegisterSharedValidationInterface(sub); // create a bunch of threads that repeatedly process a block generated above at random // this will create parallelism and randomness inside validation - the ValidationInterface @@ -196,9 +196,9 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) for (auto& t : threads) { t.join(); } - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(sub); + m_node.validation_signals->UnregisterSharedValidationInterface(sub); LOCK(cs_main); BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash()); diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp index a33e71d50e..4bbab1cdcd 100644 --- a/src/test/validation_chainstatemanager_tests.cpp +++ b/src/test/validation_chainstatemanager_tests.cpp @@ -102,7 +102,7 @@ BOOST_FIXTURE_TEST_CASE(chainstatemanager, TestChain100Setup) BOOST_CHECK_EQUAL(active_tip2, c2.m_chain.Tip()); // Let scheduler events finish running to avoid accessing memory that is going to be unloaded - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); } //! Test rebalancing the caches associated with each chainstate. @@ -374,7 +374,7 @@ struct SnapshotTestSetup : TestChain100Setup { cs->ForceFlushStateToDisk(); } // Process all callbacks referring to the old manager before wiping it. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); LOCK(::cs_main); chainman.ResetChainstates(); BOOST_CHECK_EQUAL(chainman.GetAll().size(), 0); @@ -383,6 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup { .chainparams = ::Params(), .datadir = chainman.m_options.datadir, .notifications = *m_node.notifications, + .signals = m_node.validation_signals.get(), }; const BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, diff --git a/src/test/validationinterface_tests.cpp b/src/test/validationinterface_tests.cpp index 5979441057..a46cfc3029 100644 --- a/src/test/validationinterface_tests.cpp +++ b/src/test/validationinterface_tests.cpp @@ -28,7 +28,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) const CBlock block_dummy; BlockValidationState state_dummy; while (generate) { - GetMainSignals().BlockChecked(block_dummy, state_dummy); + m_node.validation_signals->BlockChecked(block_dummy, state_dummy); } }}; @@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) // keep going for about 1 sec, which is 250k iterations for (int i = 0; i < 250000; i++) { auto sub = std::make_shared<TestSubscriberNoop>(); - RegisterSharedValidationInterface(sub); - UnregisterSharedValidationInterface(sub); + m_node.validation_signals->RegisterSharedValidationInterface(sub); + m_node.validation_signals->UnregisterSharedValidationInterface(sub); } // tell the other thread we are done generate = false; @@ -52,8 +52,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) class TestInterface : public CValidationInterface { public: - TestInterface(std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr) - : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)) + TestInterface(ValidationSignals& signals, std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr) + : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals} { } virtual ~TestInterface() @@ -64,14 +64,15 @@ public: { if (m_on_call) m_on_call(); } - static void Call() + void Call() { CBlock block; BlockValidationState state; - GetMainSignals().BlockChecked(block, state); + m_signals.BlockChecked(block, state); } std::function<void()> m_on_call; std::function<void()> m_on_destroy; + ValidationSignals& m_signals; }; // Regression test to ensure UnregisterAllValidationInterfaces calls don't @@ -80,17 +81,23 @@ public: BOOST_AUTO_TEST_CASE(unregister_all_during_call) { bool destroyed = false; - RegisterSharedValidationInterface(std::make_shared<TestInterface>( + auto shared{std::make_shared<TestInterface>( + *m_node.validation_signals, [&] { // First call should decrements reference count 2 -> 1 - UnregisterAllValidationInterfaces(); + m_node.validation_signals->UnregisterAllValidationInterfaces(); BOOST_CHECK(!destroyed); // Second call should not decrement reference count 1 -> 0 - UnregisterAllValidationInterfaces(); + m_node.validation_signals->UnregisterAllValidationInterfaces(); BOOST_CHECK(!destroyed); }, - [&] { destroyed = true; })); - TestInterface::Call(); + [&] { destroyed = true; })}; + m_node.validation_signals->RegisterSharedValidationInterface(shared); + BOOST_CHECK(shared.use_count() == 2); + shared->Call(); + BOOST_CHECK(shared.use_count() == 1); + BOOST_CHECK(!destroyed); + shared.reset(); BOOST_CHECK(destroyed); } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index de340f6b6d..0bee27c2b2 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -406,7 +406,8 @@ CTxMemPool::CTxMemPool(const Options& opts) m_require_standard{opts.require_standard}, m_full_rbf{opts.full_rbf}, m_persist_v1_dat{opts.persist_v1_dat}, - m_limits{opts.limits} + m_limits{opts.limits}, + m_signals{opts.signals} { } @@ -487,12 +488,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) // even if not directly reported below. uint64_t mempool_sequence = GetAndIncrementSequence(); - if (reason != MemPoolRemovalReason::BLOCK) { + if (reason != MemPoolRemovalReason::BLOCK && m_signals) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); + m_signals->TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } TRACE5(mempool, removed, it->GetTx().GetHash().data(), @@ -643,7 +644,9 @@ void CTxMemPool::removeForBlock(const std::vector<CTransactionRef>& vtx, unsigne removeConflicts(*tx); ClearPrioritisation(tx->GetHash()); } - GetMainSignals().MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); + if (m_signals) { + m_signals->MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); + } lastRollingFeeUpdate = GetTime(); blockSinceLastRollingFeeBump = true; } diff --git a/src/txmempool.h b/src/txmempool.h index b98355c65f..32f2c024f7 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -40,6 +40,7 @@ #include <vector> class CChain; +class ValidationSignals; /** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */ static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF; @@ -447,6 +448,8 @@ public: const Limits m_limits; + ValidationSignals* const m_signals; + /** Create a new CTxMemPool. * Sanity checks will be off by default for performance, because otherwise * accepting transactions becomes O(N^2) where N is the number of transactions diff --git a/src/util/task_runner.h b/src/util/task_runner.h new file mode 100644 index 0000000000..d3cd8007de --- /dev/null +++ b/src/util/task_runner.h @@ -0,0 +1,52 @@ +// Copyright (c) 2024-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_TASK_RUNNER_H +#define BITCOIN_UTIL_TASK_RUNNER_H + +#include <cstddef> +#include <functional> + +namespace util { + +/** @file + * This header provides an interface and simple implementation for a task + * runner. Another threaded, serial implementation using a queue is available in + * the scheduler module's SerialTaskRunner. + */ + +class TaskRunnerInterface +{ +public: + virtual ~TaskRunnerInterface() {} + + /** + * The callback can either be queued for later/asynchronous/threaded + * processing, or be executed immediately for synchronous processing. + */ + + virtual void insert(std::function<void()> func) = 0; + + /** + * Forces the processing of all pending events. + */ + virtual void flush() = 0; + + /** + * Returns the number of currently pending events. + */ + virtual size_t size() = 0; +}; + +class ImmediateTaskRunner : public TaskRunnerInterface +{ +public: + void insert(std::function<void()> func) override { func(); } + void flush() override {} + size_t size() override { return 0; } +}; + +} // namespace util + +#endif // BITCOIN_UTIL_TASK_RUNNER_H diff --git a/src/validation.cpp b/src/validation.cpp index f8e1de55e9..3a652adb1d 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1228,13 +1228,14 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>& results.emplace(ws.m_ptx->GetWitnessHash(), MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees, effective_feerate, effective_feerate_wtxids)); + if (!m_pool.m_signals) continue; const CTransaction& tx = *ws.m_ptx; const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, ws.m_vsize, ws.m_entry->GetHeight(), args.m_bypass_limits, args.m_package_submission, IsCurrentForFeeEstimation(m_active_chainstate), m_pool.HasNoInputsOf(tx)); - GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); } return all_submitted; } @@ -1242,7 +1243,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>& MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args) { AssertLockHeld(cs_main); - LOCK(m_pool.cs); // mempool "read lock" (held through GetMainSignals().TransactionAddedToMempool()) + LOCK(m_pool.cs); // mempool "read lock" (held through m_pool.m_signals->TransactionAddedToMempool()) Workspace ws(ptx); const std::vector<Wtxid> single_wtxid{ws.m_ptx->GetWitnessHash()}; @@ -1277,13 +1278,15 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef return MempoolAcceptResult::FeeFailure(ws.m_state, CFeeRate(ws.m_modified_fees, ws.m_vsize), {ws.m_ptx->GetWitnessHash()}); } - const CTransaction& tx = *ws.m_ptx; - const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, - ws.m_vsize, ws.m_entry->GetHeight(), - args.m_bypass_limits, args.m_package_submission, - IsCurrentForFeeEstimation(m_active_chainstate), - m_pool.HasNoInputsOf(tx)); - GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + if (m_pool.m_signals) { + const CTransaction& tx = *ws.m_ptx; + const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, + ws.m_vsize, ws.m_entry->GetHeight(), + args.m_bypass_limits, args.m_package_submission, + IsCurrentForFeeEstimation(m_active_chainstate), + m_pool.HasNoInputsOf(tx)); + m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + } return MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees, effective_feerate, single_wtxid); @@ -2695,9 +2698,9 @@ bool Chainstate::FlushStateToDisk( (bool)fFlushForPrune); } } - if (full_flush_completed) { + if (full_flush_completed && m_chainman.m_options.signals) { // Update best block in wallet (so we can detect restored wallets). - GetMainSignals().ChainStateFlushed(this->GetRole(), m_chain.GetLocator()); + m_chainman.m_options.signals->ChainStateFlushed(this->GetRole(), m_chain.GetLocator()); } } catch (const std::runtime_error& e) { return FatalError(m_chainman.GetNotifications(), state, std::string("System error while flushing: ") + e.what()); @@ -2864,7 +2867,9 @@ bool Chainstate::DisconnectTip(BlockValidationState& state, DisconnectedBlockTra UpdateTip(pindexDelete->pprev); // Let wallets know transactions went from 1-confirmed to // 0-confirmed or conflicted: - GetMainSignals().BlockDisconnected(pblock, pindexDelete); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockDisconnected(pblock, pindexDelete); + } return true; } @@ -2949,7 +2954,9 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew, { CCoinsViewCache view(&CoinsTip()); bool rv = ConnectBlock(blockConnecting, state, pindexNew, view); - GetMainSignals().BlockChecked(blockConnecting, state); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockChecked(blockConnecting, state); + } if (!rv) { if (state.IsInvalid()) InvalidBlockFound(pindexNew, state); @@ -3210,11 +3217,11 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main) return fNotify; } -static void LimitValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main) { +static void LimitValidationInterfaceQueue(ValidationSignals& signals) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); - if (GetMainSignals().CallbacksPending() > 10) { - SyncWithValidationInterfaceQueue(); + if (signals.CallbacksPending() > 10) { + signals.SyncWithValidationInterfaceQueue(); } } @@ -3252,7 +3259,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< // Note that if a validationinterface callback ends up calling // ActivateBestChain this may lead to a deadlock! We should // probably have a DEBUG_LOCKORDER test for this in the future. - LimitValidationInterfaceQueue(); + if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals); { LOCK(cs_main); @@ -3291,7 +3298,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) { assert(trace.pblock && trace.pindex); - GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockConnected(this->GetRole(), trace.pblock, trace.pindex); + } } // This will have been toggled in @@ -3317,7 +3326,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< // Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) { // Notify ValidationInterface subscribers - GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd); + } // Always notify the UI if a new block tip was connected if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) { @@ -3451,7 +3462,7 @@ bool Chainstate::InvalidateBlock(BlockValidationState& state, CBlockIndex* pinde if (m_chainman.m_interrupt) break; // Make sure the queue of validation callbacks doesn't grow unboundedly. - LimitValidationInterfaceQueue(); + if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals); LOCK(cs_main); // Lock for as long as disconnectpool is in scope to make sure MaybeUpdateMempoolForReorg is @@ -4237,8 +4248,9 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr<const CBlock>& pblock, // Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW // (but if it does not build on our best tip, let the SendMessages loop relay it) - if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev) - GetMainSignals().NewPoWValidBlock(pindex, pblock); + if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev && m_options.signals) { + m_options.signals->NewPoWValidBlock(pindex, pblock); + } // Write block to history file if (fNewBlock) *fNewBlock = true; @@ -4291,7 +4303,9 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr<const CBlock>& blo ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked); } if (!ret) { - GetMainSignals().BlockChecked(*block, state); + if (m_options.signals) { + m_options.signals->BlockChecked(*block, state); + } return error("%s: AcceptBlock FAILED (%s)", __func__, state.ToString()); } } diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 5e944a7c47..813fde109c 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -5,7 +5,6 @@ #include <validationinterface.h> -#include <attributes.h> #include <chain.h> #include <consensus/validation.h> #include <kernel/chain.h> @@ -13,7 +12,8 @@ #include <logging.h> #include <primitives/block.h> #include <primitives/transaction.h> -#include <scheduler.h> +#include <util/check.h> +#include <util/task_runner.h> #include <future> #include <unordered_map> @@ -22,14 +22,14 @@ std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept; /** - * MainSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks. + * ValidationSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks. * * A std::unordered_map is used to track what callbacks are currently * registered, and a std::list is used to store the callbacks that are * currently registered as well as any callbacks that are just unregistered * and about to be deleted when they are done executing. */ -class MainSignalsImpl +class ValidationSignalsImpl { private: Mutex m_mutex; @@ -42,12 +42,10 @@ private: std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex); public: - // We are not allowed to assume the scheduler only runs in one thread, - // but must ensure all callbacks happen in-order, so we end up creating - // our own queue here :( - SingleThreadedSchedulerClient m_schedulerClient; + std::unique_ptr<util::TaskRunnerInterface> m_task_runner; - explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {} + explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner) + : m_task_runner{std::move(Assert(task_runner))} {} void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -94,77 +92,56 @@ public: } }; -static CMainSignals g_signals; +ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner) + : m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {} -void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) -{ - assert(!m_internals); - m_internals = std::make_unique<MainSignalsImpl>(scheduler); -} - -void CMainSignals::UnregisterBackgroundSignalScheduler() -{ - m_internals.reset(nullptr); -} +ValidationSignals::~ValidationSignals() {} -void CMainSignals::FlushBackgroundCallbacks() +void ValidationSignals::FlushBackgroundCallbacks() { - if (m_internals) { - m_internals->m_schedulerClient.EmptyQueue(); - } + m_internals->m_task_runner->flush(); } -size_t CMainSignals::CallbacksPending() +size_t ValidationSignals::CallbacksPending() { - if (!m_internals) return 0; - return m_internals->m_schedulerClient.CallbacksPending(); + return m_internals->m_task_runner->size(); } -CMainSignals& GetMainSignals() -{ - return g_signals; -} - -void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks) +void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks) { // Each connection captures the shared_ptr to ensure that each callback is // executed before the subscriber is destroyed. For more details see #18338. - g_signals.m_internals->Register(std::move(callbacks)); + m_internals->Register(std::move(callbacks)); } -void RegisterValidationInterface(CValidationInterface* callbacks) +void ValidationSignals::RegisterValidationInterface(CValidationInterface* callbacks) { // Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle // is managed by the caller. RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}}); } -void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks) +void ValidationSignals::UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks) { UnregisterValidationInterface(callbacks.get()); } -void UnregisterValidationInterface(CValidationInterface* callbacks) +void ValidationSignals::UnregisterValidationInterface(CValidationInterface* callbacks) { - if (g_signals.m_internals) { - g_signals.m_internals->Unregister(callbacks); - } + m_internals->Unregister(callbacks); } -void UnregisterAllValidationInterfaces() +void ValidationSignals::UnregisterAllValidationInterfaces() { - if (!g_signals.m_internals) { - return; - } - g_signals.m_internals->Clear(); + m_internals->Clear(); } -void CallFunctionInValidationInterfaceQueue(std::function<void()> func) +void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func) { - g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); + m_internals->m_task_runner->insert(std::move(func)); } -void SyncWithValidationInterfaceQueue() +void ValidationSignals::SyncWithValidationInterfaceQueue() { AssertLockNotHeld(cs_main); // Block until the validation queue drains @@ -183,7 +160,7 @@ void SyncWithValidationInterfaceQueue() do { \ auto local_name = (name); \ LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \ - m_internals->m_schedulerClient.AddToProcessQueue([=] { \ + m_internals->m_task_runner->insert([=] { \ LOG_EVENT(fmt, local_name, __VA_ARGS__); \ event(); \ }); \ @@ -192,7 +169,7 @@ void SyncWithValidationInterfaceQueue() #define LOG_EVENT(fmt, ...) \ LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__) -void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { +void ValidationSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which // the chain actually updates. One way to ensure this is for the caller to invoke this signal // in the same critical section where the chain is updated @@ -206,7 +183,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd fInitialDownload); } -void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) +void ValidationSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) { auto event = [tx, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); }); @@ -216,7 +193,7 @@ void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx tx.info.m_tx->GetWitnessHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { +void ValidationSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { auto event = [tx, reason, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; @@ -226,7 +203,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemP RemovalReasonToString(reason)); } -void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) { +void ValidationSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) { auto event = [role, pblock, pindex, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(role, pblock, pindex); }); }; @@ -235,7 +212,7 @@ void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<con pindex->nHeight); } -void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight) +void ValidationSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight) { auto event = [txs_removed_for_block, nBlockHeight, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); }); @@ -245,7 +222,7 @@ void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedM txs_removed_for_block.size()); } -void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) +void ValidationSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) { auto event = [pblock, pindex, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); }); @@ -255,7 +232,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock pindex->nHeight); } -void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) { +void ValidationSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) { auto event = [role, locator, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(role, locator); }); }; @@ -263,13 +240,13 @@ void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &l locator.IsNull() ? "null" : locator.vHave.front().ToString()); } -void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) { +void ValidationSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) { LOG_EVENT("%s: block hash=%s state=%s", __func__, block.GetHash().ToString(), state.ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); }); } -void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) { +void ValidationSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) { LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); }); } diff --git a/src/validationinterface.h b/src/validationinterface.h index d9292ae2c9..6f49a73c93 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -6,61 +6,29 @@ #ifndef BITCOIN_VALIDATIONINTERFACE_H #define BITCOIN_VALIDATIONINTERFACE_H -#include <kernel/cs_main.h> #include <kernel/chain.h> +#include <kernel/cs_main.h> #include <primitives/transaction.h> // CTransaction(Ref) #include <sync.h> +#include <cstddef> +#include <cstdint> #include <functional> #include <memory> +#include <vector> + +namespace util { +class TaskRunnerInterface; +} // namespace util class BlockValidationState; class CBlock; class CBlockIndex; struct CBlockLocator; -class CValidationInterface; -class CScheduler; enum class MemPoolRemovalReason; struct RemovedMempoolTransactionInfo; struct NewMempoolTransactionInfo; -/** Register subscriber */ -void RegisterValidationInterface(CValidationInterface* callbacks); -/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */ -void UnregisterValidationInterface(CValidationInterface* callbacks); -/** Unregister all subscribers */ -void UnregisterAllValidationInterfaces(); - -// Alternate registration functions that release a shared_ptr after the last -// notification is sent. These are useful for race-free cleanup, since -// unregistration is nonblocking and can return before the last notification is -// processed. -/** Register subscriber */ -void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks); -/** Unregister subscriber */ -void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks); - -/** - * Pushes a function to callback onto the notification queue, guaranteeing any - * callbacks generated prior to now are finished when the function is called. - * - * Be very careful blocking on func to be called if any locks are held - - * validation interface clients may not be able to make progress as they often - * wait for things like cs_main, so blocking until func is called with cs_main - * will result in a deadlock (that DEBUG_LOCKORDER will miss). - */ -void CallFunctionInValidationInterfaceQueue(std::function<void ()> func); -/** - * This is a synonym for the following, which asserts certain locks are not - * held: - * std::promise<void> promise; - * CallFunctionInValidationInterfaceQueue([&promise] { - * promise.set_value(); - * }); - * promise.get_future().wait(); - */ -void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main); - /** * Implement this to subscribe to events generated in validation and mempool * @@ -185,30 +153,65 @@ protected: * has been received and connected to the headers tree, though not validated yet. */ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {}; - friend class CMainSignals; + friend class ValidationSignals; friend class ValidationInterfaceTest; }; -class MainSignalsImpl; -class CMainSignals { +class ValidationSignalsImpl; +class ValidationSignals { private: - std::unique_ptr<MainSignalsImpl> m_internals; - - friend void ::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface>); - friend void ::UnregisterValidationInterface(CValidationInterface*); - friend void ::UnregisterAllValidationInterfaces(); - friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func); + std::unique_ptr<ValidationSignalsImpl> m_internals; public: - /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ - void RegisterBackgroundSignalScheduler(CScheduler& scheduler); - /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */ - void UnregisterBackgroundSignalScheduler(); + // The task runner will block validation if it calls its insert method's + // func argument synchronously. In this class func contains a loop that + // dispatches a single validation event to all subscribers sequentially. + explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner); + + ~ValidationSignals(); + /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); size_t CallbacksPending(); + /** Register subscriber */ + void RegisterValidationInterface(CValidationInterface* callbacks); + /** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */ + void UnregisterValidationInterface(CValidationInterface* callbacks); + /** Unregister all subscribers */ + void UnregisterAllValidationInterfaces(); + + // Alternate registration functions that release a shared_ptr after the last + // notification is sent. These are useful for race-free cleanup, since + // unregistration is nonblocking and can return before the last notification is + // processed. + /** Register subscriber */ + void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks); + /** Unregister subscriber */ + void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks); + + /** + * Pushes a function to callback onto the notification queue, guaranteeing any + * callbacks generated prior to now are finished when the function is called. + * + * Be very careful blocking on func to be called if any locks are held - + * validation interface clients may not be able to make progress as they often + * wait for things like cs_main, so blocking until func is called with cs_main + * will result in a deadlock (that DEBUG_LOCKORDER will miss). + */ + void CallFunctionInValidationInterfaceQueue(std::function<void ()> func); + + /** + * This is a synonym for the following, which asserts certain locks are not + * held: + * std::promise<void> promise; + * CallFunctionInValidationInterfaceQueue([&promise] { + * promise.set_value(); + * }); + * promise.get_future().wait(); + */ + void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const NewMempoolTransactionInfo&, uint64_t mempool_sequence); @@ -221,6 +224,4 @@ public: void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&); }; -CMainSignals& GetMainSignals(); - #endif // BITCOIN_VALIDATIONINTERFACE_H diff --git a/src/wallet/test/util.cpp b/src/wallet/test/util.cpp index cbf3ccd1ec..49d206f409 100644 --- a/src/wallet/test/util.cpp +++ b/src/wallet/test/util.cpp @@ -71,7 +71,8 @@ std::shared_ptr<CWallet> TestLoadWallet(WalletContext& context) void TestUnloadWallet(std::shared_ptr<CWallet>&& wallet) { - SyncWithValidationInterfaceQueue(); + // Calls SyncWithValidationInterfaceQueue + wallet->chain().waitForNotificationsIfTipChanged({}); wallet->m_chain_notifications_handler.reset(); UnloadWallet(std::move(wallet)); } diff --git a/src/wallet/test/wallet_tests.cpp b/src/wallet/test/wallet_tests.cpp index 27a81b3669..3a67b9a433 100644 --- a/src/wallet/test/wallet_tests.cpp +++ b/src/wallet/test/wallet_tests.cpp @@ -814,7 +814,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) // transactionAddedToMempool notifications, and create block and mempool // transactions paying to the wallet std::promise<void> promise; - CallFunctionInValidationInterfaceQueue([&promise] { + m_node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] { promise.get_future().wait(); }); std::string error; @@ -842,7 +842,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) // Unblock notification queue and make sure stale blockConnected and // transactionAddedToMempool events are processed promise.set_value(); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // AddToWallet events for block_tx and mempool_tx events are counted a // second time as the notification queue is processed BOOST_CHECK_EQUAL(addtx_count, 5); @@ -865,7 +865,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]); mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey())); BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error)); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); }); wallet = TestLoadWallet(context); // Since mempool transactions are requested at the end of loading, there will @@ -905,7 +905,7 @@ BOOST_FIXTURE_TEST_CASE(RemoveTxs, TestChain100Setup) auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey())); CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); { auto block_hash = block_tx.GetHash(); |