aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAva Chow <github@achow101.com>2024-03-08 20:47:42 -0500
committerAva Chow <github@achow101.com>2024-03-08 20:58:04 -0500
commitc07935bcf56298caa3aceb9834db26f96e8421e0 (patch)
tree29c9f55dd6bb8858717669d3f69ae290754f5fdc
parent1cd2e29870e4ad3b7c57b1d567df0e6df56572b0 (diff)
parentd5228efb5391b31a9a0673019e43e7fa2cd4ac07 (diff)
downloadbitcoin-c07935bcf56298caa3aceb9834db26f96e8421e0.tar.xz
Merge bitcoin/bitcoin#28960: kernel: Remove dependency on CScheduler
d5228efb5391b31a9a0673019e43e7fa2cd4ac07 kernel: Remove dependency on CScheduler (TheCharlatan) 06069b3913dda048f5d640a662b0852f86346ace scripted-diff: Rename MainSignals to ValidationSignals (TheCharlatan) 0d6d2b650d1017691f48c9109a6cd020ab46aa73 scripted-diff: Rename SingleThreadedSchedulerClient to SerialTaskRunner (TheCharlatan) 4abde2c4e3fd9b66394b79874583bdc2a9132c36 [refactor] Make MainSignals RAII styled (TheCharlatan) 84f5c135b8118cbe15b8bfb4db80d61237987f64 refactor: De-globalize g_signals (TheCharlatan) 473dd4b97ae40e43e1a1a97fdbeb40be4855e9bc [refactor] Prepare for g_signals de-globalization (TheCharlatan) 3fba3d5deec6d7bae33823b8da7682f9b03d9deb [refactor] Make signals optional in mempool and chainman (TheCharlatan) Pull request description: By defining a virtual interface class for the scheduler client, users of the kernel can now define their own event consuming infrastructure, without having to spawn threads or rely on the scheduler design. Removing `CScheduler` also allows removing the thread and exception modules from the kernel library. To make the `CMainSignals` class easier to use from a kernel library perspective, remove its global instantiation and adopt RAII practices. Renames `CMainSignals` to `ValidationSignals`, which more accurately describes its purpose and scope. Also make the `ValidationSignals` in the `ChainstateManager` and CTxMemPool` optional. This could be useful in the future for using or testing these classes without having to instantiate any form of signal handling. --- This PR is part of the [libbitcoinkernel project](https://github.com/bitcoin/bitcoin/issues/27587). It improves the kernel API and removes two modules from the kernel library. ACKs for top commit: maflcko: re-ACK d5228efb5391b31a9a0673019e43e7fa2cd4ac07 🌄 ryanofsky: Code review ACK d5228efb5391b31a9a0673019e43e7fa2cd4ac07. Just comment change since last review. vasild: ACK d5228efb5391b31a9a0673019e43e7fa2cd4ac07 furszy: diff ACK d5228ef Tree-SHA512: e93a5f10eb6182effb84bb981859a7ce750e466efd8171045d8d9e7fe46e4065631d9f6f533c5967c4d34c9bb7d7a67e9f4593bd4c5b30cd7b3bbad7be7b331b
-rwxr-xr-xcontrib/devtools/test_deterministic_coverage.sh26
-rw-r--r--src/Makefile.am4
-rw-r--r--src/bench/wallet_balance.cpp3
-rw-r--r--src/bitcoin-chainstate.cpp23
-rw-r--r--src/index/base.cpp8
-rw-r--r--src/init.cpp47
-rw-r--r--src/kernel/chainstatemanager_opts.h2
-rw-r--r--src/kernel/mempool_options.h4
-rw-r--r--src/node/context.h4
-rw-r--r--src/node/interfaces.cpp14
-rw-r--r--src/node/transaction.cpp4
-rw-r--r--src/rpc/blockchain.cpp3
-rw-r--r--src/rpc/fees.cpp10
-rw-r--r--src/rpc/mining.cpp4
-rw-r--r--src/rpc/node.cpp2
-rw-r--r--src/scheduler.cpp14
-rw-r--r--src/scheduler.h15
-rw-r--r--src/test/coinstatsindex_tests.cpp2
-rw-r--r--src/test/fuzz/package_eval.cpp12
-rw-r--r--src/test/fuzz/process_message.cpp4
-rw-r--r--src/test/fuzz/process_messages.cpp4
-rw-r--r--src/test/fuzz/tx_pool.cpp10
-rw-r--r--src/test/peerman_tests.cpp4
-rw-r--r--src/test/policyestimator_tests.cpp20
-rw-r--r--src/test/scheduler_tests.cpp10
-rw-r--r--src/test/txindex_tests.cpp2
-rw-r--r--src/test/util/mining.cpp6
-rw-r--r--src/test/util/setup_common.cpp7
-rw-r--r--src/test/util/txmempool.cpp1
-rw-r--r--src/test/validation_block_tests.cpp8
-rw-r--r--src/test/validation_chainstatemanager_tests.cpp5
-rw-r--r--src/test/validationinterface_tests.cpp31
-rw-r--r--src/txmempool.cpp11
-rw-r--r--src/txmempool.h3
-rw-r--r--src/util/task_runner.h52
-rw-r--r--src/validation.cpp60
-rw-r--r--src/validationinterface.cpp93
-rw-r--r--src/validationinterface.h111
-rw-r--r--src/wallet/test/util.cpp3
-rw-r--r--src/wallet/test/wallet_tests.cpp8
40 files changed, 365 insertions, 289 deletions
diff --git a/contrib/devtools/test_deterministic_coverage.sh b/contrib/devtools/test_deterministic_coverage.sh
index 8501c72f04..23c260b529 100755
--- a/contrib/devtools/test_deterministic_coverage.sh
+++ b/contrib/devtools/test_deterministic_coverage.sh
@@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=(
"blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... }
"coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2))
"fs_tests/fsbridge_fstream" # deterministic test failure?
- "miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
+ "miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10)
"scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue()
"scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue()
- "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
- "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10)
+ "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10)
+ "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10)
)
TEST_BITCOIN_BINARY="src/test/test_bitcoin"
diff --git a/src/Makefile.am b/src/Makefile.am
index d797ac880b..1f55bfbafd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -323,6 +323,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
+ util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
@@ -966,7 +967,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
- scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
@@ -983,7 +983,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
- util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
@@ -994,7 +993,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
- util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
diff --git a/src/bench/wallet_balance.cpp b/src/bench/wallet_balance.cpp
index bf2195293e..7a10b167a6 100644
--- a/src/bench/wallet_balance.cpp
+++ b/src/bench/wallet_balance.cpp
@@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY));
generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY);
}
- SyncWithValidationInterfaceQueue();
+ // Calls SyncWithValidationInterfaceQueue
+ wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO);
auto bal = GetBalance(wallet); // Cache
diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp
index c1a71ed749..3eb64aa344 100644
--- a/src/bitcoin-chainstate.cpp
+++ b/src/bitcoin-chainstate.cpp
@@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
-#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
-#include <util/thread.h>
+#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>
@@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));
-
- // SETUP: Scheduling and Background Signals
- CScheduler scheduler{};
- // Start the lightweight task scheduler thread
- scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
-
- // Gather some entropy once per minute.
- scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
-
- GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
+ ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};
class KernelNotifications : public kernel::Notifications
{
@@ -118,6 +108,7 @@ int main(int argc, char* argv[])
.chainparams = *chainparams,
.datadir = abs_datadir,
.notifications = *notifications,
+ .signals = &validation_signals,
};
const node::BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
@@ -235,9 +226,9 @@ int main(int argc, char* argv[])
bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
- RegisterSharedValidationInterface(sc);
+ validation_signals.RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
- UnregisterSharedValidationInterface(sc);
+ validation_signals.UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
std::cerr << "duplicate" << std::endl;
break;
@@ -287,10 +278,9 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
- scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();
- GetMainSignals().FlushBackgroundCallbacks();
+ validation_signals.FlushBackgroundCallbacks();
{
LOCK(cs_main);
for (Chainstate* chainstate : chainman.GetAll()) {
@@ -300,5 +290,4 @@ epilogue:
}
}
}
- GetMainSignals().UnregisterBackgroundSignalScheduler();
}
diff --git a/src/index/base.cpp b/src/index/base.cpp
index bcfe7215be..2287437f8f 100644
--- a/src/index/base.cpp
+++ b/src/index/base.cpp
@@ -89,7 +89,7 @@ bool BaseIndex::Init()
return &m_chain->context()->chainman->GetChainstateForIndexing());
// Register to validation interface before setting the 'm_synced' flag, so that
// callbacks are not missed once m_synced is true.
- RegisterValidationInterface(this);
+ m_chain->context()->validation_signals->RegisterValidationInterface(this);
CBlockLocator locator;
if (!GetDB().ReadBestBlock(locator)) {
@@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const
}
LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
- SyncWithValidationInterfaceQueue();
+ m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
return true;
}
@@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync()
void BaseIndex::Stop()
{
- UnregisterValidationInterface(this);
+ if (m_chain->context()->validation_signals) {
+ m_chain->context()->validation_signals->UnregisterValidationInterface(this);
+ }
if (m_thread_sync.joinable()) {
m_thread_sync.join();
diff --git a/src/init.cpp b/src/init.cpp
index 988daefeec..9ea7b881cb 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -291,7 +291,7 @@ void Shutdown(NodeContext& node)
// Because these depend on each-other, we make sure that neither can be
// using the other before destroying them.
- if (node.peerman) UnregisterValidationInterface(node.peerman.get());
+ if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get());
if (node.connman) node.connman->Stop();
StopTorControl();
@@ -317,7 +317,9 @@ void Shutdown(NodeContext& node)
// fee estimator from validation interface.
if (node.fee_estimator) {
node.fee_estimator->Flush();
- UnregisterValidationInterface(node.fee_estimator.get());
+ if (node.validation_signals) {
+ node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get());
+ }
}
// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing
@@ -332,7 +334,7 @@ void Shutdown(NodeContext& node)
// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
- GetMainSignals().FlushBackgroundCallbacks();
+ if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();
// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
@@ -367,17 +369,19 @@ void Shutdown(NodeContext& node)
#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
- UnregisterValidationInterface(g_zmq_notification_interface.get());
+ if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif
node.chain_clients.clear();
- UnregisterAllValidationInterfaces();
- GetMainSignals().UnregisterBackgroundSignalScheduler();
+ if (node.validation_signals) {
+ node.validation_signals->UnregisterAllValidationInterfaces();
+ }
node.mempool.reset();
node.fee_estimator.reset();
node.chainman.reset();
+ node.validation_signals.reset();
node.scheduler.reset();
node.kernel.reset();
@@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
assert(!node.scheduler);
node.scheduler = std::make_unique<CScheduler>();
+ auto& scheduler = *node.scheduler;
// Start the lightweight task scheduler thread
- node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); });
+ scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
// Gather some entropy once per minute.
- node.scheduler->scheduleEvery([]{
+ scheduler.scheduleEvery([]{
RandAddPeriodic();
}, std::chrono::minutes{1});
// Check disk space every 5 minutes to avoid db corruption.
- node.scheduler->scheduleEvery([&args, &node]{
+ scheduler.scheduleEvery([&args, &node]{
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) {
LogPrintf("Shutting down due to lack of disk space!\n");
@@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}, std::chrono::minutes{5});
- GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler);
+ assert(!node.validation_signals);
+ node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
+ auto& validation_signals = *node.validation_signals;
// Create client interfaces for wallets that are supposed to be loaded
// according to -wallet and -disablewallet options. This only constructs
@@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// Flush estimates to disk periodically
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get();
- node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
- RegisterValidationInterface(fee_estimator);
+ scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
+ validation_signals.RegisterValidationInterface(fee_estimator);
}
// Check port numbers
@@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
});
if (g_zmq_notification_interface) {
- RegisterValidationInterface(g_zmq_notification_interface.get());
+ validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif
@@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
.chainparams = chainparams,
.datadir = args.GetDataDirNet(),
.notifications = *node.notifications,
+ .signals = &validation_signals,
};
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction
@@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
CTxMemPool::Options mempool_opts{
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0,
+ .signals = &validation_signals,
};
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)};
if (!result) {
@@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// Drain the validation interface queue to ensure that the old indexes
// don't have any pending work.
- SyncWithValidationInterfaceQueue();
+ Assert(node.validation_signals)->SyncWithValidationInterfaceQueue();
for (auto* index : node.indexes) {
index->Interrupt();
@@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.peerman = PeerManager::make(*node.connman, *node.addrman,
node.banman.get(), chainman,
*node.mempool, peerman_opts);
- RegisterValidationInterface(node.peerman.get());
+ validation_signals.RegisterValidationInterface(node.peerman.get());
// ********************************************************* Step 8: start indexers
@@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);
- if (!node.connman->Start(*node.scheduler, connOptions)) {
+ if (!node.connman->Start(scheduler, connOptions)) {
return false;
}
@@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
uiInterface.InitMessage(_("Done loading").translated);
for (const auto& client : node.chain_clients) {
- client->start(*node.scheduler);
+ client->start(scheduler);
}
BanMan* banman = node.banman.get();
- node.scheduler->scheduleEvery([banman]{
+ scheduler.scheduleEvery([banman]{
banman->DumpBanlist();
}, DUMP_BANS_INTERVAL);
- if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler);
+ if (node.peerman) node.peerman->StartScheduledTasks(scheduler);
#if HAVE_SYSTEM
StartupNotify(args);
diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h
index 864aac336e..de5f78494a 100644
--- a/src/kernel/chainstatemanager_opts.h
+++ b/src/kernel/chainstatemanager_opts.h
@@ -18,6 +18,7 @@
#include <optional>
class CChainParams;
+class ValidationSignals;
static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true};
static constexpr auto DEFAULT_MAX_TIP_AGE{24h};
@@ -44,6 +45,7 @@ struct ChainstateManagerOpts {
DBOptions coins_db{};
CoinsViewOptions coins_view{};
Notifications& notifications;
+ ValidationSignals* signals{nullptr};
//! Number of script check worker threads. Zero means no parallel verification.
int worker_threads_num{0};
};
diff --git a/src/kernel/mempool_options.h b/src/kernel/mempool_options.h
index 753aebd455..0850b2e60e 100644
--- a/src/kernel/mempool_options.h
+++ b/src/kernel/mempool_options.h
@@ -13,6 +13,8 @@
#include <cstdint>
#include <optional>
+class ValidationSignals;
+
/** Default for -maxmempool, maximum megabytes of mempool memory usage */
static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300};
/** Default for -maxmempool when blocksonly is set */
@@ -56,6 +58,8 @@ struct MemPoolOptions {
bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF};
bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT};
MemPoolLimits limits{};
+
+ ValidationSignals* signals{nullptr};
};
} // namespace kernel
diff --git a/src/node/context.h b/src/node/context.h
index 4f3b640b2d..245f230aec 100644
--- a/src/node/context.h
+++ b/src/node/context.h
@@ -20,6 +20,7 @@ class BanMan;
class BaseIndex;
class CBlockPolicyEstimator;
class CConnman;
+class ValidationSignals;
class CScheduler;
class CTxMemPool;
class ChainstateManager;
@@ -70,7 +71,10 @@ struct NodeContext {
interfaces::WalletLoader* wallet_loader{nullptr};
std::unique_ptr<CScheduler> scheduler;
std::function<void()> rpc_interruption_point = [] {};
+ //! Issues blocking calls about sync status, errors and warnings
std::unique_ptr<KernelNotifications> notifications;
+ //! Issues calls about blocks and transactions
+ std::unique_ptr<ValidationSignals> validation_signals;
std::atomic<int> exit_status{EXIT_SUCCESS};
//! Declare default constructor and destructor that are not inline, so code
diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp
index 5a8b7fc105..f9a372e3de 100644
--- a/src/node/interfaces.cpp
+++ b/src/node/interfaces.cpp
@@ -460,19 +460,20 @@ public:
class NotificationsHandlerImpl : public Handler
{
public:
- explicit NotificationsHandlerImpl(std::shared_ptr<Chain::Notifications> notifications)
- : m_proxy(std::make_shared<NotificationsProxy>(std::move(notifications)))
+ explicit NotificationsHandlerImpl(ValidationSignals& signals, std::shared_ptr<Chain::Notifications> notifications)
+ : m_signals{signals}, m_proxy{std::make_shared<NotificationsProxy>(std::move(notifications))}
{
- RegisterSharedValidationInterface(m_proxy);
+ m_signals.RegisterSharedValidationInterface(m_proxy);
}
~NotificationsHandlerImpl() override { disconnect(); }
void disconnect() override
{
if (m_proxy) {
- UnregisterSharedValidationInterface(m_proxy);
+ m_signals.UnregisterSharedValidationInterface(m_proxy);
m_proxy.reset();
}
}
+ ValidationSignals& m_signals;
std::shared_ptr<NotificationsProxy> m_proxy;
};
@@ -761,12 +762,12 @@ public:
}
std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) override
{
- return std::make_unique<NotificationsHandlerImpl>(std::move(notifications));
+ return std::make_unique<NotificationsHandlerImpl>(validation_signals(), std::move(notifications));
}
void waitForNotificationsIfTipChanged(const uint256& old_tip) override
{
if (!old_tip.IsNull() && old_tip == WITH_LOCK(::cs_main, return chainman().ActiveChain().Tip()->GetBlockHash())) return;
- SyncWithValidationInterfaceQueue();
+ validation_signals().SyncWithValidationInterfaceQueue();
}
std::unique_ptr<Handler> handleRpc(const CRPCCommand& command) override
{
@@ -822,6 +823,7 @@ public:
NodeContext* context() override { return &m_node; }
ArgsManager& args() { return *Assert(m_node.args); }
ChainstateManager& chainman() { return *Assert(m_node.chainman); }
+ ValidationSignals& validation_signals() { return *Assert(m_node.validation_signals); }
NodeContext& m_node;
};
} // namespace
diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp
index ef9a30a076..b66a4f2f39 100644
--- a/src/node/transaction.cpp
+++ b/src/node/transaction.cpp
@@ -92,7 +92,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
node.mempool->AddUnbroadcastTx(txid);
}
- if (wait_callback) {
+ if (wait_callback && node.validation_signals) {
// For transactions broadcast from outside the wallet, make sure
// that the wallet has been notified of the transaction before
// continuing.
@@ -101,7 +101,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
// with a transaction to/from their wallet, immediately call some
// wallet RPC, and get a stale result because callbacks have not
// yet been processed.
- CallFunctionInValidationInterfaceQueue([&promise] {
+ node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
callback_set = true;
diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp
index 50908e9f96..dfdddeacea 100644
--- a/src/rpc/blockchain.cpp
+++ b/src/rpc/blockchain.cpp
@@ -395,7 +395,8 @@ static RPCHelpMan syncwithvalidationinterfacequeue()
},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
- SyncWithValidationInterfaceQueue();
+ NodeContext& node = EnsureAnyNodeContext(request.context);
+ CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
return UniValue::VNULL;
},
};
diff --git a/src/rpc/fees.cpp b/src/rpc/fees.cpp
index 57ba486ed9..f3345b4f1c 100644
--- a/src/rpc/fees.cpp
+++ b/src/rpc/fees.cpp
@@ -4,6 +4,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <core_io.h>
+#include <node/context.h>
#include <policy/feerate.h>
#include <policy/fees.h>
#include <rpc/protocol.h>
@@ -21,10 +22,6 @@
#include <cmath>
#include <string>
-namespace node {
-struct NodeContext;
-}
-
using node::NodeContext;
static RPCHelpMan estimatesmartfee()
@@ -68,7 +65,7 @@ static RPCHelpMan estimatesmartfee()
const NodeContext& node = EnsureAnyNodeContext(request.context);
const CTxMemPool& mempool = EnsureMemPool(node);
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(mempool.m_signals)->SyncWithValidationInterfaceQueue();
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
bool conservative = true;
@@ -156,8 +153,9 @@ static RPCHelpMan estimaterawfee()
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
CBlockPolicyEstimator& fee_estimator = EnsureAnyFeeEstimator(request.context);
+ const NodeContext& node = EnsureAnyNodeContext(request.context);
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
double threshold = 0.95;
diff --git a/src/rpc/mining.cpp b/src/rpc/mining.cpp
index ff39a31a43..f7cdbf52dd 100644
--- a/src/rpc/mining.cpp
+++ b/src/rpc/mining.cpp
@@ -1042,9 +1042,9 @@ static RPCHelpMan submitblock()
bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
- RegisterSharedValidationInterface(sc);
+ CHECK_NONFATAL(chainman.m_options.signals)->RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
- UnregisterSharedValidationInterface(sc);
+ CHECK_NONFATAL(chainman.m_options.signals)->UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
return "duplicate";
}
diff --git a/src/rpc/node.cpp b/src/rpc/node.cpp
index 45053f882d..b8c0080aef 100644
--- a/src/rpc/node.cpp
+++ b/src/rpc/node.cpp
@@ -94,7 +94,7 @@ static RPCHelpMan mockscheduler()
const NodeContext& node_context{EnsureAnyNodeContext(request.context)};
CHECK_NONFATAL(node_context.scheduler)->MockForward(std::chrono::seconds{delta_seconds});
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(node_context.validation_signals)->SyncWithValidationInterfaceQueue();
for (const auto& chain_client : node_context.chain_clients) {
chain_client->schedulerMockForward(std::chrono::seconds(delta_seconds));
}
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 6c6f644142..a1158e0c07 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const
}
-void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
+void SerialTaskRunner::MaybeScheduleProcessQueue()
{
{
LOCK(m_callbacks_mutex);
@@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
}
-void SingleThreadedSchedulerClient::ProcessQueue()
+void SerialTaskRunner::ProcessQueue()
{
std::function<void()> callback;
{
@@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue()
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
- SingleThreadedSchedulerClient* instance;
- explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
+ SerialTaskRunner* instance;
+ explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
~RAIICallbacksRunning()
{
{
@@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue()
callback();
}
-void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
+void SerialTaskRunner::insert(std::function<void()> func)
{
{
LOCK(m_callbacks_mutex);
@@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func
MaybeScheduleProcessQueue();
}
-void SingleThreadedSchedulerClient::EmptyQueue()
+void SerialTaskRunner::flush()
{
assert(!m_scheduler.AreThreadsServicingQueue());
bool should_continue = true;
@@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue()
}
}
-size_t SingleThreadedSchedulerClient::CallbacksPending()
+size_t SerialTaskRunner::size()
{
LOCK(m_callbacks_mutex);
return m_callbacks_pending.size();
diff --git a/src/scheduler.h b/src/scheduler.h
index 9212582b97..454c083b31 100644
--- a/src/scheduler.h
+++ b/src/scheduler.h
@@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
+#include <util/task_runner.h>
#include <chrono>
#include <condition_variable>
@@ -120,12 +121,16 @@ private:
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
-class SingleThreadedSchedulerClient
+class SerialTaskRunner : public util::TaskRunnerInterface
{
private:
CScheduler& m_scheduler;
Mutex m_callbacks_mutex;
+
+ // We are not allowed to assume the scheduler only runs in one thread,
+ // but must ensure all callbacks happen in-order, so we end up creating
+ // our own queue here :(
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
@@ -133,7 +138,7 @@ private:
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
public:
- explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
+ explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
/**
* Add a callback to be executed. Callbacks are executed serially
@@ -141,15 +146,15 @@ public:
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
- void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
- void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
- size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};
#endif // BITCOIN_SCHEDULER_H
diff --git a/src/test/coinstatsindex_tests.cpp b/src/test/coinstatsindex_tests.cpp
index cc1ec49d41..08814c1499 100644
--- a/src/test/coinstatsindex_tests.cpp
+++ b/src/test/coinstatsindex_tests.cpp
@@ -70,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(coinstatsindex_initial_sync, TestChain100Setup)
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
// TSAN always sees the test thread waiting for the notification thread, and
// avoid potential false positive reports.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// Shutdown sequence (c.f. Shutdown() in init.cpp)
coin_stats_index.Stop();
diff --git a/src/test/fuzz/package_eval.cpp b/src/test/fuzz/package_eval.cpp
index 9e658e0ced..a48ce37bce 100644
--- a/src/test/fuzz/package_eval.cpp
+++ b/src/test/fuzz/package_eval.cpp
@@ -47,7 +47,7 @@ void initialize_tx_pool()
g_outpoints_coinbase_init_mature.push_back(prevout);
}
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
struct OutpointsUpdater final : public CValidationInterface {
@@ -147,7 +147,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
}
auto outpoints_updater = std::make_shared<OutpointsUpdater>(mempool_outpoints);
- RegisterSharedValidationInterface(outpoints_updater);
+ node.validation_signals->RegisterSharedValidationInterface(outpoints_updater);
CTxMemPool tx_pool_{MakeMempool(fuzzed_data_provider, node)};
MockedTxPool& tx_pool = *static_cast<MockedTxPool*>(&tx_pool_);
@@ -269,7 +269,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
// Remember all added transactions
std::set<CTransactionRef> added;
auto txr = std::make_shared<TransactionsDelta>(added);
- RegisterSharedValidationInterface(txr);
+ node.validation_signals->RegisterSharedValidationInterface(txr);
// When there are multiple transactions in the package, we call ProcessNewPackage(txs, test_accept=false)
// and AcceptToMemoryPool(txs.back(), test_accept=true). When there is only 1 transaction, we might flip it
@@ -285,8 +285,8 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
/*bypass_limits=*/false, /*test_accept=*/!single_submit));
const bool passed = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
- SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(txr);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterSharedValidationInterface(txr);
// There is only 1 transaction in the package. We did a test-package-accept and a ATMP
if (single_submit) {
@@ -310,7 +310,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
CheckMempoolV3Invariants(tx_pool);
}
- UnregisterSharedValidationInterface(outpoints_updater);
+ node.validation_signals->UnregisterSharedValidationInterface(outpoints_updater);
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
}
diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp
index 56b391ed5c..a467fd5382 100644
--- a/src/test/fuzz/process_message.cpp
+++ b/src/test/fuzz/process_message.cpp
@@ -47,7 +47,7 @@ void initialize_process_message()
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
FUZZ_TARGET(process_message, .init = initialize_process_message)
@@ -89,6 +89,6 @@ FUZZ_TARGET(process_message, .init = initialize_process_message)
}
g_setup->m_node.peerman->SendMessages(&p2p_node);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
g_setup->m_node.connman->StopNodes();
}
diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp
index 6b264907b5..38acd432fa 100644
--- a/src/test/fuzz/process_messages.cpp
+++ b/src/test/fuzz/process_messages.cpp
@@ -37,7 +37,7 @@ void initialize_process_messages()
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
FUZZ_TARGET(process_messages, .init = initialize_process_messages)
@@ -89,6 +89,6 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
g_setup->m_node.peerman->SendMessages(&random_node);
}
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
g_setup->m_node.connman->StopNodes();
}
diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp
index fcf230642a..b6ba612a84 100644
--- a/src/test/fuzz/tx_pool.cpp
+++ b/src/test/fuzz/tx_pool.cpp
@@ -50,7 +50,7 @@ void initialize_tx_pool()
g_outpoints_coinbase_init_immature;
outpoints.push_back(prevout);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
struct TransactionsDelta final : public CValidationInterface {
@@ -105,7 +105,7 @@ void Finish(FuzzedDataProvider& fuzzed_data_provider, MockedTxPool& tx_pool, Cha
assert(tx_pool.size() < info_all.size());
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
void MockTime(FuzzedDataProvider& fuzzed_data_provider, const Chainstate& chainstate)
@@ -285,7 +285,7 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
std::set<CTransactionRef> removed;
std::set<CTransactionRef> added;
auto txr = std::make_shared<TransactionsDelta>(removed, added);
- RegisterSharedValidationInterface(txr);
+ node.validation_signals->RegisterSharedValidationInterface(txr);
const bool bypass_limits = fuzzed_data_provider.ConsumeBool();
// Make sure ProcessNewPackage on one transaction works.
@@ -303,8 +303,8 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
const auto res = WITH_LOCK(::cs_main, return AcceptToMemoryPool(chainstate, tx, GetTime(), bypass_limits, /*test_accept=*/false));
const bool accepted = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
- SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(txr);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterSharedValidationInterface(txr);
bool txid_in_mempool = tx_pool.exists(GenTxid::Txid(tx->GetHash()));
bool wtxid_in_mempool = tx_pool.exists(GenTxid::Wtxid(tx->GetWitnessHash()));
diff --git a/src/test/peerman_tests.cpp b/src/test/peerman_tests.cpp
index 2c79329385..28866695bc 100644
--- a/src/test/peerman_tests.cpp
+++ b/src/test/peerman_tests.cpp
@@ -25,7 +25,7 @@ static void mineBlock(const node::NodeContext& node, std::chrono::seconds block_
block.fChecked = true; // little speedup
SetMockTime(curr_time); // process block at current time
Assert(node.chainman->ProcessNewBlock(std::make_shared<const CBlock>(block), /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
- SyncWithValidationInterfaceQueue(); // drain events queue
+ node.validation_signals->SyncWithValidationInterfaceQueue(); // drain events queue
}
// Verifying when network-limited peer connections are desirable based on the node's proximity to the tip
@@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE(connections_desirable_service_flags)
// By now, we tested that the connections desirable services flags change based on the node's time proximity to the tip.
// Now, perform the same tests for when the node receives a block.
- RegisterValidationInterface(peerman.get());
+ m_node.validation_signals->RegisterValidationInterface(peerman.get());
// First, verify a block in the past doesn't enable limited peers connections
// At this point, our time is (NODE_NETWORK_LIMITED_ALLOW_CONN_BLOCKS + 1) * 10 minutes ahead the tip's time.
diff --git a/src/test/policyestimator_tests.cpp b/src/test/policyestimator_tests.cpp
index ede73c6895..6cadc3290a 100644
--- a/src/test/policyestimator_tests.cpp
+++ b/src/test/policyestimator_tests.cpp
@@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
{
CBlockPolicyEstimator& feeEst = *Assert(m_node.fee_estimator);
CTxMemPool& mpool = *Assert(m_node.mempool);
- RegisterValidationInterface(&feeEst);
+ m_node.validation_signals->RegisterValidationInterface(&feeEst);
TestMemPoolEntryHelper entry;
CAmount basefee(2000);
CAmount deltaFee(100);
@@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
txHashes[j].push_back(hash);
@@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
// Check after just a few txs that combining buckets works as expected
if (blocknum == 3) {
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// At this point we should need to combine 3 buckets to get enough data points
// So estimateFee(1) should fail and estimateFee(2) should return somewhere around
// 9*baserate. estimateFee(2) %'s are 100,100,90 = average 97%
@@ -113,7 +113,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
std::vector<CAmount> origFeeEst;
// Highest feerate is 10*baseRate and gets in all blocks,
@@ -146,7 +146,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 10;i++) {
@@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
txHashes[j].push_back(hash);
@@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
for (int i = 1; i < 10;i++) {
BOOST_CHECK(feeEst.estimateFee(i) == CFeeRate(0) || feeEst.estimateFee(i).GetFeePerK() > origFeeEst[i-1] - deltaFee);
@@ -212,7 +212,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
block.clear();
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 10;i++) {
@@ -239,7 +239,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
CTransactionRef ptx = mpool.get(hash);
@@ -257,7 +257,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
block.clear();
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 9; i++) { // At 9, the original estimate was already at the bottom (b/c scale = 2)
BOOST_CHECK(feeEst.estimateFee(i).GetFeePerK() < origFeeEst[i-1] - deltaFee);
diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp
index 3fb3378598..95e725de46 100644
--- a/src/test/scheduler_tests.cpp
+++ b/src/test/scheduler_tests.cpp
@@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
CScheduler scheduler;
// each queue should be well ordered with respect to itself but not other queues
- SingleThreadedSchedulerClient queue1(scheduler);
- SingleThreadedSchedulerClient queue2(scheduler);
+ SerialTaskRunner queue1(scheduler);
+ SerialTaskRunner queue2(scheduler);
// create more threads than queues
// if the queues only permit execution of one task at once then
@@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
threads.emplace_back([&] { scheduler.serviceQueue(); });
}
- // these are not atomic, if SinglethreadedSchedulerClient prevents
+ // these are not atomic, if SerialTaskRunner prevents
// parallel execution at the queue level no synchronization should be required here
int counter1 = 0;
int counter2 = 0;
@@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
// just simply count up on each queue - if execution is properly ordered then
// the callbacks should run in exactly the order in which they were enqueued
for (int i = 0; i < 100; ++i) {
- queue1.AddToProcessQueue([i, &counter1]() {
+ queue1.insert([i, &counter1]() {
bool expectation = i == counter1++;
assert(expectation);
});
- queue2.AddToProcessQueue([i, &counter2]() {
+ queue2.insert([i, &counter2]() {
bool expectation = i == counter2++;
assert(expectation);
});
diff --git a/src/test/txindex_tests.cpp b/src/test/txindex_tests.cpp
index e2432a4718..5a32b02ad9 100644
--- a/src/test/txindex_tests.cpp
+++ b/src/test/txindex_tests.cpp
@@ -71,7 +71,7 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
// TSAN always sees the test thread waiting for the notification thread, and
// avoid potential false positive reports.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// shutdown sequence (c.f. Shutdown() in init.cpp)
txindex.Stop();
diff --git a/src/test/util/mining.cpp b/src/test/util/mining.cpp
index 08d1b4c902..ad7a38d3fe 100644
--- a/src/test/util/mining.cpp
+++ b/src/test/util/mining.cpp
@@ -95,12 +95,12 @@ COutPoint MineBlock(const NodeContext& node, std::shared_ptr<CBlock>& block)
const auto old_height = WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight());
bool new_block;
BlockValidationStateCatcher bvsc{block->GetHash()};
- RegisterValidationInterface(&bvsc);
+ node.validation_signals->RegisterValidationInterface(&bvsc);
const bool processed{chainman.ProcessNewBlock(block, true, true, &new_block)};
const bool duplicate{!new_block && processed};
assert(!duplicate);
- UnregisterValidationInterface(&bvsc);
- SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterValidationInterface(&bvsc);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
const bool was_valid{bvsc.m_state && bvsc.m_state->IsValid()};
assert(old_height + was_valid == WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight()));
diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp
index 1f8dbac5be..d26a440074 100644
--- a/src/test/util/setup_common.cpp
+++ b/src/test/util/setup_common.cpp
@@ -175,7 +175,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
// from blocking due to queue overrun.
m_node.scheduler = std::make_unique<CScheduler>();
m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); });
- GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler);
+ m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler));
m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES);
m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node));
@@ -189,6 +189,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
.datadir = m_args.GetDataDirNet(),
.check_block_index = true,
.notifications = *m_node.notifications,
+ .signals = m_node.validation_signals.get(),
.worker_threads_num = 2,
};
const BlockManager::Options blockman_opts{
@@ -206,8 +207,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
ChainTestingSetup::~ChainTestingSetup()
{
if (m_node.scheduler) m_node.scheduler->stop();
- GetMainSignals().FlushBackgroundCallbacks();
- GetMainSignals().UnregisterBackgroundSignalScheduler();
+ m_node.validation_signals->FlushBackgroundCallbacks();
m_node.connman.reset();
m_node.banman.reset();
m_node.addrman.reset();
@@ -216,6 +216,7 @@ ChainTestingSetup::~ChainTestingSetup()
m_node.mempool.reset();
m_node.fee_estimator.reset();
m_node.chainman.reset();
+ m_node.validation_signals.reset();
m_node.scheduler.reset();
}
diff --git a/src/test/util/txmempool.cpp b/src/test/util/txmempool.cpp
index 3b4161ddd3..71cf8aca60 100644
--- a/src/test/util/txmempool.cpp
+++ b/src/test/util/txmempool.cpp
@@ -22,6 +22,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node)
// Default to always checking mempool regardless of
// chainparams.DefaultConsistencyChecks for tests
.check_ratio = 1,
+ .signals = node.validation_signals.get(),
};
const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)};
Assert(result);
diff --git a/src/test/validation_block_tests.cpp b/src/test/validation_block_tests.cpp
index 35e5c6a037..316ab86c2b 100644
--- a/src/test/validation_block_tests.cpp
+++ b/src/test/validation_block_tests.cpp
@@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
bool ignored;
// Connect the genesis block and drain any outstanding events
BOOST_CHECK(Assert(m_node.chainman)->ProcessNewBlock(std::make_shared<CBlock>(Params().GenesisBlock()), true, true, &ignored));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// subscribe to events (this subscriber will validate event ordering)
const CBlockIndex* initial_tip = nullptr;
@@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
initial_tip = m_node.chainman->ActiveChain().Tip();
}
auto sub = std::make_shared<TestSubscriber>(initial_tip->GetBlockHash());
- RegisterSharedValidationInterface(sub);
+ m_node.validation_signals->RegisterSharedValidationInterface(sub);
// create a bunch of threads that repeatedly process a block generated above at random
// this will create parallelism and randomness inside validation - the ValidationInterface
@@ -196,9 +196,9 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
for (auto& t : threads) {
t.join();
}
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(sub);
+ m_node.validation_signals->UnregisterSharedValidationInterface(sub);
LOCK(cs_main);
BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash());
diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp
index a33e71d50e..4bbab1cdcd 100644
--- a/src/test/validation_chainstatemanager_tests.cpp
+++ b/src/test/validation_chainstatemanager_tests.cpp
@@ -102,7 +102,7 @@ BOOST_FIXTURE_TEST_CASE(chainstatemanager, TestChain100Setup)
BOOST_CHECK_EQUAL(active_tip2, c2.m_chain.Tip());
// Let scheduler events finish running to avoid accessing memory that is going to be unloaded
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
//! Test rebalancing the caches associated with each chainstate.
@@ -374,7 +374,7 @@ struct SnapshotTestSetup : TestChain100Setup {
cs->ForceFlushStateToDisk();
}
// Process all callbacks referring to the old manager before wiping it.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
LOCK(::cs_main);
chainman.ResetChainstates();
BOOST_CHECK_EQUAL(chainman.GetAll().size(), 0);
@@ -383,6 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup {
.chainparams = ::Params(),
.datadir = chainman.m_options.datadir,
.notifications = *m_node.notifications,
+ .signals = m_node.validation_signals.get(),
};
const BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
diff --git a/src/test/validationinterface_tests.cpp b/src/test/validationinterface_tests.cpp
index 5979441057..a46cfc3029 100644
--- a/src/test/validationinterface_tests.cpp
+++ b/src/test/validationinterface_tests.cpp
@@ -28,7 +28,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
const CBlock block_dummy;
BlockValidationState state_dummy;
while (generate) {
- GetMainSignals().BlockChecked(block_dummy, state_dummy);
+ m_node.validation_signals->BlockChecked(block_dummy, state_dummy);
}
}};
@@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
// keep going for about 1 sec, which is 250k iterations
for (int i = 0; i < 250000; i++) {
auto sub = std::make_shared<TestSubscriberNoop>();
- RegisterSharedValidationInterface(sub);
- UnregisterSharedValidationInterface(sub);
+ m_node.validation_signals->RegisterSharedValidationInterface(sub);
+ m_node.validation_signals->UnregisterSharedValidationInterface(sub);
}
// tell the other thread we are done
generate = false;
@@ -52,8 +52,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
class TestInterface : public CValidationInterface
{
public:
- TestInterface(std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
- : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy))
+ TestInterface(ValidationSignals& signals, std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
+ : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals}
{
}
virtual ~TestInterface()
@@ -64,14 +64,15 @@ public:
{
if (m_on_call) m_on_call();
}
- static void Call()
+ void Call()
{
CBlock block;
BlockValidationState state;
- GetMainSignals().BlockChecked(block, state);
+ m_signals.BlockChecked(block, state);
}
std::function<void()> m_on_call;
std::function<void()> m_on_destroy;
+ ValidationSignals& m_signals;
};
// Regression test to ensure UnregisterAllValidationInterfaces calls don't
@@ -80,17 +81,23 @@ public:
BOOST_AUTO_TEST_CASE(unregister_all_during_call)
{
bool destroyed = false;
- RegisterSharedValidationInterface(std::make_shared<TestInterface>(
+ auto shared{std::make_shared<TestInterface>(
+ *m_node.validation_signals,
[&] {
// First call should decrements reference count 2 -> 1
- UnregisterAllValidationInterfaces();
+ m_node.validation_signals->UnregisterAllValidationInterfaces();
BOOST_CHECK(!destroyed);
// Second call should not decrement reference count 1 -> 0
- UnregisterAllValidationInterfaces();
+ m_node.validation_signals->UnregisterAllValidationInterfaces();
BOOST_CHECK(!destroyed);
},
- [&] { destroyed = true; }));
- TestInterface::Call();
+ [&] { destroyed = true; })};
+ m_node.validation_signals->RegisterSharedValidationInterface(shared);
+ BOOST_CHECK(shared.use_count() == 2);
+ shared->Call();
+ BOOST_CHECK(shared.use_count() == 1);
+ BOOST_CHECK(!destroyed);
+ shared.reset();
BOOST_CHECK(destroyed);
}
diff --git a/src/txmempool.cpp b/src/txmempool.cpp
index de340f6b6d..0bee27c2b2 100644
--- a/src/txmempool.cpp
+++ b/src/txmempool.cpp
@@ -406,7 +406,8 @@ CTxMemPool::CTxMemPool(const Options& opts)
m_require_standard{opts.require_standard},
m_full_rbf{opts.full_rbf},
m_persist_v1_dat{opts.persist_v1_dat},
- m_limits{opts.limits}
+ m_limits{opts.limits},
+ m_signals{opts.signals}
{
}
@@ -487,12 +488,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
- if (reason != MemPoolRemovalReason::BLOCK) {
+ if (reason != MemPoolRemovalReason::BLOCK && m_signals) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
- GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
+ m_signals->TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
TRACE5(mempool, removed,
it->GetTx().GetHash().data(),
@@ -643,7 +644,9 @@ void CTxMemPool::removeForBlock(const std::vector<CTransactionRef>& vtx, unsigne
removeConflicts(*tx);
ClearPrioritisation(tx->GetHash());
}
- GetMainSignals().MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
+ if (m_signals) {
+ m_signals->MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
+ }
lastRollingFeeUpdate = GetTime();
blockSinceLastRollingFeeBump = true;
}
diff --git a/src/txmempool.h b/src/txmempool.h
index b98355c65f..32f2c024f7 100644
--- a/src/txmempool.h
+++ b/src/txmempool.h
@@ -40,6 +40,7 @@
#include <vector>
class CChain;
+class ValidationSignals;
/** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */
static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF;
@@ -447,6 +448,8 @@ public:
const Limits m_limits;
+ ValidationSignals* const m_signals;
+
/** Create a new CTxMemPool.
* Sanity checks will be off by default for performance, because otherwise
* accepting transactions becomes O(N^2) where N is the number of transactions
diff --git a/src/util/task_runner.h b/src/util/task_runner.h
new file mode 100644
index 0000000000..d3cd8007de
--- /dev/null
+++ b/src/util/task_runner.h
@@ -0,0 +1,52 @@
+// Copyright (c) 2024-present The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_UTIL_TASK_RUNNER_H
+#define BITCOIN_UTIL_TASK_RUNNER_H
+
+#include <cstddef>
+#include <functional>
+
+namespace util {
+
+/** @file
+ * This header provides an interface and simple implementation for a task
+ * runner. Another threaded, serial implementation using a queue is available in
+ * the scheduler module's SerialTaskRunner.
+ */
+
+class TaskRunnerInterface
+{
+public:
+ virtual ~TaskRunnerInterface() {}
+
+ /**
+ * The callback can either be queued for later/asynchronous/threaded
+ * processing, or be executed immediately for synchronous processing.
+ */
+
+ virtual void insert(std::function<void()> func) = 0;
+
+ /**
+ * Forces the processing of all pending events.
+ */
+ virtual void flush() = 0;
+
+ /**
+ * Returns the number of currently pending events.
+ */
+ virtual size_t size() = 0;
+};
+
+class ImmediateTaskRunner : public TaskRunnerInterface
+{
+public:
+ void insert(std::function<void()> func) override { func(); }
+ void flush() override {}
+ size_t size() override { return 0; }
+};
+
+} // namespace util
+
+#endif // BITCOIN_UTIL_TASK_RUNNER_H
diff --git a/src/validation.cpp b/src/validation.cpp
index f8e1de55e9..3a652adb1d 100644
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -1228,13 +1228,14 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
results.emplace(ws.m_ptx->GetWitnessHash(),
MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize,
ws.m_base_fees, effective_feerate, effective_feerate_wtxids));
+ if (!m_pool.m_signals) continue;
const CTransaction& tx = *ws.m_ptx;
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(),
args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx));
- GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
}
return all_submitted;
}
@@ -1242,7 +1243,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args)
{
AssertLockHeld(cs_main);
- LOCK(m_pool.cs); // mempool "read lock" (held through GetMainSignals().TransactionAddedToMempool())
+ LOCK(m_pool.cs); // mempool "read lock" (held through m_pool.m_signals->TransactionAddedToMempool())
Workspace ws(ptx);
const std::vector<Wtxid> single_wtxid{ws.m_ptx->GetWitnessHash()};
@@ -1277,13 +1278,15 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
return MempoolAcceptResult::FeeFailure(ws.m_state, CFeeRate(ws.m_modified_fees, ws.m_vsize), {ws.m_ptx->GetWitnessHash()});
}
- const CTransaction& tx = *ws.m_ptx;
- const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
- ws.m_vsize, ws.m_entry->GetHeight(),
- args.m_bypass_limits, args.m_package_submission,
- IsCurrentForFeeEstimation(m_active_chainstate),
- m_pool.HasNoInputsOf(tx));
- GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ if (m_pool.m_signals) {
+ const CTransaction& tx = *ws.m_ptx;
+ const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
+ ws.m_vsize, ws.m_entry->GetHeight(),
+ args.m_bypass_limits, args.m_package_submission,
+ IsCurrentForFeeEstimation(m_active_chainstate),
+ m_pool.HasNoInputsOf(tx));
+ m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ }
return MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees,
effective_feerate, single_wtxid);
@@ -2695,9 +2698,9 @@ bool Chainstate::FlushStateToDisk(
(bool)fFlushForPrune);
}
}
- if (full_flush_completed) {
+ if (full_flush_completed && m_chainman.m_options.signals) {
// Update best block in wallet (so we can detect restored wallets).
- GetMainSignals().ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
+ m_chainman.m_options.signals->ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
}
} catch (const std::runtime_error& e) {
return FatalError(m_chainman.GetNotifications(), state, std::string("System error while flushing: ") + e.what());
@@ -2864,7 +2867,9 @@ bool Chainstate::DisconnectTip(BlockValidationState& state, DisconnectedBlockTra
UpdateTip(pindexDelete->pprev);
// Let wallets know transactions went from 1-confirmed to
// 0-confirmed or conflicted:
- GetMainSignals().BlockDisconnected(pblock, pindexDelete);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockDisconnected(pblock, pindexDelete);
+ }
return true;
}
@@ -2949,7 +2954,9 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew,
{
CCoinsViewCache view(&CoinsTip());
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
- GetMainSignals().BlockChecked(blockConnecting, state);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockChecked(blockConnecting, state);
+ }
if (!rv) {
if (state.IsInvalid())
InvalidBlockFound(pindexNew, state);
@@ -3210,11 +3217,11 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main)
return fNotify;
}
-static void LimitValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main) {
+static void LimitValidationInterfaceQueue(ValidationSignals& signals) LOCKS_EXCLUDED(cs_main) {
AssertLockNotHeld(cs_main);
- if (GetMainSignals().CallbacksPending() > 10) {
- SyncWithValidationInterfaceQueue();
+ if (signals.CallbacksPending() > 10) {
+ signals.SyncWithValidationInterfaceQueue();
}
}
@@ -3252,7 +3259,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Note that if a validationinterface callback ends up calling
// ActivateBestChain this may lead to a deadlock! We should
// probably have a DEBUG_LOCKORDER test for this in the future.
- LimitValidationInterfaceQueue();
+ if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
{
LOCK(cs_main);
@@ -3291,7 +3298,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
- GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
+ }
}
// This will have been toggled in
@@ -3317,7 +3326,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected
if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) {
// Notify ValidationInterface subscribers
- GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
+ }
// Always notify the UI if a new block tip was connected
if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) {
@@ -3451,7 +3462,7 @@ bool Chainstate::InvalidateBlock(BlockValidationState& state, CBlockIndex* pinde
if (m_chainman.m_interrupt) break;
// Make sure the queue of validation callbacks doesn't grow unboundedly.
- LimitValidationInterfaceQueue();
+ if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
LOCK(cs_main);
// Lock for as long as disconnectpool is in scope to make sure MaybeUpdateMempoolForReorg is
@@ -4237,8 +4248,9 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr<const CBlock>& pblock,
// Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW
// (but if it does not build on our best tip, let the SendMessages loop relay it)
- if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev)
- GetMainSignals().NewPoWValidBlock(pindex, pblock);
+ if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev && m_options.signals) {
+ m_options.signals->NewPoWValidBlock(pindex, pblock);
+ }
// Write block to history file
if (fNewBlock) *fNewBlock = true;
@@ -4291,7 +4303,9 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr<const CBlock>& blo
ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked);
}
if (!ret) {
- GetMainSignals().BlockChecked(*block, state);
+ if (m_options.signals) {
+ m_options.signals->BlockChecked(*block, state);
+ }
return error("%s: AcceptBlock FAILED (%s)", __func__, state.ToString());
}
}
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index 5e944a7c47..813fde109c 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -5,7 +5,6 @@
#include <validationinterface.h>
-#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
@@ -13,7 +12,8 @@
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
-#include <scheduler.h>
+#include <util/check.h>
+#include <util/task_runner.h>
#include <future>
#include <unordered_map>
@@ -22,14 +22,14 @@
std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept;
/**
- * MainSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
+ * ValidationSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
*
* A std::unordered_map is used to track what callbacks are currently
* registered, and a std::list is used to store the callbacks that are
* currently registered as well as any callbacks that are just unregistered
* and about to be deleted when they are done executing.
*/
-class MainSignalsImpl
+class ValidationSignalsImpl
{
private:
Mutex m_mutex;
@@ -42,12 +42,10 @@ private:
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);
public:
- // We are not allowed to assume the scheduler only runs in one thread,
- // but must ensure all callbacks happen in-order, so we end up creating
- // our own queue here :(
- SingleThreadedSchedulerClient m_schedulerClient;
+ std::unique_ptr<util::TaskRunnerInterface> m_task_runner;
- explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {}
+ explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner)
+ : m_task_runner{std::move(Assert(task_runner))} {}
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
@@ -94,77 +92,56 @@ public:
}
};
-static CMainSignals g_signals;
+ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner)
+ : m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {}
-void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler)
-{
- assert(!m_internals);
- m_internals = std::make_unique<MainSignalsImpl>(scheduler);
-}
-
-void CMainSignals::UnregisterBackgroundSignalScheduler()
-{
- m_internals.reset(nullptr);
-}
+ValidationSignals::~ValidationSignals() {}
-void CMainSignals::FlushBackgroundCallbacks()
+void ValidationSignals::FlushBackgroundCallbacks()
{
- if (m_internals) {
- m_internals->m_schedulerClient.EmptyQueue();
- }
+ m_internals->m_task_runner->flush();
}
-size_t CMainSignals::CallbacksPending()
+size_t ValidationSignals::CallbacksPending()
{
- if (!m_internals) return 0;
- return m_internals->m_schedulerClient.CallbacksPending();
+ return m_internals->m_task_runner->size();
}
-CMainSignals& GetMainSignals()
-{
- return g_signals;
-}
-
-void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
+void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
{
// Each connection captures the shared_ptr to ensure that each callback is
// executed before the subscriber is destroyed. For more details see #18338.
- g_signals.m_internals->Register(std::move(callbacks));
+ m_internals->Register(std::move(callbacks));
}
-void RegisterValidationInterface(CValidationInterface* callbacks)
+void ValidationSignals::RegisterValidationInterface(CValidationInterface* callbacks)
{
// Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle
// is managed by the caller.
RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}});
}
-void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
+void ValidationSignals::UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
{
UnregisterValidationInterface(callbacks.get());
}
-void UnregisterValidationInterface(CValidationInterface* callbacks)
+void ValidationSignals::UnregisterValidationInterface(CValidationInterface* callbacks)
{
- if (g_signals.m_internals) {
- g_signals.m_internals->Unregister(callbacks);
- }
+ m_internals->Unregister(callbacks);
}
-void UnregisterAllValidationInterfaces()
+void ValidationSignals::UnregisterAllValidationInterfaces()
{
- if (!g_signals.m_internals) {
- return;
- }
- g_signals.m_internals->Clear();
+ m_internals->Clear();
}
-void CallFunctionInValidationInterfaceQueue(std::function<void()> func)
+void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
- g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
+ m_internals->m_task_runner->insert(std::move(func));
}
-void SyncWithValidationInterfaceQueue()
+void ValidationSignals::SyncWithValidationInterfaceQueue()
{
AssertLockNotHeld(cs_main);
// Block until the validation queue drains
@@ -183,7 +160,7 @@ void SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
- m_internals->m_schedulerClient.AddToProcessQueue([=] { \
+ m_internals->m_task_runner->insert([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \
@@ -192,7 +169,7 @@ void SyncWithValidationInterfaceQueue()
#define LOG_EVENT(fmt, ...) \
LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__)
-void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
+void ValidationSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
// Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which
// the chain actually updates. One way to ensure this is for the caller to invoke this signal
// in the same critical section where the chain is updated
@@ -206,7 +183,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}
-void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
+void ValidationSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
{
auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
@@ -216,7 +193,7 @@ void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx
tx.info.m_tx->GetWitnessHash().ToString());
}
-void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
+void ValidationSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
@@ -226,7 +203,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemP
RemovalReasonToString(reason));
}
-void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
+void ValidationSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
auto event = [role, pblock, pindex, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(role, pblock, pindex); });
};
@@ -235,7 +212,7 @@ void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<con
pindex->nHeight);
}
-void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
+void ValidationSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
{
auto event = [txs_removed_for_block, nBlockHeight, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); });
@@ -245,7 +222,7 @@ void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedM
txs_removed_for_block.size());
}
-void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
+void ValidationSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
{
auto event = [pblock, pindex, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); });
@@ -255,7 +232,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock
pindex->nHeight);
}
-void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
+void ValidationSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
auto event = [role, locator, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(role, locator); });
};
@@ -263,13 +240,13 @@ void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &l
locator.IsNull() ? "null" : locator.vHave.front().ToString());
}
-void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
+void ValidationSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
LOG_EVENT("%s: block hash=%s state=%s", __func__,
block.GetHash().ToString(), state.ToString());
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); });
}
-void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
+void ValidationSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString());
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); });
}
diff --git a/src/validationinterface.h b/src/validationinterface.h
index d9292ae2c9..6f49a73c93 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -6,61 +6,29 @@
#ifndef BITCOIN_VALIDATIONINTERFACE_H
#define BITCOIN_VALIDATIONINTERFACE_H
-#include <kernel/cs_main.h>
#include <kernel/chain.h>
+#include <kernel/cs_main.h>
#include <primitives/transaction.h> // CTransaction(Ref)
#include <sync.h>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <memory>
+#include <vector>
+
+namespace util {
+class TaskRunnerInterface;
+} // namespace util
class BlockValidationState;
class CBlock;
class CBlockIndex;
struct CBlockLocator;
-class CValidationInterface;
-class CScheduler;
enum class MemPoolRemovalReason;
struct RemovedMempoolTransactionInfo;
struct NewMempoolTransactionInfo;
-/** Register subscriber */
-void RegisterValidationInterface(CValidationInterface* callbacks);
-/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
-void UnregisterValidationInterface(CValidationInterface* callbacks);
-/** Unregister all subscribers */
-void UnregisterAllValidationInterfaces();
-
-// Alternate registration functions that release a shared_ptr after the last
-// notification is sent. These are useful for race-free cleanup, since
-// unregistration is nonblocking and can return before the last notification is
-// processed.
-/** Register subscriber */
-void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
-/** Unregister subscriber */
-void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
-
-/**
- * Pushes a function to callback onto the notification queue, guaranteeing any
- * callbacks generated prior to now are finished when the function is called.
- *
- * Be very careful blocking on func to be called if any locks are held -
- * validation interface clients may not be able to make progress as they often
- * wait for things like cs_main, so blocking until func is called with cs_main
- * will result in a deadlock (that DEBUG_LOCKORDER will miss).
- */
-void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
-/**
- * This is a synonym for the following, which asserts certain locks are not
- * held:
- * std::promise<void> promise;
- * CallFunctionInValidationInterfaceQueue([&promise] {
- * promise.set_value();
- * });
- * promise.get_future().wait();
- */
-void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
-
/**
* Implement this to subscribe to events generated in validation and mempool
*
@@ -185,30 +153,65 @@ protected:
* has been received and connected to the headers tree, though not validated yet.
*/
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
- friend class CMainSignals;
+ friend class ValidationSignals;
friend class ValidationInterfaceTest;
};
-class MainSignalsImpl;
-class CMainSignals {
+class ValidationSignalsImpl;
+class ValidationSignals {
private:
- std::unique_ptr<MainSignalsImpl> m_internals;
-
- friend void ::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface>);
- friend void ::UnregisterValidationInterface(CValidationInterface*);
- friend void ::UnregisterAllValidationInterfaces();
- friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
+ std::unique_ptr<ValidationSignalsImpl> m_internals;
public:
- /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
- void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
- /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
- void UnregisterBackgroundSignalScheduler();
+ // The task runner will block validation if it calls its insert method's
+ // func argument synchronously. In this class func contains a loop that
+ // dispatches a single validation event to all subscribers sequentially.
+ explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner);
+
+ ~ValidationSignals();
+
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
size_t CallbacksPending();
+ /** Register subscriber */
+ void RegisterValidationInterface(CValidationInterface* callbacks);
+ /** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
+ void UnregisterValidationInterface(CValidationInterface* callbacks);
+ /** Unregister all subscribers */
+ void UnregisterAllValidationInterfaces();
+
+ // Alternate registration functions that release a shared_ptr after the last
+ // notification is sent. These are useful for race-free cleanup, since
+ // unregistration is nonblocking and can return before the last notification is
+ // processed.
+ /** Register subscriber */
+ void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
+ /** Unregister subscriber */
+ void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
+
+ /**
+ * Pushes a function to callback onto the notification queue, guaranteeing any
+ * callbacks generated prior to now are finished when the function is called.
+ *
+ * Be very careful blocking on func to be called if any locks are held -
+ * validation interface clients may not be able to make progress as they often
+ * wait for things like cs_main, so blocking until func is called with cs_main
+ * will result in a deadlock (that DEBUG_LOCKORDER will miss).
+ */
+ void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
+
+ /**
+ * This is a synonym for the following, which asserts certain locks are not
+ * held:
+ * std::promise<void> promise;
+ * CallFunctionInValidationInterfaceQueue([&promise] {
+ * promise.set_value();
+ * });
+ * promise.get_future().wait();
+ */
+ void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const NewMempoolTransactionInfo&, uint64_t mempool_sequence);
@@ -221,6 +224,4 @@ public:
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
};
-CMainSignals& GetMainSignals();
-
#endif // BITCOIN_VALIDATIONINTERFACE_H
diff --git a/src/wallet/test/util.cpp b/src/wallet/test/util.cpp
index cbf3ccd1ec..49d206f409 100644
--- a/src/wallet/test/util.cpp
+++ b/src/wallet/test/util.cpp
@@ -71,7 +71,8 @@ std::shared_ptr<CWallet> TestLoadWallet(WalletContext& context)
void TestUnloadWallet(std::shared_ptr<CWallet>&& wallet)
{
- SyncWithValidationInterfaceQueue();
+ // Calls SyncWithValidationInterfaceQueue
+ wallet->chain().waitForNotificationsIfTipChanged({});
wallet->m_chain_notifications_handler.reset();
UnloadWallet(std::move(wallet));
}
diff --git a/src/wallet/test/wallet_tests.cpp b/src/wallet/test/wallet_tests.cpp
index 27a81b3669..3a67b9a433 100644
--- a/src/wallet/test/wallet_tests.cpp
+++ b/src/wallet/test/wallet_tests.cpp
@@ -814,7 +814,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
// transactionAddedToMempool notifications, and create block and mempool
// transactions paying to the wallet
std::promise<void> promise;
- CallFunctionInValidationInterfaceQueue([&promise] {
+ m_node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
promise.get_future().wait();
});
std::string error;
@@ -842,7 +842,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
// Unblock notification queue and make sure stale blockConnected and
// transactionAddedToMempool events are processed
promise.set_value();
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// AddToWallet events for block_tx and mempool_tx events are counted a
// second time as the notification queue is processed
BOOST_CHECK_EQUAL(addtx_count, 5);
@@ -865,7 +865,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]);
mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
});
wallet = TestLoadWallet(context);
// Since mempool transactions are requested at the end of loading, there will
@@ -905,7 +905,7 @@ BOOST_FIXTURE_TEST_CASE(RemoveTxs, TestChain100Setup)
auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey()));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
{
auto block_hash = block_tx.GetHash();