diff options
author | Wladimir J. van der Laan <laanwj@gmail.com> | 2017-01-04 12:20:43 +0100 |
---|---|---|
committer | Wladimir J. van der Laan <laanwj@gmail.com> | 2017-01-04 12:21:53 +0100 |
commit | d9ae1cefa081c7ef978fab0b288475692678af72 (patch) | |
tree | aaafc90d4a0ef9a6ac14f6787a4d8852705e095f /src | |
parent | c0ddd32bf629bb48426b0651def497ca1a78e6b1 (diff) | |
parent | 67ee4ec9015592c8447955356adfcbb1bf473e32 (diff) |
Merge #9289: net: drop boost::thread_group
67ee4ec net: misc header cleanups (Cory Fields)
8b3159e net: make proxy receives interruptible (Cory Fields)
5cb0fce net: remove thread_interrupted catch (Cory Fields)
d3d7056 net: make net processing interruptible (Cory Fields)
0985052 net: make net interruptible (Cory Fields)
799df91 net: add CThreadInterrupt and InterruptibleSleep (Cory Fields)
7325b15 net: a few small cleanups before replacing boost threads (Cory Fields)
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/init.cpp | 4 | ||||
-rw-r--r-- | src/net.cpp | 118 | ||||
-rw-r--r-- | src/net.h | 23 | ||||
-rw-r--r-- | src/net_processing.cpp | 36 | ||||
-rw-r--r-- | src/net_processing.h | 5 | ||||
-rw-r--r-- | src/netbase.cpp | 19 | ||||
-rw-r--r-- | src/netbase.h | 1 | ||||
-rw-r--r-- | src/test/DoS_tests.cpp | 20 | ||||
-rw-r--r-- | src/threadinterrupt.cpp | 41 | ||||
-rw-r--r-- | src/threadinterrupt.h | 34 |
11 files changed, 222 insertions, 81 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 89b90e6dff..3428d4613d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -138,6 +138,7 @@ BITCOIN_CORE_H = \ support/lockedpool.h \ sync.h \ threadsafety.h \ + threadinterrupt.h \ timedata.h \ torcontrol.h \ txdb.h \ @@ -327,6 +328,7 @@ libbitcoin_util_a_SOURCES = \ rpc/protocol.cpp \ support/cleanse.cpp \ sync.cpp \ + threadinterrupt.cpp \ util.cpp \ utilmoneystr.cpp \ utilstrencodings.cpp \ diff --git a/src/init.cpp b/src/init.cpp index 52a115b8c8..8110b63119 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup) InterruptRPC(); InterruptREST(); InterruptTorControl(); + if (g_connman) + g_connman->Interrupt(); threadGroup.interrupt_all(); } @@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe; connOptions.nMaxOutboundLimit = nMaxOutboundLimit; - if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions)) + if (!connman.Start(scheduler, strNodeError, connOptions)) return InitError(strNodeError); // ********************************************************* Step 12: finished diff --git a/src/net.cpp b/src/net.cpp index 52fd725392..bf2beb7740 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -35,8 +35,6 @@ #include <miniupnpc/upnperrors.h> #endif -#include <boost/filesystem.hpp> -#include <boost/thread.hpp> #include <math.h> @@ -1042,7 +1040,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -1180,7 +1178,8 @@ void CConnman::ThreadSocketHandler() int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -1193,7 +1192,8 @@ void CConnman::ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // @@ -1219,7 +1219,8 @@ void CConnman::ThreadSocketHandler() } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive @@ -1241,7 +1242,7 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); if(notify) - messageHandlerCondition.notify_one(); + condMsgProc.notify_one(); pnode->nLastRecv = GetTime(); pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); @@ -1469,7 +1470,8 @@ void CConnman::ThreadDNSAddressSeed() // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { - MilliSleep(11 * 1000); + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); int nRelevant = 0; @@ -1580,10 +1582,12 @@ void CConnman::ThreadOpenConnections() OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } @@ -1592,14 +1596,16 @@ void CConnman::ThreadOpenConnections() // Minimum time before next feeler connection (in microseconds). int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); - while (true) + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { @@ -1657,7 +1663,7 @@ void CConnman::ThreadOpenConnections() int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); @@ -1700,7 +1706,8 @@ void CConnman::ThreadOpenConnections() if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); - MilliSleep(randsleep); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } @@ -1779,11 +1786,12 @@ void CConnman::ThreadOpenAddedConnections() // OpenNetworkConnection can detect existing connections to that IP/port. CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - - MilliSleep(120000); // Retry every 2 minutes + if (!interruptNet.sleep_for(std::chrono::minutes(2))) + return; } } @@ -1793,7 +1801,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return false; + } if (!fNetworkActive) { return false; } @@ -1806,7 +1816,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return false; CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); - boost::this_thread::interruption_point(); if (!pnode) return false; @@ -1820,13 +1829,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return true; } - void CConnman::ThreadMessageHandler() { - boost::mutex condition_mutex; - boost::unique_lock<boost::mutex> lock(condition_mutex); - - while (true) + while (!flagInterruptMsgProc) { std::vector<CNode*> vNodesCopy; { @@ -1849,7 +1854,7 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this)) + if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) pnode->CloseSocketDisconnect(); if (pnode->nSendSize < GetSendBufferSize()) @@ -1861,15 +1866,17 @@ void CConnman::ThreadMessageHandler() } } } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; // Send messages { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - GetNodeSignals().SendMessages(pnode, *this); + GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; } { @@ -1878,8 +1885,10 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) - messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); + if (fSleep) { + std::unique_lock<std::mutex> lock(mutexMsgProc); + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + } } } @@ -2071,6 +2080,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe nMaxOutbound = 0; nBestHeight = 0; clientInterface = NULL; + flagInterruptMsgProc = false; } NodeId CConnman::GetNewNodeId() @@ -2078,7 +2088,7 @@ NodeId CConnman::GetNewNodeId() return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } -bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) +bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions) { nTotalBytesRecv = 0; nTotalBytesSent = 0; @@ -2145,24 +2155,27 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // // Start threads // + InterruptSocks5(false); + interruptNet.reset(); + flagInterruptMsgProc = false; + + // Send and receive from sockets, accept connections + threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); - - // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this)))); + threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); + threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this))); // Initiate outbound connections unless connect=0 if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this)))); + threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this)))); + threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); @@ -2185,12 +2198,34 @@ public: } instance_of_cnetcleanup; -void CConnman::Stop() +void CConnman::Interrupt() { - LogPrintf("%s\n",__func__); + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + + interruptNet(); + InterruptSocks5(true); + if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) semOutbound->post(); +} + +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); if (fAddressesInitialized) { @@ -2233,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode) CConnman::~CConnman() { + Interrupt(); Stop(); } @@ -19,11 +19,14 @@ #include "streams.h" #include "sync.h" #include "uint256.h" +#include "threadinterrupt.h" #include <atomic> #include <deque> #include <stdint.h> +#include <thread> #include <memory> +#include <condition_variable> #ifndef WIN32 #include <arpa/inet.h> @@ -142,8 +145,9 @@ public: }; CConnman(uint64_t seed0, uint64_t seed1); ~CConnman(); - bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options); + bool Start(CScheduler& scheduler, std::string& strNodeError, Options options); void Stop(); + void Interrupt(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool GetNetworkActive() const { return fNetworkActive; }; void SetNetworkActive(bool active); @@ -402,7 +406,6 @@ private: std::list<CNode*> vNodesDisconnected; mutable CCriticalSection cs_vNodes; std::atomic<NodeId> nLastNodeId; - boost::condition_variable messageHandlerCondition; /** Services this instance offers */ ServiceFlags nLocalServices; @@ -419,6 +422,18 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + + std::condition_variable condMsgProc; + std::mutex mutexMsgProc; + std::atomic<bool> flagInterruptMsgProc; + + CThreadInterrupt interruptNet; + + std::thread threadDNSAddressSeed; + std::thread threadSocketHandler; + std::thread threadOpenAddedConnections; + std::thread threadOpenConnections; + std::thread threadMessageHandler; }; extern std::unique_ptr<CConnman> g_connman; void Discover(boost::thread_group& threadGroup); @@ -445,8 +460,8 @@ struct CombinerAll // Signals for message handling struct CNodeSignals { - boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> ProcessMessages; - boost::signals2::signal<bool (CNode*, CConnman&), CombinerAll> SendMessages; + boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> ProcessMessages; + boost::signals2::signal<bool (CNode*, CConnman&, std::atomic<bool>&), CombinerAll> SendMessages; boost::signals2::signal<void (CNode*, CConnman&)> InitializeNode; boost::signals2::signal<void (NodeId, bool&)> FinalizeNode; }; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 380decbcb7..e0c12d8530 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam const CInv &inv = *it; { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return; + it++; if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) @@ -1055,7 +1057,7 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } -bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) +bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nSince = nNow - 10 * 60; BOOST_FOREACH(CAddress& addr, vAddr) { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { CInv &inv = vInv[nInv]; - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; bool fAlreadyHave = AlreadyHave(inv); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); @@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); } @@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); return true; } @@ -1925,10 +1929,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // cs_main if (fProcessBLOCKTXN) - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fRevertToHeaderProcessing) - return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fBlockReconstructed) { // If we got here, we were able to optimistically reconstruct a @@ -2441,7 +2445,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // requires LOCK(cs_vRecvMsg) -bool ProcessMessages(CNode* pfrom, CConnman& connman) +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -2459,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fOk = true; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; @@ -2520,8 +2524,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); - boost::this_thread::interruption_point(); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return true; } catch (const std::ios_base::failure& e) { @@ -2546,9 +2551,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) PrintExceptionContinue(&e, "ProcessMessages()"); } } - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { PrintExceptionContinue(&e, "ProcessMessages()"); } catch (...) { @@ -2585,7 +2587,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman& connman) +bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { diff --git a/src/net_processing.h b/src/net_processing.h index dc250ebab6..230d805bd4 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -39,13 +39,14 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch); /** Process protocol messages received from a given node */ -bool ProcessMessages(CNode* pfrom, CConnman& connman); +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interrupt); /** * Send queued protocol messages to be sent to a give node. * * @param[in] pto The node which we are sending messages to. * @param[in] connman The connection manager for that node. + * @param[in] interrupt Interrupt condition for processing threads */ -bool SendMessages(CNode* pto, CConnman& connman); +bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt); #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/netbase.cpp b/src/netbase.cpp index 21aa645de9..8fd2a8efd2 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -16,20 +16,14 @@ #include "util.h" #include "utilstrencodings.h" -#ifdef HAVE_GETADDRINFO_A -#include <netdb.h> -#endif +#include <atomic> #ifndef WIN32 -#if HAVE_INET_PTON -#include <arpa/inet.h> -#endif #include <fcntl.h> #endif #include <boost/algorithm/string/case_conv.hpp> // for to_lower() #include <boost/algorithm/string/predicate.hpp> // for startswith() and endswith() -#include <boost/thread.hpp> #if !defined(HAVE_MSG_NOSIGNAL) && !defined(MSG_NOSIGNAL) #define MSG_NOSIGNAL 0 @@ -44,6 +38,7 @@ bool fNameLookup = DEFAULT_NAME_LOOKUP; // Need ample time for negotiation for very slow proxies such as Tor (milliseconds) static const int SOCKS5_RECV_TIMEOUT = 20 * 1000; +static std::atomic<bool> interruptSocks5Recv(false); enum Network ParseNetwork(std::string net) { boost::to_lower(net); @@ -206,7 +201,7 @@ struct timeval MillisToTimeval(int64_t nTimeout) /** * Read bytes from socket. This will either read the full number of bytes requested * or return False on error or timeout. - * This function can be interrupted by boost thread interrupt. + * This function can be interrupted by calling InterruptSocks5() * * @param data Buffer to receive into * @param len Length of data to receive @@ -246,7 +241,8 @@ bool static InterruptibleRecv(char* data, size_t len, int timeout, SOCKET& hSock return false; } } - boost::this_thread::interruption_point(); + if (interruptSocks5Recv) + return false; curTime = GetTimeMillis(); } return len == 0; @@ -715,3 +711,8 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking) return true; } + +void InterruptSocks5(bool interrupt) +{ + interruptSocks5Recv = interrupt; +} diff --git a/src/netbase.h b/src/netbase.h index 0337d0d54b..dd33b6e47e 100644 --- a/src/netbase.h +++ b/src/netbase.h @@ -63,5 +63,6 @@ bool SetSocketNonBlocking(SOCKET& hSocket, bool fNonBlocking); * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(int64_t nTimeout); +void InterruptSocks5(bool interrupt); #endif // BITCOIN_NETBASE_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index c7c3646b03..f02579878a 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -47,6 +47,8 @@ BOOST_FIXTURE_TEST_SUITE(DoS_tests, TestingSetup) BOOST_AUTO_TEST_CASE(DoS_banning) { + std::atomic<bool> interruptDummy(false); + connman->ClearBanned(); CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true); @@ -54,7 +56,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning) GetNodeSignals().InitializeNode(&dummyNode1, *connman); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); // Should get banned - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned @@ -64,16 +66,18 @@ BOOST_AUTO_TEST_CASE(DoS_banning) GetNodeSignals().InitializeNode(&dummyNode2, *connman); dummyNode2.nVersion = 1; Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman); + SendMessages(&dummyNode2, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be Misbehaving(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman); + SendMessages(&dummyNode2, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } BOOST_AUTO_TEST_CASE(DoS_banscore) { + std::atomic<bool> interruptDummy(false); + connman->ClearBanned(); ForceSetArg("-banscore", "111"); // because 11 is my favorite number CAddress addr1(ip(0xa0b0c001), NODE_NONE); @@ -82,19 +86,21 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) GetNodeSignals().InitializeNode(&dummyNode1, *connman); dummyNode1.nVersion = 1; Misbehaving(dummyNode1.GetId(), 100); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1); - SendMessages(&dummyNode1, *connman); + SendMessages(&dummyNode1, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } BOOST_AUTO_TEST_CASE(DoS_bantime) { + std::atomic<bool> interruptDummy(false); + connman->ClearBanned(); int64_t nStartTime = GetTime(); SetMockTime(nStartTime); // Overrides future calls to GetTime() @@ -106,7 +112,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) dummyNode.nVersion = 1; Misbehaving(dummyNode.GetId(), 100); - SendMessages(&dummyNode, *connman); + SendMessages(&dummyNode, *connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime+60*60); diff --git a/src/threadinterrupt.cpp b/src/threadinterrupt.cpp new file mode 100644 index 0000000000..9d691079ed --- /dev/null +++ b/src/threadinterrupt.cpp @@ -0,0 +1,41 @@ +// Copyright (c) 2009-2010 Satoshi Nakamoto +// Copyright (c) 2009-2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "threadinterrupt.h" + +CThreadInterrupt::operator bool() const +{ + return flag.load(std::memory_order_acquire); +} + +void CThreadInterrupt::reset() +{ + flag.store(false, std::memory_order_release); +} + +void CThreadInterrupt::operator()() +{ + { + std::unique_lock<std::mutex> lock(mut); + flag.store(true, std::memory_order_release); + } + cond.notify_all(); +} + +bool CThreadInterrupt::sleep_for(std::chrono::milliseconds rel_time) +{ + std::unique_lock<std::mutex> lock(mut); + return !cond.wait_for(lock, rel_time, [this]() { return flag.load(std::memory_order_acquire); }); +} + +bool CThreadInterrupt::sleep_for(std::chrono::seconds rel_time) +{ + return sleep_for(std::chrono::duration_cast<std::chrono::milliseconds>(rel_time)); +} + +bool CThreadInterrupt::sleep_for(std::chrono::minutes rel_time) +{ + return sleep_for(std::chrono::duration_cast<std::chrono::milliseconds>(rel_time)); +} diff --git a/src/threadinterrupt.h b/src/threadinterrupt.h new file mode 100644 index 0000000000..54e3102808 --- /dev/null +++ b/src/threadinterrupt.h @@ -0,0 +1,34 @@ +// Copyright (c) 2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_THREADINTERRUPT_H +#define BITCOIN_THREADINTERRUPT_H + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <mutex> + +/* + A helper class for interruptible sleeps. Calling operator() will interrupt + any current sleep, and after that point operator bool() will return true + until reset. +*/ +class CThreadInterrupt +{ +public: + explicit operator bool() const; + void operator()(); + void reset(); + bool sleep_for(std::chrono::milliseconds rel_time); + bool sleep_for(std::chrono::seconds rel_time); + bool sleep_for(std::chrono::minutes rel_time); + +private: + std::condition_variable cond; + std::mutex mut; + std::atomic<bool> flag; +}; + +#endif //BITCOIN_THREADINTERRUPT_H |