diff options
author | Matt Corallo <git@bluematt.me> | 2017-04-10 14:55:49 -0400 |
---|---|---|
committer | Matt Corallo <git@bluematt.me> | 2017-07-07 11:33:18 -0400 |
commit | 08096bbbc6d6fef86943ca8ce5e6de18744d58ea (patch) | |
tree | b1199ad23ceb259db07d8f3fb3b6dbe4c996fefa | |
parent | 2fbf2dbe151e135586cc1bb05b891f2c8ab6c817 (diff) |
Support more than one CScheduler thread for serial clients
This will be used by CValidationInterface soon.
This requires a bit of work as we need to ensure that most of our
callbacks happen in-order (to avoid synchronization issues in
wallet) - we keep our own internal queue and push things onto it,
scheduling a queue-draining function immediately upon new
callbacks.
-rw-r--r-- | src/scheduler.cpp | 52 | ||||
-rw-r--r-- | src/scheduler.h | 24 | ||||
-rw-r--r-- | src/validationinterface.cpp | 22 | ||||
-rw-r--r-- | src/validationinterface.h | 2 |
4 files changed, 90 insertions, 10 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 923ba2c231..a76a87e10a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } return result; } + + +void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { + { + LOCK(m_cs_callbacks_pending); + // Try to avoid scheduling too many copies here, but if we + // accidentally have two ProcessQueue's scheduled at once its + // not a big deal. + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + } + m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); +} + +void SingleThreadedSchedulerClient::ProcessQueue() { + std::function<void (void)> callback; + { + LOCK(m_cs_callbacks_pending); + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + m_are_callbacks_running = true; + + callback = std::move(m_callbacks_pending.front()); + m_callbacks_pending.pop_front(); + } + + // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue + // to ensure both happen safely even if callback() throws. + struct RAIICallbacksRunning { + SingleThreadedSchedulerClient* instance; + RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} + ~RAIICallbacksRunning() { + { + LOCK(instance->m_cs_callbacks_pending); + instance->m_are_callbacks_running = false; + } + instance->MaybeScheduleProcessQueue(); + } + } raiicallbacksrunning(this); + + callback(); +} + +void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) { + assert(m_pscheduler); + + { + LOCK(m_cs_callbacks_pending); + m_callbacks_pending.emplace_back(std::move(func)); + } + MaybeScheduleProcessQueue(); +} diff --git a/src/scheduler.h b/src/scheduler.h index 5da6e6f69f..82036afdf0 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -14,6 +14,8 @@ #include <boost/thread.hpp> #include <map> +#include "sync.h" + // // Simple class for background tasks that should be run // periodically or once "after a while" @@ -79,4 +81,26 @@ private: bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; +/** + * Class used by CScheduler clients which may schedule multiple jobs + * which are required to be run serially. Does not require such jobs + * to be executed on the same thread, but no two jobs will be executed + * at the same time. + */ +class SingleThreadedSchedulerClient { +private: + CScheduler *m_pscheduler; + + CCriticalSection m_cs_callbacks_pending; + std::list<std::function<void (void)>> m_callbacks_pending; + bool m_are_callbacks_running = false; + + void MaybeScheduleProcessQueue(); + void ProcessQueue(); + +public: + SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} + void AddToProcessQueue(std::function<void (void)> func); +}; + #endif diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index a17a08eee2..8edc7c398d 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -6,6 +6,11 @@ #include "validationinterface.h" #include "init.h" #include "scheduler.h" +#include "sync.h" +#include "util.h" + +#include <list> +#include <atomic> #include <boost/signals2/signal.hpp> @@ -20,22 +25,23 @@ struct MainSignalsInstance { boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked; boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock; - CScheduler *m_scheduler = NULL; + // We are not allowed to assume the scheduler only runs in one thread, + // but must ensure all callbacks happen in-order, so we end up creating + // our own queue here :( + SingleThreadedSchedulerClient m_schedulerClient; + + MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {} }; static CMainSignals g_signals; -CMainSignals::CMainSignals() { - m_internals.reset(new MainSignalsInstance()); -} - void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) { - assert(!m_internals->m_scheduler); - m_internals->m_scheduler = &scheduler; + assert(!m_internals); + m_internals.reset(new MainSignalsInstance(&scheduler)); } void CMainSignals::UnregisterBackgroundSignalScheduler() { - m_internals->m_scheduler = NULL; + m_internals.reset(nullptr); } CMainSignals& GetMainSignals() diff --git a/src/validationinterface.h b/src/validationinterface.h index 8cae3c6db4..fbfe273b10 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -75,8 +75,6 @@ private: friend void ::UnregisterAllValidationInterfaces(); public: - CMainSignals(); - /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ void RegisterBackgroundSignalScheduler(CScheduler& scheduler); /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */ |