aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/bench/wallet_balance.cpp3
-rw-r--r--src/bitcoin-chainstate.cpp23
-rw-r--r--src/index/base.cpp8
-rw-r--r--src/init.cpp47
-rw-r--r--src/kernel/chainstatemanager_opts.h2
-rw-r--r--src/kernel/mempool_options.h4
-rw-r--r--src/node/context.h4
-rw-r--r--src/node/interfaces.cpp14
-rw-r--r--src/node/transaction.cpp4
-rw-r--r--src/rpc/blockchain.cpp3
-rw-r--r--src/rpc/fees.cpp10
-rw-r--r--src/rpc/mining.cpp4
-rw-r--r--src/rpc/node.cpp2
-rw-r--r--src/scheduler.cpp14
-rw-r--r--src/scheduler.h15
-rw-r--r--src/test/coinstatsindex_tests.cpp2
-rw-r--r--src/test/fuzz/package_eval.cpp12
-rw-r--r--src/test/fuzz/process_message.cpp4
-rw-r--r--src/test/fuzz/process_messages.cpp4
-rw-r--r--src/test/fuzz/tx_pool.cpp10
-rw-r--r--src/test/peerman_tests.cpp4
-rw-r--r--src/test/policyestimator_tests.cpp20
-rw-r--r--src/test/scheduler_tests.cpp10
-rw-r--r--src/test/txindex_tests.cpp2
-rw-r--r--src/test/util/mining.cpp6
-rw-r--r--src/test/util/setup_common.cpp7
-rw-r--r--src/test/util/txmempool.cpp1
-rw-r--r--src/test/validation_block_tests.cpp8
-rw-r--r--src/test/validation_chainstatemanager_tests.cpp5
-rw-r--r--src/test/validationinterface_tests.cpp31
-rw-r--r--src/txmempool.cpp11
-rw-r--r--src/txmempool.h3
-rw-r--r--src/util/task_runner.h52
-rw-r--r--src/validation.cpp60
-rw-r--r--src/validationinterface.cpp93
-rw-r--r--src/validationinterface.h111
-rw-r--r--src/wallet/test/util.cpp3
-rw-r--r--src/wallet/test/wallet_tests.cpp8
39 files changed, 352 insertions, 276 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index d797ac880b..1f55bfbafd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -323,6 +323,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
+ util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
@@ -966,7 +967,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
- scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
@@ -983,7 +983,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
- util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
@@ -994,7 +993,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
- util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \
diff --git a/src/bench/wallet_balance.cpp b/src/bench/wallet_balance.cpp
index bf2195293e..7a10b167a6 100644
--- a/src/bench/wallet_balance.cpp
+++ b/src/bench/wallet_balance.cpp
@@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b
generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY));
generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY);
}
- SyncWithValidationInterfaceQueue();
+ // Calls SyncWithValidationInterfaceQueue
+ wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO);
auto bal = GetBalance(wallet); // Cache
diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp
index c1a71ed749..3eb64aa344 100644
--- a/src/bitcoin-chainstate.cpp
+++ b/src/bitcoin-chainstate.cpp
@@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
-#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
-#include <util/thread.h>
+#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>
@@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));
-
- // SETUP: Scheduling and Background Signals
- CScheduler scheduler{};
- // Start the lightweight task scheduler thread
- scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
-
- // Gather some entropy once per minute.
- scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
-
- GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);
+ ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};
class KernelNotifications : public kernel::Notifications
{
@@ -118,6 +108,7 @@ int main(int argc, char* argv[])
.chainparams = *chainparams,
.datadir = abs_datadir,
.notifications = *notifications,
+ .signals = &validation_signals,
};
const node::BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
@@ -235,9 +226,9 @@ int main(int argc, char* argv[])
bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
- RegisterSharedValidationInterface(sc);
+ validation_signals.RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
- UnregisterSharedValidationInterface(sc);
+ validation_signals.UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
std::cerr << "duplicate" << std::endl;
break;
@@ -287,10 +278,9 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
- scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();
- GetMainSignals().FlushBackgroundCallbacks();
+ validation_signals.FlushBackgroundCallbacks();
{
LOCK(cs_main);
for (Chainstate* chainstate : chainman.GetAll()) {
@@ -300,5 +290,4 @@ epilogue:
}
}
}
- GetMainSignals().UnregisterBackgroundSignalScheduler();
}
diff --git a/src/index/base.cpp b/src/index/base.cpp
index bcfe7215be..2287437f8f 100644
--- a/src/index/base.cpp
+++ b/src/index/base.cpp
@@ -89,7 +89,7 @@ bool BaseIndex::Init()
return &m_chain->context()->chainman->GetChainstateForIndexing());
// Register to validation interface before setting the 'm_synced' flag, so that
// callbacks are not missed once m_synced is true.
- RegisterValidationInterface(this);
+ m_chain->context()->validation_signals->RegisterValidationInterface(this);
CBlockLocator locator;
if (!GetDB().ReadBestBlock(locator)) {
@@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const
}
LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
- SyncWithValidationInterfaceQueue();
+ m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue();
return true;
}
@@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync()
void BaseIndex::Stop()
{
- UnregisterValidationInterface(this);
+ if (m_chain->context()->validation_signals) {
+ m_chain->context()->validation_signals->UnregisterValidationInterface(this);
+ }
if (m_thread_sync.joinable()) {
m_thread_sync.join();
diff --git a/src/init.cpp b/src/init.cpp
index 988daefeec..9ea7b881cb 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -291,7 +291,7 @@ void Shutdown(NodeContext& node)
// Because these depend on each-other, we make sure that neither can be
// using the other before destroying them.
- if (node.peerman) UnregisterValidationInterface(node.peerman.get());
+ if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get());
if (node.connman) node.connman->Stop();
StopTorControl();
@@ -317,7 +317,9 @@ void Shutdown(NodeContext& node)
// fee estimator from validation interface.
if (node.fee_estimator) {
node.fee_estimator->Flush();
- UnregisterValidationInterface(node.fee_estimator.get());
+ if (node.validation_signals) {
+ node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get());
+ }
}
// FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing
@@ -332,7 +334,7 @@ void Shutdown(NodeContext& node)
// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
- GetMainSignals().FlushBackgroundCallbacks();
+ if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks();
// Stop and delete all indexes only after flushing background callbacks.
if (g_txindex) {
@@ -367,17 +369,19 @@ void Shutdown(NodeContext& node)
#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
- UnregisterValidationInterface(g_zmq_notification_interface.get());
+ if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif
node.chain_clients.clear();
- UnregisterAllValidationInterfaces();
- GetMainSignals().UnregisterBackgroundSignalScheduler();
+ if (node.validation_signals) {
+ node.validation_signals->UnregisterAllValidationInterfaces();
+ }
node.mempool.reset();
node.fee_estimator.reset();
node.chainman.reset();
+ node.validation_signals.reset();
node.scheduler.reset();
node.kernel.reset();
@@ -1138,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
assert(!node.scheduler);
node.scheduler = std::make_unique<CScheduler>();
+ auto& scheduler = *node.scheduler;
// Start the lightweight task scheduler thread
- node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); });
+ scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
// Gather some entropy once per minute.
- node.scheduler->scheduleEvery([]{
+ scheduler.scheduleEvery([]{
RandAddPeriodic();
}, std::chrono::minutes{1});
// Check disk space every 5 minutes to avoid db corruption.
- node.scheduler->scheduleEvery([&args, &node]{
+ scheduler.scheduleEvery([&args, &node]{
constexpr uint64_t min_disk_space = 50 << 20; // 50 MB
if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) {
LogPrintf("Shutting down due to lack of disk space!\n");
@@ -1158,7 +1163,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}
}, std::chrono::minutes{5});
- GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler);
+ assert(!node.validation_signals);
+ node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
+ auto& validation_signals = *node.validation_signals;
// Create client interfaces for wallets that are supposed to be loaded
// according to -wallet and -disablewallet options. This only constructs
@@ -1263,8 +1270,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// Flush estimates to disk periodically
CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get();
- node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
- RegisterValidationInterface(fee_estimator);
+ scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL);
+ validation_signals.RegisterValidationInterface(fee_estimator);
}
// Check port numbers
@@ -1435,7 +1442,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
});
if (g_zmq_notification_interface) {
- RegisterValidationInterface(g_zmq_notification_interface.get());
+ validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif
@@ -1449,6 +1456,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
.chainparams = chainparams,
.datadir = args.GetDataDirNet(),
.notifications = *node.notifications,
+ .signals = &validation_signals,
};
Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction
@@ -1478,6 +1486,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
CTxMemPool::Options mempool_opts{
.check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0,
+ .signals = &validation_signals,
};
auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)};
if (!result) {
@@ -1505,7 +1514,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// Drain the validation interface queue to ensure that the old indexes
// don't have any pending work.
- SyncWithValidationInterfaceQueue();
+ Assert(node.validation_signals)->SyncWithValidationInterfaceQueue();
for (auto* index : node.indexes) {
index->Interrupt();
@@ -1594,7 +1603,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
node.peerman = PeerManager::make(*node.connman, *node.addrman,
node.banman.get(), chainman,
*node.mempool, peerman_opts);
- RegisterValidationInterface(node.peerman.get());
+ validation_signals.RegisterValidationInterface(node.peerman.get());
// ********************************************************* Step 8: start indexers
@@ -1900,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING);
- if (!node.connman->Start(*node.scheduler, connOptions)) {
+ if (!node.connman->Start(scheduler, connOptions)) {
return false;
}
@@ -1920,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
uiInterface.InitMessage(_("Done loading").translated);
for (const auto& client : node.chain_clients) {
- client->start(*node.scheduler);
+ client->start(scheduler);
}
BanMan* banman = node.banman.get();
- node.scheduler->scheduleEvery([banman]{
+ scheduler.scheduleEvery([banman]{
banman->DumpBanlist();
}, DUMP_BANS_INTERVAL);
- if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler);
+ if (node.peerman) node.peerman->StartScheduledTasks(scheduler);
#if HAVE_SYSTEM
StartupNotify(args);
diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h
index 864aac336e..de5f78494a 100644
--- a/src/kernel/chainstatemanager_opts.h
+++ b/src/kernel/chainstatemanager_opts.h
@@ -18,6 +18,7 @@
#include <optional>
class CChainParams;
+class ValidationSignals;
static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true};
static constexpr auto DEFAULT_MAX_TIP_AGE{24h};
@@ -44,6 +45,7 @@ struct ChainstateManagerOpts {
DBOptions coins_db{};
CoinsViewOptions coins_view{};
Notifications& notifications;
+ ValidationSignals* signals{nullptr};
//! Number of script check worker threads. Zero means no parallel verification.
int worker_threads_num{0};
};
diff --git a/src/kernel/mempool_options.h b/src/kernel/mempool_options.h
index 753aebd455..0850b2e60e 100644
--- a/src/kernel/mempool_options.h
+++ b/src/kernel/mempool_options.h
@@ -13,6 +13,8 @@
#include <cstdint>
#include <optional>
+class ValidationSignals;
+
/** Default for -maxmempool, maximum megabytes of mempool memory usage */
static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300};
/** Default for -maxmempool when blocksonly is set */
@@ -56,6 +58,8 @@ struct MemPoolOptions {
bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF};
bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT};
MemPoolLimits limits{};
+
+ ValidationSignals* signals{nullptr};
};
} // namespace kernel
diff --git a/src/node/context.h b/src/node/context.h
index 4f3b640b2d..245f230aec 100644
--- a/src/node/context.h
+++ b/src/node/context.h
@@ -20,6 +20,7 @@ class BanMan;
class BaseIndex;
class CBlockPolicyEstimator;
class CConnman;
+class ValidationSignals;
class CScheduler;
class CTxMemPool;
class ChainstateManager;
@@ -70,7 +71,10 @@ struct NodeContext {
interfaces::WalletLoader* wallet_loader{nullptr};
std::unique_ptr<CScheduler> scheduler;
std::function<void()> rpc_interruption_point = [] {};
+ //! Issues blocking calls about sync status, errors and warnings
std::unique_ptr<KernelNotifications> notifications;
+ //! Issues calls about blocks and transactions
+ std::unique_ptr<ValidationSignals> validation_signals;
std::atomic<int> exit_status{EXIT_SUCCESS};
//! Declare default constructor and destructor that are not inline, so code
diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp
index 5a8b7fc105..f9a372e3de 100644
--- a/src/node/interfaces.cpp
+++ b/src/node/interfaces.cpp
@@ -460,19 +460,20 @@ public:
class NotificationsHandlerImpl : public Handler
{
public:
- explicit NotificationsHandlerImpl(std::shared_ptr<Chain::Notifications> notifications)
- : m_proxy(std::make_shared<NotificationsProxy>(std::move(notifications)))
+ explicit NotificationsHandlerImpl(ValidationSignals& signals, std::shared_ptr<Chain::Notifications> notifications)
+ : m_signals{signals}, m_proxy{std::make_shared<NotificationsProxy>(std::move(notifications))}
{
- RegisterSharedValidationInterface(m_proxy);
+ m_signals.RegisterSharedValidationInterface(m_proxy);
}
~NotificationsHandlerImpl() override { disconnect(); }
void disconnect() override
{
if (m_proxy) {
- UnregisterSharedValidationInterface(m_proxy);
+ m_signals.UnregisterSharedValidationInterface(m_proxy);
m_proxy.reset();
}
}
+ ValidationSignals& m_signals;
std::shared_ptr<NotificationsProxy> m_proxy;
};
@@ -761,12 +762,12 @@ public:
}
std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) override
{
- return std::make_unique<NotificationsHandlerImpl>(std::move(notifications));
+ return std::make_unique<NotificationsHandlerImpl>(validation_signals(), std::move(notifications));
}
void waitForNotificationsIfTipChanged(const uint256& old_tip) override
{
if (!old_tip.IsNull() && old_tip == WITH_LOCK(::cs_main, return chainman().ActiveChain().Tip()->GetBlockHash())) return;
- SyncWithValidationInterfaceQueue();
+ validation_signals().SyncWithValidationInterfaceQueue();
}
std::unique_ptr<Handler> handleRpc(const CRPCCommand& command) override
{
@@ -822,6 +823,7 @@ public:
NodeContext* context() override { return &m_node; }
ArgsManager& args() { return *Assert(m_node.args); }
ChainstateManager& chainman() { return *Assert(m_node.chainman); }
+ ValidationSignals& validation_signals() { return *Assert(m_node.validation_signals); }
NodeContext& m_node;
};
} // namespace
diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp
index ef9a30a076..b66a4f2f39 100644
--- a/src/node/transaction.cpp
+++ b/src/node/transaction.cpp
@@ -92,7 +92,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
node.mempool->AddUnbroadcastTx(txid);
}
- if (wait_callback) {
+ if (wait_callback && node.validation_signals) {
// For transactions broadcast from outside the wallet, make sure
// that the wallet has been notified of the transaction before
// continuing.
@@ -101,7 +101,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
// with a transaction to/from their wallet, immediately call some
// wallet RPC, and get a stale result because callbacks have not
// yet been processed.
- CallFunctionInValidationInterfaceQueue([&promise] {
+ node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
promise.set_value();
});
callback_set = true;
diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp
index 50908e9f96..dfdddeacea 100644
--- a/src/rpc/blockchain.cpp
+++ b/src/rpc/blockchain.cpp
@@ -395,7 +395,8 @@ static RPCHelpMan syncwithvalidationinterfacequeue()
},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
- SyncWithValidationInterfaceQueue();
+ NodeContext& node = EnsureAnyNodeContext(request.context);
+ CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
return UniValue::VNULL;
},
};
diff --git a/src/rpc/fees.cpp b/src/rpc/fees.cpp
index 57ba486ed9..f3345b4f1c 100644
--- a/src/rpc/fees.cpp
+++ b/src/rpc/fees.cpp
@@ -4,6 +4,7 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <core_io.h>
+#include <node/context.h>
#include <policy/feerate.h>
#include <policy/fees.h>
#include <rpc/protocol.h>
@@ -21,10 +22,6 @@
#include <cmath>
#include <string>
-namespace node {
-struct NodeContext;
-}
-
using node::NodeContext;
static RPCHelpMan estimatesmartfee()
@@ -68,7 +65,7 @@ static RPCHelpMan estimatesmartfee()
const NodeContext& node = EnsureAnyNodeContext(request.context);
const CTxMemPool& mempool = EnsureMemPool(node);
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(mempool.m_signals)->SyncWithValidationInterfaceQueue();
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
bool conservative = true;
@@ -156,8 +153,9 @@ static RPCHelpMan estimaterawfee()
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue
{
CBlockPolicyEstimator& fee_estimator = EnsureAnyFeeEstimator(request.context);
+ const NodeContext& node = EnsureAnyNodeContext(request.context);
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue();
unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE);
unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target);
double threshold = 0.95;
diff --git a/src/rpc/mining.cpp b/src/rpc/mining.cpp
index ff39a31a43..f7cdbf52dd 100644
--- a/src/rpc/mining.cpp
+++ b/src/rpc/mining.cpp
@@ -1042,9 +1042,9 @@ static RPCHelpMan submitblock()
bool new_block;
auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
- RegisterSharedValidationInterface(sc);
+ CHECK_NONFATAL(chainman.m_options.signals)->RegisterSharedValidationInterface(sc);
bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block);
- UnregisterSharedValidationInterface(sc);
+ CHECK_NONFATAL(chainman.m_options.signals)->UnregisterSharedValidationInterface(sc);
if (!new_block && accepted) {
return "duplicate";
}
diff --git a/src/rpc/node.cpp b/src/rpc/node.cpp
index 45053f882d..b8c0080aef 100644
--- a/src/rpc/node.cpp
+++ b/src/rpc/node.cpp
@@ -94,7 +94,7 @@ static RPCHelpMan mockscheduler()
const NodeContext& node_context{EnsureAnyNodeContext(request.context)};
CHECK_NONFATAL(node_context.scheduler)->MockForward(std::chrono::seconds{delta_seconds});
- SyncWithValidationInterfaceQueue();
+ CHECK_NONFATAL(node_context.validation_signals)->SyncWithValidationInterfaceQueue();
for (const auto& chain_client : node_context.chain_clients) {
chain_client->schedulerMockForward(std::chrono::seconds(delta_seconds));
}
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 6c6f644142..a1158e0c07 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const
}
-void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
+void SerialTaskRunner::MaybeScheduleProcessQueue()
{
{
LOCK(m_callbacks_mutex);
@@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
}
-void SingleThreadedSchedulerClient::ProcessQueue()
+void SerialTaskRunner::ProcessQueue()
{
std::function<void()> callback;
{
@@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue()
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
- SingleThreadedSchedulerClient* instance;
- explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
+ SerialTaskRunner* instance;
+ explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
~RAIICallbacksRunning()
{
{
@@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue()
callback();
}
-void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
+void SerialTaskRunner::insert(std::function<void()> func)
{
{
LOCK(m_callbacks_mutex);
@@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func
MaybeScheduleProcessQueue();
}
-void SingleThreadedSchedulerClient::EmptyQueue()
+void SerialTaskRunner::flush()
{
assert(!m_scheduler.AreThreadsServicingQueue());
bool should_continue = true;
@@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue()
}
}
-size_t SingleThreadedSchedulerClient::CallbacksPending()
+size_t SerialTaskRunner::size()
{
LOCK(m_callbacks_mutex);
return m_callbacks_pending.size();
diff --git a/src/scheduler.h b/src/scheduler.h
index 9212582b97..454c083b31 100644
--- a/src/scheduler.h
+++ b/src/scheduler.h
@@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
+#include <util/task_runner.h>
#include <chrono>
#include <condition_variable>
@@ -120,12 +121,16 @@ private:
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
-class SingleThreadedSchedulerClient
+class SerialTaskRunner : public util::TaskRunnerInterface
{
private:
CScheduler& m_scheduler;
Mutex m_callbacks_mutex;
+
+ // We are not allowed to assume the scheduler only runs in one thread,
+ // but must ensure all callbacks happen in-order, so we end up creating
+ // our own queue here :(
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
@@ -133,7 +138,7 @@ private:
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
public:
- explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
+ explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
/**
* Add a callback to be executed. Callbacks are executed serially
@@ -141,15 +146,15 @@ public:
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
- void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
- void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
- size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
+ size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};
#endif // BITCOIN_SCHEDULER_H
diff --git a/src/test/coinstatsindex_tests.cpp b/src/test/coinstatsindex_tests.cpp
index cc1ec49d41..08814c1499 100644
--- a/src/test/coinstatsindex_tests.cpp
+++ b/src/test/coinstatsindex_tests.cpp
@@ -70,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(coinstatsindex_initial_sync, TestChain100Setup)
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
// TSAN always sees the test thread waiting for the notification thread, and
// avoid potential false positive reports.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// Shutdown sequence (c.f. Shutdown() in init.cpp)
coin_stats_index.Stop();
diff --git a/src/test/fuzz/package_eval.cpp b/src/test/fuzz/package_eval.cpp
index 9e658e0ced..a48ce37bce 100644
--- a/src/test/fuzz/package_eval.cpp
+++ b/src/test/fuzz/package_eval.cpp
@@ -47,7 +47,7 @@ void initialize_tx_pool()
g_outpoints_coinbase_init_mature.push_back(prevout);
}
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
struct OutpointsUpdater final : public CValidationInterface {
@@ -147,7 +147,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
}
auto outpoints_updater = std::make_shared<OutpointsUpdater>(mempool_outpoints);
- RegisterSharedValidationInterface(outpoints_updater);
+ node.validation_signals->RegisterSharedValidationInterface(outpoints_updater);
CTxMemPool tx_pool_{MakeMempool(fuzzed_data_provider, node)};
MockedTxPool& tx_pool = *static_cast<MockedTxPool*>(&tx_pool_);
@@ -269,7 +269,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
// Remember all added transactions
std::set<CTransactionRef> added;
auto txr = std::make_shared<TransactionsDelta>(added);
- RegisterSharedValidationInterface(txr);
+ node.validation_signals->RegisterSharedValidationInterface(txr);
// When there are multiple transactions in the package, we call ProcessNewPackage(txs, test_accept=false)
// and AcceptToMemoryPool(txs.back(), test_accept=true). When there is only 1 transaction, we might flip it
@@ -285,8 +285,8 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
/*bypass_limits=*/false, /*test_accept=*/!single_submit));
const bool passed = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
- SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(txr);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterSharedValidationInterface(txr);
// There is only 1 transaction in the package. We did a test-package-accept and a ATMP
if (single_submit) {
@@ -310,7 +310,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool)
CheckMempoolV3Invariants(tx_pool);
}
- UnregisterSharedValidationInterface(outpoints_updater);
+ node.validation_signals->UnregisterSharedValidationInterface(outpoints_updater);
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
}
diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp
index 56b391ed5c..a467fd5382 100644
--- a/src/test/fuzz/process_message.cpp
+++ b/src/test/fuzz/process_message.cpp
@@ -47,7 +47,7 @@ void initialize_process_message()
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
FUZZ_TARGET(process_message, .init = initialize_process_message)
@@ -89,6 +89,6 @@ FUZZ_TARGET(process_message, .init = initialize_process_message)
}
g_setup->m_node.peerman->SendMessages(&p2p_node);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
g_setup->m_node.connman->StopNodes();
}
diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp
index 6b264907b5..38acd432fa 100644
--- a/src/test/fuzz/process_messages.cpp
+++ b/src/test/fuzz/process_messages.cpp
@@ -37,7 +37,7 @@ void initialize_process_messages()
for (int i = 0; i < 2 * COINBASE_MATURITY; i++) {
MineBlock(g_setup->m_node, CScript() << OP_TRUE);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
FUZZ_TARGET(process_messages, .init = initialize_process_messages)
@@ -89,6 +89,6 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
g_setup->m_node.peerman->SendMessages(&random_node);
}
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
g_setup->m_node.connman->StopNodes();
}
diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp
index fcf230642a..b6ba612a84 100644
--- a/src/test/fuzz/tx_pool.cpp
+++ b/src/test/fuzz/tx_pool.cpp
@@ -50,7 +50,7 @@ void initialize_tx_pool()
g_outpoints_coinbase_init_immature;
outpoints.push_back(prevout);
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
struct TransactionsDelta final : public CValidationInterface {
@@ -105,7 +105,7 @@ void Finish(FuzzedDataProvider& fuzzed_data_provider, MockedTxPool& tx_pool, Cha
assert(tx_pool.size() < info_all.size());
WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1));
}
- SyncWithValidationInterfaceQueue();
+ g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
void MockTime(FuzzedDataProvider& fuzzed_data_provider, const Chainstate& chainstate)
@@ -285,7 +285,7 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
std::set<CTransactionRef> removed;
std::set<CTransactionRef> added;
auto txr = std::make_shared<TransactionsDelta>(removed, added);
- RegisterSharedValidationInterface(txr);
+ node.validation_signals->RegisterSharedValidationInterface(txr);
const bool bypass_limits = fuzzed_data_provider.ConsumeBool();
// Make sure ProcessNewPackage on one transaction works.
@@ -303,8 +303,8 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool)
const auto res = WITH_LOCK(::cs_main, return AcceptToMemoryPool(chainstate, tx, GetTime(), bypass_limits, /*test_accept=*/false));
const bool accepted = res.m_result_type == MempoolAcceptResult::ResultType::VALID;
- SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(txr);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterSharedValidationInterface(txr);
bool txid_in_mempool = tx_pool.exists(GenTxid::Txid(tx->GetHash()));
bool wtxid_in_mempool = tx_pool.exists(GenTxid::Wtxid(tx->GetWitnessHash()));
diff --git a/src/test/peerman_tests.cpp b/src/test/peerman_tests.cpp
index 2c79329385..28866695bc 100644
--- a/src/test/peerman_tests.cpp
+++ b/src/test/peerman_tests.cpp
@@ -25,7 +25,7 @@ static void mineBlock(const node::NodeContext& node, std::chrono::seconds block_
block.fChecked = true; // little speedup
SetMockTime(curr_time); // process block at current time
Assert(node.chainman->ProcessNewBlock(std::make_shared<const CBlock>(block), /*force_processing=*/true, /*min_pow_checked=*/true, nullptr));
- SyncWithValidationInterfaceQueue(); // drain events queue
+ node.validation_signals->SyncWithValidationInterfaceQueue(); // drain events queue
}
// Verifying when network-limited peer connections are desirable based on the node's proximity to the tip
@@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE(connections_desirable_service_flags)
// By now, we tested that the connections desirable services flags change based on the node's time proximity to the tip.
// Now, perform the same tests for when the node receives a block.
- RegisterValidationInterface(peerman.get());
+ m_node.validation_signals->RegisterValidationInterface(peerman.get());
// First, verify a block in the past doesn't enable limited peers connections
// At this point, our time is (NODE_NETWORK_LIMITED_ALLOW_CONN_BLOCKS + 1) * 10 minutes ahead the tip's time.
diff --git a/src/test/policyestimator_tests.cpp b/src/test/policyestimator_tests.cpp
index ede73c6895..6cadc3290a 100644
--- a/src/test/policyestimator_tests.cpp
+++ b/src/test/policyestimator_tests.cpp
@@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
{
CBlockPolicyEstimator& feeEst = *Assert(m_node.fee_estimator);
CTxMemPool& mpool = *Assert(m_node.mempool);
- RegisterValidationInterface(&feeEst);
+ m_node.validation_signals->RegisterValidationInterface(&feeEst);
TestMemPoolEntryHelper entry;
CAmount basefee(2000);
CAmount deltaFee(100);
@@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
txHashes[j].push_back(hash);
@@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
// Check after just a few txs that combining buckets works as expected
if (blocknum == 3) {
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// At this point we should need to combine 3 buckets to get enough data points
// So estimateFee(1) should fail and estimateFee(2) should return somewhere around
// 9*baserate. estimateFee(2) %'s are 100,100,90 = average 97%
@@ -113,7 +113,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
std::vector<CAmount> origFeeEst;
// Highest feerate is 10*baseRate and gets in all blocks,
@@ -146,7 +146,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 10;i++) {
@@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
txHashes[j].push_back(hash);
@@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
for (int i = 1; i < 10;i++) {
BOOST_CHECK(feeEst.estimateFee(i) == CFeeRate(0) || feeEst.estimateFee(i).GetFeePerK() > origFeeEst[i-1] - deltaFee);
@@ -212,7 +212,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
block.clear();
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 10;i++) {
@@ -239,7 +239,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
/*submitted_in_package=*/false,
/*chainstate_is_current=*/true,
/*has_no_mempool_parents=*/true)};
- GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
+ m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence());
}
uint256 hash = tx.GetHash();
CTransactionRef ptx = mpool.get(hash);
@@ -257,7 +257,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates)
block.clear();
}
// Wait for fee estimator to catch up
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0));
for (int i = 2; i < 9; i++) { // At 9, the original estimate was already at the bottom (b/c scale = 2)
BOOST_CHECK(feeEst.estimateFee(i).GetFeePerK() < origFeeEst[i-1] - deltaFee);
diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp
index 3fb3378598..95e725de46 100644
--- a/src/test/scheduler_tests.cpp
+++ b/src/test/scheduler_tests.cpp
@@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
CScheduler scheduler;
// each queue should be well ordered with respect to itself but not other queues
- SingleThreadedSchedulerClient queue1(scheduler);
- SingleThreadedSchedulerClient queue2(scheduler);
+ SerialTaskRunner queue1(scheduler);
+ SerialTaskRunner queue2(scheduler);
// create more threads than queues
// if the queues only permit execution of one task at once then
@@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
threads.emplace_back([&] { scheduler.serviceQueue(); });
}
- // these are not atomic, if SinglethreadedSchedulerClient prevents
+ // these are not atomic, if SerialTaskRunner prevents
// parallel execution at the queue level no synchronization should be required here
int counter1 = 0;
int counter2 = 0;
@@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
// just simply count up on each queue - if execution is properly ordered then
// the callbacks should run in exactly the order in which they were enqueued
for (int i = 0; i < 100; ++i) {
- queue1.AddToProcessQueue([i, &counter1]() {
+ queue1.insert([i, &counter1]() {
bool expectation = i == counter1++;
assert(expectation);
});
- queue2.AddToProcessQueue([i, &counter2]() {
+ queue2.insert([i, &counter2]() {
bool expectation = i == counter2++;
assert(expectation);
});
diff --git a/src/test/txindex_tests.cpp b/src/test/txindex_tests.cpp
index e2432a4718..5a32b02ad9 100644
--- a/src/test/txindex_tests.cpp
+++ b/src/test/txindex_tests.cpp
@@ -71,7 +71,7 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
// SyncWithValidationInterfaceQueue() call below is also needed to ensure
// TSAN always sees the test thread waiting for the notification thread, and
// avoid potential false positive reports.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// shutdown sequence (c.f. Shutdown() in init.cpp)
txindex.Stop();
diff --git a/src/test/util/mining.cpp b/src/test/util/mining.cpp
index 08d1b4c902..ad7a38d3fe 100644
--- a/src/test/util/mining.cpp
+++ b/src/test/util/mining.cpp
@@ -95,12 +95,12 @@ COutPoint MineBlock(const NodeContext& node, std::shared_ptr<CBlock>& block)
const auto old_height = WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight());
bool new_block;
BlockValidationStateCatcher bvsc{block->GetHash()};
- RegisterValidationInterface(&bvsc);
+ node.validation_signals->RegisterValidationInterface(&bvsc);
const bool processed{chainman.ProcessNewBlock(block, true, true, &new_block)};
const bool duplicate{!new_block && processed};
assert(!duplicate);
- UnregisterValidationInterface(&bvsc);
- SyncWithValidationInterfaceQueue();
+ node.validation_signals->UnregisterValidationInterface(&bvsc);
+ node.validation_signals->SyncWithValidationInterfaceQueue();
const bool was_valid{bvsc.m_state && bvsc.m_state->IsValid()};
assert(old_height + was_valid == WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight()));
diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp
index 1f8dbac5be..d26a440074 100644
--- a/src/test/util/setup_common.cpp
+++ b/src/test/util/setup_common.cpp
@@ -175,7 +175,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
// from blocking due to queue overrun.
m_node.scheduler = std::make_unique<CScheduler>();
m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); });
- GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler);
+ m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler));
m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES);
m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node));
@@ -189,6 +189,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
.datadir = m_args.GetDataDirNet(),
.check_block_index = true,
.notifications = *m_node.notifications,
+ .signals = m_node.validation_signals.get(),
.worker_threads_num = 2,
};
const BlockManager::Options blockman_opts{
@@ -206,8 +207,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
ChainTestingSetup::~ChainTestingSetup()
{
if (m_node.scheduler) m_node.scheduler->stop();
- GetMainSignals().FlushBackgroundCallbacks();
- GetMainSignals().UnregisterBackgroundSignalScheduler();
+ m_node.validation_signals->FlushBackgroundCallbacks();
m_node.connman.reset();
m_node.banman.reset();
m_node.addrman.reset();
@@ -216,6 +216,7 @@ ChainTestingSetup::~ChainTestingSetup()
m_node.mempool.reset();
m_node.fee_estimator.reset();
m_node.chainman.reset();
+ m_node.validation_signals.reset();
m_node.scheduler.reset();
}
diff --git a/src/test/util/txmempool.cpp b/src/test/util/txmempool.cpp
index 3b4161ddd3..71cf8aca60 100644
--- a/src/test/util/txmempool.cpp
+++ b/src/test/util/txmempool.cpp
@@ -22,6 +22,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node)
// Default to always checking mempool regardless of
// chainparams.DefaultConsistencyChecks for tests
.check_ratio = 1,
+ .signals = node.validation_signals.get(),
};
const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)};
Assert(result);
diff --git a/src/test/validation_block_tests.cpp b/src/test/validation_block_tests.cpp
index 35e5c6a037..316ab86c2b 100644
--- a/src/test/validation_block_tests.cpp
+++ b/src/test/validation_block_tests.cpp
@@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
bool ignored;
// Connect the genesis block and drain any outstanding events
BOOST_CHECK(Assert(m_node.chainman)->ProcessNewBlock(std::make_shared<CBlock>(Params().GenesisBlock()), true, true, &ignored));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// subscribe to events (this subscriber will validate event ordering)
const CBlockIndex* initial_tip = nullptr;
@@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
initial_tip = m_node.chainman->ActiveChain().Tip();
}
auto sub = std::make_shared<TestSubscriber>(initial_tip->GetBlockHash());
- RegisterSharedValidationInterface(sub);
+ m_node.validation_signals->RegisterSharedValidationInterface(sub);
// create a bunch of threads that repeatedly process a block generated above at random
// this will create parallelism and randomness inside validation - the ValidationInterface
@@ -196,9 +196,9 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
for (auto& t : threads) {
t.join();
}
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
- UnregisterSharedValidationInterface(sub);
+ m_node.validation_signals->UnregisterSharedValidationInterface(sub);
LOCK(cs_main);
BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash());
diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp
index a33e71d50e..4bbab1cdcd 100644
--- a/src/test/validation_chainstatemanager_tests.cpp
+++ b/src/test/validation_chainstatemanager_tests.cpp
@@ -102,7 +102,7 @@ BOOST_FIXTURE_TEST_CASE(chainstatemanager, TestChain100Setup)
BOOST_CHECK_EQUAL(active_tip2, c2.m_chain.Tip());
// Let scheduler events finish running to avoid accessing memory that is going to be unloaded
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
}
//! Test rebalancing the caches associated with each chainstate.
@@ -374,7 +374,7 @@ struct SnapshotTestSetup : TestChain100Setup {
cs->ForceFlushStateToDisk();
}
// Process all callbacks referring to the old manager before wiping it.
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
LOCK(::cs_main);
chainman.ResetChainstates();
BOOST_CHECK_EQUAL(chainman.GetAll().size(), 0);
@@ -383,6 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup {
.chainparams = ::Params(),
.datadir = chainman.m_options.datadir,
.notifications = *m_node.notifications,
+ .signals = m_node.validation_signals.get(),
};
const BlockManager::Options blockman_opts{
.chainparams = chainman_opts.chainparams,
diff --git a/src/test/validationinterface_tests.cpp b/src/test/validationinterface_tests.cpp
index 5979441057..a46cfc3029 100644
--- a/src/test/validationinterface_tests.cpp
+++ b/src/test/validationinterface_tests.cpp
@@ -28,7 +28,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
const CBlock block_dummy;
BlockValidationState state_dummy;
while (generate) {
- GetMainSignals().BlockChecked(block_dummy, state_dummy);
+ m_node.validation_signals->BlockChecked(block_dummy, state_dummy);
}
}};
@@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
// keep going for about 1 sec, which is 250k iterations
for (int i = 0; i < 250000; i++) {
auto sub = std::make_shared<TestSubscriberNoop>();
- RegisterSharedValidationInterface(sub);
- UnregisterSharedValidationInterface(sub);
+ m_node.validation_signals->RegisterSharedValidationInterface(sub);
+ m_node.validation_signals->UnregisterSharedValidationInterface(sub);
}
// tell the other thread we are done
generate = false;
@@ -52,8 +52,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race)
class TestInterface : public CValidationInterface
{
public:
- TestInterface(std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
- : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy))
+ TestInterface(ValidationSignals& signals, std::function<void()> on_call = nullptr, std::function<void()> on_destroy = nullptr)
+ : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals}
{
}
virtual ~TestInterface()
@@ -64,14 +64,15 @@ public:
{
if (m_on_call) m_on_call();
}
- static void Call()
+ void Call()
{
CBlock block;
BlockValidationState state;
- GetMainSignals().BlockChecked(block, state);
+ m_signals.BlockChecked(block, state);
}
std::function<void()> m_on_call;
std::function<void()> m_on_destroy;
+ ValidationSignals& m_signals;
};
// Regression test to ensure UnregisterAllValidationInterfaces calls don't
@@ -80,17 +81,23 @@ public:
BOOST_AUTO_TEST_CASE(unregister_all_during_call)
{
bool destroyed = false;
- RegisterSharedValidationInterface(std::make_shared<TestInterface>(
+ auto shared{std::make_shared<TestInterface>(
+ *m_node.validation_signals,
[&] {
// First call should decrements reference count 2 -> 1
- UnregisterAllValidationInterfaces();
+ m_node.validation_signals->UnregisterAllValidationInterfaces();
BOOST_CHECK(!destroyed);
// Second call should not decrement reference count 1 -> 0
- UnregisterAllValidationInterfaces();
+ m_node.validation_signals->UnregisterAllValidationInterfaces();
BOOST_CHECK(!destroyed);
},
- [&] { destroyed = true; }));
- TestInterface::Call();
+ [&] { destroyed = true; })};
+ m_node.validation_signals->RegisterSharedValidationInterface(shared);
+ BOOST_CHECK(shared.use_count() == 2);
+ shared->Call();
+ BOOST_CHECK(shared.use_count() == 1);
+ BOOST_CHECK(!destroyed);
+ shared.reset();
BOOST_CHECK(destroyed);
}
diff --git a/src/txmempool.cpp b/src/txmempool.cpp
index de340f6b6d..0bee27c2b2 100644
--- a/src/txmempool.cpp
+++ b/src/txmempool.cpp
@@ -406,7 +406,8 @@ CTxMemPool::CTxMemPool(const Options& opts)
m_require_standard{opts.require_standard},
m_full_rbf{opts.full_rbf},
m_persist_v1_dat{opts.persist_v1_dat},
- m_limits{opts.limits}
+ m_limits{opts.limits},
+ m_signals{opts.signals}
{
}
@@ -487,12 +488,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
// even if not directly reported below.
uint64_t mempool_sequence = GetAndIncrementSequence();
- if (reason != MemPoolRemovalReason::BLOCK) {
+ if (reason != MemPoolRemovalReason::BLOCK && m_signals) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
- GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
+ m_signals->TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
TRACE5(mempool, removed,
it->GetTx().GetHash().data(),
@@ -643,7 +644,9 @@ void CTxMemPool::removeForBlock(const std::vector<CTransactionRef>& vtx, unsigne
removeConflicts(*tx);
ClearPrioritisation(tx->GetHash());
}
- GetMainSignals().MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
+ if (m_signals) {
+ m_signals->MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight);
+ }
lastRollingFeeUpdate = GetTime();
blockSinceLastRollingFeeBump = true;
}
diff --git a/src/txmempool.h b/src/txmempool.h
index b98355c65f..32f2c024f7 100644
--- a/src/txmempool.h
+++ b/src/txmempool.h
@@ -40,6 +40,7 @@
#include <vector>
class CChain;
+class ValidationSignals;
/** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */
static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF;
@@ -447,6 +448,8 @@ public:
const Limits m_limits;
+ ValidationSignals* const m_signals;
+
/** Create a new CTxMemPool.
* Sanity checks will be off by default for performance, because otherwise
* accepting transactions becomes O(N^2) where N is the number of transactions
diff --git a/src/util/task_runner.h b/src/util/task_runner.h
new file mode 100644
index 0000000000..d3cd8007de
--- /dev/null
+++ b/src/util/task_runner.h
@@ -0,0 +1,52 @@
+// Copyright (c) 2024-present The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_UTIL_TASK_RUNNER_H
+#define BITCOIN_UTIL_TASK_RUNNER_H
+
+#include <cstddef>
+#include <functional>
+
+namespace util {
+
+/** @file
+ * This header provides an interface and simple implementation for a task
+ * runner. Another threaded, serial implementation using a queue is available in
+ * the scheduler module's SerialTaskRunner.
+ */
+
+class TaskRunnerInterface
+{
+public:
+ virtual ~TaskRunnerInterface() {}
+
+ /**
+ * The callback can either be queued for later/asynchronous/threaded
+ * processing, or be executed immediately for synchronous processing.
+ */
+
+ virtual void insert(std::function<void()> func) = 0;
+
+ /**
+ * Forces the processing of all pending events.
+ */
+ virtual void flush() = 0;
+
+ /**
+ * Returns the number of currently pending events.
+ */
+ virtual size_t size() = 0;
+};
+
+class ImmediateTaskRunner : public TaskRunnerInterface
+{
+public:
+ void insert(std::function<void()> func) override { func(); }
+ void flush() override {}
+ size_t size() override { return 0; }
+};
+
+} // namespace util
+
+#endif // BITCOIN_UTIL_TASK_RUNNER_H
diff --git a/src/validation.cpp b/src/validation.cpp
index f8e1de55e9..3a652adb1d 100644
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -1228,13 +1228,14 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
results.emplace(ws.m_ptx->GetWitnessHash(),
MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize,
ws.m_base_fees, effective_feerate, effective_feerate_wtxids));
+ if (!m_pool.m_signals) continue;
const CTransaction& tx = *ws.m_ptx;
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(),
args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx));
- GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
}
return all_submitted;
}
@@ -1242,7 +1243,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args)
{
AssertLockHeld(cs_main);
- LOCK(m_pool.cs); // mempool "read lock" (held through GetMainSignals().TransactionAddedToMempool())
+ LOCK(m_pool.cs); // mempool "read lock" (held through m_pool.m_signals->TransactionAddedToMempool())
Workspace ws(ptx);
const std::vector<Wtxid> single_wtxid{ws.m_ptx->GetWitnessHash()};
@@ -1277,13 +1278,15 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
return MempoolAcceptResult::FeeFailure(ws.m_state, CFeeRate(ws.m_modified_fees, ws.m_vsize), {ws.m_ptx->GetWitnessHash()});
}
- const CTransaction& tx = *ws.m_ptx;
- const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
- ws.m_vsize, ws.m_entry->GetHeight(),
- args.m_bypass_limits, args.m_package_submission,
- IsCurrentForFeeEstimation(m_active_chainstate),
- m_pool.HasNoInputsOf(tx));
- GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ if (m_pool.m_signals) {
+ const CTransaction& tx = *ws.m_ptx;
+ const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
+ ws.m_vsize, ws.m_entry->GetHeight(),
+ args.m_bypass_limits, args.m_package_submission,
+ IsCurrentForFeeEstimation(m_active_chainstate),
+ m_pool.HasNoInputsOf(tx));
+ m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence());
+ }
return MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees,
effective_feerate, single_wtxid);
@@ -2695,9 +2698,9 @@ bool Chainstate::FlushStateToDisk(
(bool)fFlushForPrune);
}
}
- if (full_flush_completed) {
+ if (full_flush_completed && m_chainman.m_options.signals) {
// Update best block in wallet (so we can detect restored wallets).
- GetMainSignals().ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
+ m_chainman.m_options.signals->ChainStateFlushed(this->GetRole(), m_chain.GetLocator());
}
} catch (const std::runtime_error& e) {
return FatalError(m_chainman.GetNotifications(), state, std::string("System error while flushing: ") + e.what());
@@ -2864,7 +2867,9 @@ bool Chainstate::DisconnectTip(BlockValidationState& state, DisconnectedBlockTra
UpdateTip(pindexDelete->pprev);
// Let wallets know transactions went from 1-confirmed to
// 0-confirmed or conflicted:
- GetMainSignals().BlockDisconnected(pblock, pindexDelete);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockDisconnected(pblock, pindexDelete);
+ }
return true;
}
@@ -2949,7 +2954,9 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew,
{
CCoinsViewCache view(&CoinsTip());
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
- GetMainSignals().BlockChecked(blockConnecting, state);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockChecked(blockConnecting, state);
+ }
if (!rv) {
if (state.IsInvalid())
InvalidBlockFound(pindexNew, state);
@@ -3210,11 +3217,11 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main)
return fNotify;
}
-static void LimitValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main) {
+static void LimitValidationInterfaceQueue(ValidationSignals& signals) LOCKS_EXCLUDED(cs_main) {
AssertLockNotHeld(cs_main);
- if (GetMainSignals().CallbacksPending() > 10) {
- SyncWithValidationInterfaceQueue();
+ if (signals.CallbacksPending() > 10) {
+ signals.SyncWithValidationInterfaceQueue();
}
}
@@ -3252,7 +3259,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Note that if a validationinterface callback ends up calling
// ActivateBestChain this may lead to a deadlock! We should
// probably have a DEBUG_LOCKORDER test for this in the future.
- LimitValidationInterfaceQueue();
+ if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
{
LOCK(cs_main);
@@ -3291,7 +3298,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
- GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
+ }
}
// This will have been toggled in
@@ -3317,7 +3326,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected
if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) {
// Notify ValidationInterface subscribers
- GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
+ if (m_chainman.m_options.signals) {
+ m_chainman.m_options.signals->UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
+ }
// Always notify the UI if a new block tip was connected
if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) {
@@ -3451,7 +3462,7 @@ bool Chainstate::InvalidateBlock(BlockValidationState& state, CBlockIndex* pinde
if (m_chainman.m_interrupt) break;
// Make sure the queue of validation callbacks doesn't grow unboundedly.
- LimitValidationInterfaceQueue();
+ if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals);
LOCK(cs_main);
// Lock for as long as disconnectpool is in scope to make sure MaybeUpdateMempoolForReorg is
@@ -4237,8 +4248,9 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr<const CBlock>& pblock,
// Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW
// (but if it does not build on our best tip, let the SendMessages loop relay it)
- if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev)
- GetMainSignals().NewPoWValidBlock(pindex, pblock);
+ if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev && m_options.signals) {
+ m_options.signals->NewPoWValidBlock(pindex, pblock);
+ }
// Write block to history file
if (fNewBlock) *fNewBlock = true;
@@ -4291,7 +4303,9 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr<const CBlock>& blo
ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked);
}
if (!ret) {
- GetMainSignals().BlockChecked(*block, state);
+ if (m_options.signals) {
+ m_options.signals->BlockChecked(*block, state);
+ }
return error("%s: AcceptBlock FAILED (%s)", __func__, state.ToString());
}
}
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index 5e944a7c47..813fde109c 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -5,7 +5,6 @@
#include <validationinterface.h>
-#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
@@ -13,7 +12,8 @@
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
-#include <scheduler.h>
+#include <util/check.h>
+#include <util/task_runner.h>
#include <future>
#include <unordered_map>
@@ -22,14 +22,14 @@
std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept;
/**
- * MainSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
+ * ValidationSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
*
* A std::unordered_map is used to track what callbacks are currently
* registered, and a std::list is used to store the callbacks that are
* currently registered as well as any callbacks that are just unregistered
* and about to be deleted when they are done executing.
*/
-class MainSignalsImpl
+class ValidationSignalsImpl
{
private:
Mutex m_mutex;
@@ -42,12 +42,10 @@ private:
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);
public:
- // We are not allowed to assume the scheduler only runs in one thread,
- // but must ensure all callbacks happen in-order, so we end up creating
- // our own queue here :(
- SingleThreadedSchedulerClient m_schedulerClient;
+ std::unique_ptr<util::TaskRunnerInterface> m_task_runner;
- explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {}
+ explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner)
+ : m_task_runner{std::move(Assert(task_runner))} {}
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
@@ -94,77 +92,56 @@ public:
}
};
-static CMainSignals g_signals;
+ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner)
+ : m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {}
-void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler)
-{
- assert(!m_internals);
- m_internals = std::make_unique<MainSignalsImpl>(scheduler);
-}
-
-void CMainSignals::UnregisterBackgroundSignalScheduler()
-{
- m_internals.reset(nullptr);
-}
+ValidationSignals::~ValidationSignals() {}
-void CMainSignals::FlushBackgroundCallbacks()
+void ValidationSignals::FlushBackgroundCallbacks()
{
- if (m_internals) {
- m_internals->m_schedulerClient.EmptyQueue();
- }
+ m_internals->m_task_runner->flush();
}
-size_t CMainSignals::CallbacksPending()
+size_t ValidationSignals::CallbacksPending()
{
- if (!m_internals) return 0;
- return m_internals->m_schedulerClient.CallbacksPending();
+ return m_internals->m_task_runner->size();
}
-CMainSignals& GetMainSignals()
-{
- return g_signals;
-}
-
-void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
+void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
{
// Each connection captures the shared_ptr to ensure that each callback is
// executed before the subscriber is destroyed. For more details see #18338.
- g_signals.m_internals->Register(std::move(callbacks));
+ m_internals->Register(std::move(callbacks));
}
-void RegisterValidationInterface(CValidationInterface* callbacks)
+void ValidationSignals::RegisterValidationInterface(CValidationInterface* callbacks)
{
// Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle
// is managed by the caller.
RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}});
}
-void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
+void ValidationSignals::UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
{
UnregisterValidationInterface(callbacks.get());
}
-void UnregisterValidationInterface(CValidationInterface* callbacks)
+void ValidationSignals::UnregisterValidationInterface(CValidationInterface* callbacks)
{
- if (g_signals.m_internals) {
- g_signals.m_internals->Unregister(callbacks);
- }
+ m_internals->Unregister(callbacks);
}
-void UnregisterAllValidationInterfaces()
+void ValidationSignals::UnregisterAllValidationInterfaces()
{
- if (!g_signals.m_internals) {
- return;
- }
- g_signals.m_internals->Clear();
+ m_internals->Clear();
}
-void CallFunctionInValidationInterfaceQueue(std::function<void()> func)
+void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
- g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
+ m_internals->m_task_runner->insert(std::move(func));
}
-void SyncWithValidationInterfaceQueue()
+void ValidationSignals::SyncWithValidationInterfaceQueue()
{
AssertLockNotHeld(cs_main);
// Block until the validation queue drains
@@ -183,7 +160,7 @@ void SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
- m_internals->m_schedulerClient.AddToProcessQueue([=] { \
+ m_internals->m_task_runner->insert([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \
@@ -192,7 +169,7 @@ void SyncWithValidationInterfaceQueue()
#define LOG_EVENT(fmt, ...) \
LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__)
-void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
+void ValidationSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
// Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which
// the chain actually updates. One way to ensure this is for the caller to invoke this signal
// in the same critical section where the chain is updated
@@ -206,7 +183,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}
-void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
+void ValidationSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence)
{
auto event = [tx, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
@@ -216,7 +193,7 @@ void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx
tx.info.m_tx->GetWitnessHash().ToString());
}
-void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
+void ValidationSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
auto event = [tx, reason, mempool_sequence, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
@@ -226,7 +203,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemP
RemovalReasonToString(reason));
}
-void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
+void ValidationSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex) {
auto event = [role, pblock, pindex, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(role, pblock, pindex); });
};
@@ -235,7 +212,7 @@ void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr<con
pindex->nHeight);
}
-void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
+void ValidationSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedMempoolTransactionInfo>& txs_removed_for_block, unsigned int nBlockHeight)
{
auto event = [txs_removed_for_block, nBlockHeight, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); });
@@ -245,7 +222,7 @@ void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector<RemovedM
txs_removed_for_block.size());
}
-void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
+void ValidationSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
{
auto event = [pblock, pindex, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); });
@@ -255,7 +232,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock
pindex->nHeight);
}
-void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
+void ValidationSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) {
auto event = [role, locator, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(role, locator); });
};
@@ -263,13 +240,13 @@ void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &l
locator.IsNull() ? "null" : locator.vHave.front().ToString());
}
-void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
+void ValidationSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) {
LOG_EVENT("%s: block hash=%s state=%s", __func__,
block.GetHash().ToString(), state.ToString());
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); });
}
-void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
+void ValidationSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString());
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); });
}
diff --git a/src/validationinterface.h b/src/validationinterface.h
index d9292ae2c9..6f49a73c93 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -6,61 +6,29 @@
#ifndef BITCOIN_VALIDATIONINTERFACE_H
#define BITCOIN_VALIDATIONINTERFACE_H
-#include <kernel/cs_main.h>
#include <kernel/chain.h>
+#include <kernel/cs_main.h>
#include <primitives/transaction.h> // CTransaction(Ref)
#include <sync.h>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <memory>
+#include <vector>
+
+namespace util {
+class TaskRunnerInterface;
+} // namespace util
class BlockValidationState;
class CBlock;
class CBlockIndex;
struct CBlockLocator;
-class CValidationInterface;
-class CScheduler;
enum class MemPoolRemovalReason;
struct RemovedMempoolTransactionInfo;
struct NewMempoolTransactionInfo;
-/** Register subscriber */
-void RegisterValidationInterface(CValidationInterface* callbacks);
-/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
-void UnregisterValidationInterface(CValidationInterface* callbacks);
-/** Unregister all subscribers */
-void UnregisterAllValidationInterfaces();
-
-// Alternate registration functions that release a shared_ptr after the last
-// notification is sent. These are useful for race-free cleanup, since
-// unregistration is nonblocking and can return before the last notification is
-// processed.
-/** Register subscriber */
-void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
-/** Unregister subscriber */
-void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
-
-/**
- * Pushes a function to callback onto the notification queue, guaranteeing any
- * callbacks generated prior to now are finished when the function is called.
- *
- * Be very careful blocking on func to be called if any locks are held -
- * validation interface clients may not be able to make progress as they often
- * wait for things like cs_main, so blocking until func is called with cs_main
- * will result in a deadlock (that DEBUG_LOCKORDER will miss).
- */
-void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
-/**
- * This is a synonym for the following, which asserts certain locks are not
- * held:
- * std::promise<void> promise;
- * CallFunctionInValidationInterfaceQueue([&promise] {
- * promise.set_value();
- * });
- * promise.get_future().wait();
- */
-void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
-
/**
* Implement this to subscribe to events generated in validation and mempool
*
@@ -185,30 +153,65 @@ protected:
* has been received and connected to the headers tree, though not validated yet.
*/
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
- friend class CMainSignals;
+ friend class ValidationSignals;
friend class ValidationInterfaceTest;
};
-class MainSignalsImpl;
-class CMainSignals {
+class ValidationSignalsImpl;
+class ValidationSignals {
private:
- std::unique_ptr<MainSignalsImpl> m_internals;
-
- friend void ::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface>);
- friend void ::UnregisterValidationInterface(CValidationInterface*);
- friend void ::UnregisterAllValidationInterfaces();
- friend void ::CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
+ std::unique_ptr<ValidationSignalsImpl> m_internals;
public:
- /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
- void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
- /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
- void UnregisterBackgroundSignalScheduler();
+ // The task runner will block validation if it calls its insert method's
+ // func argument synchronously. In this class func contains a loop that
+ // dispatches a single validation event to all subscribers sequentially.
+ explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner);
+
+ ~ValidationSignals();
+
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();
size_t CallbacksPending();
+ /** Register subscriber */
+ void RegisterValidationInterface(CValidationInterface* callbacks);
+ /** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */
+ void UnregisterValidationInterface(CValidationInterface* callbacks);
+ /** Unregister all subscribers */
+ void UnregisterAllValidationInterfaces();
+
+ // Alternate registration functions that release a shared_ptr after the last
+ // notification is sent. These are useful for race-free cleanup, since
+ // unregistration is nonblocking and can return before the last notification is
+ // processed.
+ /** Register subscriber */
+ void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
+ /** Unregister subscriber */
+ void UnregisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks);
+
+ /**
+ * Pushes a function to callback onto the notification queue, guaranteeing any
+ * callbacks generated prior to now are finished when the function is called.
+ *
+ * Be very careful blocking on func to be called if any locks are held -
+ * validation interface clients may not be able to make progress as they often
+ * wait for things like cs_main, so blocking until func is called with cs_main
+ * will result in a deadlock (that DEBUG_LOCKORDER will miss).
+ */
+ void CallFunctionInValidationInterfaceQueue(std::function<void ()> func);
+
+ /**
+ * This is a synonym for the following, which asserts certain locks are not
+ * held:
+ * std::promise<void> promise;
+ * CallFunctionInValidationInterfaceQueue([&promise] {
+ * promise.set_value();
+ * });
+ * promise.get_future().wait();
+ */
+ void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const NewMempoolTransactionInfo&, uint64_t mempool_sequence);
@@ -221,6 +224,4 @@ public:
void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
};
-CMainSignals& GetMainSignals();
-
#endif // BITCOIN_VALIDATIONINTERFACE_H
diff --git a/src/wallet/test/util.cpp b/src/wallet/test/util.cpp
index cbf3ccd1ec..49d206f409 100644
--- a/src/wallet/test/util.cpp
+++ b/src/wallet/test/util.cpp
@@ -71,7 +71,8 @@ std::shared_ptr<CWallet> TestLoadWallet(WalletContext& context)
void TestUnloadWallet(std::shared_ptr<CWallet>&& wallet)
{
- SyncWithValidationInterfaceQueue();
+ // Calls SyncWithValidationInterfaceQueue
+ wallet->chain().waitForNotificationsIfTipChanged({});
wallet->m_chain_notifications_handler.reset();
UnloadWallet(std::move(wallet));
}
diff --git a/src/wallet/test/wallet_tests.cpp b/src/wallet/test/wallet_tests.cpp
index 27a81b3669..3a67b9a433 100644
--- a/src/wallet/test/wallet_tests.cpp
+++ b/src/wallet/test/wallet_tests.cpp
@@ -814,7 +814,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
// transactionAddedToMempool notifications, and create block and mempool
// transactions paying to the wallet
std::promise<void> promise;
- CallFunctionInValidationInterfaceQueue([&promise] {
+ m_node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] {
promise.get_future().wait();
});
std::string error;
@@ -842,7 +842,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
// Unblock notification queue and make sure stale blockConnected and
// transactionAddedToMempool events are processed
promise.set_value();
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
// AddToWallet events for block_tx and mempool_tx events are counted a
// second time as the notification queue is processed
BOOST_CHECK_EQUAL(addtx_count, 5);
@@ -865,7 +865,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup)
m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]);
mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
});
wallet = TestLoadWallet(context);
// Since mempool transactions are requested at the end of loading, there will
@@ -905,7 +905,7 @@ BOOST_FIXTURE_TEST_CASE(RemoveTxs, TestChain100Setup)
auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey()));
CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey()));
- SyncWithValidationInterfaceQueue();
+ m_node.validation_signals->SyncWithValidationInterfaceQueue();
{
auto block_hash = block_tx.GetHash();