diff options
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 144 |
1 files changed, 44 insertions, 100 deletions
@@ -162,75 +162,33 @@ public: void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); template<typename Callable> - bool ForEachNodeContinueIf(Callable&& func) - { - LOCK(cs_vNodes); - for (auto&& node : vNodes) - if(!func(node)) - return false; - return true; - }; - - template<typename Callable> - bool ForEachNodeContinueIf(Callable&& func) const - { - LOCK(cs_vNodes); - for (const auto& node : vNodes) - if(!func(node)) - return false; - return true; - }; - - template<typename Callable, typename CallableAfter> - bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post) - { - bool ret = true; - LOCK(cs_vNodes); - for (auto&& node : vNodes) - if(!pre(node)) { - ret = false; - break; - } - post(); - return ret; - }; - - template<typename Callable, typename CallableAfter> - bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post) const - { - bool ret = true; - LOCK(cs_vNodes); - for (const auto& node : vNodes) - if(!pre(node)) { - ret = false; - break; - } - post(); - return ret; - }; - - template<typename Callable> void ForEachNode(Callable&& func) { LOCK(cs_vNodes); - for (auto&& node : vNodes) - func(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + func(node); + } }; template<typename Callable> void ForEachNode(Callable&& func) const { LOCK(cs_vNodes); - for (const auto& node : vNodes) - func(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + func(node); + } }; template<typename Callable, typename CallableAfter> void ForEachNodeThen(Callable&& pre, CallableAfter&& post) { LOCK(cs_vNodes); - for (auto&& node : vNodes) - pre(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + pre(node); + } post(); }; @@ -238,13 +196,13 @@ public: void ForEachNodeThen(Callable&& pre, CallableAfter&& post) const { LOCK(cs_vNodes); - for (const auto& node : vNodes) - pre(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + pre(node); + } post(); }; - void RelayTransaction(const CTransaction& tx); - // Addrman functions size_t GetAddressCount() const; void SetServices(const CService &addr, ServiceFlags nServices); @@ -286,10 +244,8 @@ public: size_t GetNodeCount(NumConnections num); void GetNodeStats(std::vector<CNodeStats>& vstats); - bool DisconnectAddress(const CNetAddr& addr); bool DisconnectNode(const std::string& node); bool DisconnectNode(NodeId id); - bool DisconnectSubnet(const CSubNet& subnet); unsigned int GetSendBufferSize() const; @@ -325,8 +281,11 @@ public: int GetBestHeight() const; /** Get a unique deterministic randomizer. */ - CSipHasher GetDeterministicRandomizer(uint64_t id); + CSipHasher GetDeterministicRandomizer(uint64_t id) const; + + unsigned int GetReceiveFloodSize() const; + void WakeMessageHandler(); private: struct ListenSocket { SOCKET socket; @@ -343,7 +302,7 @@ private: void ThreadSocketHandler(); void ThreadDNSAddressSeed(); - uint64_t CalculateKeyedNetGroup(const CAddress& ad); + uint64_t CalculateKeyedNetGroup(const CAddress& ad) const; CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CSubNet& subNet); @@ -358,6 +317,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode) const; //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -368,12 +328,13 @@ private: void DumpData(); void DumpBanlist(); - unsigned int GetReceiveFloodSize() const; - // Network stats void RecordBytesRecv(uint64_t bytes); void RecordBytesSent(uint64_t bytes); + // Whether the node should be passed out in ForEach* callbacks + static bool NodeFullyConnected(const CNode* pnode); + // Network usage totals CCriticalSection cs_totalBytesRecv; CCriticalSection cs_totalBytesSent; @@ -428,6 +389,9 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic<bool> flagInterruptMsgProc; @@ -445,7 +409,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -610,11 +573,15 @@ public: std::deque<std::vector<unsigned char>> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_vProcessMsg; + std::list<CNetMessage> vProcessMsg; + size_t nProcessQueueSize; + + CCriticalSection cs_sendProcessing; + std::deque<CInv> vRecvGetData; - std::deque<CNetMessage> vRecvMsg; - CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; - int nRecvVersion; + std::atomic<int> nRecvVersion; int64_t nLastSend; int64_t nLastRecv; @@ -623,7 +590,7 @@ public: const CAddress addr; std::string addrName; CService addrLocal; - int nVersion; + std::atomic<int> nVersion; // strSubVer is whatever byte array we read from the wire. However, this field is intended // to be printed out, displayed to humans in various forms and so on. So we sanitize it and // store the sanitized version in cleanSubVer. The original should be used when dealing with @@ -635,7 +602,7 @@ public: bool fAddnode; bool fClient; const bool fInbound; - bool fSuccessfullyConnected; + std::atomic_bool fSuccessfullyConnected; std::atomic_bool fDisconnect; // We use fRelayTxes for two purposes - // a) it allows us to not relay tx invs before receiving the peer's version message @@ -650,6 +617,8 @@ public: const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -723,6 +692,7 @@ private: const ServiceFlags nLocalServices; const int nMyStartingHeight; int nSendVersion; + std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread public: NodeId GetId() const { @@ -743,44 +713,18 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); - // requires LOCK(cs_vRecvMsg) void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - BOOST_FOREACH(CNetMessage &msg, vRecvMsg) - msg.SetVersion(nVersionIn); } - void SetSendVersion(int nVersionIn) - { - // Send version may only be changed in the version message, and - // only one version message is allowed per session. We can therefore - // treat this value as const and even atomic as long as it's only used - // once the handshake is complete. Any attempt to set this twice is an - // error. - assert(nSendVersion == 0); - nSendVersion = nVersionIn; - } - - int GetSendVersion() const + int GetRecvVersion() { - // The send version should always be explicitly set to - // INIT_PROTO_VERSION rather than using this value until the handshake - // is complete. - assert(nSendVersion != 0); - return nSendVersion; + return nRecvVersion; } + void SetSendVersion(int nVersionIn); + int GetSendVersion() const; CNode* AddRef() { |