diff options
-rw-r--r-- | src/scheduler.cpp | 42 | ||||
-rw-r--r-- | src/scheduler.h | 95 | ||||
-rw-r--r-- | src/test/scheduler_tests.cpp | 6 | ||||
-rw-r--r-- | src/test/util/setup_common.cpp | 1 | ||||
-rw-r--r-- | src/test/util/setup_common.h | 1 |
5 files changed, 72 insertions, 73 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp index c4bd47310b..7c361bf26f 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -30,9 +30,6 @@ void CScheduler::serviceQueue() // is called. while (!shouldStop()) { try { - if (!shouldStop() && taskQueue.empty()) { - REVERSE_LOCK(lock); - } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. newTaskScheduled.wait(lock); @@ -71,18 +68,6 @@ void CScheduler::serviceQueue() newTaskScheduled.notify_one(); } -void CScheduler::stop(bool drain) -{ - { - LOCK(newTaskMutex); - if (drain) - stopWhenEmpty = true; - else - stopRequested = true; - } - newTaskScheduled.notify_all(); -} - void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) { { @@ -125,8 +110,8 @@ void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds scheduleFromNow([=] { Repeat(*this, f, delta); }, delta); } -size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, - std::chrono::system_clock::time_point &last) const +size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first, + std::chrono::system_clock::time_point& last) const { LOCK(newTaskMutex); size_t result = taskQueue.size(); @@ -137,13 +122,15 @@ size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, return result; } -bool CScheduler::AreThreadsServicingQueue() const { +bool CScheduler::AreThreadsServicingQueue() const +{ LOCK(newTaskMutex); return nThreadsServicingQueue; } -void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { +void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() +{ { LOCK(m_cs_callbacks_pending); // Try to avoid scheduling too many copies here, but if we @@ -155,8 +142,9 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); } -void SingleThreadedSchedulerClient::ProcessQueue() { - std::function<void ()> callback; +void SingleThreadedSchedulerClient::ProcessQueue() +{ + std::function<void()> callback; { LOCK(m_cs_callbacks_pending); if (m_are_callbacks_running) return; @@ -172,7 +160,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() { struct RAIICallbacksRunning { SingleThreadedSchedulerClient* instance; explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} - ~RAIICallbacksRunning() { + ~RAIICallbacksRunning() + { { LOCK(instance->m_cs_callbacks_pending); instance->m_are_callbacks_running = false; @@ -184,7 +173,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() { callback(); } -void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> func) { +void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) +{ assert(m_pscheduler); { @@ -194,7 +184,8 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> fun MaybeScheduleProcessQueue(); } -void SingleThreadedSchedulerClient::EmptyQueue() { +void SingleThreadedSchedulerClient::EmptyQueue() +{ assert(!m_pscheduler->AreThreadsServicingQueue()); bool should_continue = true; while (should_continue) { @@ -204,7 +195,8 @@ void SingleThreadedSchedulerClient::EmptyQueue() { } } -size_t SingleThreadedSchedulerClient::CallbacksPending() { +size_t SingleThreadedSchedulerClient::CallbacksPending() +{ LOCK(m_cs_callbacks_pending); return m_callbacks_pending.size(); } diff --git a/src/scheduler.h b/src/scheduler.h index 1e64195484..d7fe00d1b4 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -5,11 +5,6 @@ #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H -// -// NOTE: -// boost::thread should be ported to std::thread -// when we support C++11. -// #include <condition_variable> #include <functional> #include <list> @@ -17,24 +12,23 @@ #include <sync.h> -// -// Simple class for background tasks that should be run -// periodically or once "after a while" -// -// Usage: -// -// CScheduler* s = new CScheduler(); -// s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } -// s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); -// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); -// -// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: -// s->stop(); -// t->join(); -// delete t; -// delete s; // Must be done after thread is interrupted/joined. -// - +/** + * Simple class for background tasks that should be run + * periodically or once "after a while" + * + * Usage: + * + * CScheduler* s = new CScheduler(); + * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } + * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); + * std::thread* t = new std::thread([&] { s->serviceQueue(); }); + * + * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: + * s->stop(); + * t->join(); + * delete t; + * delete s; // Must be done after thread is interrupted/joined. + */ class CScheduler { public: @@ -43,7 +37,7 @@ public: typedef std::function<void()> Function; - // Call func at/after time t + /** Call func at/after time t */ void schedule(Function f, std::chrono::system_clock::time_point t); /** Call f once after the delta has passed */ @@ -67,23 +61,33 @@ public: */ void MockForward(std::chrono::seconds delta_seconds); - // To keep things as simple as possible, there is no unschedule. - - // Services the queue 'forever'. Should be run in a thread, - // and interrupted using boost::interrupt_thread + /** + * Services the queue 'forever'. Should be run in a thread, + * and interrupted using boost::interrupt_thread + */ void serviceQueue(); - // Tell any threads running serviceQueue to stop as soon as they're - // done servicing whatever task they're currently servicing (drain=false) - // or when there is no work left to be done (drain=true) - void stop(bool drain=false); + /** Tell any threads running serviceQueue to stop as soon as the current task is done */ + void stop() + { + WITH_LOCK(newTaskMutex, stopRequested = true); + newTaskScheduled.notify_all(); + } + /** Tell any threads running serviceQueue to stop when there is no work left to be done */ + void StopWhenDrained() + { + WITH_LOCK(newTaskMutex, stopWhenEmpty = true); + newTaskScheduled.notify_all(); + } - // Returns number of tasks waiting to be serviced, - // and first and last task times - size_t getQueueInfo(std::chrono::system_clock::time_point &first, - std::chrono::system_clock::time_point &last) const; + /** + * Returns number of tasks waiting to be serviced, + * and first and last task times + */ + size_t getQueueInfo(std::chrono::system_clock::time_point& first, + std::chrono::system_clock::time_point& last) const; - // Returns true if there are threads actively running in serviceQueue() + /** Returns true if there are threads actively running in serviceQueue() */ bool AreThreadsServicingQueue() const; private: @@ -106,19 +110,20 @@ private: * B() will be able to observe all of the effects of callback A() which executed * before it. */ -class SingleThreadedSchedulerClient { +class SingleThreadedSchedulerClient +{ private: - CScheduler *m_pscheduler; + CScheduler* m_pscheduler; RecursiveMutex m_cs_callbacks_pending; - std::list<std::function<void ()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); + std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false; void MaybeScheduleProcessQueue(); void ProcessQueue(); public: - explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} + explicit SingleThreadedSchedulerClient(CScheduler* pschedulerIn) : m_pscheduler(pschedulerIn) {} /** * Add a callback to be executed. Callbacks are executed serially @@ -126,10 +131,12 @@ public: * Practically, this means that callbacks can behave as if they are executed * in order by a single thread. */ - void AddToProcessQueue(std::function<void ()> func); + void AddToProcessQueue(std::function<void()> func); - // Processes all remaining queue members on the calling thread, blocking until queue is empty - // Must be called after the CScheduler has no remaining processing threads! + /** + * Processes all remaining queue members on the calling thread, blocking until queue is empty + * Must be called after the CScheduler has no remaining processing threads! + */ void EmptyQueue(); size_t CallbacksPending(); diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index fcee6a9b9d..2e5a7549b7 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -89,7 +89,7 @@ BOOST_AUTO_TEST_CASE(manythreads) } // Drain the task queue then exit threads - microTasks.stop(true); + microTasks.StopWhenDrained(); microThreads.join_all(); // ... wait until all the threads are done int counterSum = 0; @@ -155,7 +155,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) } // finish up - scheduler.stop(true); + scheduler.StopWhenDrained(); threads.join_all(); BOOST_CHECK_EQUAL(counter1, 100); @@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(mockforward) scheduler.MockForward(std::chrono::minutes{5}); // ensure scheduler has chance to process all tasks queued for before 1 ms from now. - scheduler.scheduleFromNow([&scheduler] { scheduler.stop(false); }, std::chrono::milliseconds{1}); + scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1}); scheduler_thread.join(); // check that the queue only has one job remaining diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 3b7a7c8d12..709d357b8a 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -19,6 +19,7 @@ #include <rpc/blockchain.h> #include <rpc/register.h> #include <rpc/server.h> +#include <scheduler.h> #include <script/sigcache.h> #include <streams.h> #include <txdb.h> diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index d5cda8a95b..e480782c12 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -11,7 +11,6 @@ #include <node/context.h> #include <pubkey.h> #include <random.h> -#include <scheduler.h> #include <txmempool.h> #include <util/string.h> |