diff options
-rw-r--r-- | src/scheduler.cpp | 46 | ||||
-rw-r--r-- | src/scheduler.h | 15 | ||||
-rw-r--r-- | src/test/scheduler_tests.cpp | 27 |
3 files changed, 70 insertions, 18 deletions
diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8b55888ae8..4f1d8be7c0 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -8,7 +8,7 @@ #include <boost/bind.hpp> #include <utility> -CScheduler::CScheduler() : nThreadsServicingQueue(0) +CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) { } @@ -29,32 +29,37 @@ void CScheduler::serviceQueue() { boost::unique_lock<boost::mutex> lock(newTaskMutex); ++nThreadsServicingQueue; + stopRequested = false; + stopWhenEmpty = false; // newTaskMutex is locked throughout this loop EXCEPT // when the thread is waiting or when the user's function // is called. - while (1) { + while (!shouldStop()) { try { - while (taskQueue.empty()) { + while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. newTaskScheduled.wait(lock); } -// Wait until either there is a new task, or until -// the time of the first item on the queue: + + // Wait until either there is a new task, or until + // the time of the first item on the queue: // wait_until needs boost 1.50 or later; older versions have timed_wait: #if BOOST_VERSION < 105000 - while (!taskQueue.empty() && newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { + while (!shouldStop() && !taskQueue.empty() && + newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { // Keep waiting until timeout } #else - while (!taskQueue.empty() && newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) { + while (!shouldStop() && !taskQueue.empty() && + newTaskScheduled.wait_until(lock, taskQueue.begin()->first) != boost::cv_status::timeout) { // Keep waiting until timeout } #endif // If there are multiple threads, the queue can empty while we're waiting (another // thread may service the task we were waiting on). - if (taskQueue.empty()) + if (shouldStop() || taskQueue.empty()) continue; Function f = taskQueue.begin()->second; @@ -70,6 +75,19 @@ void CScheduler::serviceQueue() throw; } } + --nThreadsServicingQueue; +} + +void CScheduler::stop(bool drain) +{ + { + boost::unique_lock<boost::mutex> lock(newTaskMutex); + if (drain) + stopWhenEmpty = true; + else + stopRequested = true; + } + newTaskScheduled.notify_all(); } void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) @@ -96,3 +114,15 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds) { scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds); } + +size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, + boost::chrono::system_clock::time_point &last) const +{ + boost::unique_lock<boost::mutex> lock(newTaskMutex); + size_t result = taskQueue.size(); + if (!taskQueue.empty()) { + first = taskQueue.begin()->first; + last = taskQueue.rbegin()->first; + } + return result; +} diff --git a/src/scheduler.h b/src/scheduler.h index bb383ab9f9..436659e58b 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -60,11 +60,24 @@ public: // and interrupted using boost::interrupt_thread void serviceQueue(); + // Tell any threads running serviceQueue to stop as soon as they're + // done servicing whatever task they're currently servicing (drain=false) + // or when there is no work left to be done (drain=true) + void stop(bool drain=false); + + // Returns number of tasks waiting to be serviced, + // and first and last task times + size_t getQueueInfo(boost::chrono::system_clock::time_point &first, + boost::chrono::system_clock::time_point &last) const; + private: std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue; boost::condition_variable newTaskScheduled; - boost::mutex newTaskMutex; + mutable boost::mutex newTaskMutex; int nThreadsServicingQueue; + bool stopRequested; + bool stopWhenEmpty; + bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; #endif diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index a26d0afaed..cb1a427db0 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -42,6 +42,8 @@ static void MicroSleep(uint64_t n) BOOST_AUTO_TEST_CASE(manythreads) { + seed_insecure_rand(false); + // Stress test: hundreds of microsecond-scheduled tasks, // serviced by 10 threads. // @@ -54,10 +56,6 @@ BOOST_AUTO_TEST_CASE(manythreads) // counters should sum to the number of initial tasks performed. CScheduler microTasks; - boost::thread_group microThreads; - for (int i = 0; i < 5; i++) - microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks)); - boost::mutex counterMutex[10]; int counter[10] = { 0 }; boost::random::mt19937 rng(insecure_rand()); @@ -67,6 +65,9 @@ BOOST_AUTO_TEST_CASE(manythreads) boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); boost::chrono::system_clock::time_point now = start; + boost::chrono::system_clock::time_point first, last; + size_t nTasks = microTasks.getQueueInfo(first, last); + BOOST_CHECK(nTasks == 0); for (int i = 0; i < 100; i++) { boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); @@ -77,9 +78,19 @@ BOOST_AUTO_TEST_CASE(manythreads) randomDelta(rng), tReschedule); microTasks.schedule(f, t); } + nTasks = microTasks.getQueueInfo(first, last); + BOOST_CHECK(nTasks == 100); + BOOST_CHECK(first < last); + BOOST_CHECK(last > now); + + // As soon as these are created they will start running and servicing the queue + boost::thread_group microThreads; + for (int i = 0; i < 5; i++) + microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks)); MicroSleep(600); now = boost::chrono::system_clock::now(); + // More threads and more tasks: for (int i = 0; i < 5; i++) microThreads.create_thread(boost::bind(&CScheduler::serviceQueue, µTasks)); @@ -93,11 +104,9 @@ BOOST_AUTO_TEST_CASE(manythreads) microTasks.schedule(f, t); } - // All 2,000 tasks should be finished within 2 milliseconds. Sleep a bit longer. - MicroSleep(2100); - - microThreads.interrupt_all(); - microThreads.join_all(); + // Drain the task queue then exit threads + microTasks.stop(true); + microThreads.join_all(); // ... wait until all the threads are done int counterSum = 0; for (int i = 0; i < 10; i++) { |