diff options
author | Cory Fields <cory-nospam-@coryfields.com> | 2016-12-31 02:05:28 -0500 |
---|---|---|
committer | Cory Fields <cory-nospam-@coryfields.com> | 2017-01-12 23:05:25 -0500 |
commit | 4d712e366ca7fffaf96394ef01c9246482c0d92e (patch) | |
tree | aa61383f6bf692c4fda2f697ad6fba3e4c199913 | |
parent | c5a8b1b946b1ab0bb82bd4270b2a40f5731abcff (diff) |
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for
processing, allowing the locks to be split.
-rw-r--r-- | src/net.cpp | 12 | ||||
-rw-r--r-- | src/net.h | 3 | ||||
-rw-r--r-- | src/net_processing.cpp | 25 |
3 files changed, 24 insertions, 16 deletions
diff --git a/src/net.cpp b/src/net.cpp index 947f016798..df2109e3f7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if (notify) + if (notify) { + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + } WakeMessageHandler(); + } } else if (nBytes == 0) { @@ -608,6 +608,9 @@ public: std::deque<std::vector<unsigned char>> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list<CNetMessage> vProcessMsg; + std::deque<CInv> vRecvGetData; std::list<CNetMessage> vRecvMsg; CCriticalSection cs_vRecvMsg; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 605e142e8d..9963a872e8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (pfrom->nSendSize >= nMaxSendBufferSize) return false; - auto it = pfrom->vRecvMsg.begin(); - if (it == pfrom->vRecvMsg.end()) - return false; - - // end, if an incomplete message is found - if (!it->complete()) - return false; - - // get next message - CNetMessage msg = std::move(*it); - - // at this point, any failure means we can delete the current message - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin()); - - fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete(); + std::list<CNetMessage> msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start |