aboutsummaryrefslogtreecommitdiff
path: root/src/net.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.h')
-rw-r--r--src/net.h75
1 files changed, 62 insertions, 13 deletions
diff --git a/src/net.h b/src/net.h
index 31d17ea76c..7bb164003e 100644
--- a/src/net.h
+++ b/src/net.h
@@ -38,7 +38,9 @@
#include <map>
#include <memory>
#include <optional>
+#include <queue>
#include <thread>
+#include <unordered_set>
#include <vector>
class AddrMan;
@@ -345,9 +347,6 @@ struct CNodeOptions
/** Information about a peer */
class CNode
{
- friend class CConnman;
- friend struct ConnmanTestMsg;
-
public:
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
const std::unique_ptr<const TransportSerializer> m_serializer;
@@ -374,10 +373,6 @@ public:
Mutex m_sock_mutex;
Mutex cs_vRecv;
- RecursiveMutex cs_vProcessMsg;
- std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
- size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
-
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -415,6 +410,30 @@ public:
std::atomic_bool fPauseRecv{false};
std::atomic_bool fPauseSend{false};
+ const ConnectionType& GetConnectionType() const
+ {
+ return m_conn_type;
+ }
+
+ /** Move all messages from the received queue to the processing queue. */
+ void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size)
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Poll the next message from the processing queue of this connection.
+ *
+ * Returns std::nullopt if the processing queue is empty, or a pair
+ * consisting of the message and a bool that indicates if the processing
+ * queue has more entries. */
+ std::optional<std::pair<CNetMessage, bool>> PollMessage(size_t recv_flood_size)
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Account for the total size of a sent message in the per msg type connection stats. */
+ void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
+ EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
+ {
+ mapSendBytesPerMsgType[msg_type] += sent_bytes;
+ }
+
bool IsOutboundOrBlockRelayConn() const {
switch (m_conn_type) {
case ConnectionType::OUTBOUND_FULL_RELAY:
@@ -600,6 +619,10 @@ private:
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
+ Mutex m_msg_process_queue_mutex;
+ std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
+ size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
+
// Our address, as reported by the peer
CService addrLocal GUARDED_BY(m_addr_local_mutex);
mutable Mutex m_addr_local_mutex;
@@ -743,7 +766,7 @@ public:
bool GetNetworkActive() const { return fNetworkActive; };
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
void SetNetworkActive(bool active);
- void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type);
+ void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
bool CheckIncomingNonce(uint64_t nonce);
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
@@ -819,7 +842,7 @@ public:
* - Max total outbound connection capacity filled
* - Max connection capacity for type is filled
*/
- bool AddConnection(const std::string& address, ConnectionType conn_type);
+ bool AddConnection(const std::string& address, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
size_t GetNodeCount(ConnectionDirection) const;
void GetNodeStats(std::vector<CNodeStats>& vstats) const;
@@ -885,10 +908,10 @@ private:
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
bool InitBinds(const Options& options);
- void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
+ void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
- void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
- void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex);
+ void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
+ void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming();
void AcceptConnection(const ListenSocket& hListenSocket);
@@ -955,7 +978,7 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
- CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type);
+ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
@@ -970,6 +993,12 @@ private:
void RecordBytesSent(uint64_t bytes) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex);
/**
+ Return reachable networks for which we have no addresses in addrman and therefore
+ may require loading fixed seeds.
+ */
+ std::unordered_set<Network> GetReachableEmptyNetworks() const;
+
+ /**
* Return vector of current BLOCK_RELAY peers.
*/
std::vector<CAddress> GetCurrentBlockRelayOnlyConns() const;
@@ -1127,6 +1156,26 @@ private:
std::vector<CService> m_onion_binds;
/**
+ * Mutex protecting m_i2p_sam_sessions.
+ */
+ Mutex m_unused_i2p_sessions_mutex;
+
+ /**
+ * A pool of created I2P SAM transient sessions that should be used instead
+ * of creating new ones in order to reduce the load on the I2P network.
+ * Creating a session in I2P is not cheap, thus if this is not empty, then
+ * pick an entry from it instead of creating a new session. If connecting to
+ * a host fails, then the created session is put to this pool for reuse.
+ */
+ std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
+
+ /**
+ * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
+ * unexpectedly use too much memory.
+ */
+ static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
+
+ /**
* RAII helper to atomically create a copy of `m_nodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/