diff options
author | Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> | 2020-08-21 09:24:05 +0300 |
---|---|---|
committer | Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> | 2020-09-24 06:55:33 +0300 |
commit | 01511776acb0c7ec216dc9c8112531067763f1cb (patch) | |
tree | 4affd73ec2317bc425f8731600b98bb57dd232e8 /src/checkqueue.h | |
parent | 0ef938685b5c079a6f5a98daf0e3865d718d817b (diff) |
Add local thread pool to CCheckQueue
Diffstat (limited to 'src/checkqueue.h')
-rw-r--r-- | src/checkqueue.h | 52 |
1 files changed, 48 insertions, 4 deletions
diff --git a/src/checkqueue.h b/src/checkqueue.h index 3e22cd8c60..fcd6e87af3 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,6 +6,8 @@ #define BITCOIN_CHECKQUEUE_H #include <sync.h> +#include <tinyformat.h> +#include <util/threadnames.h> #include <algorithm> #include <vector> @@ -62,8 +64,11 @@ private: //! The maximum number of elements to be processed in one batch const unsigned int nBatchSize; + std::vector<std::thread> m_worker_threads; + bool m_request_stop{false}; + /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { boost::condition_variable& cond = fMaster ? condMaster : condWorker; std::vector<T> vChecks; @@ -85,7 +90,7 @@ private: nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -98,6 +103,10 @@ private: cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -132,16 +141,34 @@ public: { } + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) + { + { + boost::unique_lock<boost::mutex> lock(mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } + } + //! Worker thread void Thread() { - Loop(); + Loop(false /* worker thread */); } //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { - return Loop(true); + return Loop(true /* master thread */); } //! Add a batch of checks to the queue @@ -159,8 +186,25 @@ public: condWorker.notify_all(); } + //! Stop all of the worker threads. + void StopWorkerThreads() + { + { + boost::unique_lock<boost::mutex> lock(mutex); + m_request_stop = true; + } + condWorker.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + boost::unique_lock<boost::mutex> lock(mutex); + m_request_stop = false; + } + ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; |