diff options
author | Cory Fields <cory-nospam-@coryfields.com> | 2016-12-31 02:05:32 -0500 |
---|---|---|
committer | Cory Fields <cory-nospam-@coryfields.com> | 2017-01-12 23:05:59 -0500 |
commit | 991955ee81034dc3fbc1c2a8e60c04fc9e0b538c (patch) | |
tree | a624868494e3c50786df85c75b1225b6e8e8eee2 | |
parent | c6e8a9bcffe4c0f236e27c663f08785d1a0a783b (diff) |
net: add a flag to indicate when a node's send buffer is full
Similar to the recv flag, but this one indicates whether or not the net's send
buffer is full.
The socket handler checks the send queue when a new message is added and pauses
if necessary, and possibly unpauses after each message is drained from its buffer.
-rw-r--r-- | src/net.cpp | 11 | ||||
-rw-r--r-- | src/net.h | 3 | ||||
-rw-r--r-- | src/net_processing.cpp | 6 |
3 files changed, 12 insertions, 8 deletions
diff --git a/src/net.cpp b/src/net.cpp index 70c04d7a0e..8ae2bebd32 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler() if (lockRecv) { bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); } } if (flagInterruptMsgProc) @@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; fPauseRecv = false; + fPauseSend = false; nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) @@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); @@ -358,6 +358,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -444,7 +445,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -652,6 +652,7 @@ public: const uint64_t nKeyedNetGroup; std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 93b6e2ec01..185ab980fe 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector<CInv> vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); // // Message format // (4) message start @@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (!pfrom->vRecvGetData.empty()) return true; // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) return false; std::list<CNetMessage> msgs; |