aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCory Fields <cory-nospam-@coryfields.com>2016-12-31 02:05:28 -0500
committerCory Fields <cory-nospam-@coryfields.com>2017-01-12 23:05:25 -0500
commit4d712e366ca7fffaf96394ef01c9246482c0d92e (patch)
treeaa61383f6bf692c4fda2f697ad6fba3e4c199913 /src
parentc5a8b1b946b1ab0bb82bd4270b2a40f5731abcff (diff)
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp12
-rw-r--r--src/net.h3
-rw-r--r--src/net_processing.cpp25
3 files changed, 24 insertions, 16 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 947f016798..df2109e3f7 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
- if (notify)
+ if (notify) {
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ }
WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
diff --git a/src/net.h b/src/net.h
index 4fc41bddac..21864e73d1 100644
--- a/src/net.h
+++ b/src/net.h
@@ -608,6 +608,9 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
+ CCriticalSection cs_vProcessMsg;
+ std::list<CNetMessage> vProcessMsg;
+
std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 605e142e8d..9963a872e8 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize)
return false;
- auto it = pfrom->vRecvMsg.begin();
- if (it == pfrom->vRecvMsg.end())
- return false;
-
- // end, if an incomplete message is found
- if (!it->complete())
- return false;
-
- // get next message
- CNetMessage msg = std::move(*it);
-
- // at this point, any failure means we can delete the current message
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
-
- fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
+ std::list<CNetMessage> msgs;
+ {
+ LOCK(pfrom->cs_vProcessMsg);
+ if (pfrom->vProcessMsg.empty())
+ return false;
+ // Just take one message
+ msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
+ fMoreWork = !pfrom->vProcessMsg.empty();
+ }
+ CNetMessage& msg(msgs.front());
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start