aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNeha Narula <narula@gmail.com>2020-09-13 20:11:24 -0400
committerNeha Narula <narula@gmail.com>2020-10-14 10:08:44 -0400
commit2d9f2fca43aadcdda4d644cddab36dca88b40b97 (patch)
tree0dd29941947137eee36c3017e838b1d8f6d3c61e /src
parent673247b58cd1252ab7e99f7d63ead05cc100cef2 (diff)
Move vRecvGetData to net processing
Diffstat (limited to 'src')
-rw-r--r--src/net.h1
-rw-r--r--src/net_processing.cpp25
2 files changed, 16 insertions, 10 deletions
diff --git a/src/net.h b/src/net.h
index da82fe9a2e..cd7a7c7c73 100644
--- a/src/net.h
+++ b/src/net.h
@@ -848,7 +848,6 @@ public:
RecursiveMutex cs_sendProcessing;
- std::deque<CInv> vRecvGetData;
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<int64_t> nLastSend{0};
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 1a50c68a26..3b632cf20b 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -515,6 +515,9 @@ struct Peer {
/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
+ /** Work queue of items requested by this peer **/
+ std::deque<CInv> vRecvGetData;
+
Peer(NodeId id) : m_id(id) {}
};
@@ -1754,7 +1757,10 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
{
AssertLockNotHeld(cs_main);
- std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin();
+ PeerRef peer = GetPeerRef(pfrom.GetId());
+ if (peer == nullptr) return;
+
+ std::deque<CInv>::iterator it = peer->vRecvGetData.begin();
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
@@ -1766,7 +1772,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process
// them.
- while (it != pfrom.vRecvGetData.end() && it->IsGenTxMsg()) {
+ while (it != peer->vRecvGetData.end() && it->IsGenTxMsg()) {
if (interruptMsgProc) return;
// The send buffer provides backpressure. If there's no space in
// the buffer, pause processing until the next call.
@@ -1814,7 +1820,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Only process one BLOCK item per call, since they're uncommon and can be
// expensive to process.
- if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) {
+ if (it != peer->vRecvGetData.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman);
@@ -1823,7 +1829,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// and continue processing the queue on the next call.
}
- pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it);
+ peer->vRecvGetData.erase(peer->vRecvGetData.begin(), it);
if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it doesn't
@@ -2805,7 +2811,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
}
- pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end());
+ peer->vRecvGetData.insert(peer->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
return;
}
@@ -2914,7 +2920,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
CInv inv;
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
inv.hash = req.blockhash;
- pfrom.vRecvGetData.push_back(inv);
+ peer->vRecvGetData.push_back(inv);
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
return;
}
@@ -3873,8 +3879,9 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false;
- if (!pfrom->vRecvGetData.empty())
+ if (!peer->vRecvGetData.empty()) {
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
+ }
{
LOCK2(cs_main, g_cs_orphans);
@@ -3888,7 +3895,7 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
// this maintains the order of responses
// and prevents vRecvGetData to grow unbounded
- if (!pfrom->vRecvGetData.empty()) return true;
+ if (!peer->vRecvGetData.empty()) return true;
{
LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) return true;
@@ -3921,7 +3928,7 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
if (interruptMsgProc)
return false;
- if (!pfrom->vRecvGetData.empty())
+ if (!peer->vRecvGetData.empty())
fMoreWork = true;
} catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());