aboutsummaryrefslogtreecommitdiff
path: root/src/net.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.h')
-rw-r--r--src/net.h86
1 files changed, 70 insertions, 16 deletions
diff --git a/src/net.h b/src/net.h
index 31d17ea76c..908b16f35e 100644
--- a/src/net.h
+++ b/src/net.h
@@ -38,7 +38,9 @@
#include <map>
#include <memory>
#include <optional>
+#include <queue>
#include <thread>
+#include <unordered_set>
#include <vector>
class AddrMan;
@@ -234,6 +236,14 @@ public:
std::string m_type;
CNetMessage(CDataStream&& recv_in) : m_recv(std::move(recv_in)) {}
+ // Only one CNetMessage object will exist for the same message on either
+ // the receive or processing queue. For performance reasons we therefore
+ // delete the copy constructor and assignment operator to avoid the
+ // possibility of copying CNetMessage objects.
+ CNetMessage(CNetMessage&&) = default;
+ CNetMessage(const CNetMessage&) = delete;
+ CNetMessage& operator=(CNetMessage&&) = default;
+ CNetMessage& operator=(const CNetMessage&) = delete;
void SetVersion(int nVersionIn)
{
@@ -340,14 +350,12 @@ struct CNodeOptions
NetPermissionFlags permission_flags = NetPermissionFlags::None;
std::unique_ptr<i2p::sam::Session> i2p_sam_session = nullptr;
bool prefer_evict = false;
+ size_t recv_flood_size{DEFAULT_MAXRECEIVEBUFFER * 1000};
};
/** Information about a peer */
class CNode
{
- friend class CConnman;
- friend struct ConnmanTestMsg;
-
public:
const std::unique_ptr<TransportDeserializer> m_deserializer; // Used only by SocketHandler thread
const std::unique_ptr<const TransportSerializer> m_serializer;
@@ -374,10 +382,6 @@ public:
Mutex m_sock_mutex;
Mutex cs_vRecv;
- RecursiveMutex cs_vProcessMsg;
- std::list<CNetMessage> vProcessMsg GUARDED_BY(cs_vProcessMsg);
- size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0};
-
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};
std::atomic<std::chrono::seconds> m_last_send{0s};
@@ -415,6 +419,27 @@ public:
std::atomic_bool fPauseRecv{false};
std::atomic_bool fPauseSend{false};
+ const ConnectionType m_conn_type;
+
+ /** Move all messages from the received queue to the processing queue. */
+ void MarkReceivedMsgsForProcessing()
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Poll the next message from the processing queue of this connection.
+ *
+ * Returns std::nullopt if the processing queue is empty, or a pair
+ * consisting of the message and a bool that indicates if the processing
+ * queue has more entries. */
+ std::optional<std::pair<CNetMessage, bool>> PollMessage()
+ EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
+
+ /** Account for the total size of a sent message in the per msg type connection stats. */
+ void AccountForSentBytes(const std::string& msg_type, size_t sent_bytes)
+ EXCLUSIVE_LOCKS_REQUIRED(cs_vSend)
+ {
+ mapSendBytesPerMsgType[msg_type] += sent_bytes;
+ }
+
bool IsOutboundOrBlockRelayConn() const {
switch (m_conn_type) {
case ConnectionType::OUTBOUND_FULL_RELAY:
@@ -595,11 +620,15 @@ public:
private:
const NodeId id;
const uint64_t nLocalHostNonce;
- const ConnectionType m_conn_type;
std::atomic<int> m_greatest_common_version{INIT_PROTO_VERSION};
+ const size_t m_recv_flood_size;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
+ Mutex m_msg_process_queue_mutex;
+ std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
+ size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
+
// Our address, as reported by the peer
CService addrLocal GUARDED_BY(m_addr_local_mutex);
mutable Mutex m_addr_local_mutex;
@@ -743,7 +772,7 @@ public:
bool GetNetworkActive() const { return fNetworkActive; };
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
void SetNetworkActive(bool active);
- void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type);
+ void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound, const char* strDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
bool CheckIncomingNonce(uint64_t nonce);
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
@@ -819,9 +848,10 @@ public:
* - Max total outbound connection capacity filled
* - Max connection capacity for type is filled
*/
- bool AddConnection(const std::string& address, ConnectionType conn_type);
+ bool AddConnection(const std::string& address, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
size_t GetNodeCount(ConnectionDirection) const;
+ uint32_t GetMappedAS(const CNetAddr& addr) const;
void GetNodeStats(std::vector<CNodeStats>& vstats) const;
bool DisconnectNode(const std::string& node);
bool DisconnectNode(const CSubNet& subnet);
@@ -856,8 +886,6 @@ public:
/** Get a unique deterministic randomizer. */
CSipHasher GetDeterministicRandomizer(uint64_t id) const;
- unsigned int GetReceiveFloodSize() const;
-
void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
/** Return true if we should disconnect the peer for failing an inactivity check. */
@@ -885,10 +913,10 @@ private:
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
bool InitBinds(const Options& options);
- void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex);
+ void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
- void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
- void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex);
+ void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
+ void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void ThreadI2PAcceptIncoming();
void AcceptConnection(const ListenSocket& hListenSocket);
@@ -955,7 +983,7 @@ private:
bool AlreadyConnectedToAddress(const CAddress& addr);
bool AttemptToEvictConnection();
- CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type);
+ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
@@ -970,6 +998,12 @@ private:
void RecordBytesSent(uint64_t bytes) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex);
/**
+ Return reachable networks for which we have no addresses in addrman and therefore
+ may require loading fixed seeds.
+ */
+ std::unordered_set<Network> GetReachableEmptyNetworks() const;
+
+ /**
* Return vector of current BLOCK_RELAY peers.
*/
std::vector<CAddress> GetCurrentBlockRelayOnlyConns() const;
@@ -1127,6 +1161,26 @@ private:
std::vector<CService> m_onion_binds;
/**
+ * Mutex protecting m_i2p_sam_sessions.
+ */
+ Mutex m_unused_i2p_sessions_mutex;
+
+ /**
+ * A pool of created I2P SAM transient sessions that should be used instead
+ * of creating new ones in order to reduce the load on the I2P network.
+ * Creating a session in I2P is not cheap, thus if this is not empty, then
+ * pick an entry from it instead of creating a new session. If connecting to
+ * a host fails, then the created session is put to this pool for reuse.
+ */
+ std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
+
+ /**
+ * Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
+ * unexpectedly use too much memory.
+ */
+ static constexpr size_t MAX_UNUSED_I2P_SESSIONS_SIZE{10};
+
+ /**
* RAII helper to atomically create a copy of `m_nodes` and add a reference
* to each of the nodes. The nodes are released when this object is destroyed.
*/