diff options
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 86 |
1 files changed, 70 insertions, 16 deletions
@@ -38,7 +38,9 @@ #include <map> #include <memory> #include <optional> +#include <queue> #include <thread> +#include <unordered_set> #include <vector> class AddrMan; @@ -234,6 +236,14 @@ public: std::string m_type; CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {} + // Only one CNetMessage object will exist for the same message on either + // the receive or processing queue. For performance reasons we therefore + // delete the copy constructor and assignment operator to avoid the + // possibility of copying CNetMessage objects. + CNetMessage(CNetMessage&&) = default; + CNetMessage(const CNetMessage&) = delete; + CNetMessage& operator=(CNetMessage&&) = default; + CNetMessage& operator=(const CNetMessage&) = delete; void SetVersion(int nVersionIn) { @@ -340,14 +350,12 @@ struct CNodeOptions NetPermissionFlags permission_flags = NetPermissionFlags::None; std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr; bool prefer_evict = false; + size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000}; }; /** Information about a peer */ class CNode { - friend class CConnman; - friend struct ConnmanTestMsg; - public: const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread const std::unique_ptr<const TransportSerializer> m_serializer; @@ -374,10 +382,6 @@ public: Mutex m_sock_mutex; Mutex cs_vRecv; - RecursiveMutex cs_vProcessMsg; - std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg); - size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0}; - uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; std::atomic<std::chrono::seconds> m_last_send{0s}; @@ -415,6 +419,27 @@ public: std::atomic_bool fPauseRecv{false}; std::atomic_bool fPauseSend{false}; + const ConnectionType m_conn_type; + + /** Move all messages from the received queue to the processing queue. */ + void MarkReceivedMsgsForProcessing() + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Poll the next message from the processing queue of this connection. + * + * Returns std::nullopt if the processing queue is empty, or a pair + * consisting of the message and a bool that indicates if the processing + * queue has more entries. */ + std::optional<std::pair<CNetMessage, bool>> PollMessage() + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Account for the total size of a sent message in the per msg type connection stats. */ + void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes) + EXCLUSIVE_LOCKS_REQUIRED(cs_vSend) + { + mapSendBytesPerMsgType[msg_type] += sent_bytes; + } + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: @@ -595,11 +620,15 @@ public: private: const NodeId id; const uint64_t nLocalHostNonce; - const ConnectionType m_conn_type; std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION}; + const size_t m_recv_flood_size; std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread + Mutex m_msg_process_queue_mutex; + std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); + size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0}; + // Our address, as reported by the peer CService addrLocal GUARDED_BY(m_addr_local_mutex); mutable Mutex m_addr_local_mutex; @@ -743,7 +772,7 @@ public: bool GetNetworkActive() const { return fNetworkActive; }; bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; }; void SetNetworkActive(bool active); - void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type); + void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); bool CheckIncomingNonce(uint64_t nonce); bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func); @@ -819,9 +848,10 @@ public: * - Max total outbound connection capacity filled * - Max connection capacity for type is filled */ - bool AddConnection(const std::string& address, ConnectionType conn_type); + bool AddConnection(const std::string& address, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); size_t GetNodeCount(ConnectionDirection) const; + uint32_t GetMappedAS(const CNetAddr& addr) const; void GetNodeStats(std::vector<CNodeStats>& vstats) const; bool DisconnectNode(const std::string& node); bool DisconnectNode(const CSubNet& subnet); @@ -856,8 +886,6 @@ public: /** Get a unique deterministic randomizer. */ CSipHasher GetDeterministicRandomizer(uint64_t id) const; - unsigned int GetReceiveFloodSize() const; - void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); /** Return true if we should disconnect the peer for failing an inactivity check. */ @@ -885,10 +913,10 @@ private: bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions); bool InitBinds(const Options& options); - void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); + void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex); void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); - void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); - void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex); + void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex); + void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); void AcceptConnection(const ListenSocket& hListenSocket); @@ -955,7 +983,7 @@ private: bool AlreadyConnectedToAddress(const CAddress& addr); bool AttemptToEvictConnection(); - CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type); + CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const; void DeleteNode(CNode* pnode); @@ -970,6 +998,12 @@ private: void RecordBytesSent(uint64_t bytes) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); /** + Return reachable networks for which we have no addresses in addrman and therefore + may require loading fixed seeds. + */ + std::unordered_set<Network> GetReachableEmptyNetworks() const; + + /** * Return vector of current BLOCK_RELAY peers. */ std::vector<CAddress> GetCurrentBlockRelayOnlyConns() const; @@ -1127,6 +1161,26 @@ private: std::vector<CService> m_onion_binds; /** + * Mutex protecting m_i2p_sam_sessions. + */ + Mutex m_unused_i2p_sessions_mutex; + + /** + * A pool of created I2P SAM transient sessions that should be used instead + * of creating new ones in order to reduce the load on the I2P network. + * Creating a session in I2P is not cheap, thus if this is not empty, then + * pick an entry from it instead of creating a new session. If connecting to + * a host fails, then the created session is put to this pool for reuse. + */ + std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); + + /** + * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not + * unexpectedly use too much memory. + */ + static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10}; + + /** * RAII helper to atomically create a copy of `m_nodes` and add a reference * to each of the nodes. The nodes are released when this object is destroyed. */ |