diff options
author | David Joel Schwartz <davidjoelschwartz@gmail.com> | 2012-04-14 20:35:58 -0400 |
---|---|---|
committer | Jeff Garzik <jgarzik@redhat.com> | 2012-05-08 20:11:17 -0400 |
commit | e9205293bd8d6352ce51372111336ba3c4e14e70 (patch) | |
tree | 28963d21d87c814049ee10cd5bb22e0eca6e1619 /src | |
parent | 203f9e6c0010893df20fb64c77dc0ac42e396947 (diff) |
Support multi-threaded JSON-RPC
Change internal HTTP JSON-RPC server from single-threaded to
thread-per-connection model. The IP filter list is applied prior to starting
the thread, which then processes the RPC.
A mutex covers the entire RPC operation, because not all RPC operations are
thread-safe.
[minor modifications by jgarzik, to make change upstream-ready]
Diffstat (limited to 'src')
-rw-r--r-- | src/bitcoinrpc.cpp | 98 | ||||
-rw-r--r-- | src/bitcoinrpc.h | 1 | ||||
-rw-r--r-- | src/net.cpp | 5 | ||||
-rw-r--r-- | src/net.h | 3 |
4 files changed, 72 insertions, 35 deletions
diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index a189b2b2b0..b3a27101a6 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp); const Object emptyobj; +void ThreadRPCServer3(void* parg); + Object JSONRPCError(int code, const string& message) { Object error; @@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp) throw JSONRPCError(-10, "Bitcoin is downloading blocks..."); typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t; - static mapNewBlock_t mapNewBlock; + static mapNewBlock_t mapNewBlock; // FIXME: thread safety static vector<CBlock*> vNewBlock; static CReserveKey reservekey(pwalletMain); @@ -2573,20 +2575,34 @@ private: SSLStream& stream; }; +class AcceptedConnection +{ + public: + SSLStream sslStream; + SSLIOStreamDevice d; + iostreams::stream<SSLIOStreamDevice> stream; + + ip::tcp::endpoint peer; + + AcceptedConnection(asio::io_service &io_service, ssl::context &context, + bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL), + stream(d) { ; } +}; + void ThreadRPCServer(void* parg) { IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg)); try { - vnThreadsRunning[THREAD_RPCSERVER]++; + vnThreadsRunning[THREAD_RPCLISTENER]++; ThreadRPCServer2(parg); - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; } catch (std::exception& e) { - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; PrintException(&e, "ThreadRPCServer()"); } catch (...) { - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; PrintException(NULL, "ThreadRPCServer()"); } printf("ThreadRPCServer exiting\n"); @@ -2664,54 +2680,67 @@ void ThreadRPCServer2(void* parg) loop { // Accept connection - SSLStream sslStream(io_service, context); - SSLIOStreamDevice d(sslStream, fUseSSL); - iostreams::stream<SSLIOStreamDevice> stream(d); - - ip::tcp::endpoint peer; - vnThreadsRunning[THREAD_RPCSERVER]--; - acceptor.accept(sslStream.lowest_layer(), peer); - vnThreadsRunning[4]++; + AcceptedConnection *conn = + new AcceptedConnection(io_service, context, fUseSSL); + + vnThreadsRunning[THREAD_RPCLISTENER]--; + acceptor.accept(conn->sslStream.lowest_layer(), conn->peer); + vnThreadsRunning[THREAD_RPCLISTENER]++; + if (fShutdown) + { + delete conn; return; + } - // Restrict callers by IP - if (!ClientAllowed(peer.address().to_string())) + // Restrict callers by IP. It is important to + // do this before starting client thread, to filter out + // certain DoS and misbehaving clients. + if (!ClientAllowed(conn->peer.address().to_string())) { // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. if (!fUseSSL) - stream << HTTPReply(403, "") << std::flush; - continue; + conn->stream << HTTPReply(403, "") << std::flush; + delete conn; } + // start HTTP client thread + else if (!CreateThread(ThreadRPCServer3, conn)) { + printf("Failed to create RPC server client thread\n"); + delete conn; + } + } +} + +void ThreadRPCServer3(void* parg) +{ + IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg)); + vnThreadsRunning[THREAD_RPCHANDLER]++; + AcceptedConnection *conn = (AcceptedConnection *) parg; + + do { map<string, string> mapHeaders; string strRequest; - boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest)); - if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30)))) - { // Timed out: - acceptor.cancel(); - printf("ThreadRPCServer ReadHTTP timeout\n"); - continue; - } + ReadHTTP(conn->stream, mapHeaders, strRequest); // Check authorization if (mapHeaders.count("authorization") == 0) { - stream << HTTPReply(401, "") << std::flush; - continue; + conn->stream << HTTPReply(401, "") << std::flush; + break; } if (!HTTPAuthorized(mapHeaders)) { - printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str()); + printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str()); /* Deter brute-forcing short passwords. If this results in a DOS the user really shouldn't have their RPC port exposed.*/ if (mapArgs["-rpcpassword"].size() < 20) Sleep(250); - stream << HTTPReply(401, "") << std::flush; - continue; + conn->stream << HTTPReply(401, "") << std::flush; + break; } Value id = Value::null; @@ -2750,17 +2779,22 @@ void ThreadRPCServer2(void* parg) // Send reply string strReply = JSONRPCReply(result, Value::null, id); - stream << HTTPReply(200, strReply) << std::flush; + conn->stream << HTTPReply(200, strReply) << std::flush; } catch (Object& objError) { - ErrorReply(stream, objError, id); + ErrorReply(conn->stream, objError, id); + break; } catch (std::exception& e) { - ErrorReply(stream, JSONRPCError(-32700, e.what()), id); + ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id); + break; } } + while (0); + delete conn; + vnThreadsRunning[THREAD_RPCHANDLER]--; } json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array ¶ms) const diff --git a/src/bitcoinrpc.h b/src/bitcoinrpc.h index dd18a504f3..ed5974578c 100644 --- a/src/bitcoinrpc.h +++ b/src/bitcoinrpc.h @@ -9,6 +9,7 @@ #include <string> #include <map> +#define BOOST_SPIRIT_THREADSAFE #include "json/json_spirit_reader_template.h" #include "json/json_spirit_writer_template.h" #include "json/json_spirit_utils.h" diff --git a/src/net.cpp b/src/net.cpp index 7efe304fb6..6da256a8cb 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1839,12 +1839,13 @@ bool StopNode() if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n"); - if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n"); + if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n"); + if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n"); if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); - while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0) + while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0) Sleep(20); Sleep(50); DumpAddresses(); @@ -92,11 +92,12 @@ enum threadId THREAD_OPENCONNECTIONS, THREAD_MESSAGEHANDLER, THREAD_MINER, - THREAD_RPCSERVER, + THREAD_RPCLISTENER, THREAD_UPNP, THREAD_DNSSEED, THREAD_ADDEDCONNECTIONS, THREAD_DUMPADDRESS, + THREAD_RPCHANDLER, THREAD_MAX }; |