diff options
Diffstat (limited to 'src/httpserver.cpp')
-rw-r--r-- | src/httpserver.cpp | 95 |
1 files changed, 85 insertions, 10 deletions
diff --git a/src/httpserver.cpp b/src/httpserver.cpp index baca007571..0a7f903e9f 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -72,13 +72,35 @@ private: std::deque<WorkItem*> queue; bool running; size_t maxDepth; + int numThreads; + + /** RAII object to keep track of number of running worker threads */ + class ThreadCounter + { + public: + WorkQueue &wq; + ThreadCounter(WorkQueue &w): wq(w) + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads += 1; + } + ~ThreadCounter() + { + boost::lock_guard<boost::mutex> lock(wq.cs); + wq.numThreads -= 1; + wq.cond.notify_all(); + } + }; public: WorkQueue(size_t maxDepth) : running(true), - maxDepth(maxDepth) + maxDepth(maxDepth), + numThreads(0) { } - /* Precondition: worker threads have all stopped */ + /*( Precondition: worker threads have all stopped + * (call WaitExit) + */ ~WorkQueue() { while (!queue.empty()) { @@ -100,6 +122,7 @@ public: /** Thread function */ void Run() { + ThreadCounter count(*this); while (running) { WorkItem* i = 0; { @@ -122,6 +145,13 @@ public: running = false; cond.notify_all(); } + /** Wait for worker threads to exit */ + void WaitExit() + { + boost::unique_lock<boost::mutex> lock(cs); + while (numThreads > 0) + cond.wait(lock); + } /** Return current depth of queue */ size_t Depth() @@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets; static WorkQueue<HTTPClosure>* workQueue = 0; //! Handlers for (sub)paths std::vector<HTTPPathHandler> pathHandlers; +//! Bound listening sockets +std::vector<evhttp_bound_socket *> boundSockets; /** Check if a network address is allowed to access the HTTP server */ static bool ClientAllowed(const CNetAddr& netaddr) @@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg) } } +/** Callback to reject HTTP requests after shutdown. */ +static void http_reject_request_cb(struct evhttp_request* req, void*) +{ + LogPrint("http", "Rejecting request while shutting down\n"); + evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); +} + /** Event dispatcher thread */ static void ThreadHTTP(struct event_base* base, struct evhttp* http) { @@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http) static bool HTTPBindAddresses(struct evhttp* http) { int defaultPort = GetArg("-rpcport", BaseParams().RPCPort()); - int nBound = 0; std::vector<std::pair<std::string, uint16_t> > endpoints; // Determine what addresses to bind to @@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http) // Bind addresses for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) { LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second); - if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) { - nBound += 1; + evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second); + if (bind_handle) { + boundSockets.push_back(bind_handle); } else { LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second); } } - return nBound > 0; + return !boundSockets.empty(); } /** Simple wrapper to set thread name and run work queue */ @@ -320,6 +359,15 @@ static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue) queue->Run(); } +/** libevent event log callback */ +static void libevent_log_cb(int severity, const char *msg) +{ + if (severity >= EVENT_LOG_WARN) // Log warn messages and higher without debug category + LogPrintf("libevent: %s\n", msg); + else + LogPrint("libevent", "libevent: %s\n", msg); +} + bool InitHTTPServer() { struct evhttp* http = 0; @@ -335,6 +383,16 @@ bool InitHTTPServer() return false; } + // Redirect libevent's logging to our own log + event_set_log_callback(&libevent_log_cb); +#if LIBEVENT_VERSION_NUMBER >= 0x02010100 + // If -debug=libevent, set full libevent debugging. + // Otherwise, disable all libevent debugging. + if (LogAcceptCategory("libevent")) + event_enable_debug_logging(EVENT_DBG_ALL); + else + event_enable_debug_logging(EVENT_DBG_NONE); +#endif #ifdef WIN32 evthread_use_windows_threads(); #else @@ -355,7 +413,7 @@ bool InitHTTPServer() return false; } - evhttp_set_timeout(http, GetArg("-rpctimeout", DEFAULT_HTTP_TIMEOUT)); + evhttp_set_timeout(http, GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT)); evhttp_set_max_body_size(http, MAX_SIZE); evhttp_set_gencb(http, http_request_cb, NULL); @@ -391,8 +449,21 @@ bool StartHTTPServer(boost::thread_group& threadGroup) void InterruptHTTPServer() { LogPrint("http", "Interrupting HTTP server\n"); - if (eventBase) - event_base_loopbreak(eventBase); + if (eventHTTP) { + // Unlisten sockets + BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) { + evhttp_del_accept_socket(eventHTTP, socket); + } + // Reject requests on current connections + evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL); + } + if (eventBase) { + // Force-exit event loop after predefined time + struct timeval tv; + tv.tv_sec = 10; + tv.tv_usec = 0; + event_base_loopexit(eventBase, &tv); + } if (workQueue) workQueue->Interrupt(); } @@ -400,7 +471,11 @@ void InterruptHTTPServer() void StopHTTPServer() { LogPrint("http", "Stopping HTTP server\n"); - delete workQueue; + if (workQueue) { + LogPrint("http", "Waiting for HTTP worker threads to exit\n"); + workQueue->WaitExit(); + delete workQueue; + } if (eventHTTP) { evhttp_free(eventHTTP); eventHTTP = 0; |