aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCory Fields <cory-nospam-@coryfields.com>2016-12-31 02:05:28 -0500
committerCory Fields <cory-nospam-@coryfields.com>2017-01-12 23:05:25 -0500
commit4d712e366ca7fffaf96394ef01c9246482c0d92e (patch)
treeaa61383f6bf692c4fda2f697ad6fba3e4c199913
parentc5a8b1b946b1ab0bb82bd4270b2a40f5731abcff (diff)
downloadbitcoin-4d712e366ca7fffaf96394ef01c9246482c0d92e.tar.xz
net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
-rw-r--r--src/net.cpp12
-rw-r--r--src/net.h3
-rw-r--r--src/net_processing.cpp25
3 files changed, 24 insertions, 16 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 947f016798..df2109e3f7 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
- if (notify)
+ if (notify) {
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ }
WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
diff --git a/src/net.h b/src/net.h
index 4fc41bddac..21864e73d1 100644
--- a/src/net.h
+++ b/src/net.h
@@ -608,6 +608,9 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
+ CCriticalSection cs_vProcessMsg;
+ std::list<CNetMessage> vProcessMsg;
+
std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 605e142e8d..9963a872e8 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize)
return false;
- auto it = pfrom->vRecvMsg.begin();
- if (it == pfrom->vRecvMsg.end())
- return false;
-
- // end, if an incomplete message is found
- if (!it->complete())
- return false;
-
- // get next message
- CNetMessage msg = std::move(*it);
-
- // at this point, any failure means we can delete the current message
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
-
- fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
+ std::list<CNetMessage> msgs;
+ {
+ LOCK(pfrom->cs_vProcessMsg);
+ if (pfrom->vProcessMsg.empty())
+ return false;
+ // Just take one message
+ msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
+ fMoreWork = !pfrom->vProcessMsg.empty();
+ }
+ CNetMessage& msg(msgs.front());
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start