From c6e8a9bcffe4c0f236e27c663f08785d1a0a783b Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:30 -0500 Subject: net: add a flag to indicate when a node's process queue is full Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue. --- src/net.cpp | 10 +++++++--- src/net.h | 11 ++--------- src/net_processing.cpp | 2 ++ 3 files changed, 11 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/net.cpp b/src/net.cpp index df2109e3f7..70c04d7a0e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler() } { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (lockRecv && !pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { + size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } @@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; diff --git a/src/net.h b/src/net.h index 21864e73d1..0eb430a8bf 100644 --- a/src/net.h +++ b/src/net.h @@ -610,6 +610,7 @@ public: CCriticalSection cs_vProcessMsg; std::list vProcessMsg; + size_t nProcessQueueSize; std::deque vRecvGetData; std::list vRecvMsg; @@ -650,6 +651,7 @@ public: const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -743,15 +745,6 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9963a872e8..93b6e2ec01 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru return false; // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front()); -- cgit v1.2.3