diff options
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 206 |
1 files changed, 132 insertions, 74 deletions
@@ -15,6 +15,7 @@ #endif #include "mruset.h" +#include "limitedmap.h" #include "netbase.h" #include "protocol.h" #include "addrman.h" @@ -27,7 +28,7 @@ extern int nBestHeight; -inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } +inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); } inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); } void AddOneShot(std::string strDest); @@ -36,12 +37,13 @@ bool GetMyExternalIP(CNetAddr& ipRet); void AddressCurrentlyConnected(const CService& addr); CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CService& ip); -CNode* ConnectNode(CAddress addrConnect, const char *strDest = NULL, int64 nTimeout=0); -void MapPort(); +CNode* ConnectNode(CAddress addrConnect, const char *strDest = NULL); +void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string())); -void StartNode(void* parg); +void StartNode(boost::thread_group& threadGroup); bool StopNode(); +void SocketSendData(CNode *pnode); enum { @@ -49,7 +51,6 @@ enum LOCAL_IF, // address a local interface listens on LOCAL_BIND, // address explicit bound to LOCAL_UPNP, // address reported by UPnP - LOCAL_IRC, // address reported by IRC (deprecated) LOCAL_HTTP, // address reported by whatismyip.com and similar LOCAL_MANUAL, // address explicitly specified (-externalip=) @@ -69,38 +70,18 @@ void SetReachable(enum Network net, bool fFlag = true); CAddress GetLocalAddress(const CNetAddr *paddrPeer = NULL); -/** Thread types */ -enum threadId -{ - THREAD_SOCKETHANDLER, - THREAD_OPENCONNECTIONS, - THREAD_MESSAGEHANDLER, - THREAD_MINER, - THREAD_RPCLISTENER, - THREAD_UPNP, - THREAD_DNSSEED, - THREAD_ADDEDCONNECTIONS, - THREAD_DUMPADDRESS, - THREAD_RPCHANDLER, - THREAD_IMPORT, - THREAD_SCRIPTCHECK, - - THREAD_MAX -}; - extern bool fDiscover; -extern bool fUseUPnP; extern uint64 nLocalServices; extern uint64 nLocalHostNonce; -extern boost::array<int, THREAD_MAX> vnThreadsRunning; extern CAddrMan addrman; +extern int nMaxConnections; extern std::vector<CNode*> vNodes; extern CCriticalSection cs_vNodes; extern std::map<CInv, CDataStream> mapRelay; extern std::deque<std::pair<int64, CInv> > vRelayExpiration; extern CCriticalSection cs_mapRelay; -extern std::map<CInv, int64> mapAlreadyAskedFor; +extern limitedmap<CInv, int64> mapAlreadyAskedFor; extern std::vector<std::string> vAddedNodes; extern CCriticalSection cs_vAddedNodes; @@ -119,9 +100,49 @@ public: int nVersion; std::string strSubVer; bool fInbound; - int64 nReleaseTime; int nStartingHeight; int nMisbehavior; + uint64 nSendBytes; + uint64 nRecvBytes; + bool fSyncNode; +}; + + + + +class CNetMessage { +public: + bool in_data; // parsing header (false) or data (true) + + CDataStream hdrbuf; // partially received header + CMessageHeader hdr; // complete header + unsigned int nHdrPos; + + CDataStream vRecv; // received message data + unsigned int nDataPos; + + CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + hdrbuf.resize(24); + in_data = false; + nHdrPos = 0; + nDataPos = 0; + } + + bool complete() const + { + if (!in_data) + return false; + return (hdr.nMessageSize == nDataPos); + } + + void SetVersion(int nVersionIn) + { + hdrbuf.SetVersion(nVersionIn); + vRecv.SetVersion(nVersionIn); + } + + int readHeader(const char *pch, unsigned int nBytes); + int readData(const char *pch, unsigned int nBytes); }; @@ -135,16 +156,23 @@ public: // socket uint64 nServices; SOCKET hSocket; - CDataStream vSend; - CDataStream vRecv; + CDataStream ssSend; + size_t nSendSize; // total size of all vSendMsg entries + size_t nSendOffset; // offset inside the first vSendMsg already sent + uint64 nSendBytes; + std::deque<CSerializeData> vSendMsg; CCriticalSection cs_vSend; - CCriticalSection cs_vRecv; + + std::deque<CInv> vRecvGetData; + std::deque<CNetMessage> vRecvMsg; + CCriticalSection cs_vRecvMsg; + uint64 nRecvBytes; + int nRecvVersion; + int64 nLastSend; int64 nLastRecv; int64 nLastSendEmpty; int64 nTimeConnected; - int nHeaderStart; - unsigned int nMessageStart; CAddress addr; std::string addrName; CService addrLocal; @@ -164,8 +192,8 @@ public: CSemaphoreGrant grantOutbound; CCriticalSection cs_filter; CBloomFilter* pfilter; -protected: int nRefCount; +protected: // Denial-of-service detection/prevention // Key is IP address, value is banned-until-time @@ -174,11 +202,11 @@ protected: int nMisbehavior; public: - int64 nReleaseTime; uint256 hashContinue; CBlockIndex* pindexLastGetBlocksBegin; uint256 hashLastGetBlocksEnd; int nStartingHeight; + bool fStartSync; // flood relay std::vector<CAddress> vAddrToSend; @@ -192,16 +220,17 @@ public: CCriticalSection cs_inventory; std::multimap<int64, CInv> mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION), vRecv(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; + nRecvVersion = MIN_PROTO_VERSION; nLastSend = 0; nLastRecv = 0; + nSendBytes = 0; + nRecvBytes = 0; nLastSendEmpty = GetTime(); nTimeConnected = GetTime(); - nHeaderStart = -1; - nMessageStart = -1; addr = addrIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -213,11 +242,13 @@ public: fSuccessfullyConnected = false; fDisconnect = false; nRefCount = 0; - nReleaseTime = 0; + nSendSize = 0; + nSendOffset = 0; hashContinue = 0; pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; nStartingHeight = -1; + fStartSync = false; fGetAddr = false; nMisbehavior = 0; fRelayTxes = false; @@ -225,7 +256,7 @@ public: pfilter = NULL; // Be shy and don't send version until we hear - if (!fInbound) + if (hSocket != INVALID_SOCKET && !fInbound) PushVersion(); } @@ -248,15 +279,33 @@ public: int GetRefCount() { - return std::max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0); + assert(nRefCount >= 0); + return nRefCount; } - CNode* AddRef(int64 nTimeout=0) + // requires LOCK(cs_vRecvMsg) + unsigned int GetTotalRecvSize() { - if (nTimeout != 0) - nReleaseTime = std::max(nReleaseTime, GetTime() + nTimeout); - else - nRefCount++; + unsigned int total = 0; + BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) + total += msg.vRecv.size() + 24; + return total; + } + + // requires LOCK(cs_vRecvMsg) + bool ReceiveMsgBytes(const char *pch, unsigned int nBytes); + + // requires LOCK(cs_vRecvMsg) + void SetRecvVersion(int nVersionIn) + { + nRecvVersion = nVersionIn; + BOOST_FOREACH(CNetMessage &msg, vRecvMsg) + msg.SetVersion(nVersionIn); + } + + CNode* AddRef() + { + nRefCount++; return this; } @@ -303,7 +352,12 @@ public: { // We're using mapAskFor as a priority queue, // the key is the earliest time the request can be sent - int64& nRequestTime = mapAlreadyAskedFor[inv]; + int64 nRequestTime; + limitedmap<CInv, int64>::const_iterator it = mapAlreadyAskedFor.find(inv); + if (it != mapAlreadyAskedFor.end()) + nRequestTime = it->second; + else + nRequestTime = 0; if (fDebugNet) printf("askfor %s %"PRI64d" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str()); @@ -316,6 +370,10 @@ public: // Each retry is 2 minutes after the last nRequestTime = std::max(nRequestTime + 2 * 60 * 1000000, nNow); + if (it != mapAlreadyAskedFor.end()) + mapAlreadyAskedFor.update(it, nRequestTime); + else + mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime)); mapAskFor.insert(std::make_pair(nRequestTime, inv)); } @@ -325,11 +383,8 @@ public: void BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) { ENTER_CRITICAL_SECTION(cs_vSend); - if (nHeaderStart != -1) - AbortMessage(); - nHeaderStart = vSend.size(); - vSend << CMessageHeader(pszCommand, 0); - nMessageStart = vSend.size(); + assert(ssSend.size() == 0); + ssSend << CMessageHeader(pszCommand, 0); if (fDebug) printf("sending: %s ", pszCommand); } @@ -337,11 +392,8 @@ public: // TODO: Document the precondition of this function. Is cs_vSend locked? void AbortMessage() UNLOCK_FUNCTION(cs_vSend) { - if (nHeaderStart < 0) - return; - vSend.resize(nHeaderStart); - nHeaderStart = -1; - nMessageStart = -1; + ssSend.clear(); + LEAVE_CRITICAL_SECTION(cs_vSend); if (fDebug) @@ -358,26 +410,32 @@ public: return; } - if (nHeaderStart < 0) + if (ssSend.size() == 0) return; // Set the size - unsigned int nSize = vSend.size() - nMessageStart; - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); + unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; + memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); // Set the checksum - uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); + assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); + memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); if (fDebug) { printf("(%d bytes)\n", nSize); } - nHeaderStart = -1; - nMessageStart = -1; + std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); + ssSend.GetAndClear(*it); + nSendSize += (*it).size(); + + // If write queue empty, attempt "optimistic write" + if (it == vSendMsg.begin()) + SocketSendData(this); + LEAVE_CRITICAL_SECTION(cs_vSend); } @@ -404,7 +462,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1; + ssSend << a1; EndMessage(); } catch (...) @@ -420,7 +478,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2; + ssSend << a1 << a2; EndMessage(); } catch (...) @@ -436,7 +494,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3; + ssSend << a1 << a2 << a3; EndMessage(); } catch (...) @@ -452,7 +510,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4; + ssSend << a1 << a2 << a3 << a4; EndMessage(); } catch (...) @@ -468,7 +526,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5; + ssSend << a1 << a2 << a3 << a4 << a5; EndMessage(); } catch (...) @@ -484,7 +542,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6; + ssSend << a1 << a2 << a3 << a4 << a5 << a6; EndMessage(); } catch (...) @@ -500,7 +558,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; EndMessage(); } catch (...) @@ -516,7 +574,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; EndMessage(); } catch (...) @@ -532,7 +590,7 @@ public: try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; EndMessage(); } catch (...) |