diff options
author | Pieter Wuille <pieter.wuille@gmail.com> | 2013-03-01 01:41:28 +0100 |
---|---|---|
committer | Pieter Wuille <sipa@ulyssis.org> | 2013-03-29 23:56:26 +0100 |
commit | 967f24590b43f0f84148f669d886b40fe45aa978 (patch) | |
tree | 5e0116b462a5433907d5400456d0006e14d332ee /src | |
parent | b9ff2970b9fbb24e2fffc449b4ef478d019633d8 (diff) |
Some fixes to CNetMessage processing
* Change CNode::vRecvMsg to be a deque instead of a vector (less copying)
* Make sure to acquire cs_vRecvMsg in CNode::CloseSocketDisconnect (as it
may be called without that lock).
Diffstat (limited to 'src')
-rw-r--r-- | src/main.cpp | 28 | ||||
-rw-r--r-- | src/net.cpp | 11 | ||||
-rw-r--r-- | src/net.h | 10 |
3 files changed, 28 insertions, 21 deletions
diff --git a/src/main.cpp b/src/main.cpp index 3406144595..0c80f23df9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3708,8 +3708,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) { - if (pfrom->vRecvMsg.empty()) - return true; //if (fDebug) // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); @@ -3721,29 +3719,34 @@ bool ProcessMessages(CNode* pfrom) // (4) checksum // (x) data // + bool fOk = true; - unsigned int nMsgPos = 0; - for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++) - { + std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin(); + while (it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway if (pfrom->vSend.size() >= SendBufferSize()) break; - // get next message; end, if an incomplete message is found - CNetMessage& msg = pfrom->vRecvMsg[nMsgPos]; + // get next message + CNetMessage& msg = *it; //if (fDebug) // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", // msg.hdr.nMessageSize, msg.vRecv.size(), // msg.complete() ? "Y" : "N"); + // end, if an incomplete message is found if (!msg.complete()) break; + // at this point, any failure means we can delete the current message + it++; + // Scan for message start if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); - return false; + fOk = false; + break; } // Read header @@ -3779,7 +3782,7 @@ bool ProcessMessages(CNode* pfrom) fRet = ProcessMessage(pfrom, strCommand, vRecv); } if (fShutdown) - return true; + break; } catch (std::ios_base::failure& e) { @@ -3808,11 +3811,8 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - // remove processed messages; one incomplete message may remain - if (nMsgPos > 0) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), - pfrom->vRecvMsg.begin() + nMsgPos); - return true; + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + return fOk; } diff --git a/src/net.cpp b/src/net.cpp index eafb335642..1016d5d9f0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecvMsg.clear(); + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); } } @@ -634,7 +638,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) while (nBytes > 0) { // get current incomplete message, or create a new one - if (vRecvMsg.size() == 0 || + if (vRecvMsg.empty() || vRecvMsg.back().complete()) vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); @@ -1767,6 +1771,9 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); @@ -176,7 +176,7 @@ public: CDataStream vSend; CCriticalSection cs_vSend; - std::vector<CNetMessage> vRecvMsg; + std::deque<CNetMessage> vRecvMsg; CCriticalSection cs_vRecvMsg; int nRecvVersion; @@ -297,8 +297,8 @@ public: unsigned int GetTotalRecvSize() { unsigned int total = 0; - for (unsigned int i = 0; i < vRecvMsg.size(); i++) - total += vRecvMsg[i].vRecv.size(); + BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) + total += msg.vRecv.size() + 24; return total; } @@ -309,8 +309,8 @@ public: void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - for (unsigned int i = 0; i < vRecvMsg.size(); i++) - vRecvMsg[i].SetVersion(nVersionIn); + BOOST_FOREACH(CNetMessage &msg, vRecvMsg) + msg.SetVersion(nVersionIn); } CNode* AddRef(int64 nTimeout=0) |