aboutsummaryrefslogtreecommitdiff
path: root/src/scheduler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/scheduler.cpp')
-rw-r--r--src/scheduler.cpp67
1 files changed, 67 insertions, 0 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp
index 923ba2c231..4edb2c6d9b 100644
--- a/src/scheduler.cpp
+++ b/src/scheduler.cpp
@@ -139,3 +139,70 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}
+
+bool CScheduler::AreThreadsServicingQueue() const {
+ boost::unique_lock<boost::mutex> lock(newTaskMutex);
+ return nThreadsServicingQueue;
+}
+
+
+void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
+ {
+ LOCK(m_cs_callbacks_pending);
+ // Try to avoid scheduling too many copies here, but if we
+ // accidentally have two ProcessQueue's scheduled at once its
+ // not a big deal.
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ }
+ m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
+}
+
+void SingleThreadedSchedulerClient::ProcessQueue() {
+ std::function<void (void)> callback;
+ {
+ LOCK(m_cs_callbacks_pending);
+ if (m_are_callbacks_running) return;
+ if (m_callbacks_pending.empty()) return;
+ m_are_callbacks_running = true;
+
+ callback = std::move(m_callbacks_pending.front());
+ m_callbacks_pending.pop_front();
+ }
+
+ // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
+ // to ensure both happen safely even if callback() throws.
+ struct RAIICallbacksRunning {
+ SingleThreadedSchedulerClient* instance;
+ explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
+ ~RAIICallbacksRunning() {
+ {
+ LOCK(instance->m_cs_callbacks_pending);
+ instance->m_are_callbacks_running = false;
+ }
+ instance->MaybeScheduleProcessQueue();
+ }
+ } raiicallbacksrunning(this);
+
+ callback();
+}
+
+void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
+ assert(m_pscheduler);
+
+ {
+ LOCK(m_cs_callbacks_pending);
+ m_callbacks_pending.emplace_back(std::move(func));
+ }
+ MaybeScheduleProcessQueue();
+}
+
+void SingleThreadedSchedulerClient::EmptyQueue() {
+ assert(!m_pscheduler->AreThreadsServicingQueue());
+ bool should_continue = true;
+ while (should_continue) {
+ ProcessQueue();
+ LOCK(m_cs_callbacks_pending);
+ should_continue = !m_callbacks_pending.empty();
+ }
+}