diff options
author | Wladimir J. van der Laan <laanwj@gmail.com> | 2017-01-04 12:20:43 +0100 |
---|---|---|
committer | Wladimir J. van der Laan <laanwj@gmail.com> | 2017-01-04 12:21:53 +0100 |
commit | d9ae1cefa081c7ef978fab0b288475692678af72 (patch) | |
tree | aaafc90d4a0ef9a6ac14f6787a4d8852705e095f /src/net.cpp | |
parent | c0ddd32bf629bb48426b0651def497ca1a78e6b1 (diff) | |
parent | 67ee4ec9015592c8447955356adfcbb1bf473e32 (diff) |
Merge #9289: net: drop boost::thread_group
67ee4ec net: misc header cleanups (Cory Fields)
8b3159e net: make proxy receives interruptible (Cory Fields)
5cb0fce net: remove thread_interrupted catch (Cory Fields)
d3d7056 net: make net processing interruptible (Cory Fields)
0985052 net: make net interruptible (Cory Fields)
799df91 net: add CThreadInterrupt and InterruptibleSleep (Cory Fields)
7325b15 net: a few small cleanups before replacing boost threads (Cory Fields)
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 118 |
1 files changed, 77 insertions, 41 deletions
diff --git a/src/net.cpp b/src/net.cpp index 52fd725392..bf2beb7740 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -35,8 +35,6 @@ #include <miniupnpc/upnperrors.h> #endif -#include <boost/filesystem.hpp> -#include <boost/thread.hpp> #include <math.h> @@ -1042,7 +1040,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -1180,7 +1178,8 @@ void CConnman::ThreadSocketHandler() int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -1193,7 +1192,8 @@ void CConnman::ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // @@ -1219,7 +1219,8 @@ void CConnman::ThreadSocketHandler() } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive @@ -1241,7 +1242,7 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); if(notify) - messageHandlerCondition.notify_one(); + condMsgProc.notify_one(); pnode->nLastRecv = GetTime(); pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); @@ -1469,7 +1470,8 @@ void CConnman::ThreadDNSAddressSeed() // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { - MilliSleep(11 * 1000); + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); int nRelevant = 0; @@ -1580,10 +1582,12 @@ void CConnman::ThreadOpenConnections() OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } @@ -1592,14 +1596,16 @@ void CConnman::ThreadOpenConnections() // Minimum time before next feeler connection (in microseconds). int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); - while (true) + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { @@ -1657,7 +1663,7 @@ void CConnman::ThreadOpenConnections() int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); @@ -1700,7 +1706,8 @@ void CConnman::ThreadOpenConnections() if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); - MilliSleep(randsleep); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } @@ -1779,11 +1786,12 @@ void CConnman::ThreadOpenAddedConnections() // OpenNetworkConnection can detect existing connections to that IP/port. CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - - MilliSleep(120000); // Retry every 2 minutes + if (!interruptNet.sleep_for(std::chrono::minutes(2))) + return; } } @@ -1793,7 +1801,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return false; + } if (!fNetworkActive) { return false; } @@ -1806,7 +1816,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return false; CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure); - boost::this_thread::interruption_point(); if (!pnode) return false; @@ -1820,13 +1829,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return true; } - void CConnman::ThreadMessageHandler() { - boost::mutex condition_mutex; - boost::unique_lock<boost::mutex> lock(condition_mutex); - - while (true) + while (!flagInterruptMsgProc) { std::vector<CNode*> vNodesCopy; { @@ -1849,7 +1854,7 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this)) + if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) pnode->CloseSocketDisconnect(); if (pnode->nSendSize < GetSendBufferSize()) @@ -1861,15 +1866,17 @@ void CConnman::ThreadMessageHandler() } } } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; // Send messages { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - GetNodeSignals().SendMessages(pnode, *this); + GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; } { @@ -1878,8 +1885,10 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) - messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); + if (fSleep) { + std::unique_lock<std::mutex> lock(mutexMsgProc); + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + } } } @@ -2071,6 +2080,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe nMaxOutbound = 0; nBestHeight = 0; clientInterface = NULL; + flagInterruptMsgProc = false; } NodeId CConnman::GetNewNodeId() @@ -2078,7 +2088,7 @@ NodeId CConnman::GetNewNodeId() return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } -bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) +bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions) { nTotalBytesRecv = 0; nTotalBytesSent = 0; @@ -2145,24 +2155,27 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // // Start threads // + InterruptSocks5(false); + interruptNet.reset(); + flagInterruptMsgProc = false; + + // Send and receive from sockets, accept connections + threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); - - // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this)))); + threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); + threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this))); // Initiate outbound connections unless connect=0 if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this)))); + threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this)))); + threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); @@ -2185,12 +2198,34 @@ public: } instance_of_cnetcleanup; -void CConnman::Stop() +void CConnman::Interrupt() { - LogPrintf("%s\n",__func__); + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + + interruptNet(); + InterruptSocks5(true); + if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) semOutbound->post(); +} + +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); if (fAddressesInitialized) { @@ -2233,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode) CConnman::~CConnman() { + Interrupt(); Stop(); } |