diff options
author | Cory Fields <cory-nospam-@coryfields.com> | 2016-12-27 17:13:04 -0500 |
---|---|---|
committer | Cory Fields <cory-nospam-@coryfields.com> | 2017-01-03 17:56:20 -0500 |
commit | d3d7056d2a562301b3770c4ede1dfc8ffb00cf4b (patch) | |
tree | bf1afce0daac6bf2e8b71a181e2bcdf8b27e655c /src/net_processing.cpp | |
parent | 0985052319263bd7ca9744af3504682b3ea8e21a (diff) |
net: make net processing interruptible
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 33 |
1 files changed, 19 insertions, 14 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 380decbcb7..f3a04080d1 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam const CInv &inv = *it; { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return; + it++; if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) @@ -1055,7 +1057,7 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } -bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) +bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nSince = nNow - 10 * 60; BOOST_FOREACH(CAddress& addr, vAddr) { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { CInv &inv = vInv[nInv]; - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; bool fAlreadyHave = AlreadyHave(inv); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); @@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); } @@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); return true; } @@ -1925,10 +1929,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // cs_main if (fProcessBLOCKTXN) - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fRevertToHeaderProcessing) - return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fBlockReconstructed) { // If we got here, we were able to optimistically reconstruct a @@ -2441,7 +2445,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // requires LOCK(cs_vRecvMsg) -bool ProcessMessages(CNode* pfrom, CConnman& connman) +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -2459,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fOk = true; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; @@ -2520,8 +2524,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); - boost::this_thread::interruption_point(); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return true; } catch (const std::ios_base::failure& e) { @@ -2585,7 +2590,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman& connman) +bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { |