aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJeff Garzik <jgarzik@exmulti.com>2012-05-11 09:57:08 -0700
committerJeff Garzik <jgarzik@exmulti.com>2012-05-11 09:57:08 -0700
commitb34c5f3c0f4b37335e27bd67f554cf4df6976116 (patch)
treeeb452ded3cff6ec54ce9789c633b54e566f031af /src
parenteb793429f12579c9d39129b4e3572b208aa03954 (diff)
parent96c5269511b0cecbea67c0981aaea1a8a3345ba3 (diff)
Merge pull request #1101 from jgarzik/http11
Multithreaded JSON-RPC with HTTP 1.1 Keep-Alive support
Diffstat (limited to 'src')
-rw-r--r--src/bitcoinrpc.cpp134
-rw-r--r--src/bitcoinrpc.h1
-rw-r--r--src/net.cpp5
-rw-r--r--src/net.h3
4 files changed, 103 insertions, 40 deletions
diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp
index 5c78656fcd..a766c4469a 100644
--- a/src/bitcoinrpc.cpp
+++ b/src/bitcoinrpc.cpp
@@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp);
const Object emptyobj;
+void ThreadRPCServer3(void* parg);
+
Object JSONRPCError(int code, const string& message)
{
Object error;
@@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp)
throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
- static mapNewBlock_t mapNewBlock;
+ static mapNewBlock_t mapNewBlock; // FIXME: thread safety
static vector<CBlock*> vNewBlock;
static CReserveKey reservekey(pwalletMain);
@@ -2355,7 +2357,7 @@ string rfc1123Time()
return string(buffer);
}
-static string HTTPReply(int nStatus, const string& strMsg)
+static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
{
if (nStatus == 401)
return strprintf("HTTP/1.0 401 Authorization Required\r\n"
@@ -2384,7 +2386,7 @@ static string HTTPReply(int nStatus, const string& strMsg)
return strprintf(
"HTTP/1.1 %d %s\r\n"
"Date: %s\r\n"
- "Connection: close\r\n"
+ "Connection: %s\r\n"
"Content-Length: %d\r\n"
"Content-Type: application/json\r\n"
"Server: bitcoin-json-rpc/%s\r\n"
@@ -2393,12 +2395,13 @@ static string HTTPReply(int nStatus, const string& strMsg)
nStatus,
cStatus,
rfc1123Time().c_str(),
+ keepalive ? "keep-alive" : "close",
strMsg.size(),
FormatFullVersion().c_str(),
strMsg.c_str());
}
-int ReadHTTPStatus(std::basic_istream<char>& stream)
+int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
{
string str;
getline(stream, str);
@@ -2406,6 +2409,10 @@ int ReadHTTPStatus(std::basic_istream<char>& stream)
boost::split(vWords, str, boost::is_any_of(" "));
if (vWords.size() < 2)
return 500;
+ proto = 0;
+ const char *ver = strstr(str.c_str(), "HTTP/1.");
+ if (ver != NULL)
+ proto = atoi(ver+7);
return atoi(vWords[1].c_str());
}
@@ -2440,7 +2447,8 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = "";
// Read status
- int nStatus = ReadHTTPStatus(stream);
+ int nProto;
+ int nStatus = ReadHTTPStatus(stream, nProto);
// Read header
int nLen = ReadHTTPHeader(stream, mapHeadersRet);
@@ -2455,6 +2463,16 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = string(vch.begin(), vch.end());
}
+ string sConHdr = mapHeadersRet["connection"];
+
+ if ((sConHdr != "close") && (sConHdr != "keep-alive"))
+ {
+ if (nProto >= 1)
+ mapHeadersRet["connection"] = "keep-alive";
+ else
+ mapHeadersRet["connection"] = "close";
+ }
+
return nStatus;
}
@@ -2507,7 +2525,7 @@ void ErrorReply(std::ostream& stream, const Object& objError, const Value& id)
if (code == -32600) nStatus = 400;
else if (code == -32601) nStatus = 404;
string strReply = JSONRPCReply(Value::null, objError, id);
- stream << HTTPReply(nStatus, strReply) << std::flush;
+ stream << HTTPReply(nStatus, strReply, false) << std::flush;
}
bool ClientAllowed(const string& strAddress)
@@ -2573,20 +2591,34 @@ private:
SSLStream& stream;
};
+class AcceptedConnection
+{
+ public:
+ SSLStream sslStream;
+ SSLIOStreamDevice d;
+ iostreams::stream<SSLIOStreamDevice> stream;
+
+ ip::tcp::endpoint peer;
+
+ AcceptedConnection(asio::io_service &io_service, ssl::context &context,
+ bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
+ stream(d) { ; }
+};
+
void ThreadRPCServer(void* parg)
{
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
try
{
- vnThreadsRunning[THREAD_RPCSERVER]++;
+ vnThreadsRunning[THREAD_RPCLISTENER]++;
ThreadRPCServer2(parg);
- vnThreadsRunning[THREAD_RPCSERVER]--;
+ vnThreadsRunning[THREAD_RPCLISTENER]--;
}
catch (std::exception& e) {
- vnThreadsRunning[THREAD_RPCSERVER]--;
+ vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(&e, "ThreadRPCServer()");
} catch (...) {
- vnThreadsRunning[THREAD_RPCSERVER]--;
+ vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(NULL, "ThreadRPCServer()");
}
printf("ThreadRPCServer exiting\n");
@@ -2664,55 +2696,78 @@ void ThreadRPCServer2(void* parg)
loop
{
// Accept connection
- SSLStream sslStream(io_service, context);
- SSLIOStreamDevice d(sslStream, fUseSSL);
- iostreams::stream<SSLIOStreamDevice> stream(d);
-
- ip::tcp::endpoint peer;
- vnThreadsRunning[THREAD_RPCSERVER]--;
- acceptor.accept(sslStream.lowest_layer(), peer);
- vnThreadsRunning[4]++;
+ AcceptedConnection *conn =
+ new AcceptedConnection(io_service, context, fUseSSL);
+
+ vnThreadsRunning[THREAD_RPCLISTENER]--;
+ acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
+ vnThreadsRunning[THREAD_RPCLISTENER]++;
+
if (fShutdown)
+ {
+ delete conn;
return;
+ }
- // Restrict callers by IP
- if (!ClientAllowed(peer.address().to_string()))
+ // Restrict callers by IP. It is important to
+ // do this before starting client thread, to filter out
+ // certain DoS and misbehaving clients.
+ if (!ClientAllowed(conn->peer.address().to_string()))
{
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
if (!fUseSSL)
- stream << HTTPReply(403, "") << std::flush;
- continue;
+ conn->stream << HTTPReply(403, "", false) << std::flush;
+ delete conn;
+ }
+
+ // start HTTP client thread
+ else if (!CreateThread(ThreadRPCServer3, conn)) {
+ printf("Failed to create RPC server client thread\n");
+ delete conn;
}
+ }
+}
+
+void ThreadRPCServer3(void* parg)
+{
+ IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
+ vnThreadsRunning[THREAD_RPCHANDLER]++;
+ AcceptedConnection *conn = (AcceptedConnection *) parg;
+ bool fRun = true;
+ loop {
+ if (fShutdown || !fRun)
+ {
+ conn->stream.close();
+ delete conn;
+ --vnThreadsRunning[THREAD_RPCHANDLER];
+ return;
+ }
map<string, string> mapHeaders;
string strRequest;
- boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
- if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
- { // Timed out:
- acceptor.cancel();
- printf("ThreadRPCServer ReadHTTP timeout\n");
- continue;
- }
+ ReadHTTP(conn->stream, mapHeaders, strRequest);
// Check authorization
if (mapHeaders.count("authorization") == 0)
{
- stream << HTTPReply(401, "") << std::flush;
- continue;
+ conn->stream << HTTPReply(401, "", false) << std::flush;
+ break;
}
if (!HTTPAuthorized(mapHeaders))
{
- printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
+ printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
/* Deter brute-forcing short passwords.
If this results in a DOS the user really
shouldn't have their RPC port exposed.*/
if (mapArgs["-rpcpassword"].size() < 20)
Sleep(250);
- stream << HTTPReply(401, "") << std::flush;
- continue;
+ conn->stream << HTTPReply(401, "", false) << std::flush;
+ break;
}
+ if (mapHeaders["connection"] == "close")
+ fRun = false;
Value id = Value::null;
try
@@ -2750,17 +2805,22 @@ void ThreadRPCServer2(void* parg)
// Send reply
string strReply = JSONRPCReply(result, Value::null, id);
- stream << HTTPReply(200, strReply) << std::flush;
+ conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
}
catch (Object& objError)
{
- ErrorReply(stream, objError, id);
+ ErrorReply(conn->stream, objError, id);
+ break;
}
catch (std::exception& e)
{
- ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
+ ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
+ break;
}
}
+
+ delete conn;
+ vnThreadsRunning[THREAD_RPCHANDLER]--;
}
json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const
diff --git a/src/bitcoinrpc.h b/src/bitcoinrpc.h
index dd18a504f3..ed5974578c 100644
--- a/src/bitcoinrpc.h
+++ b/src/bitcoinrpc.h
@@ -9,6 +9,7 @@
#include <string>
#include <map>
+#define BOOST_SPIRIT_THREADSAFE
#include "json/json_spirit_reader_template.h"
#include "json/json_spirit_writer_template.h"
#include "json/json_spirit_utils.h"
diff --git a/src/net.cpp b/src/net.cpp
index 8603514f91..a9ac44a08e 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1839,12 +1839,13 @@ bool StopNode()
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
- if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
+ if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
+ if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
- while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
+ while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
Sleep(20);
Sleep(50);
DumpAddresses();
diff --git a/src/net.h b/src/net.h
index a00dd1b8cc..c649242a07 100644
--- a/src/net.h
+++ b/src/net.h
@@ -92,11 +92,12 @@ enum threadId
THREAD_OPENCONNECTIONS,
THREAD_MESSAGEHANDLER,
THREAD_MINER,
- THREAD_RPCSERVER,
+ THREAD_RPCLISTENER,
THREAD_UPNP,
THREAD_DNSSEED,
THREAD_ADDEDCONNECTIONS,
THREAD_DUMPADDRESS,
+ THREAD_RPCHANDLER,
THREAD_MAX
};