diff options
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 187 |
1 files changed, 72 insertions, 115 deletions
@@ -162,75 +162,33 @@ public: void PushMessage(CNode* pnode, CSerializedNetMsg&& msg); template<typename Callable> - bool ForEachNodeContinueIf(Callable&& func) - { - LOCK(cs_vNodes); - for (auto&& node : vNodes) - if(!func(node)) - return false; - return true; - }; - - template<typename Callable> - bool ForEachNodeContinueIf(Callable&& func) const - { - LOCK(cs_vNodes); - for (const auto& node : vNodes) - if(!func(node)) - return false; - return true; - }; - - template<typename Callable, typename CallableAfter> - bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post) - { - bool ret = true; - LOCK(cs_vNodes); - for (auto&& node : vNodes) - if(!pre(node)) { - ret = false; - break; - } - post(); - return ret; - }; - - template<typename Callable, typename CallableAfter> - bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post) const - { - bool ret = true; - LOCK(cs_vNodes); - for (const auto& node : vNodes) - if(!pre(node)) { - ret = false; - break; - } - post(); - return ret; - }; - - template<typename Callable> void ForEachNode(Callable&& func) { LOCK(cs_vNodes); - for (auto&& node : vNodes) - func(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + func(node); + } }; template<typename Callable> void ForEachNode(Callable&& func) const { LOCK(cs_vNodes); - for (const auto& node : vNodes) - func(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + func(node); + } }; template<typename Callable, typename CallableAfter> void ForEachNodeThen(Callable&& pre, CallableAfter&& post) { LOCK(cs_vNodes); - for (auto&& node : vNodes) - pre(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + pre(node); + } post(); }; @@ -238,21 +196,19 @@ public: void ForEachNodeThen(Callable&& pre, CallableAfter&& post) const { LOCK(cs_vNodes); - for (const auto& node : vNodes) - pre(node); + for (auto&& node : vNodes) { + if (NodeFullyConnected(node)) + pre(node); + } post(); }; - void RelayTransaction(const CTransaction& tx); - // Addrman functions size_t GetAddressCount() const; void SetServices(const CService &addr, ServiceFlags nServices); void MarkAddressGood(const CAddress& addr); - void AddNewAddress(const CAddress& addr, const CAddress& addrFrom, int64_t nTimePenalty = 0); void AddNewAddresses(const std::vector<CAddress>& vAddr, const CAddress& addrFrom, int64_t nTimePenalty = 0); std::vector<CAddress> GetAddresses(); - void AddressCurrentlyConnected(const CService& addr); // Denial-of-service detection/prevention // The idea is to detect peers that are behaving @@ -286,10 +242,8 @@ public: size_t GetNodeCount(NumConnections num); void GetNodeStats(std::vector<CNodeStats>& vstats); - bool DisconnectAddress(const CNetAddr& addr); bool DisconnectNode(const std::string& node); bool DisconnectNode(NodeId id); - bool DisconnectSubnet(const CSubNet& subnet); unsigned int GetSendBufferSize() const; @@ -325,8 +279,11 @@ public: int GetBestHeight() const; /** Get a unique deterministic randomizer. */ - CSipHasher GetDeterministicRandomizer(uint64_t id); + CSipHasher GetDeterministicRandomizer(uint64_t id) const; + + unsigned int GetReceiveFloodSize() const; + void WakeMessageHandler(); private: struct ListenSocket { SOCKET socket; @@ -343,7 +300,7 @@ private: void ThreadSocketHandler(); void ThreadDNSAddressSeed(); - uint64_t CalculateKeyedNetGroup(const CAddress& ad); + uint64_t CalculateKeyedNetGroup(const CAddress& ad) const; CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CSubNet& subNet); @@ -358,6 +315,7 @@ private: NodeId GetNewNodeId(); + size_t SocketSendData(CNode *pnode) const; //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist @@ -368,12 +326,13 @@ private: void DumpData(); void DumpBanlist(); - unsigned int GetReceiveFloodSize() const; - // Network stats void RecordBytesRecv(uint64_t bytes); void RecordBytesSent(uint64_t bytes); + // Whether the node should be passed out in ForEach* callbacks + static bool NodeFullyConnected(const CNode* pnode); + // Network usage totals CCriticalSection cs_totalBytesRecv; CCriticalSection cs_totalBytesSent; @@ -428,6 +387,9 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + /** flag for waking the message processor. */ + bool fMsgProcWake; + std::condition_variable condMsgProc; std::mutex mutexMsgProc; std::atomic<bool> flagInterruptMsgProc; @@ -445,7 +407,6 @@ void Discover(boost::thread_group& threadGroup); void MapPort(bool fUseUPnP); unsigned short GetListenPort(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); -size_t SocketSendData(CNode *pnode); struct CombinerAll { @@ -601,7 +562,7 @@ class CNode friend class CConnman; public: // socket - ServiceFlags nServices; + std::atomic<ServiceFlags> nServices; ServiceFlags nServicesExpected; SOCKET hSocket; size_t nSendSize; // total size of all vSendMsg entries @@ -609,33 +570,38 @@ public: uint64_t nSendBytes; std::deque<std::vector<unsigned char>> vSendMsg; CCriticalSection cs_vSend; + CCriticalSection cs_hSocket; + CCriticalSection cs_vRecv; + + CCriticalSection cs_vProcessMsg; + std::list<CNetMessage> vProcessMsg; + size_t nProcessQueueSize; + + CCriticalSection cs_sendProcessing; std::deque<CInv> vRecvGetData; - std::deque<CNetMessage> vRecvMsg; - CCriticalSection cs_vRecvMsg; uint64_t nRecvBytes; - int nRecvVersion; + std::atomic<int> nRecvVersion; - int64_t nLastSend; - int64_t nLastRecv; - int64_t nTimeConnected; - int64_t nTimeOffset; + std::atomic<int64_t> nLastSend; + std::atomic<int64_t> nLastRecv; + const int64_t nTimeConnected; + std::atomic<int64_t> nTimeOffset; const CAddress addr; - std::string addrName; - CService addrLocal; - int nVersion; + std::atomic<int> nVersion; // strSubVer is whatever byte array we read from the wire. However, this field is intended // to be printed out, displayed to humans in various forms and so on. So we sanitize it and // store the sanitized version in cleanSubVer. The original should be used when dealing with // the network or wire types and the cleaned string used when displayed or logged. std::string strSubVer, cleanSubVer; + CCriticalSection cs_SubVer; // used for both cleanSubVer and strSubVer bool fWhitelisted; // This peer can bypass DoS banning. bool fFeeler; // If true this node is being used as a short lived feeler. bool fOneShot; bool fAddnode; bool fClient; const bool fInbound; - bool fSuccessfullyConnected; + std::atomic_bool fSuccessfullyConnected; std::atomic_bool fDisconnect; // We use fRelayTxes for two purposes - // a) it allows us to not relay tx invs before receiving the peer's version message @@ -646,10 +612,12 @@ public: CSemaphoreGrant grantOutbound; CCriticalSection cs_filter; CBloomFilter* pfilter; - int nRefCount; + std::atomic<int> nRefCount; const NodeId id; const uint64_t nKeyedNetGroup; + std::atomic_bool fPauseRecv; + std::atomic_bool fPauseSend; protected: mapMsgCmdSize mapSendBytesPerMsgCmd; @@ -657,7 +625,7 @@ protected: public: uint256 hashContinue; - int nStartingHeight; + std::atomic<int> nStartingHeight; // flood relay std::vector<CAddress> vAddrToSend; @@ -695,15 +663,15 @@ public: // Ping time measurement: // The pong reply we're expecting, or 0 if no pong expected. - uint64_t nPingNonceSent; + std::atomic<uint64_t> nPingNonceSent; // Time (in usec) the last ping was sent, or 0 if no ping was ever sent. - int64_t nPingUsecStart; + std::atomic<int64_t> nPingUsecStart; // Last measured round-trip time. - int64_t nPingUsecTime; + std::atomic<int64_t> nPingUsecTime; // Best measured round-trip time. - int64_t nMinPingUsecTime; + std::atomic<int64_t> nMinPingUsecTime; // Whether a ping is requested. - bool fPingQueued; + std::atomic<bool> fPingQueued; // Minimum fee rate with which to filter inv's to this node CAmount minFeeFilter; CCriticalSection cs_feeFilter; @@ -723,6 +691,13 @@ private: const ServiceFlags nLocalServices; const int nMyStartingHeight; int nSendVersion; + std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread + + mutable CCriticalSection cs_addrName; + std::string addrName; + + CService addrLocal; + mutable CCriticalSection cs_addrLocal; public: NodeId GetId() const { @@ -743,44 +718,22 @@ public: return nRefCount; } - // requires LOCK(cs_vRecvMsg) - unsigned int GetTotalRecvSize() - { - unsigned int total = 0; - BOOST_FOREACH(const CNetMessage &msg, vRecvMsg) - total += msg.vRecv.size() + 24; - return total; - } - - // requires LOCK(cs_vRecvMsg) bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); - // requires LOCK(cs_vRecvMsg) void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; - BOOST_FOREACH(CNetMessage &msg, vRecvMsg) - msg.SetVersion(nVersionIn); } - void SetSendVersion(int nVersionIn) + int GetRecvVersion() { - // Send version may only be changed in the version message, and - // only one version message is allowed per session. We can therefore - // treat this value as const and even atomic as long as it's only used - // once the handshake is complete. Any attempt to set this twice is an - // error. - assert(nSendVersion == 0); - nSendVersion = nVersionIn; + return nRecvVersion; } + void SetSendVersion(int nVersionIn); + int GetSendVersion() const; - int GetSendVersion() const - { - // The send version should always be explicitly set to - // INIT_PROTO_VERSION rather than using this value until the handshake - // is complete. - assert(nSendVersion != 0); - return nSendVersion; - } + CService GetAddrLocal() const; + //! May not be called more than once + void SetAddrLocal(const CService& addrLocalIn); CNode* AddRef() { @@ -851,6 +804,10 @@ public: { return nLocalServices; } + + std::string GetAddrName() const; + //! Sets the addrName only if it was not previously set + void MaybeSetAddrName(const std::string& addrNameIn); }; |