diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 348 |
1 files changed, 225 insertions, 123 deletions
diff --git a/src/net.cpp b/src/net.cpp index bfa2738e45..6b2ef5f43d 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -35,6 +35,7 @@ #include <util/threadinterrupt.h> #include <util/trace.h> #include <util/translation.h> +#include <util/vector.h> #ifdef WIN32 #include <string.h> @@ -157,39 +158,34 @@ uint16_t GetListenPort() return static_cast<uint16_t>(gArgs.GetIntArg("-port", Params().GetDefaultPort())); } -// find 'best' local address for a particular peer -bool GetLocal(CService& addr, const CNode& peer) +// Determine the "best" local address for a particular peer. +[[nodiscard]] static std::optional<CService> GetLocal(const CNode& peer) { - if (!fListen) - return false; + if (!fListen) return std::nullopt; + std::optional<CService> addr; int nBestScore = -1; int nBestReachability = -1; { LOCK(g_maplocalhost_mutex); - for (const auto& entry : mapLocalHost) - { + for (const auto& [local_addr, local_service_info] : mapLocalHost) { // For privacy reasons, don't advertise our privacy-network address // to other networks and don't advertise our other-network address // to privacy networks. - const Network our_net{entry.first.GetNetwork()}; - const Network peers_net{peer.ConnectedThroughNetwork()}; - if (our_net != peers_net && - (our_net == NET_ONION || our_net == NET_I2P || - peers_net == NET_ONION || peers_net == NET_I2P)) { + if (local_addr.GetNetwork() != peer.ConnectedThroughNetwork() + && (local_addr.IsPrivacyNet() || peer.IsConnectedThroughPrivacyNet())) { continue; } - int nScore = entry.second.nScore; - int nReachability = entry.first.GetReachabilityFrom(peer.addr); - if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) - { - addr = CService(entry.first, entry.second.nPort); + const int nScore{local_service_info.nScore}; + const int nReachability{local_addr.GetReachabilityFrom(peer.addr)}; + if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) { + addr.emplace(CService{local_addr, local_service_info.nPort}); nBestReachability = nReachability; nBestScore = nScore; } } } - return nBestScore >= 0; + return addr; } //! Convert the serialized seeds into usable address objects. @@ -215,17 +211,13 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn) return vSeedsOut; } -// get best local address for a particular peer as a CAddress -// Otherwise, return the unroutable 0.0.0.0 but filled in with +// Determine the "best" local address for a particular peer. +// If none, return the unroutable 0.0.0.0 but filled in with // the normal parameters, since the IP may be changed to a useful // one by discovery. CService GetLocalAddress(const CNode& peer) { - CService addr; - if (GetLocal(addr, peer)) { - return addr; - } - return CService{CNetAddr(), GetListenPort()}; + return GetLocal(peer).value_or(CService{CNetAddr(), GetListenPort()}); } static int GetnScore(const CService& addr) @@ -236,7 +228,7 @@ static int GetnScore(const CService& addr) } // Is our peer's addrLocal potentially useful as an external IP source? -bool IsPeerAddrLocalGood(CNode *pnode) +[[nodiscard]] static bool IsPeerAddrLocalGood(CNode *pnode) { CService addrLocal = pnode->GetAddrLocal(); return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() && @@ -288,7 +280,7 @@ std::optional<CService> GetLocalAddrForPeer(CNode& node) CService MaybeFlipIPv6toCJDNS(const CService& service) { CService ret{service}; - if (ret.m_net == NET_IPV6 && ret.m_addr[0] == 0xfc && IsReachable(NET_CJDNS)) { + if (ret.IsIPv6() && ret.HasCJDNSPrefix() && IsReachable(NET_CJDNS)) { ret.m_net = NET_CJDNS; } return ret; @@ -447,7 +439,7 @@ static CAddress GetBindAddress(const Sock& sock) return addr_bind; } -CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) +CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); @@ -465,7 +457,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo } } - LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying connection %s lastseen=%.1fhrs\n", + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying %s connection %s lastseen=%.1fhrs\n", + use_v2transport ? "v2" : "v1", pszDest ? pszDest : addrConnect.ToStringAddrPort(), Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime)); @@ -504,7 +497,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo const bool use_proxy{GetProxy(addrConnect.GetNetwork(), proxy)}; bool proxyConnectionFailed = false; - if (addrConnect.GetNetwork() == NET_I2P && use_proxy) { + if (addrConnect.IsI2P() && use_proxy) { i2p::Connection conn; if (m_i2p_sam_session) { @@ -588,6 +581,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session), .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, }); pnode->AddRef(); @@ -636,6 +630,11 @@ Network CNode::ConnectedThroughNetwork() const return m_inbound_onion ? NET_ONION : addr.GetNetClass(); } +bool CNode::IsConnectedThroughPrivacyNet() const +{ + return m_inbound_onion || addr.IsPrivacyNet(); +} + #undef X #define X(name) stats.name = name void CNode::CopyStats(CNodeStats& stats) @@ -668,6 +667,9 @@ void CNode::CopyStats(CNodeStats& stats) LOCK(cs_vRecv); X(mapRecvBytesPerMsgType); X(nRecvBytes); + Transport::Info info = m_transport->GetInfo(); + stats.m_transport_type = info.transport_type; + if (info.session_id) stats.m_session_id = HexStr(*info.session_id); } X(m_permission_flags); @@ -735,6 +737,11 @@ V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noex Reset(); } +Transport::Info V1Transport::GetInfo() const noexcept +{ + return {.transport_type = TransportProtocolType::V1, .session_id = {}}; +} + int V1Transport::readHeader(Span<const uint8_t> msg_bytes) { AssertLockHeld(m_recv_mutex); @@ -858,7 +865,7 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept // serialize header m_header_to_send.clear(); - CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; + CVectorWriter{INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; // update state m_message_to_send = std::move(msg); @@ -899,8 +906,7 @@ void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept m_bytes_sent = 0; } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { // We're done sending a message's data. Wipe the data vector to reduce memory consumption. - m_message_to_send.data.clear(); - m_message_to_send.data.shrink_to_fit(); + ClearShrink(m_message_to_send.data); m_bytes_sent = 0; } } @@ -1006,8 +1012,7 @@ void V2Transport::StartSendingHandshake() noexcept m_send_buffer.resize(EllSwiftPubKey::size() + m_send_garbage.size()); std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin()); std::copy(m_send_garbage.begin(), m_send_garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size()); - // We cannot wipe m_send_garbage as it will still be used to construct the garbage - // authentication packet. + // We cannot wipe m_send_garbage as it will still be used as AAD later in the handshake. } V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span<const std::byte> ent32, std::vector<uint8_t> garbage) noexcept : @@ -1041,9 +1046,6 @@ void V2Transport::SetReceiveState(RecvState recv_state) noexcept Assume(recv_state == RecvState::GARB_GARBTERM); break; case RecvState::GARB_GARBTERM: - Assume(recv_state == RecvState::GARBAUTH); - break; - case RecvState::GARBAUTH: Assume(recv_state == RecvState::VERSION); break; case RecvState::VERSION: @@ -1123,8 +1125,8 @@ void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept SetReceiveState(RecvState::V1); SetSendState(SendState::V1); // Reset v2 transport buffers to save memory. - m_recv_buffer = {}; - m_send_buffer = {}; + ClearShrink(m_recv_buffer); + ClearShrink(m_send_buffer); } else { // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come. } @@ -1175,25 +1177,15 @@ bool V2Transport::ProcessReceivedKeyBytes() noexcept m_cipher.GetSendGarbageTerminator().end(), MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin()); - // Construct garbage authentication packet in the send buffer (using the garbage data which - // is still there). - m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION); - m_cipher.Encrypt( - /*contents=*/{}, - /*aad=*/MakeByteSpan(m_send_garbage), - /*ignore=*/false, - /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION)); - // We no longer need the garbage. - m_send_garbage.clear(); - m_send_garbage.shrink_to_fit(); - - // Construct version packet in the send buffer. + // Construct version packet in the send buffer, with the sent garbage data as AAD. m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()); m_cipher.Encrypt( /*contents=*/VERSION_CONTENTS, - /*aad=*/{}, + /*aad=*/MakeByteSpan(m_send_garbage), /*ignore=*/false, /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size())); + // We no longer need the garbage. + ClearShrink(m_send_garbage); } else { // We still have to receive more key bytes. } @@ -1207,11 +1199,11 @@ bool V2Transport::ProcessReceivedGarbageBytes() noexcept Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) { if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) { - // Garbage terminator received. Switch to receiving garbage authentication packet. - m_recv_garbage = std::move(m_recv_buffer); - m_recv_garbage.resize(m_recv_garbage.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN); + // Garbage terminator received. Store garbage to authenticate it as AAD later. + m_recv_aad = std::move(m_recv_buffer); + m_recv_aad.resize(m_recv_aad.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN); m_recv_buffer.clear(); - SetReceiveState(RecvState::GARBAUTH); + SetReceiveState(RecvState::VERSION); } else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) { // We've reached the maximum length for garbage + garbage terminator, and the // terminator still does not match. Abort. @@ -1230,8 +1222,7 @@ bool V2Transport::ProcessReceivedGarbageBytes() noexcept bool V2Transport::ProcessReceivedPacketBytes() noexcept { AssertLockHeld(m_recv_mutex); - Assume(m_recv_state == RecvState::GARBAUTH || m_recv_state == RecvState::VERSION || - m_recv_state == RecvState::APP); + Assume(m_recv_state == RecvState::VERSION || m_recv_state == RecvState::APP); // The maximum permitted contents length for a packet, consisting of: // - 0x00 byte: indicating long message type encoding @@ -1254,50 +1245,42 @@ bool V2Transport::ProcessReceivedPacketBytes() noexcept // as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point. m_recv_decode_buffer.resize(m_recv_len); bool ignore{false}; - Span<const std::byte> aad; - if (m_recv_state == RecvState::GARBAUTH) aad = MakeByteSpan(m_recv_garbage); bool ret = m_cipher.Decrypt( /*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN), - /*aad=*/aad, + /*aad=*/MakeByteSpan(m_recv_aad), /*ignore=*/ignore, /*contents=*/MakeWritableByteSpan(m_recv_decode_buffer)); if (!ret) { LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid); return false; } + // We have decrypted a valid packet with the AAD we expected, so clear the expected AAD. + ClearShrink(m_recv_aad); // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG. RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4)); - // At this point we have a valid packet decrypted into m_recv_decode_buffer. Depending on - // the current state, decide what to do with it. - switch (m_recv_state) { - case RecvState::GARBAUTH: - // Ignore flag does not matter for garbage authentication. Any valid packet functions - // as authentication. Receive and process the version packet next. - SetReceiveState(RecvState::VERSION); - m_recv_garbage = {}; - break; - case RecvState::VERSION: - if (!ignore) { + // At this point we have a valid packet decrypted into m_recv_decode_buffer. If it's not a + // decoy, which we simply ignore, use the current state to decide what to do with it. + if (!ignore) { + switch (m_recv_state) { + case RecvState::VERSION: // Version message received; transition to application phase. The contents is // ignored, but can be used for future extensions. SetReceiveState(RecvState::APP); - } - break; - case RecvState::APP: - if (!ignore) { + break; + case RecvState::APP: // Application message decrypted correctly. It can be extracted using GetMessage(). SetReceiveState(RecvState::APP_READY); + break; + default: + // Any other state is invalid (this function should not have been called). + Assume(false); } - break; - default: - // Any other state is invalid (this function should not have been called). - Assume(false); } // Wipe the receive buffer where the next packet will be received into. - m_recv_buffer = {}; + ClearShrink(m_recv_buffer); // In all but APP_READY state, we can wipe the decoded contents. - if (m_recv_state != RecvState::APP_READY) m_recv_decode_buffer = {}; + if (m_recv_state != RecvState::APP_READY) ClearShrink(m_recv_decode_buffer); } else { // We either have less than 3 bytes, so we don't know the packet's length yet, or more // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive. @@ -1328,7 +1311,6 @@ size_t V2Transport::GetMaxBytesToProcess() noexcept case RecvState::GARB_GARBTERM: // Process garbage bytes one by one (because terminator may appear anywhere). return 1; - case RecvState::GARBAUTH: case RecvState::VERSION: case RecvState::APP: // These three states all involve decoding a packet. Process the length descriptor first, @@ -1382,7 +1364,6 @@ bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept // bytes). m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); break; - case RecvState::GARBAUTH: case RecvState::VERSION: case RecvState::APP: { // During states where a packet is being received, as much as is expected but never @@ -1426,7 +1407,6 @@ bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept if (!ProcessReceivedGarbageBytes()) return false; break; - case RecvState::GARBAUTH: case RecvState::VERSION: case RecvState::APP: if (!ProcessReceivedPacketBytes()) return false; @@ -1511,7 +1491,7 @@ CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid); reject_message = true; } - m_recv_decode_buffer = {}; + ClearShrink(m_recv_decode_buffer); SetReceiveState(RecvState::APP); return msg; @@ -1545,7 +1525,7 @@ bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer)); m_send_type = msg.m_type; // Release memory - msg.data = {}; + ClearShrink(msg.data); return true; } @@ -1572,15 +1552,39 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent); + if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) { + LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid); + } + m_send_pos += bytes_sent; Assume(m_send_pos <= m_send_buffer.size()); + if (m_send_pos >= CMessageHeader::HEADER_SIZE) { + m_sent_v1_header_worth = true; + } // Wipe the buffer when everything is sent. if (m_send_pos == m_send_buffer.size()) { m_send_pos = 0; - m_send_buffer = {}; + ClearShrink(m_send_buffer); } } +bool V2Transport::ShouldReconnectV1() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + AssertLockNotHeld(m_recv_mutex); + // Only outgoing connections need reconnection. + if (!m_initiating) return false; + + LOCK(m_recv_mutex); + // We only reconnect in the very first state and when the receive buffer is empty. Together + // these conditions imply nothing has been received so far. + if (m_recv_state != RecvState::KEY) return false; + if (!m_recv_buffer.empty()) return false; + // Check if we've sent enough for the other side to disconnect us (if it was V1). + LOCK(m_send_mutex); + return m_sent_v1_header_worth; +} + size_t V2Transport::GetSendMemoryUsage() const noexcept { AssertLockNotHeld(m_send_mutex); @@ -1590,6 +1594,27 @@ size_t V2Transport::GetSendMemoryUsage() const noexcept return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer); } +Transport::Info V2Transport::GetInfo() const noexcept +{ + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); + if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo(); + + Transport::Info info; + + // Do not report v2 and session ID until the version packet has been received + // and verified (confirming that the other side very likely has the same keys as us). + if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY && + m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) { + info.transport_type = TransportProtocolType::V2; + info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID())); + } else { + info.transport_type = TransportProtocolType::DETECTING; + } + + return info; +} + std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); @@ -1639,7 +1664,9 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const // Notify transport that bytes have been processed. node.m_transport->MarkBytesSent(nBytes); // Update statistics per message type. - node.AccountForSentBytes(msg_type, nBytes); + if (!msg_type.empty()) { // don't report v2 handshake bytes for now + node.AccountForSentBytes(msg_type, nBytes); + } nSentSize += nBytes; if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more @@ -1822,6 +1849,10 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock, } const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end(); + // The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is + // detected, so use it whenever we signal NODE_P2P_V2. + const bool use_v2transport(nodeServices & NODE_P2P_V2); + CNode* pnode = new CNode(id, std::move(sock), addr, @@ -1835,6 +1866,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock, .permission_flags = permission_flags, .prefer_evict = discouraged, .recv_flood_size = nReceiveFloodSize, + .use_v2transport = use_v2transport, }); pnode->AddRef(); m_msgproc->InitializeNode(*pnode, nodeServices); @@ -1883,12 +1915,19 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ CSemaphoreGrant grant(*semOutbound, true); if (!grant) return false; - OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type); + OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false); return true; } void CConnman::DisconnectNodes() { + AssertLockNotHeld(m_nodes_mutex); + AssertLockNotHeld(m_reconnections_mutex); + + // Use a temporary variable to accumulate desired reconnections, so we don't need + // m_reconnections_mutex while holding m_nodes_mutex. + decltype(m_reconnections) reconnections_to_add; + { LOCK(m_nodes_mutex); @@ -1911,6 +1950,19 @@ void CConnman::DisconnectNodes() // remove from m_nodes m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end()); + // Add to reconnection list if appropriate. We don't reconnect right here, because + // the creation of a connection is a blocking operation (up to several seconds), + // and we don't want to hold up the socket handler thread for that long. + if (pnode->m_transport->ShouldReconnectV1()) { + reconnections_to_add.push_back({ + .addr_connect = pnode->addr, + .grant = std::move(pnode->grantOutbound), + .destination = pnode->m_dest, + .conn_type = pnode->m_conn_type, + .use_v2transport = false}); + LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId()); + } + // release outbound grant (if any) pnode->grantOutbound.Release(); @@ -1938,6 +1990,11 @@ void CConnman::DisconnectNodes() } } } + { + // Move entries from reconnections_to_add to m_reconnections. + LOCK(m_reconnections_mutex); + m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add)); + } } void CConnman::NotifyNumConnectionsChanged() @@ -2315,9 +2372,9 @@ void CConnman::ProcessAddrFetch() m_addr_fetches.pop_front(); } CAddress addr; - CSemaphoreGrant grant(*semOutbound, true); + CSemaphoreGrant grant(*semOutbound, /*fTry=*/true); if (grant) { - OpenNetworkConnection(addr, false, &grant, strDest.c_str(), ConnectionType::ADDR_FETCH); + OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, /*use_v2transport=*/false); } } @@ -2410,6 +2467,7 @@ bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network) void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); + AssertLockNotHeld(m_reconnections_mutex); FastRandomContext rng; // Connect to specific addresses if (!connect.empty()) @@ -2419,7 +2477,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) for (const std::string& strAddr : connect) { CAddress addr(CService(), NODE_NONE); - OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), ConnectionType::MANUAL); + OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/false); for (int i = 0; i < 10 && i < nLoop; i++) { if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) @@ -2453,6 +2511,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + PerformReconnections(); + CSemaphoreGrant grant(*semOutbound); if (interruptNet) return; @@ -2473,7 +2533,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) // Perform cheap checks before locking a mutex. else if (!dnsseed && !use_seednodes) { LOCK(m_added_nodes_mutex); - if (m_added_nodes.empty()) { + if (m_added_node_params.empty()) { add_fixed_seeds_now = true; LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n"); } @@ -2722,7 +2782,9 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) // Don't record addrman failure attempts when node is offline. This can be identified since all local // network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1. const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(nMaxConnections - 1, 2)}; - OpenNetworkConnection(addrConnect, count_failures, &grant, /*strDest=*/nullptr, conn_type); + // Use BIP324 transport when both us and them have NODE_V2_P2P set. + const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2); + OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport); } } } @@ -2744,11 +2806,11 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const { std::vector<AddedNodeInfo> ret; - std::list<std::string> lAddresses(0); + std::list<AddedNodeParams> lAddresses(0); { LOCK(m_added_nodes_mutex); - ret.reserve(m_added_nodes.size()); - std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses)); + ret.reserve(m_added_node_params.size()); + std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses)); } @@ -2768,9 +2830,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const } } - for (const std::string& strAddNode : lAddresses) { - CService service(LookupNumeric(strAddNode, GetDefaultPort(strAddNode))); - AddedNodeInfo addedNode{strAddNode, CService(), false, false}; + for (const auto& addr : lAddresses) { + CService service(LookupNumeric(addr.m_added_node, GetDefaultPort(addr.m_added_node))); + AddedNodeInfo addedNode{addr, CService(), false, false}; if (service.IsValid()) { // strAddNode is an IP:port auto it = mapConnected.find(service); @@ -2781,7 +2843,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const } } else { // strAddNode is a name - auto it = mapConnectedByName.find(strAddNode); + auto it = mapConnectedByName.find(addr.m_added_node); if (it != mapConnectedByName.end()) { addedNode.resolvedAddress = it->second.second; addedNode.fConnected = true; @@ -2797,6 +2859,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const void CConnman::ThreadOpenAddedConnections() { AssertLockNotHeld(m_unused_i2p_sessions_mutex); + AssertLockNotHeld(m_reconnections_mutex); while (true) { CSemaphoreGrant grant(*semAddnode); @@ -2804,26 +2867,28 @@ void CConnman::ThreadOpenAddedConnections() bool tried = false; for (const AddedNodeInfo& info : vInfo) { if (!info.fConnected) { - if (!grant.TryAcquire()) { + if (!grant) { // If we've used up our semaphore and need a new one, let's not wait here since while we are waiting // the addednodeinfo state might change. break; } tried = true; CAddress addr(CService(), NODE_NONE); - OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), ConnectionType::MANUAL); - if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) - return; + OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; + grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true); } } // Retry every 60 seconds if a connection was attempted, otherwise two seconds if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) return; + // See if any reconnections are desired. + PerformReconnections(); } } // if successful, this moves the passed grant to the constructed node -void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, ConnectionType conn_type) +void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); @@ -2845,12 +2910,11 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai } else if (FindNode(std::string(pszDest))) return; - CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type); + CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport); if (!pnode) return; - if (grantOutbound) - grantOutbound->MoveTo(pnode->grantOutbound); + pnode->grantOutbound = std::move(grant_outbound); m_msgproc->InitializeNode(*pnode, nLocalServices); { @@ -3403,23 +3467,23 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres return cache_entry.m_addrs_response_cache; } -bool CConnman::AddNode(const std::string& strNode) +bool CConnman::AddNode(const AddedNodeParams& add) { LOCK(m_added_nodes_mutex); - for (const std::string& it : m_added_nodes) { - if (strNode == it) return false; + for (const auto& it : m_added_node_params) { + if (add.m_added_node == it.m_added_node) return false; } - m_added_nodes.push_back(strNode); + m_added_node_params.push_back(add); return true; } bool CConnman::RemoveAddedNode(const std::string& strNode) { LOCK(m_added_nodes_mutex); - for(std::vector<std::string>::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) { - if (strNode == *it) { - m_added_nodes.erase(it); + for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) { + if (strNode == it->m_added_node) { + m_added_node_params.erase(it); return true; } } @@ -3607,6 +3671,15 @@ ServiceFlags CConnman::GetLocalServices() const return nLocalServices; } +static std::unique_ptr<Transport> MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept +{ + if (use_v2transport) { + return std::make_unique<V2Transport>(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION); + } else { + return std::make_unique<V1Transport>(id, SER_NETWORK, INIT_PROTO_VERSION); + } +} + CNode::CNode(NodeId idIn, std::shared_ptr<Sock> sock, const CAddress& addrIn, @@ -3617,13 +3690,14 @@ CNode::CNode(NodeId idIn, ConnectionType conn_type_in, bool inbound_onion, CNodeOptions&& node_opts) - : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)}, + : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)}, m_permission_flags{node_opts.permission_flags}, m_sock{sock}, m_connected{GetTime<std::chrono::seconds>()}, addr{addrIn}, addrBind{addrBindIn}, m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn}, + m_dest(addrNameIn), m_inbound_onion{inbound_onion}, m_prefer_evict{node_opts.prefer_evict}, nKeyedNetGroup{nKeyedNetGroupIn}, @@ -3754,10 +3828,38 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize(); } -void CaptureMessageToFile(const CAddress& addr, - const std::string& msg_type, - Span<const unsigned char> data, - bool is_incoming) +void CConnman::PerformReconnections() +{ + AssertLockNotHeld(m_reconnections_mutex); + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + while (true) { + // Move first element of m_reconnections to todo (avoiding an allocation inside the lock). + decltype(m_reconnections) todo; + { + LOCK(m_reconnections_mutex); + if (m_reconnections.empty()) break; + todo.splice(todo.end(), m_reconnections, m_reconnections.begin()); + } + + auto& item = *todo.begin(); + OpenNetworkConnection(item.addr_connect, + // We only reconnect if the first attempt to connect succeeded at + // connection time, but then failed after the CNode object was + // created. Since we already know connecting is possible, do not + // count failure to reconnect. + /*fCountFailure=*/false, + std::move(item.grant), + item.destination.empty() ? nullptr : item.destination.c_str(), + item.conn_type, + item.use_v2transport); + } +} + +// Dump binary message to file, with timestamp. +static void CaptureMessageToFile(const CAddress& addr, + const std::string& msg_type, + Span<const unsigned char> data, + bool is_incoming) { // Note: This function captures the message at the time of processing, // not at socket receive/send time. |