diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/httpserver.cpp | 74 | ||||
-rw-r--r-- | src/rpcserver.cpp | 3 |
2 files changed, 67 insertions, 10 deletions
diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 600e57b7cc..0a7f903e9f 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -72,13 +72,35 @@ private: std::deque<WorkItem*> queue; bool running; size_t maxDepth; + int numThreads; + + /** RAII object to keep track of number of running worker threads */ + class ThreadCounter + { + public: + WorkQueue &wq; + ThreadCounter(WorkQueue &w): wq(w) + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads += 1; + } + ~ThreadCounter() + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads -= 1; + wq.cond.notify_all(); + } + }; public: WorkQueue(size_t maxDepth) : running(true), - maxDepth(maxDepth) + maxDepth(maxDepth), + numThreads(0) { } - /* Precondition: worker threads have all stopped */ + /*( Precondition: worker threads have all stopped + * (call WaitExit) + */ ~WorkQueue() { while (!queue.empty()) { @@ -100,6 +122,7 @@ public: /** Thread function */ void Run() { + ThreadCounter count(*this); while (running) { WorkItem* i = 0; { @@ -122,6 +145,13 @@ public: running = false; cond.notify_all(); } + /** Wait for worker threads to exit */ + void WaitExit() + { + boost::unique_lock<boost::mutex> lock(cs); + while (numThreads > 0) + cond.wait(lock); + } /** Return current depth of queue */ size_t Depth() @@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets; static WorkQueue<HTTPClosure>* workQueue = 0; //! Handlers for (sub)paths std::vector<HTTPPathHandler> pathHandlers; +//! Bound listening sockets +std::vector<evhttp_bound_socket *> boundSockets; /** Check if a network address is allowed to access the HTTP server */ static bool ClientAllowed(const CNetAddr& netaddr) @@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg) } } +/** Callback to reject HTTP requests after shutdown. */ +static void http_reject_request_cb(struct evhttp_request* req, void*) +{ + LogPrint("http", "Rejecting request while shutting down\n"); + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); +} + /** Event dispatcher thread */ static void ThreadHTTP(struct event_base* base, struct evhttp* http) { @@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http) static bool HTTPBindAddresses(struct evhttp* http) { int defaultPort = GetArg("-rpcport", BaseParams().RPCPort()); - int nBound = 0; std::vector<std::pair<std::string, uint16_t> > endpoints; // Determine what addresses to bind to @@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http) // Bind addresses for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) { LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second); - if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) { - nBound += 1; + evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second); + if (bind_handle) { + boundSockets.push_back(bind_handle); } else { LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second); } } - return nBound > 0; + return !boundSockets.empty(); } /** Simple wrapper to set thread name and run work queue */ @@ -410,8 +449,21 @@ bool StartHTTPServer(boost::thread_group& threadGroup) void InterruptHTTPServer() { LogPrint("http", "Interrupting HTTP server\n"); - if (eventBase) - event_base_loopbreak(eventBase); + if (eventHTTP) { + // Unlisten sockets + BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) { + evhttp_del_accept_socket(eventHTTP, socket); + } + // Reject requests on current connections + evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL); + } + if (eventBase) { + // Force-exit event loop after predefined time + struct timeval tv; + tv.tv_sec = 10; + tv.tv_usec = 0; + event_base_loopexit(eventBase, &tv); + } if (workQueue) workQueue->Interrupt(); } @@ -419,7 +471,11 @@ void InterruptHTTPServer() void StopHTTPServer() { LogPrint("http", "Stopping HTTP server\n"); - delete workQueue; + if (workQueue) { + LogPrint("http", "Waiting for HTTP worker threads to exit\n"); + workQueue->WaitExit(); + delete workQueue; + } if (eventHTTP) { evhttp_free(eventHTTP); eventHTTP = 0; diff --git a/src/rpcserver.cpp b/src/rpcserver.cpp index b831d3d3b2..dbee61efc8 100644 --- a/src/rpcserver.cpp +++ b/src/rpcserver.cpp @@ -243,7 +243,8 @@ UniValue stop(const UniValue& params, bool fHelp) throw runtime_error( "stop\n" "\nStop Bitcoin server."); - // Shutdown will take long enough that the response should get back + // Event loop will exit after current HTTP requests have been handled, so + // this reply will get back to the client. StartShutdown(); return "Bitcoin server stopping"; } |