aboutsummaryrefslogtreecommitdiff
path: root/src/checkqueue.h
diff options
context:
space:
mode:
authorHennadii Stepanov <32963518+hebasto@users.noreply.github.com>2020-08-21 09:24:05 +0300
committerHennadii Stepanov <32963518+hebasto@users.noreply.github.com>2020-09-24 06:55:33 +0300
commit01511776acb0c7ec216dc9c8112531067763f1cb (patch)
tree4affd73ec2317bc425f8731600b98bb57dd232e8 /src/checkqueue.h
parent0ef938685b5c079a6f5a98daf0e3865d718d817b (diff)
downloadbitcoin-01511776acb0c7ec216dc9c8112531067763f1cb.tar.xz
Add local thread pool to CCheckQueue
Diffstat (limited to 'src/checkqueue.h')
-rw-r--r--src/checkqueue.h52
1 files changed, 48 insertions, 4 deletions
diff --git a/src/checkqueue.h b/src/checkqueue.h
index 3e22cd8c60..fcd6e87af3 100644
--- a/src/checkqueue.h
+++ b/src/checkqueue.h
@@ -6,6 +6,8 @@
#define BITCOIN_CHECKQUEUE_H
#include <sync.h>
+#include <tinyformat.h>
+#include <util/threadnames.h>
#include <algorithm>
#include <vector>
@@ -62,8 +64,11 @@ private:
//! The maximum number of elements to be processed in one batch
const unsigned int nBatchSize;
+ std::vector<std::thread> m_worker_threads;
+ bool m_request_stop{false};
+
/** Internal function that does bulk of the verification work. */
- bool Loop(bool fMaster = false)
+ bool Loop(bool fMaster)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
@@ -85,7 +90,7 @@ private:
nTotal++;
}
// logically, the do loop starts here
- while (queue.empty()) {
+ while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
@@ -98,6 +103,10 @@ private:
cond.wait(lock); // wait
nIdle--;
}
+ if (m_request_stop) {
+ return false;
+ }
+
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
@@ -132,16 +141,34 @@ public:
{
}
+ //! Create a pool of new worker threads.
+ void StartWorkerThreads(const int threads_num)
+ {
+ {
+ boost::unique_lock<boost::mutex> lock(mutex);
+ nIdle = 0;
+ nTotal = 0;
+ fAllOk = true;
+ }
+ assert(m_worker_threads.empty());
+ for (int n = 0; n < threads_num; ++n) {
+ m_worker_threads.emplace_back([this, n]() {
+ util::ThreadRename(strprintf("scriptch.%i", n));
+ Loop(false /* worker thread */);
+ });
+ }
+ }
+
//! Worker thread
void Thread()
{
- Loop();
+ Loop(false /* worker thread */);
}
//! Wait until execution finishes, and return whether all evaluations were successful.
bool Wait()
{
- return Loop(true);
+ return Loop(true /* master thread */);
}
//! Add a batch of checks to the queue
@@ -159,8 +186,25 @@ public:
condWorker.notify_all();
}
+ //! Stop all of the worker threads.
+ void StopWorkerThreads()
+ {
+ {
+ boost::unique_lock<boost::mutex> lock(mutex);
+ m_request_stop = true;
+ }
+ condWorker.notify_all();
+ for (std::thread& t : m_worker_threads) {
+ t.join();
+ }
+ m_worker_threads.clear();
+ boost::unique_lock<boost::mutex> lock(mutex);
+ m_request_stop = false;
+ }
+
~CCheckQueue()
{
+ assert(m_worker_threads.empty());
}
};