diff options
author | Jeff Garzik <jgarzik@exmulti.com> | 2012-11-15 19:41:12 -0500 |
---|---|---|
committer | Pieter Wuille <sipa@ulyssis.org> | 2013-03-29 23:56:25 +0100 |
commit | 607dbfdeaf7ec053d959c47c125d60c0b7e7216a (patch) | |
tree | 4a77135cba3d16f4bddea919f2621dfcc7428796 /src/net.h | |
parent | ea83336f4eceecbc046e465509b792dd203327bc (diff) |
P2P: parse network datastream into header/data components in socket thread
Replaces CNode::vRecv buffer with a vector of CNetMessage's. This simplifies
ProcessMessages() and eliminates several redundant data copies.
Overview:
* socket thread now parses incoming message datastream into
header/data components, as encapsulated by CNetMessage
* socket thread adds each CNetMessage to a vector inside CNode
* message thread (ProcessMessages) iterates through CNode's CNetMessage vector
Message parsing is made more strict:
* Socket is disconnected, if message larger than MAX_SIZE
or if CMessageHeader deserialization fails (latter is impossible?).
Previously, code would simply eat garbage data all day long.
* Socket is disconnected, if we fail to find pchMessageStart.
We do not search through garbage, to find pchMessageStart. Each
message must begin precisely after the last message ends.
ProcessMessages() always processes a complete message, and is more efficient:
* buffer is always precisely sized, using CDataStream::resize(),
rather than progressively sized in 64k chunks. More efficient
for large messages like "block".
* whole-buffer memory copy eliminated (vRecv -> vMsg)
* other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase)
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 70 |
1 files changed, 66 insertions, 4 deletions
@@ -27,7 +27,7 @@ extern int nBestHeight; -inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } +inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); } void AddOneShot(std::string strDest); @@ -126,6 +126,44 @@ public: +class CNetMessage { +public: + bool in_data; // parsing header (false) or data (true) + + CDataStream hdrbuf; // partially received header + CMessageHeader hdr; // complete header + unsigned int nHdrPos; + + CDataStream vRecv; // received message data + unsigned int nDataPos; + + CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + hdrbuf.resize(24); + in_data = false; + nHdrPos = 0; + nDataPos = 0; + } + + bool complete() const + { + if (!in_data) + return false; + return (hdr.nMessageSize == nDataPos); + } + + void SetVersion(int nVersionIn) + { + hdrbuf.SetVersion(nVersionIn); + vRecv.SetVersion(nVersionIn); + } + + int readHeader(const char *pch, unsigned int nBytes); + int readData(const char *pch, unsigned int nBytes); +}; + + + + /** Information about a peer */ class CNode @@ -135,9 +173,12 @@ public: uint64 nServices; SOCKET hSocket; CDataStream vSend; - CDataStream vRecv; CCriticalSection cs_vSend; - CCriticalSection cs_vRecv; + + std::vector<CNetMessage> vRecvMsg; + CCriticalSection cs_vRecvMsg; + int nRecvVersion; + int64 nLastSend; int64 nLastRecv; int64 nLastSendEmpty; @@ -191,10 +232,11 @@ public: CCriticalSection cs_inventory; std::multimap<int64, CInv> mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION), vRecv(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; + nRecvVersion = MIN_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; nLastSendEmpty = GetTime(); @@ -250,6 +292,26 @@ public: return std::max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0); } + // requires LOCK(cs_vRecvMsg) + unsigned int GetTotalRecvSize() + { + unsigned int total = 0; + for (unsigned int i = 0; i < vRecvMsg.size(); i++) + total += vRecvMsg[i].vRecv.size(); + return total; + } + + // requires LOCK(cs_vRecvMsg) + bool ReceiveMsgBytes(const char *pch, unsigned int nBytes); + + // requires LOCK(cs_vRecvMsg) + void SetRecvVersion(int nVersionIn) + { + nRecvVersion = nVersionIn; + for (unsigned int i = 0; i < vRecvMsg.size(); i++) + vRecvMsg[i].SetVersion(nVersionIn); + } + CNode* AddRef(int64 nTimeout=0) { if (nTimeout != 0) |