diff options
author | Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> | 2020-08-21 09:24:05 +0300 |
---|---|---|
committer | Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> | 2020-09-24 06:55:33 +0300 |
commit | 01511776acb0c7ec216dc9c8112531067763f1cb (patch) | |
tree | 4affd73ec2317bc425f8731600b98bb57dd232e8 | |
parent | 0ef938685b5c079a6f5a98daf0e3865d718d817b (diff) |
Add local thread pool to CCheckQueue
-rw-r--r-- | src/checkqueue.h | 52 | ||||
-rw-r--r-- | src/init.cpp | 5 | ||||
-rw-r--r-- | src/test/util/setup_common.cpp | 5 | ||||
-rw-r--r-- | src/validation.cpp | 11 | ||||
-rw-r--r-- | src/validation.h | 6 |
5 files changed, 64 insertions, 15 deletions
diff --git a/src/checkqueue.h b/src/checkqueue.h index 3e22cd8c60..fcd6e87af3 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,6 +6,8 @@ #define BITCOIN_CHECKQUEUE_H #include <sync.h> +#include <tinyformat.h> +#include <util/threadnames.h> #include <algorithm> #include <vector> @@ -62,8 +64,11 @@ private: //! The maximum number of elements to be processed in one batch const unsigned int nBatchSize; + std::vector<std::thread> m_worker_threads; + bool m_request_stop{false}; + /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { boost::condition_variable& cond = fMaster ? condMaster : condWorker; std::vector<T> vChecks; @@ -85,7 +90,7 @@ private: nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -98,6 +103,10 @@ private: cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -132,16 +141,34 @@ public: { } + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) + { + { + boost::unique_lock<boost::mutex> lock(mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } + } + //! Worker thread void Thread() { - Loop(); + Loop(false /* worker thread */); } //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { - return Loop(true); + return Loop(true /* master thread */); } //! Add a batch of checks to the queue @@ -159,8 +186,25 @@ public: condWorker.notify_all(); } + //! Stop all of the worker threads. + void StopWorkerThreads() + { + { + boost::unique_lock<boost::mutex> lock(mutex); + m_request_stop = true; + } + condWorker.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + boost::unique_lock<boost::mutex> lock(mutex); + m_request_stop = false; + } + ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; diff --git a/src/init.cpp b/src/init.cpp index 5ad807cbac..bc99994a4d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -224,6 +224,7 @@ void Shutdown(NodeContext& node) if (g_load_block.joinable()) g_load_block.join(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1307,9 +1308,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA LogPrintf("Script verification uses %d additional threads\n", script_threads); if (script_threads >= 1) { g_parallel_script_checks = true; - for (int i = 0; i < script_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_threads); } assert(!node.scheduler); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 2d3137e1e2..74498c6c79 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -162,9 +162,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector<const // Start script-checking threads. Set g_parallel_script_checks to true so they are used. constexpr int script_check_threads = 2; - for (int i = 0; i < script_check_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_check_threads); g_parallel_script_checks = true; m_node.banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); @@ -182,6 +180,7 @@ TestingSetup::~TestingSetup() if (m_node.scheduler) m_node.scheduler->stop(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); diff --git a/src/validation.cpp b/src/validation.cpp index d4463bf17b..046583fa5e 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1817,9 +1817,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, BlockValidationSt static CCheckQueue<CScriptCheck> scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) +{ + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() +{ + scriptcheckqueue.StopWorkerThreads(); } VersionBitsCache versionbitscache GUARDED_BY(cs_main); diff --git a/src/validation.h b/src/validation.h index d88bd07765..df3b16dc15 100644 --- a/src/validation.h +++ b/src/validation.h @@ -158,8 +158,10 @@ void LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, FlatFi bool LoadGenesisBlock(const CChainParams& chainparams); /** Unload database information */ void UnloadBlockIndex(CTxMemPool* mempool, ChainstateManager& chainman); -/** Run an instance of the script checking thread */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** * Return transaction from the block at block_index. * If block_index is not provided, fall back to mempool. |