aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Corallo <git@bluematt.me>2017-04-10 14:55:49 -0400
committerMatt Corallo <git@bluematt.me>2017-07-07 11:33:18 -0400
commit08096bbbc6d6fef86943ca8ce5e6de18744d58ea (patch)
treeb1199ad23ceb259db07d8f3fb3b6dbe4c996fefa
parent2fbf2dbe151e135586cc1bb05b891f2c8ab6c817 (diff)
downloadbitcoin-08096bbbc6d6fef86943ca8ce5e6de18744d58ea.tar.xz
Support more than one CScheduler thread for serial clients
This will be used by CValidationInterface soon. This requires a bit of work as we need to ensure that most of our callbacks happen in-order (to avoid synchronization issues in wallet) - we keep our own internal queue and push things onto it, scheduling a queue-draining function immediately upon new callbacks.
-rw-r--r--src/scheduler.cpp52
-rw-r--r--src/scheduler.h24
-rw-r--r--src/validationinterface.cpp22
-rw-r--r--src/validationinterface.h2
4 files changed, 90 insertions, 10 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 923ba2c231..a76a87e10a 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}
+
+
+void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
+ {
+ LOCK(m_cs_callbacks_pending);
+ // Try to avoid scheduling too many copies here, but if we
+ // accidentally have two ProcessQueue's scheduled at once its
+ // not a big deal.
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ }
+ m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
+}
+
+void SingleThreadedSchedulerClient::ProcessQueue() {
+ std::function<void (void)> callback;
+ {
+ LOCK(m_cs_callbacks_pending);
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ m_are_callbacks_running = true;
+
+ callback = std::move(m_callbacks_pending.front());
+ m_callbacks_pending.pop_front();
+ }
+
+ // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
+ // to ensure both happen safely even if callback() throws.
+ struct RAIICallbacksRunning {
+ SingleThreadedSchedulerClient* instance;
+ RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
+ ~RAIICallbacksRunning() {
+ {
+ LOCK(instance->m_cs_callbacks_pending);
+ instance->m_are_callbacks_running = false;
+ }
+ instance->MaybeScheduleProcessQueue();
+ }
+ } raiicallbacksrunning(this);
+
+ callback();
+}
+
+void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
+ assert(m_pscheduler);
+
+ {
+ LOCK(m_cs_callbacks_pending);
+ m_callbacks_pending.emplace_back(std::move(func));
+ }
+ MaybeScheduleProcessQueue();
+}
diff --git a/src/scheduler.h b/src/scheduler.h
index 5da6e6f69f..82036afdf0 100644
--- a/src/scheduler.h
+++ b/src/scheduler.h
@@ -14,6 +14,8 @@
#include <boost/thread.hpp>
#include <map>
+#include "sync.h"
+
//
// Simple class for background tasks that should be run
// periodically or once "after a while"
@@ -79,4 +81,26 @@ private:
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};
+/**
+ * Class used by CScheduler clients which may schedule multiple jobs
+ * which are required to be run serially. Does not require such jobs
+ * to be executed on the same thread, but no two jobs will be executed
+ * at the same time.
+ */
+class SingleThreadedSchedulerClient {
+private:
+ CScheduler *m_pscheduler;
+
+ CCriticalSection m_cs_callbacks_pending;
+ std::list<std::function<void (void)>> m_callbacks_pending;
+ bool m_are_callbacks_running = false;
+
+ void MaybeScheduleProcessQueue();
+ void ProcessQueue();
+
+public:
+ SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
+ void AddToProcessQueue(std::function<void (void)> func);
+};
+
#endif
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index a17a08eee2..8edc7c398d 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -6,6 +6,11 @@
#include "validationinterface.h"
#include "init.h"
#include "scheduler.h"
+#include "sync.h"
+#include "util.h"
+
+#include <list>
+#include <atomic>
#include <boost/signals2/signal.hpp>
@@ -20,22 +25,23 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
- CScheduler *m_scheduler = NULL;
+ // We are not allowed to assume the scheduler only runs in one thread,
+ // but must ensure all callbacks happen in-order, so we end up creating
+ // our own queue here :(
+ SingleThreadedSchedulerClient m_schedulerClient;
+
+ MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};
static CMainSignals g_signals;
-CMainSignals::CMainSignals() {
- m_internals.reset(new MainSignalsInstance());
-}
-
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
- assert(!m_internals->m_scheduler);
- m_internals->m_scheduler = &scheduler;
+ assert(!m_internals);
+ m_internals.reset(new MainSignalsInstance(&scheduler));
}
void CMainSignals::UnregisterBackgroundSignalScheduler() {
- m_internals->m_scheduler = NULL;
+ m_internals.reset(nullptr);
}
CMainSignals& GetMainSignals()
diff --git a/src/validationinterface.h b/src/validationinterface.h
index 8cae3c6db4..fbfe273b10 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -75,8 +75,6 @@ private:
friend void ::UnregisterAllValidationInterfaces();
public:
- CMainSignals();
-
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */