diff options
author | Matt Corallo <git@bluematt.me> | 2017-04-10 14:55:49 -0400 |
---|---|---|
committer | Matt Corallo <git@bluematt.me> | 2017-07-07 11:33:18 -0400 |
commit | 08096bbbc6d6fef86943ca8ce5e6de18744d58ea (patch) | |
tree | b1199ad23ceb259db07d8f3fb3b6dbe4c996fefa /src/scheduler.cpp | |
parent | 2fbf2dbe151e135586cc1bb05b891f2c8ab6c817 (diff) |
Support more than one CScheduler thread for serial clients
This will be used by CValidationInterface soon.
This requires a bit of work as we need to ensure that most of our
callbacks happen in-order (to avoid synchronization issues in
wallet) - we keep our own internal queue and push things onto it,
scheduling a queue-draining function immediately upon new
callbacks.
Diffstat (limited to 'src/scheduler.cpp')
-rw-r--r-- | src/scheduler.cpp | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 923ba2c231..a76a87e10a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } return result; } + + +void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { + { + LOCK(m_cs_callbacks_pending); + // Try to avoid scheduling too many copies here, but if we + // accidentally have two ProcessQueue's scheduled at once its + // not a big deal. + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + } + m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); +} + +void SingleThreadedSchedulerClient::ProcessQueue() { + std::function<void (void)> callback; + { + LOCK(m_cs_callbacks_pending); + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + m_are_callbacks_running = true; + + callback = std::move(m_callbacks_pending.front()); + m_callbacks_pending.pop_front(); + } + + // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue + // to ensure both happen safely even if callback() throws. + struct RAIICallbacksRunning { + SingleThreadedSchedulerClient* instance; + RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} + ~RAIICallbacksRunning() { + { + LOCK(instance->m_cs_callbacks_pending); + instance->m_are_callbacks_running = false; + } + instance->MaybeScheduleProcessQueue(); + } + } raiicallbacksrunning(this); + + callback(); +} + +void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) { + assert(m_pscheduler); + + { + LOCK(m_cs_callbacks_pending); + m_callbacks_pending.emplace_back(std::move(func)); + } + MaybeScheduleProcessQueue(); +} |