aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorWladimir J. van der Laan <laanwj@gmail.com>2015-09-28 16:03:41 +0200
committerWladimir J. van der Laan <laanwj@gmail.com>2015-09-28 16:03:51 +0200
commit1a9f19a78daa392baf2e062bddff597ce0ce30b6 (patch)
treeb80a79a74bdd6a0160a2fd76c0bff2d382b85cd7 /src
parentad57b310bac44a7e470cf66276421f2bbc61b1f0 (diff)
parentec908d5f7aa9ad7e3487018e06a24cb6449cc58b (diff)
downloadbitcoin-1a9f19a78daa392baf2e062bddff597ce0ce30b6.tar.xz
Merge pull request #6719
ec908d5 http: Force-exit event loop after predefined time (Wladimir J. van der Laan) de9de2d http: Wait for worker threads to exit (Wladimir J. van der Laan) 5e0c221 Make HTTP server shutdown more graceful (Wladimir J. van der Laan)
Diffstat (limited to 'src')
-rw-r--r--src/httpserver.cpp74
-rw-r--r--src/rpcserver.cpp3
2 files changed, 67 insertions, 10 deletions
diff --git a/src/httpserver.cpp b/src/httpserver.cpp
index 600e57b7cc..0a7f903e9f 100644
--- a/src/httpserver.cpp
+++ b/src/httpserver.cpp
@@ -72,13 +72,35 @@ private:
std::deque<WorkItem*> queue;
bool running;
size_t maxDepth;
+ int numThreads;
+
+ /** RAII object to keep track of number of running worker threads */
+ class ThreadCounter
+ {
+ public:
+ WorkQueue &wq;
+ ThreadCounter(WorkQueue &w): wq(w)
+ {
+ boost::lock_guard<boost::mutex> lock(wq.cs);
+ wq.numThreads += 1;
+ }
+ ~ThreadCounter()
+ {
+ boost::lock_guard<boost::mutex> lock(wq.cs);
+ wq.numThreads -= 1;
+ wq.cond.notify_all();
+ }
+ };
public:
WorkQueue(size_t maxDepth) : running(true),
- maxDepth(maxDepth)
+ maxDepth(maxDepth),
+ numThreads(0)
{
}
- /* Precondition: worker threads have all stopped */
+ /*( Precondition: worker threads have all stopped
+ * (call WaitExit)
+ */
~WorkQueue()
{
while (!queue.empty()) {
@@ -100,6 +122,7 @@ public:
/** Thread function */
void Run()
{
+ ThreadCounter count(*this);
while (running) {
WorkItem* i = 0;
{
@@ -122,6 +145,13 @@ public:
running = false;
cond.notify_all();
}
+ /** Wait for worker threads to exit */
+ void WaitExit()
+ {
+ boost::unique_lock<boost::mutex> lock(cs);
+ while (numThreads > 0)
+ cond.wait(lock);
+ }
/** Return current depth of queue */
size_t Depth()
@@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets;
static WorkQueue<HTTPClosure>* workQueue = 0;
//! Handlers for (sub)paths
std::vector<HTTPPathHandler> pathHandlers;
+//! Bound listening sockets
+std::vector<evhttp_bound_socket *> boundSockets;
/** Check if a network address is allowed to access the HTTP server */
static bool ClientAllowed(const CNetAddr& netaddr)
@@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
}
}
+/** Callback to reject HTTP requests after shutdown. */
+static void http_reject_request_cb(struct evhttp_request* req, void*)
+{
+ LogPrint("http", "Rejecting request while shutting down\n");
+ evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
+}
+
/** Event dispatcher thread */
static void ThreadHTTP(struct event_base* base, struct evhttp* http)
{
@@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http)
static bool HTTPBindAddresses(struct evhttp* http)
{
int defaultPort = GetArg("-rpcport", BaseParams().RPCPort());
- int nBound = 0;
std::vector<std::pair<std::string, uint16_t> > endpoints;
// Determine what addresses to bind to
@@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http)
// Bind addresses
for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) {
LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second);
- if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) {
- nBound += 1;
+ evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second);
+ if (bind_handle) {
+ boundSockets.push_back(bind_handle);
} else {
LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second);
}
}
- return nBound > 0;
+ return !boundSockets.empty();
}
/** Simple wrapper to set thread name and run work queue */
@@ -410,8 +449,21 @@ bool StartHTTPServer(boost::thread_group& threadGroup)
void InterruptHTTPServer()
{
LogPrint("http", "Interrupting HTTP server\n");
- if (eventBase)
- event_base_loopbreak(eventBase);
+ if (eventHTTP) {
+ // Unlisten sockets
+ BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) {
+ evhttp_del_accept_socket(eventHTTP, socket);
+ }
+ // Reject requests on current connections
+ evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL);
+ }
+ if (eventBase) {
+ // Force-exit event loop after predefined time
+ struct timeval tv;
+ tv.tv_sec = 10;
+ tv.tv_usec = 0;
+ event_base_loopexit(eventBase, &tv);
+ }
if (workQueue)
workQueue->Interrupt();
}
@@ -419,7 +471,11 @@ void InterruptHTTPServer()
void StopHTTPServer()
{
LogPrint("http", "Stopping HTTP server\n");
- delete workQueue;
+ if (workQueue) {
+ LogPrint("http", "Waiting for HTTP worker threads to exit\n");
+ workQueue->WaitExit();
+ delete workQueue;
+ }
if (eventHTTP) {
evhttp_free(eventHTTP);
eventHTTP = 0;
diff --git a/src/rpcserver.cpp b/src/rpcserver.cpp
index b831d3d3b2..dbee61efc8 100644
--- a/src/rpcserver.cpp
+++ b/src/rpcserver.cpp
@@ -243,7 +243,8 @@ UniValue stop(const UniValue& params, bool fHelp)
throw runtime_error(
"stop\n"
"\nStop Bitcoin server.");
- // Shutdown will take long enough that the response should get back
+ // Event loop will exit after current HTTP requests have been handled, so
+ // this reply will get back to the client.
StartShutdown();
return "Bitcoin server stopping";
}