From e5bcd9c84fd3107321ff6dbdef067ba03f2b43cb Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:07 -0500 Subject: net: make vRecvMsg a list so that we can use splice() --- src/net_processing.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ccfbb77fcd..f53a4b2636 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2471,7 +2471,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; - std::deque::iterator it = pfrom->vRecvMsg.begin(); + auto it = pfrom->vRecvMsg.begin(); while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->nSendSize >= nMaxSendBufferSize) -- cgit v1.2.3 From 0e973d970a2114c11f4a95f09721d977da7f0a94 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Wed, 4 Jan 2017 09:32:29 -0500 Subject: net: remove redundant max sendbuffer size check This is left-over from before there was proper accounting. Hitting 2x the sendbuffer size should not be possible. --- src/net_processing.cpp | 7 ------- 1 file changed, 7 deletions(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f53a4b2636..3d38995c50 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1059,8 +1059,6 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic& interruptMsgProc) { - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { @@ -1413,11 +1411,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Track requests for our stuff GetMainSignals().Inventory(inv.hash); - - if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) { - Misbehaving(pfrom->GetId(), 50); - return error("send buffer size() = %u", pfrom->nSendSize); - } } if (!vToFetch.empty()) -- cgit v1.2.3 From 56212e20acf1534d443cb910c9bf3a30f84d0f02 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:15 -0500 Subject: net: set message deserialization version when it's actually time to deserialize We'll soon no longer have access to vRecvMsg, and this is more intuitive anyway. --- src/net_processing.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3d38995c50..556975d142 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2485,6 +2485,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // at this point, any failure means we can delete the current message it++; + msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); -- cgit v1.2.3 From c72cc88ed39652e44b5be48e9455f6f395bd7e83 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:23 -0500 Subject: net: remove useless comments --- src/net_processing.cpp | 8 -------- 1 file changed, 8 deletions(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 556975d142..43a97ff718 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2445,9 +2445,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - //if (fDebug) - // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size()); - // // Message format // (4) message start @@ -2473,11 +2470,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // get next message CNetMessage& msg = *it; - //if (fDebug) - // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, - // msg.hdr.nMessageSize, msg.vRecv.size(), - // msg.complete() ? "Y" : "N"); - // end, if an incomplete message is found if (!msg.complete()) break; -- cgit v1.2.3 From c5a8b1b946b1ab0bb82bd4270b2a40f5731abcff Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:26 -0500 Subject: net: rework the way that the messagehandler sleeps In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases. --- src/net_processing.cpp | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 43a97ff718..605e142e8d 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - auto it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->nSendSize >= nMaxSendBufferSize) - break; + return false; - // get next message - CNetMessage& msg = *it; + auto it = pfrom->vRecvMsg.begin(); + if (it == pfrom->vRecvMsg.end()) + return false; // end, if an incomplete message is found - if (!msg.complete()) - break; + if (!it->complete()) + return false; + + // get next message + CNetMessage msg = std::move(*it); // at this point, any failure means we can delete the current message - it++; + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); + + fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } class CompareInvMempoolOrder -- cgit v1.2.3 From 4d712e366ca7fffaf96394ef01c9246482c0d92e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:28 -0500 Subject: net: add a new message queue for the message processor This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split. --- src/net_processing.cpp | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 605e142e8d..9963a872e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (pfrom->nSendSize >= nMaxSendBufferSize) return false; - auto it = pfrom->vRecvMsg.begin(); - if (it == pfrom->vRecvMsg.end()) - return false; - - // end, if an incomplete message is found - if (!it->complete()) - return false; - - // get next message - CNetMessage msg = std::move(*it); - - // at this point, any failure means we can delete the current message - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); - - fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); + std::list msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start -- cgit v1.2.3 From c6e8a9bcffe4c0f236e27c663f08785d1a0a783b Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:30 -0500 Subject: net: add a flag to indicate when a node's process queue is full Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue. --- src/net_processing.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9963a872e8..93b6e2ec01 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru return false; // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front()); -- cgit v1.2.3 From 991955ee81034dc3fbc1c2a8e60c04fc9e0b538c Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:32 -0500 Subject: net: add a flag to indicate when a node's send buffer is full Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer. --- src/net_processing.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 93b6e2ec01..185ab980fe 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic& interruptMsgProc) { std::deque::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -2444,7 +2443,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); // // Message format // (4) message start @@ -2465,7 +2463,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru if (!pfrom->vRecvGetData.empty()) return true; // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) return false; std::list msgs; -- cgit v1.2.3 From e60360e139852c655930e99d4bb4db554cd8385e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:36 -0500 Subject: net: remove cs_vRecvMsg vRecvMsg is now only touched by the socket handler thread. The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also only used by the socket handler thread, with the exception of queries from rpc/gui. These accesses are not threadsafe, but they never were. This needs to be addressed separately. Also, update comment describing data flow --- src/net_processing.cpp | 1 - 1 file changed, 1 deletion(-) (limited to 'src/net_processing.cpp') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 185ab980fe..32a5862f26 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2439,7 +2439,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } -// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interruptMsgProc) { const CChainParams& chainparams = Params(); -- cgit v1.2.3