aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp30
-rw-r--r--src/net.h3
-rw-r--r--src/net_processing.cpp48
-rw-r--r--src/net_processing.h1
4 files changed, 45 insertions, 37 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 36db77abb6..947f016798 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler()
void CConnman::WakeMessageHandler()
{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
condMsgProc.notify_one();
}
@@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler()
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete()))
- {
- fSleep = false;
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize());
}
}
if (flagInterruptMsgProc)
@@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
diff --git a/src/net.h b/src/net.h
index cc79cce852..4fc41bddac 100644
--- a/src/net.h
+++ b/src/net.h
@@ -424,6 +424,9 @@ private:
/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;
+ /** flag for waking the message processor. */
+ bool fMsgProcWake;
+
std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 43a97ff718..605e142e8d 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -2453,36 +2453,43 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
// (4) checksum
// (x) data
//
- bool fOk = true;
+ bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
+ if (pfrom->fDisconnect)
+ return false;
+
// this maintains the order of responses
- if (!pfrom->vRecvGetData.empty()) return fOk;
+ if (!pfrom->vRecvGetData.empty()) return true;
- auto it = pfrom->vRecvMsg.begin();
- while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize)
- break;
+ return false;
- // get next message
- CNetMessage& msg = *it;
+ auto it = pfrom->vRecvMsg.begin();
+ if (it == pfrom->vRecvMsg.end())
+ return false;
// end, if an incomplete message is found
- if (!msg.complete())
- break;
+ if (!it->complete())
+ return false;
+
+ // get next message
+ CNetMessage msg = std::move(*it);
// at this point, any failure means we can delete the current message
- it++;
+ pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
+
+ fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
- fOk = false;
- break;
+ pfrom->fDisconnect = true;
+ return false;
}
// Read header
@@ -2490,7 +2497,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!hdr.IsValid(chainparams.MessageStart()))
{
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
- continue;
+ return fMoreWork;
}
string strCommand = hdr.GetCommand();
@@ -2506,7 +2513,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
- continue;
+ return fMoreWork;
}
// Process message
@@ -2515,7 +2522,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
{
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
if (interruptMsgProc)
- return true;
+ return false;
+ if (!pfrom->vRecvGetData.empty())
+ fMoreWork = true;
}
catch (const std::ios_base::failure& e)
{
@@ -2549,14 +2558,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!fRet)
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
- break;
- }
-
- // In case the connection got shut down, its receive buffer was wiped
- if (!pfrom->fDisconnect)
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
-
- return fOk;
+ return fMoreWork;
}
class CompareInvMempoolOrder
diff --git a/src/net_processing.h b/src/net_processing.h
index 230d805bd4..1f33def1f7 100644
--- a/src/net_processing.h
+++ b/src/net_processing.h
@@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
+ * @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);