diff options
Diffstat (limited to 'src/net.h')
-rw-r--r-- | src/net.h | 75 |
1 files changed, 62 insertions, 13 deletions
@@ -38,7 +38,9 @@ #include <map> #include <memory> #include <optional> +#include <queue> #include <thread> +#include <unordered_set> #include <vector> class AddrMan; @@ -345,9 +347,6 @@ struct CNodeOptions /** Information about a peer */ class CNode { - friend class CConnman; - friend struct ConnmanTestMsg; - public: const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread const std::unique_ptr<const TransportSerializer> m_serializer; @@ -374,10 +373,6 @@ public: Mutex m_sock_mutex; Mutex cs_vRecv; - RecursiveMutex cs_vProcessMsg; - std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg); - size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0}; - uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; std::atomic<std::chrono::seconds> m_last_send{0s}; @@ -415,6 +410,30 @@ public: std::atomic_bool fPauseRecv{false}; std::atomic_bool fPauseSend{false}; + const ConnectionType& GetConnectionType() const + { + return m_conn_type; + } + + /** Move all messages from the received queue to the processing queue. */ + void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Poll the next message from the processing queue of this connection. + * + * Returns std::nullopt if the processing queue is empty, or a pair + * consisting of the message and a bool that indicates if the processing + * queue has more entries. */ + std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex); + + /** Account for the total size of a sent message in the per msg type connection stats. */ + void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes) + EXCLUSIVE_LOCKS_REQUIRED(cs_vSend) + { + mapSendBytesPerMsgType[msg_type] += sent_bytes; + } + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: @@ -600,6 +619,10 @@ private: std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread + Mutex m_msg_process_queue_mutex; + std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); + size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0}; + // Our address, as reported by the peer CService addrLocal GUARDED_BY(m_addr_local_mutex); mutable Mutex m_addr_local_mutex; @@ -743,7 +766,7 @@ public: bool GetNetworkActive() const { return fNetworkActive; }; bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; }; void SetNetworkActive(bool active); - void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type); + void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); bool CheckIncomingNonce(uint64_t nonce); bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func); @@ -819,7 +842,7 @@ public: * - Max total outbound connection capacity filled * - Max connection capacity for type is filled */ - bool AddConnection(const std::string& address, ConnectionType conn_type); + bool AddConnection(const std::string& address, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); size_t GetNodeCount(ConnectionDirection) const; void GetNodeStats(std::vector<CNodeStats>& vstats) const; @@ -885,10 +908,10 @@ private: bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions); bool InitBinds(const Options& options); - void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex); + void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex); void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); - void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex); - void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex); + void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex); + void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); void AcceptConnection(const ListenSocket& hListenSocket); @@ -955,7 +978,7 @@ private: bool AlreadyConnectedToAddress(const CAddress& addr); bool AttemptToEvictConnection(); - CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type); + CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const; void DeleteNode(CNode* pnode); @@ -970,6 +993,12 @@ private: void RecordBytesSent(uint64_t bytes) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex); /** + Return reachable networks for which we have no addresses in addrman and therefore + may require loading fixed seeds. + */ + std::unordered_set<Network> GetReachableEmptyNetworks() const; + + /** * Return vector of current BLOCK_RELAY peers. */ std::vector<CAddress> GetCurrentBlockRelayOnlyConns() const; @@ -1127,6 +1156,26 @@ private: std::vector<CService> m_onion_binds; /** + * Mutex protecting m_i2p_sam_sessions. + */ + Mutex m_unused_i2p_sessions_mutex; + + /** + * A pool of created I2P SAM transient sessions that should be used instead + * of creating new ones in order to reduce the load on the I2P network. + * Creating a session in I2P is not cheap, thus if this is not empty, then + * pick an entry from it instead of creating a new session. If connecting to + * a host fails, then the created session is put to this pool for reuse. + */ + std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex); + + /** + * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not + * unexpectedly use too much memory. + */ + static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10}; + + /** * RAII helper to atomically create a copy of `m_nodes` and add a reference * to each of the nodes. The nodes are released when this object is destroyed. */ |