diff options
author | Jeff Garzik <jgarzik@exmulti.com> | 2012-11-15 19:41:12 -0500 |
---|---|---|
committer | Pieter Wuille <sipa@ulyssis.org> | 2013-03-29 23:56:25 +0100 |
commit | 607dbfdeaf7ec053d959c47c125d60c0b7e7216a (patch) | |
tree | 4a77135cba3d16f4bddea919f2621dfcc7428796 /src/net.cpp | |
parent | ea83336f4eceecbc046e465509b792dd203327bc (diff) |
P2P: parse network datastream into header/data components in socket thread
Replaces CNode::vRecv buffer with a vector of CNetMessage's. This simplifies
ProcessMessages() and eliminates several redundant data copies.
Overview:
* socket thread now parses incoming message datastream into
header/data components, as encapsulated by CNetMessage
* socket thread adds each CNetMessage to a vector inside CNode
* message thread (ProcessMessages) iterates through CNode's CNetMessage vector
Message parsing is made more strict:
* Socket is disconnected, if message larger than MAX_SIZE
or if CMessageHeader deserialization fails (latter is impossible?).
Previously, code would simply eat garbage data all day long.
* Socket is disconnected, if we fail to find pchMessageStart.
We do not search through garbage, to find pchMessageStart. Each
message must begin precisely after the last message ends.
ProcessMessages() always processes a complete message, and is more efficient:
* buffer is always precisely sized, using CDataStream::resize(),
rather than progressively sized in 64k chunks. More efficient
for large messages like "block".
* whole-buffer memory copy eliminated (vRecv -> vMsg)
* other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase)
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 96 |
1 files changed, 83 insertions, 13 deletions
diff --git a/src/net.cpp b/src/net.cpp index 6c8fe3ffc9..0e558228d7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,7 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); + vRecvMsg.clear(); } } @@ -628,6 +628,78 @@ void CNode::copyStats(CNodeStats &stats) } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.size() == 0 || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + + if (handled < 0) + return false; + + pch += handled; + nBytes -= handled; + } + + return true; +} + +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); + + return nCopy; +} + +int CNetMessage::readData(const char *pch, unsigned int nBytes) +{ + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + @@ -676,7 +748,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -708,7 +780,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -873,15 +945,12 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { + if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); + printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); pnode->CloseSocketDisconnect(); } else { @@ -890,8 +959,8 @@ void ThreadSocketHandler2(void* parg) int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); } else if (nBytes == 0) @@ -1693,9 +1762,10 @@ void ThreadMessageHandler2(void* parg) { // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } if (fShutdown) return; |