aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHennadii Stepanov <32963518+hebasto@users.noreply.github.com>2020-08-21 09:24:05 +0300
committerHennadii Stepanov <32963518+hebasto@users.noreply.github.com>2020-09-24 06:55:33 +0300
commit01511776acb0c7ec216dc9c8112531067763f1cb (patch)
tree4affd73ec2317bc425f8731600b98bb57dd232e8
parent0ef938685b5c079a6f5a98daf0e3865d718d817b (diff)
Add local thread pool to CCheckQueue
-rw-r--r--src/checkqueue.h52
-rw-r--r--src/init.cpp5
-rw-r--r--src/test/util/setup_common.cpp5
-rw-r--r--src/validation.cpp11
-rw-r--r--src/validation.h6
5 files changed, 64 insertions, 15 deletions
diff --git a/src/checkqueue.h b/src/checkqueue.h
index 3e22cd8c60..fcd6e87af3 100644
--- a/src/checkqueue.h
+++ b/src/checkqueue.h
@@ -6,6 +6,8 @@
#define BITCOIN_CHECKQUEUE_H
#include <sync.h>
+#include <tinyformat.h>
+#include <util/threadnames.h>
#include <algorithm>
#include <vector>
@@ -62,8 +64,11 @@ private:
//! The maximum number of elements to be processed in one batch
const unsigned int nBatchSize;
+ std::vector<std::thread> m_worker_threads;
+ bool m_request_stop{false};
+
/** Internal function that does bulk of the verification work. */
- bool Loop(bool fMaster = false)
+ bool Loop(bool fMaster)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
@@ -85,7 +90,7 @@ private:
nTotal++;
}
// logically, the do loop starts here
- while (queue.empty()) {
+ while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
@@ -98,6 +103,10 @@ private:
cond.wait(lock); // wait
nIdle--;
}
+ if (m_request_stop) {
+ return false;
+ }
+
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
@@ -132,16 +141,34 @@ public:
{
}
+ //! Create a pool of new worker threads.
+ void StartWorkerThreads(const int threads_num)
+ {
+ {
+ boost::unique_lock<boost::mutex> lock(mutex);
+ nIdle = 0;
+ nTotal = 0;
+ fAllOk = true;
+ }
+ assert(m_worker_threads.empty());
+ for (int n = 0; n < threads_num; ++n) {
+ m_worker_threads.emplace_back([this, n]() {
+ util::ThreadRename(strprintf("scriptch.%i", n));
+ Loop(false /* worker thread */);
+ });
+ }
+ }
+
//! Worker thread
void Thread()
{
- Loop();
+ Loop(false /* worker thread */);
}
//! Wait until execution finishes, and return whether all evaluations were successful.
bool Wait()
{
- return Loop(true);
+ return Loop(true /* master thread */);
}
//! Add a batch of checks to the queue
@@ -159,8 +186,25 @@ public:
condWorker.notify_all();
}
+ //! Stop all of the worker threads.
+ void StopWorkerThreads()
+ {
+ {
+ boost::unique_lock<boost::mutex> lock(mutex);
+ m_request_stop = true;
+ }
+ condWorker.notify_all();
+ for (std::thread& t : m_worker_threads) {
+ t.join();
+ }
+ m_worker_threads.clear();
+ boost::unique_lock<boost::mutex> lock(mutex);
+ m_request_stop = false;
+ }
+
~CCheckQueue()
{
+ assert(m_worker_threads.empty());
}
};
diff --git a/src/init.cpp b/src/init.cpp
index 5ad807cbac..bc99994a4d 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -224,6 +224,7 @@ void Shutdown(NodeContext& node)
if (g_load_block.joinable()) g_load_block.join();
threadGroup.interrupt_all();
threadGroup.join_all();
+ StopScriptCheckWorkerThreads();
// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
@@ -1307,9 +1308,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA
LogPrintf("Script verification uses %d additional threads\n", script_threads);
if (script_threads >= 1) {
g_parallel_script_checks = true;
- for (int i = 0; i < script_threads; ++i) {
- threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
- }
+ StartScriptCheckWorkerThreads(script_threads);
}
assert(!node.scheduler);
diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp
index 2d3137e1e2..74498c6c79 100644
--- a/src/test/util/setup_common.cpp
+++ b/src/test/util/setup_common.cpp
@@ -162,9 +162,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector<const
// Start script-checking threads. Set g_parallel_script_checks to true so they are used.
constexpr int script_check_threads = 2;
- for (int i = 0; i < script_check_threads; ++i) {
- threadGroup.create_thread([i]() { return ThreadScriptCheck(i); });
- }
+ StartScriptCheckWorkerThreads(script_check_threads);
g_parallel_script_checks = true;
m_node.banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
@@ -182,6 +180,7 @@ TestingSetup::~TestingSetup()
if (m_node.scheduler) m_node.scheduler->stop();
threadGroup.interrupt_all();
threadGroup.join_all();
+ StopScriptCheckWorkerThreads();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
m_node.connman.reset();
diff --git a/src/validation.cpp b/src/validation.cpp
index d4463bf17b..046583fa5e 100644
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -1817,9 +1817,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, BlockValidationSt
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);
-void ThreadScriptCheck(int worker_num) {
- util::ThreadRename(strprintf("scriptch.%i", worker_num));
- scriptcheckqueue.Thread();
+void StartScriptCheckWorkerThreads(int threads_num)
+{
+ scriptcheckqueue.StartWorkerThreads(threads_num);
+}
+
+void StopScriptCheckWorkerThreads()
+{
+ scriptcheckqueue.StopWorkerThreads();
}
VersionBitsCache versionbitscache GUARDED_BY(cs_main);
diff --git a/src/validation.h b/src/validation.h
index d88bd07765..df3b16dc15 100644
--- a/src/validation.h
+++ b/src/validation.h
@@ -158,8 +158,10 @@ void LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, FlatFi
bool LoadGenesisBlock(const CChainParams& chainparams);
/** Unload database information */
void UnloadBlockIndex(CTxMemPool* mempool, ChainstateManager& chainman);
-/** Run an instance of the script checking thread */
-void ThreadScriptCheck(int worker_num);
+/** Run instances of script checking worker threads */
+void StartScriptCheckWorkerThreads(int threads_num);
+/** Stop all of the script checking worker threads */
+void StopScriptCheckWorkerThreads();
/**
* Return transaction from the block at block_index.
* If block_index is not provided, fall back to mempool.