diff options
author | Neha Narula <narula@gmail.com> | 2020-09-13 20:34:41 -0400 |
---|---|---|
committer | Neha Narula <narula@gmail.com> | 2020-10-14 10:08:44 -0400 |
commit | ba951812ec0cc8ebee5911a582f188525b76ff0a (patch) | |
tree | 9089072724b2405be3120af33ab49abd131cfd2c | |
parent | 2d9f2fca43aadcdda4d644cddab36dca88b40b97 (diff) |
Guard vRecvGetData (now in net processing) with its own mutex
This requires slightly reorganizing the logic in GETBLOCKTXN to
maintain locking order.
-rw-r--r-- | src/net_processing.cpp | 101 |
1 files changed, 57 insertions, 44 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3b632cf20b..162d484566 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -515,8 +515,10 @@ struct Peer { /** Set of txids to reconsider once their parent transactions have been accepted **/ std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); + /** Protects vRecvGetData **/ + Mutex m_getdata_requests_mutex; /** Work queue of items requested by this peer **/ - std::deque<CInv> vRecvGetData; + std::deque<CInv> vRecvGetData GUARDED_BY(m_getdata_requests_mutex); Peer(NodeId id) : m_id(id) {} }; @@ -1753,14 +1755,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode& return {}; } -void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) +void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex) { AssertLockNotHeld(cs_main); - PeerRef peer = GetPeerRef(pfrom.GetId()); - if (peer == nullptr) return; - - std::deque<CInv>::iterator it = peer->vRecvGetData.begin(); + std::deque<CInv>::iterator it = peer.vRecvGetData.begin(); std::vector<CInv> vNotFound; const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); @@ -1772,7 +1771,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm // Process as many TX items from the front of the getdata queue as // possible, since they're common and it's efficient to batch process // them. - while (it != peer->vRecvGetData.end() && it->IsGenTxMsg()) { + while (it != peer.vRecvGetData.end() && it->IsGenTxMsg()) { if (interruptMsgProc) return; // The send buffer provides backpressure. If there's no space in // the buffer, pause processing until the next call. @@ -1820,7 +1819,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm // Only process one BLOCK item per call, since they're uncommon and can be // expensive to process. - if (it != peer->vRecvGetData.end() && !pfrom.fPauseSend) { + if (it != peer.vRecvGetData.end() && !pfrom.fPauseSend) { const CInv &inv = *it++; if (inv.IsGenBlkMsg()) { ProcessGetBlockData(pfrom, chainparams, inv, connman); @@ -1829,7 +1828,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm // and continue processing the queue on the next call. } - peer->vRecvGetData.erase(peer->vRecvGetData.begin(), it); + peer.vRecvGetData.erase(peer.vRecvGetData.begin(), it); if (!vNotFound.empty()) { // Let the peer know that we didn't find what it asked for, so it doesn't @@ -2811,8 +2810,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); } - peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc); + { + LOCK(peer->m_getdata_requests_mutex); + peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + } + return; } @@ -2900,36 +2903,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat return; } - LOCK(cs_main); + { + LOCK(cs_main); - const CBlockIndex* pindex = LookupBlockIndex(req.blockhash); - if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) { - LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId()); - return; - } + const CBlockIndex* pindex = LookupBlockIndex(req.blockhash); + if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) { + LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId()); + return; + } - if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) { - // If an older block is requested (should never happen in practice, - // but can happen in tests) send a block response instead of a - // blocktxn response. Sending a full block response instead of a - // small blocktxn response is preferable in the case where a peer - // might maliciously send lots of getblocktxn requests to trigger - // expensive disk reads, because it will require the peer to - // actually receive all the data read from disk over the network. - LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); - CInv inv; - inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; - inv.hash = req.blockhash; - peer->vRecvGetData.push_back(inv); - // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main) - return; - } + if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) { + CBlock block; + bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus()); + assert(ret); - CBlock block; - bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus()); - assert(ret); + SendBlockTransactions(pfrom, block, req); + return; + } + } - SendBlockTransactions(pfrom, block, req); + // If an older block is requested (should never happen in practice, + // but can happen in tests) send a block response instead of a + // blocktxn response. Sending a full block response instead of a + // small blocktxn response is preferable in the case where a peer + // might maliciously send lots of getblocktxn requests to trigger + // expensive disk reads, because it will require the peer to + // actually receive all the data read from disk over the network. + LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); + CInv inv; + WITH_LOCK(cs_main, inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK); + inv.hash = req.blockhash; + WITH_LOCK(peer->m_getdata_requests_mutex, peer->vRecvGetData.push_back(inv)); + // The message processing loop will go around again (without pausing) and we'll respond then return; } @@ -3879,8 +3884,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP PeerRef peer = GetPeerRef(pfrom->GetId()); if (peer == nullptr) return false; - if (!peer->vRecvGetData.empty()) { - ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc); + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->vRecvGetData.empty()) { + ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + } } { @@ -3895,7 +3903,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP // this maintains the order of responses // and prevents vRecvGetData to grow unbounded - if (!peer->vRecvGetData.empty()) return true; + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->vRecvGetData.empty()) return true; + } + { LOCK(g_cs_orphans); if (!peer->m_orphan_work_set.empty()) return true; @@ -3926,10 +3938,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP try { ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc); - if (interruptMsgProc) - return false; - if (!peer->vRecvGetData.empty()) - fMoreWork = true; + if (interruptMsgProc) return false; + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->vRecvGetData.empty()) fMoreWork = true; + } } catch (const std::exception& e) { LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name()); } catch (...) { |