// Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2022 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #if defined(HAVE_CONFIG_H) #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef WIN32 #include #endif #if HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS #include #endif #include #include #include #include #include #include #include /** Maximum number of block-relay-only anchor connections */ static constexpr size_t MAX_BLOCK_RELAY_ONLY_ANCHORS = 2; static_assert (MAX_BLOCK_RELAY_ONLY_ANCHORS <= static_cast(MAX_BLOCK_RELAY_ONLY_CONNECTIONS), "MAX_BLOCK_RELAY_ONLY_ANCHORS must not exceed MAX_BLOCK_RELAY_ONLY_CONNECTIONS."); /** Anchor IP address database file name */ const char* const ANCHORS_DATABASE_FILENAME = "anchors.dat"; // How often to dump addresses to peers.dat static constexpr std::chrono::minutes DUMP_PEERS_INTERVAL{15}; /** Number of DNS seeds to query when the number of connections is low. */ static constexpr int DNSSEEDS_TO_QUERY_AT_ONCE = 3; /** How long to delay before querying DNS seeds * * If we have more than THRESHOLD entries in addrman, then it's likely * that we got those addresses from having previously connected to the P2P * network, and that we'll be able to successfully reconnect to the P2P * network via contacting one of them. So if that's the case, spend a * little longer trying to connect to known peers before querying the * DNS seeds. */ static constexpr std::chrono::seconds DNSSEEDS_DELAY_FEW_PEERS{11}; static constexpr std::chrono::minutes DNSSEEDS_DELAY_MANY_PEERS{5}; static constexpr int DNSSEEDS_DELAY_PEER_THRESHOLD = 1000; // "many" vs "few" peers /** The default timeframe for -maxuploadtarget. 1 day. */ static constexpr std::chrono::seconds MAX_UPLOAD_TIMEFRAME{60 * 60 * 24}; // A random time period (0 to 1 seconds) is added to feeler connections to prevent synchronization. static constexpr auto FEELER_SLEEP_WINDOW{1s}; /** Frequency to attempt extra connections to reachable networks we're not connected to yet **/ static constexpr auto EXTRA_NETWORK_PEER_INTERVAL{5min}; /** Used to pass flags to the Bind() function */ enum BindFlags { BF_NONE = 0, BF_REPORT_ERROR = (1U << 0), /** * Do not call AddLocal() for our special addresses, e.g., for incoming * Tor connections, to prevent gossiping them over the network. */ BF_DONT_ADVERTISE = (1U << 1), }; // The set of sockets cannot be modified while waiting // The sleep time needs to be small to avoid new sockets stalling static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50; const std::string NET_MESSAGE_TYPE_OTHER = "*other*"; static const uint64_t RANDOMIZER_ID_NETGROUP = 0x6c0edd8036ef4036ULL; // SHA256("netgroup")[0:8] static const uint64_t RANDOMIZER_ID_LOCALHOSTNONCE = 0xd93e69e2bbfa5735ULL; // SHA256("localhostnonce")[0:8] static const uint64_t RANDOMIZER_ID_ADDRCACHE = 0x1cf2e4ddd306dda9ULL; // SHA256("addrcache")[0:8] // // Global state variables // bool fDiscover = true; bool fListen = true; GlobalMutex g_maplocalhost_mutex; std::map mapLocalHost GUARDED_BY(g_maplocalhost_mutex); std::string strSubVersion; size_t CSerializedNetMsg::GetMemoryUsage() const noexcept { // Don't count the dynamic memory used for the m_type string, by assuming it fits in the // "small string" optimization area (which stores data inside the object itself, up to some // size; 15 bytes in modern libstdc++). return sizeof(*this) + memusage::DynamicUsage(data); } void CConnman::AddAddrFetch(const std::string& strDest) { LOCK(m_addr_fetches_mutex); m_addr_fetches.push_back(strDest); } uint16_t GetListenPort() { // If -bind= is provided with ":port" part, use that (first one if multiple are provided). for (const std::string& bind_arg : gArgs.GetArgs("-bind")) { constexpr uint16_t dummy_port = 0; const std::optional bind_addr{Lookup(bind_arg, dummy_port, /*fAllowLookup=*/false)}; if (bind_addr.has_value() && bind_addr->GetPort() != dummy_port) return bind_addr->GetPort(); } // Otherwise, if -whitebind= without NetPermissionFlags::NoBan is provided, use that // (-whitebind= is required to have ":port"). for (const std::string& whitebind_arg : gArgs.GetArgs("-whitebind")) { NetWhitebindPermissions whitebind; bilingual_str error; if (NetWhitebindPermissions::TryParse(whitebind_arg, whitebind, error)) { if (!NetPermissions::HasFlag(whitebind.m_flags, NetPermissionFlags::NoBan)) { return whitebind.m_service.GetPort(); } } } // Otherwise, if -port= is provided, use that. Otherwise use the default port. return static_cast(gArgs.GetIntArg("-port", Params().GetDefaultPort())); } // Determine the "best" local address for a particular peer. [[nodiscard]] static std::optional GetLocal(const CNode& peer) { if (!fListen) return std::nullopt; std::optional addr; int nBestScore = -1; int nBestReachability = -1; { LOCK(g_maplocalhost_mutex); for (const auto& [local_addr, local_service_info] : mapLocalHost) { // For privacy reasons, don't advertise our privacy-network address // to other networks and don't advertise our other-network address // to privacy networks. if (local_addr.GetNetwork() != peer.ConnectedThroughNetwork() && (local_addr.IsPrivacyNet() || peer.IsConnectedThroughPrivacyNet())) { continue; } const int nScore{local_service_info.nScore}; const int nReachability{local_addr.GetReachabilityFrom(peer.addr)}; if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) { addr.emplace(CService{local_addr, local_service_info.nPort}); nBestReachability = nReachability; nBestScore = nScore; } } } return addr; } //! Convert the serialized seeds into usable address objects. static std::vector ConvertSeeds(const std::vector &vSeedsIn) { // It'll only connect to one or two seed nodes because once it connects, // it'll get a pile of addresses with newer timestamps. // Seed nodes are given a random 'last seen time' of between one and two // weeks ago. const auto one_week{7 * 24h}; std::vector vSeedsOut; FastRandomContext rng; DataStream underlying_stream{vSeedsIn}; ParamsStream s{CAddress::V2_NETWORK, underlying_stream}; while (!s.eof()) { CService endpoint; s >> endpoint; CAddress addr{endpoint, GetDesirableServiceFlags(NODE_NONE)}; addr.nTime = rng.rand_uniform_delay(Now() - one_week, -one_week); LogPrint(BCLog::NET, "Added hardcoded seed: %s\n", addr.ToStringAddrPort()); vSeedsOut.push_back(addr); } return vSeedsOut; } // Determine the "best" local address for a particular peer. // If none, return the unroutable 0.0.0.0 but filled in with // the normal parameters, since the IP may be changed to a useful // one by discovery. CService GetLocalAddress(const CNode& peer) { return GetLocal(peer).value_or(CService{CNetAddr(), GetListenPort()}); } static int GetnScore(const CService& addr) { LOCK(g_maplocalhost_mutex); const auto it = mapLocalHost.find(addr); return (it != mapLocalHost.end()) ? it->second.nScore : 0; } // Is our peer's addrLocal potentially useful as an external IP source? [[nodiscard]] static bool IsPeerAddrLocalGood(CNode *pnode) { CService addrLocal = pnode->GetAddrLocal(); return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() && g_reachable_nets.Contains(addrLocal); } std::optional GetLocalAddrForPeer(CNode& node) { CService addrLocal{GetLocalAddress(node)}; if (gArgs.GetBoolArg("-addrmantest", false)) { // use IPv4 loopback during addrmantest addrLocal = CService(LookupNumeric("127.0.0.1", GetListenPort())); } // If discovery is enabled, sometimes give our peer the address it // tells us that it sees us as in case it has a better idea of our // address than we do. FastRandomContext rng; if (IsPeerAddrLocalGood(&node) && (!addrLocal.IsRoutable() || rng.randbits((GetnScore(addrLocal) > LOCAL_MANUAL) ? 3 : 1) == 0)) { if (node.IsInboundConn()) { // For inbound connections, assume both the address and the port // as seen from the peer. addrLocal = CService{node.GetAddrLocal()}; } else { // For outbound connections, assume just the address as seen from // the peer and leave the port in `addrLocal` as returned by // `GetLocalAddress()` above. The peer has no way to observe our // listening port when we have initiated the connection. addrLocal.SetIP(node.GetAddrLocal()); } } if (addrLocal.IsRoutable() || gArgs.GetBoolArg("-addrmantest", false)) { LogPrint(BCLog::NET, "Advertising address %s to peer=%d\n", addrLocal.ToStringAddrPort(), node.GetId()); return addrLocal; } // Address is unroutable. Don't advertise. return std::nullopt; } // learn a new local address bool AddLocal(const CService& addr_, int nScore) { CService addr{MaybeFlipIPv6toCJDNS(addr_)}; if (!addr.IsRoutable()) return false; if (!fDiscover && nScore < LOCAL_MANUAL) return false; if (!g_reachable_nets.Contains(addr)) return false; LogPrintf("AddLocal(%s,%i)\n", addr.ToStringAddrPort(), nScore); { LOCK(g_maplocalhost_mutex); const auto [it, is_newly_added] = mapLocalHost.emplace(addr, LocalServiceInfo()); LocalServiceInfo &info = it->second; if (is_newly_added || nScore >= info.nScore) { info.nScore = nScore + (is_newly_added ? 0 : 1); info.nPort = addr.GetPort(); } } return true; } bool AddLocal(const CNetAddr &addr, int nScore) { return AddLocal(CService(addr, GetListenPort()), nScore); } void RemoveLocal(const CService& addr) { LOCK(g_maplocalhost_mutex); LogPrintf("RemoveLocal(%s)\n", addr.ToStringAddrPort()); mapLocalHost.erase(addr); } /** vote for a local address */ bool SeenLocal(const CService& addr) { LOCK(g_maplocalhost_mutex); const auto it = mapLocalHost.find(addr); if (it == mapLocalHost.end()) return false; ++it->second.nScore; return true; } /** check whether a given address is potentially local */ bool IsLocal(const CService& addr) { LOCK(g_maplocalhost_mutex); return mapLocalHost.count(addr) > 0; } CNode* CConnman::FindNode(const CNetAddr& ip) { LOCK(m_nodes_mutex); for (CNode* pnode : m_nodes) { if (static_cast(pnode->addr) == ip) { return pnode; } } return nullptr; } CNode* CConnman::FindNode(const std::string& addrName) { LOCK(m_nodes_mutex); for (CNode* pnode : m_nodes) { if (pnode->m_addr_name == addrName) { return pnode; } } return nullptr; } CNode* CConnman::FindNode(const CService& addr) { LOCK(m_nodes_mutex); for (CNode* pnode : m_nodes) { if (static_cast(pnode->addr) == addr) { return pnode; } } return nullptr; } bool CConnman::AlreadyConnectedToAddress(const CAddress& addr) { return FindNode(static_cast(addr)) || FindNode(addr.ToStringAddrPort()); } bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) return false; } return true; } /** Get the bind address for a socket as CAddress */ static CAddress GetBindAddress(const Sock& sock) { CAddress addr_bind; struct sockaddr_storage sockaddr_bind; socklen_t sockaddr_bind_len = sizeof(sockaddr_bind); if (!sock.GetSockName((struct sockaddr*)&sockaddr_bind, &sockaddr_bind_len)) { addr_bind.SetSockAddr((const struct sockaddr*)&sockaddr_bind); } else { LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "getsockname failed\n"); } return addr_bind; } CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); if (pszDest == nullptr) { if (IsLocal(addrConnect)) return nullptr; // Look for an existing connection CNode* pnode = FindNode(static_cast(addrConnect)); if (pnode) { LogPrintf("Failed to open new connection, already connected\n"); return nullptr; } } LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying %s connection %s lastseen=%.1fhrs\n", use_v2transport ? "v2" : "v1", pszDest ? pszDest : addrConnect.ToStringAddrPort(), Ticks(pszDest ? 0h : Now() - addrConnect.nTime)); // Resolve const uint16_t default_port{pszDest != nullptr ? GetDefaultPort(pszDest) : m_params.GetDefaultPort()}; if (pszDest) { const std::vector resolved{Lookup(pszDest, default_port, fNameLookup && !HaveNameProxy(), 256)}; if (!resolved.empty()) { const CService& rnd{resolved[GetRand(resolved.size())]}; addrConnect = CAddress{MaybeFlipIPv6toCJDNS(rnd), NODE_NONE}; if (!addrConnect.IsValid()) { LogPrint(BCLog::NET, "Resolver returned invalid address %s for %s\n", addrConnect.ToStringAddrPort(), pszDest); return nullptr; } // It is possible that we already have a connection to the IP/port pszDest resolved to. // In that case, drop the connection that was just created. LOCK(m_nodes_mutex); CNode* pnode = FindNode(static_cast(addrConnect)); if (pnode) { LogPrintf("Failed to open new connection, already connected\n"); return nullptr; } } } // Connect bool connected = false; std::unique_ptr sock; Proxy proxy; CAddress addr_bind; assert(!addr_bind.IsValid()); std::unique_ptr i2p_transient_session; if (addrConnect.IsValid()) { const bool use_proxy{GetProxy(addrConnect.GetNetwork(), proxy)}; bool proxyConnectionFailed = false; if (addrConnect.IsI2P() && use_proxy) { i2p::Connection conn; if (m_i2p_sam_session) { connected = m_i2p_sam_session->Connect(addrConnect, conn, proxyConnectionFailed); } else { { LOCK(m_unused_i2p_sessions_mutex); if (m_unused_i2p_sessions.empty()) { i2p_transient_session = std::make_unique(proxy.proxy, &interruptNet); } else { i2p_transient_session.swap(m_unused_i2p_sessions.front()); m_unused_i2p_sessions.pop(); } } connected = i2p_transient_session->Connect(addrConnect, conn, proxyConnectionFailed); if (!connected) { LOCK(m_unused_i2p_sessions_mutex); if (m_unused_i2p_sessions.size() < MAX_UNUSED_I2P_SESSIONS_SIZE) { m_unused_i2p_sessions.emplace(i2p_transient_session.release()); } } } if (connected) { sock = std::move(conn.sock); addr_bind = CAddress{conn.me, NODE_NONE}; } } else if (use_proxy) { sock = CreateSock(proxy.proxy); if (!sock) { return nullptr; } connected = ConnectThroughProxy(proxy, addrConnect.ToStringAddr(), addrConnect.GetPort(), *sock, nConnectTimeout, proxyConnectionFailed); } else { // no proxy needed (none set for target network) sock = CreateSock(addrConnect); if (!sock) { return nullptr; } connected = ConnectSocketDirectly(addrConnect, *sock, nConnectTimeout, conn_type == ConnectionType::MANUAL); } if (!proxyConnectionFailed) { // If a connection to the node was attempted, and failure (if any) is not caused by a problem connecting to // the proxy, mark this as an attempt. addrman.Attempt(addrConnect, fCountFailure); } } else if (pszDest && GetNameProxy(proxy)) { sock = CreateSock(proxy.proxy); if (!sock) { return nullptr; } std::string host; uint16_t port{default_port}; SplitHostPort(std::string(pszDest), port, host); bool proxyConnectionFailed; connected = ConnectThroughProxy(proxy, host, port, *sock, nConnectTimeout, proxyConnectionFailed); } if (!connected) { return nullptr; } // Add node NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); if (!addr_bind.IsValid()) { addr_bind = GetBindAddress(*sock); } CNode* pnode = new CNode(id, std::move(sock), addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, addr_bind, pszDest ? pszDest : "", conn_type, /*inbound_onion=*/false, CNodeOptions{ .i2p_sam_session = std::move(i2p_transient_session), .recv_flood_size = nReceiveFloodSize, .use_v2transport = use_v2transport, }); pnode->AddRef(); // We're making a new connection, harvest entropy from the time (and our peer count) RandAddEvent((uint32_t)id); return pnode; } void CNode::CloseSocketDisconnect() { fDisconnect = true; LOCK(m_sock_mutex); if (m_sock) { LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); m_sock.reset(); } m_i2p_sam_session.reset(); } void CConnman::AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const { for (const auto& subnet : vWhitelistedRange) { if (subnet.m_subnet.Match(addr)) NetPermissions::AddFlag(flags, subnet.m_flags); } } CService CNode::GetAddrLocal() const { AssertLockNotHeld(m_addr_local_mutex); LOCK(m_addr_local_mutex); return addrLocal; } void CNode::SetAddrLocal(const CService& addrLocalIn) { AssertLockNotHeld(m_addr_local_mutex); LOCK(m_addr_local_mutex); if (addrLocal.IsValid()) { error("Addr local already set for node: %i. Refusing to change from %s to %s", id, addrLocal.ToStringAddrPort(), addrLocalIn.ToStringAddrPort()); } else { addrLocal = addrLocalIn; } } Network CNode::ConnectedThroughNetwork() const { return m_inbound_onion ? NET_ONION : addr.GetNetClass(); } bool CNode::IsConnectedThroughPrivacyNet() const { return m_inbound_onion || addr.IsPrivacyNet(); } #undef X #define X(name) stats.name = name void CNode::CopyStats(CNodeStats& stats) { stats.nodeid = this->GetId(); X(addr); X(addrBind); stats.m_network = ConnectedThroughNetwork(); X(m_last_send); X(m_last_recv); X(m_last_tx_time); X(m_last_block_time); X(m_connected); X(nTimeOffset); X(m_addr_name); X(nVersion); { LOCK(m_subver_mutex); X(cleanSubVer); } stats.fInbound = IsInboundConn(); X(m_bip152_highbandwidth_to); X(m_bip152_highbandwidth_from); { LOCK(cs_vSend); X(mapSendBytesPerMsgType); X(nSendBytes); } { LOCK(cs_vRecv); X(mapRecvBytesPerMsgType); X(nRecvBytes); Transport::Info info = m_transport->GetInfo(); stats.m_transport_type = info.transport_type; if (info.session_id) stats.m_session_id = HexStr(*info.session_id); } X(m_permission_flags); X(m_last_ping_time); X(m_min_ping_time); // Leave string empty if addrLocal invalid (not filled in yet) CService addrLocalUnlocked = GetAddrLocal(); stats.addrLocal = addrLocalUnlocked.IsValid() ? addrLocalUnlocked.ToStringAddrPort() : ""; X(m_conn_type); } #undef X bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) { complete = false; const auto time = GetTime(); LOCK(cs_vRecv); m_last_recv = std::chrono::duration_cast(time); nRecvBytes += msg_bytes.size(); while (msg_bytes.size() > 0) { // absorb network data if (!m_transport->ReceivedBytes(msg_bytes)) { // Serious transport problem, disconnect from the peer. return false; } if (m_transport->ReceivedMessageComplete()) { // decompose a transport agnostic CNetMessage from the deserializer bool reject_message{false}; CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message); if (reject_message) { // Message deserialization failed. Drop the message but don't disconnect the peer. // store the size of the corrupt message mapRecvBytesPerMsgType.at(NET_MESSAGE_TYPE_OTHER) += msg.m_raw_message_size; continue; } // Store received bytes per message type. // To prevent a memory DOS, only allow known message types. auto i = mapRecvBytesPerMsgType.find(msg.m_type); if (i == mapRecvBytesPerMsgType.end()) { i = mapRecvBytesPerMsgType.find(NET_MESSAGE_TYPE_OTHER); } assert(i != mapRecvBytesPerMsgType.end()); i->second += msg.m_raw_message_size; // push the message to the process queue, vRecvMsg.push_back(std::move(msg)); complete = true; } } return true; } V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept : m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes)); m_magic_bytes = Params().MessageStart(); LOCK(m_recv_mutex); Reset(); } Transport::Info V1Transport::GetInfo() const noexcept { return {.transport_type = TransportProtocolType::V1, .session_id = {}}; } int V1Transport::readHeader(Span msg_bytes) { AssertLockHeld(m_recv_mutex); // copy data to temporary parsing buffer unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos; unsigned int nCopy = std::min(nRemaining, msg_bytes.size()); memcpy(&hdrbuf[nHdrPos], msg_bytes.data(), nCopy); nHdrPos += nCopy; // if header incomplete, exit if (nHdrPos < CMessageHeader::HEADER_SIZE) return nCopy; // deserialize to CMessageHeader try { hdrbuf >> hdr; } catch (const std::exception&) { LogPrint(BCLog::NET, "Header error: Unable to deserialize, peer=%d\n", m_node_id); return -1; } // Check start string, network magic if (hdr.pchMessageStart != m_magic_bytes) { LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id); return -1; } // reject messages larger than MAX_SIZE or MAX_PROTOCOL_MESSAGE_LENGTH if (hdr.nMessageSize > MAX_SIZE || hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) { LogPrint(BCLog::NET, "Header error: Size too large (%s, %u bytes), peer=%d\n", SanitizeString(hdr.GetCommand()), hdr.nMessageSize, m_node_id); return -1; } // switch state to reading message data in_data = true; return nCopy; } int V1Transport::readData(Span msg_bytes) { AssertLockHeld(m_recv_mutex); unsigned int nRemaining = hdr.nMessageSize - nDataPos; unsigned int nCopy = std::min(nRemaining, msg_bytes.size()); if (vRecv.size() < nDataPos + nCopy) { // Allocate up to 256 KiB ahead, but never more than the total message size. vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024)); } hasher.Write(msg_bytes.first(nCopy)); memcpy(&vRecv[nDataPos], msg_bytes.data(), nCopy); nDataPos += nCopy; return nCopy; } const uint256& V1Transport::GetMessageHash() const { AssertLockHeld(m_recv_mutex); assert(CompleteInternal()); if (data_hash.IsNull()) hasher.Finalize(data_hash); return data_hash; } CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message) { AssertLockNotHeld(m_recv_mutex); // Initialize out parameter reject_message = false; // decompose a single CNetMessage from the TransportDeserializer LOCK(m_recv_mutex); CNetMessage msg(std::move(vRecv)); // store message type string, time, and sizes msg.m_type = hdr.GetCommand(); msg.m_time = time; msg.m_message_size = hdr.nMessageSize; msg.m_raw_message_size = hdr.nMessageSize + CMessageHeader::HEADER_SIZE; uint256 hash = GetMessageHash(); // We just received a message off the wire, harvest entropy from the time (and the message checksum) RandAddEvent(ReadLE32(hash.begin())); // Check checksum and header message type string if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) { LogPrint(BCLog::NET, "Header error: Wrong checksum (%s, %u bytes), expected %s was %s, peer=%d\n", SanitizeString(msg.m_type), msg.m_message_size, HexStr(Span{hash}.first(CMessageHeader::CHECKSUM_SIZE)), HexStr(hdr.pchChecksum), m_node_id); reject_message = true; } else if (!hdr.IsCommandValid()) { LogPrint(BCLog::NET, "Header error: Invalid message type (%s, %u bytes), peer=%d\n", SanitizeString(hdr.GetCommand()), msg.m_message_size, m_node_id); reject_message = true; } // Always reset the network deserializer (prepare for the next message) Reset(); return msg; } bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept { AssertLockNotHeld(m_send_mutex); // Determine whether a new message can be set. LOCK(m_send_mutex); if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false; // create dbl-sha256 checksum uint256 hash = Hash(msg.data); // create header CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size()); memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE); // serialize header m_header_to_send.clear(); CVectorWriter{INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; // update state m_message_to_send = std::move(msg); m_sending_header = true; m_bytes_sent = 0; return true; } Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_sending_header) { return {Span{m_header_to_send}.subspan(m_bytes_sent), // We have more to send after the header if the message has payload, or if there // is a next message after that. have_next_message || !m_message_to_send.data.empty(), m_message_to_send.m_type }; } else { return {Span{m_message_to_send.data}.subspan(m_bytes_sent), // We only have more to send after this message's payload if there is another // message. have_next_message, m_message_to_send.m_type }; } } void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); m_bytes_sent += bytes_sent; if (m_sending_header && m_bytes_sent == m_header_to_send.size()) { // We're done sending a message's header. Switch to sending its data bytes. m_sending_header = false; m_bytes_sent = 0; } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { // We're done sending a message's data. Wipe the data vector to reduce memory consumption. ClearShrink(m_message_to_send.data); m_bytes_sent = 0; } } size_t V1Transport::GetSendMemoryUsage() const noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded. return m_message_to_send.GetMemoryUsage(); } namespace { /** List of short messages as defined in BIP324, in order. * * Only message types that are actually implemented in this codebase need to be listed, as other * messages get ignored anyway - whether we know how to decode them or not. */ const std::array V2_MESSAGE_IDS = { "", // 12 bytes follow encoding the message type like in V1 NetMsgType::ADDR, NetMsgType::BLOCK, NetMsgType::BLOCKTXN, NetMsgType::CMPCTBLOCK, NetMsgType::FEEFILTER, NetMsgType::FILTERADD, NetMsgType::FILTERCLEAR, NetMsgType::FILTERLOAD, NetMsgType::GETBLOCKS, NetMsgType::GETBLOCKTXN, NetMsgType::GETDATA, NetMsgType::GETHEADERS, NetMsgType::HEADERS, NetMsgType::INV, NetMsgType::MEMPOOL, NetMsgType::MERKLEBLOCK, NetMsgType::NOTFOUND, NetMsgType::PING, NetMsgType::PONG, NetMsgType::SENDCMPCT, NetMsgType::TX, NetMsgType::GETCFILTERS, NetMsgType::CFILTER, NetMsgType::GETCFHEADERS, NetMsgType::CFHEADERS, NetMsgType::GETCFCHECKPT, NetMsgType::CFCHECKPT, NetMsgType::ADDRV2, // Unimplemented message types that are assigned in BIP324: "", "", "", "" }; class V2MessageMap { std::unordered_map m_map; public: V2MessageMap() noexcept { for (size_t i = 1; i < std::size(V2_MESSAGE_IDS); ++i) { m_map.emplace(V2_MESSAGE_IDS[i], i); } } std::optional operator()(const std::string& message_name) const noexcept { auto it = m_map.find(message_name); if (it == m_map.end()) return std::nullopt; return it->second; } }; const V2MessageMap V2_MESSAGE_MAP; CKey GenerateRandomKey() noexcept { CKey key; key.MakeNewKey(/*fCompressed=*/true); return key; } std::vector GenerateRandomGarbage() noexcept { std::vector ret; FastRandomContext rng; ret.resize(rng.randrange(V2Transport::MAX_GARBAGE_LEN + 1)); rng.fillrand(MakeWritableByteSpan(ret)); return ret; } } // namespace void V2Transport::StartSendingHandshake() noexcept { AssertLockHeld(m_send_mutex); Assume(m_send_state == SendState::AWAITING_KEY); Assume(m_send_buffer.empty()); // Initialize the send buffer with ellswift pubkey + provided garbage. m_send_buffer.resize(EllSwiftPubKey::size() + m_send_garbage.size()); std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin()); std::copy(m_send_garbage.begin(), m_send_garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size()); // We cannot wipe m_send_garbage as it will still be used as AAD later in the handshake. } V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span ent32, std::vector garbage) noexcept : m_cipher{key, ent32}, m_initiating{initiating}, m_nodeid{nodeid}, m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in}, m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1}, m_send_garbage{std::move(garbage)}, m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1} { Assume(m_send_garbage.size() <= MAX_GARBAGE_LEN); // Start sending immediately if we're the initiator of the connection. if (initiating) { LOCK(m_send_mutex); StartSendingHandshake(); } } V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in) noexcept : V2Transport{nodeid, initiating, type_in, version_in, GenerateRandomKey(), MakeByteSpan(GetRandHash()), GenerateRandomGarbage()} { } void V2Transport::SetReceiveState(RecvState recv_state) noexcept { AssertLockHeld(m_recv_mutex); // Enforce allowed state transitions. switch (m_recv_state) { case RecvState::KEY_MAYBE_V1: Assume(recv_state == RecvState::KEY || recv_state == RecvState::V1); break; case RecvState::KEY: Assume(recv_state == RecvState::GARB_GARBTERM); break; case RecvState::GARB_GARBTERM: Assume(recv_state == RecvState::VERSION); break; case RecvState::VERSION: Assume(recv_state == RecvState::APP); break; case RecvState::APP: Assume(recv_state == RecvState::APP_READY); break; case RecvState::APP_READY: Assume(recv_state == RecvState::APP); break; case RecvState::V1: Assume(false); // V1 state cannot be left break; } // Change state. m_recv_state = recv_state; } void V2Transport::SetSendState(SendState send_state) noexcept { AssertLockHeld(m_send_mutex); // Enforce allowed state transitions. switch (m_send_state) { case SendState::MAYBE_V1: Assume(send_state == SendState::V1 || send_state == SendState::AWAITING_KEY); break; case SendState::AWAITING_KEY: Assume(send_state == SendState::READY); break; case SendState::READY: case SendState::V1: Assume(false); // Final states break; } // Change state. m_send_state = send_state; } bool V2Transport::ReceivedMessageComplete() const noexcept { AssertLockNotHeld(m_recv_mutex); LOCK(m_recv_mutex); if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedMessageComplete(); return m_recv_state == RecvState::APP_READY; } void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept { AssertLockHeld(m_recv_mutex); AssertLockNotHeld(m_send_mutex); Assume(m_recv_state == RecvState::KEY_MAYBE_V1); // We still have to determine if this is a v1 or v2 connection. The bytes being received could // be the beginning of either a v1 packet (network magic + "version\x00\x00\x00\x00\x00"), or // of a v2 public key. BIP324 specifies that a mismatch with this 16-byte string should trigger // sending of the key. std::array v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0}; std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), v1_prefix.begin()); Assume(m_recv_buffer.size() <= v1_prefix.size()); if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), v1_prefix.begin())) { // Mismatch with v1 prefix, so we can assume a v2 connection. SetReceiveState(RecvState::KEY); // Convert to KEY state, leaving received bytes around. // Transition the sender to AWAITING_KEY state and start sending. LOCK(m_send_mutex); SetSendState(SendState::AWAITING_KEY); StartSendingHandshake(); } else if (m_recv_buffer.size() == v1_prefix.size()) { // Full match with the v1 prefix, so fall back to v1 behavior. LOCK(m_send_mutex); Span feedback{m_recv_buffer}; // Feed already received bytes to v1 transport. It should always accept these, because it's // less than the size of a v1 header, and these are the first bytes fed to m_v1_fallback. bool ret = m_v1_fallback.ReceivedBytes(feedback); Assume(feedback.empty()); Assume(ret); SetReceiveState(RecvState::V1); SetSendState(SendState::V1); // Reset v2 transport buffers to save memory. ClearShrink(m_recv_buffer); ClearShrink(m_send_buffer); } else { // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come. } } bool V2Transport::ProcessReceivedKeyBytes() noexcept { AssertLockHeld(m_recv_mutex); AssertLockNotHeld(m_send_mutex); Assume(m_recv_state == RecvState::KEY); Assume(m_recv_buffer.size() <= EllSwiftPubKey::size()); // As a special exception, if bytes 4-16 of the key on a responder connection match the // corresponding bytes of a V1 version message, but bytes 0-4 don't match the network magic // (if they did, we'd have switched to V1 state already), assume this is a peer from // another network, and disconnect them. They will almost certainly disconnect us too when // they receive our uniformly random key and garbage, but detecting this case specially // means we can log it. static constexpr std::array MATCH = {'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0}; static constexpr size_t OFFSET = std::tuple_size_v; if (!m_initiating && m_recv_buffer.size() >= OFFSET + MATCH.size()) { if (std::equal(MATCH.begin(), MATCH.end(), m_recv_buffer.begin() + OFFSET)) { LogPrint(BCLog::NET, "V2 transport error: V1 peer with wrong MessageStart %s\n", HexStr(Span(m_recv_buffer).first(OFFSET))); return false; } } if (m_recv_buffer.size() == EllSwiftPubKey::size()) { // Other side's key has been fully received, and can now be Diffie-Hellman combined with // our key to initialize the encryption ciphers. // Initialize the ciphers. EllSwiftPubKey ellswift(MakeByteSpan(m_recv_buffer)); LOCK(m_send_mutex); m_cipher.Initialize(ellswift, m_initiating); // Switch receiver state to GARB_GARBTERM. SetReceiveState(RecvState::GARB_GARBTERM); m_recv_buffer.clear(); // Switch sender state to READY. SetSendState(SendState::READY); // Append the garbage terminator to the send buffer. m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::GARBAGE_TERMINATOR_LEN); std::copy(m_cipher.GetSendGarbageTerminator().begin(), m_cipher.GetSendGarbageTerminator().end(), MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin()); // Construct version packet in the send buffer, with the sent garbage data as AAD. m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()); m_cipher.Encrypt( /*contents=*/VERSION_CONTENTS, /*aad=*/MakeByteSpan(m_send_garbage), /*ignore=*/false, /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size())); // We no longer need the garbage. ClearShrink(m_send_garbage); } else { // We still have to receive more key bytes. } return true; } bool V2Transport::ProcessReceivedGarbageBytes() noexcept { AssertLockHeld(m_recv_mutex); Assume(m_recv_state == RecvState::GARB_GARBTERM); Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) { if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) { // Garbage terminator received. Store garbage to authenticate it as AAD later. m_recv_aad = std::move(m_recv_buffer); m_recv_aad.resize(m_recv_aad.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN); m_recv_buffer.clear(); SetReceiveState(RecvState::VERSION); } else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) { // We've reached the maximum length for garbage + garbage terminator, and the // terminator still does not match. Abort. LogPrint(BCLog::NET, "V2 transport error: missing garbage terminator, peer=%d\n", m_nodeid); return false; } else { // We still need to receive more garbage and/or garbage terminator bytes. } } else { // We have less than GARBAGE_TERMINATOR_LEN (16) bytes, so we certainly need to receive // more first. } return true; } bool V2Transport::ProcessReceivedPacketBytes() noexcept { AssertLockHeld(m_recv_mutex); Assume(m_recv_state == RecvState::VERSION || m_recv_state == RecvState::APP); // The maximum permitted contents length for a packet, consisting of: // - 0x00 byte: indicating long message type encoding // - 12 bytes of message type // - payload static constexpr size_t MAX_CONTENTS_LEN = 1 + CMessageHeader::COMMAND_SIZE + std::min(MAX_SIZE, MAX_PROTOCOL_MESSAGE_LENGTH); if (m_recv_buffer.size() == BIP324Cipher::LENGTH_LEN) { // Length descriptor received. m_recv_len = m_cipher.DecryptLength(MakeByteSpan(m_recv_buffer)); if (m_recv_len > MAX_CONTENTS_LEN) { LogPrint(BCLog::NET, "V2 transport error: packet too large (%u bytes), peer=%d\n", m_recv_len, m_nodeid); return false; } } else if (m_recv_buffer.size() > BIP324Cipher::LENGTH_LEN && m_recv_buffer.size() == m_recv_len + BIP324Cipher::EXPANSION) { // Ciphertext received, decrypt it into m_recv_decode_buffer. // Note that it is impossible to reach this branch without hitting the branch above first, // as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point. m_recv_decode_buffer.resize(m_recv_len); bool ignore{false}; bool ret = m_cipher.Decrypt( /*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN), /*aad=*/MakeByteSpan(m_recv_aad), /*ignore=*/ignore, /*contents=*/MakeWritableByteSpan(m_recv_decode_buffer)); if (!ret) { LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid); return false; } // We have decrypted a valid packet with the AAD we expected, so clear the expected AAD. ClearShrink(m_recv_aad); // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG. RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4)); // At this point we have a valid packet decrypted into m_recv_decode_buffer. If it's not a // decoy, which we simply ignore, use the current state to decide what to do with it. if (!ignore) { switch (m_recv_state) { case RecvState::VERSION: // Version message received; transition to application phase. The contents is // ignored, but can be used for future extensions. SetReceiveState(RecvState::APP); break; case RecvState::APP: // Application message decrypted correctly. It can be extracted using GetMessage(). SetReceiveState(RecvState::APP_READY); break; default: // Any other state is invalid (this function should not have been called). Assume(false); } } // Wipe the receive buffer where the next packet will be received into. ClearShrink(m_recv_buffer); // In all but APP_READY state, we can wipe the decoded contents. if (m_recv_state != RecvState::APP_READY) ClearShrink(m_recv_decode_buffer); } else { // We either have less than 3 bytes, so we don't know the packet's length yet, or more // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive. } return true; } size_t V2Transport::GetMaxBytesToProcess() noexcept { AssertLockHeld(m_recv_mutex); switch (m_recv_state) { case RecvState::KEY_MAYBE_V1: // During the KEY_MAYBE_V1 state we do not allow more than the length of v1 prefix into the // receive buffer. Assume(m_recv_buffer.size() <= V1_PREFIX_LEN); // As long as we're not sure if this is a v1 or v2 connection, don't receive more than what // is strictly necessary to distinguish the two (16 bytes). If we permitted more than // the v1 header size (24 bytes), we may not be able to feed the already-received bytes // back into the m_v1_fallback V1 transport. return V1_PREFIX_LEN - m_recv_buffer.size(); case RecvState::KEY: // During the KEY state, we only allow the 64-byte key into the receive buffer. Assume(m_recv_buffer.size() <= EllSwiftPubKey::size()); // As long as we have not received the other side's public key, don't receive more than // that (64 bytes), as garbage follows, and locating the garbage terminator requires the // key exchange first. return EllSwiftPubKey::size() - m_recv_buffer.size(); case RecvState::GARB_GARBTERM: // Process garbage bytes one by one (because terminator may appear anywhere). return 1; case RecvState::VERSION: case RecvState::APP: // These three states all involve decoding a packet. Process the length descriptor first, // so that we know where the current packet ends (and we don't process bytes from the next // packet or decoy yet). Then, process the ciphertext bytes of the current packet. if (m_recv_buffer.size() < BIP324Cipher::LENGTH_LEN) { return BIP324Cipher::LENGTH_LEN - m_recv_buffer.size(); } else { // Note that BIP324Cipher::EXPANSION is the total difference between contents size // and encoded packet size, which includes the 3 bytes due to the packet length. // When transitioning from receiving the packet length to receiving its ciphertext, // the encrypted packet length is left in the receive buffer. return BIP324Cipher::EXPANSION + m_recv_len - m_recv_buffer.size(); } case RecvState::APP_READY: // No bytes can be processed until GetMessage() is called. return 0; case RecvState::V1: // Not allowed (must be dealt with by the caller). Assume(false); return 0; } Assume(false); // unreachable return 0; } bool V2Transport::ReceivedBytes(Span& msg_bytes) noexcept { AssertLockNotHeld(m_recv_mutex); /** How many bytes to allocate in the receive buffer at most above what is received so far. */ static constexpr size_t MAX_RESERVE_AHEAD = 256 * 1024; LOCK(m_recv_mutex); if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedBytes(msg_bytes); // Process the provided bytes in msg_bytes in a loop. In each iteration a nonzero number of // bytes (decided by GetMaxBytesToProcess) are taken from the beginning om msg_bytes, and // appended to m_recv_buffer. Then, depending on the receiver state, one of the // ProcessReceived*Bytes functions is called to process the bytes in that buffer. while (!msg_bytes.empty()) { // Decide how many bytes to copy from msg_bytes to m_recv_buffer. size_t max_read = GetMaxBytesToProcess(); // Reserve space in the buffer if there is not enough. if (m_recv_buffer.size() + std::min(msg_bytes.size(), max_read) > m_recv_buffer.capacity()) { switch (m_recv_state) { case RecvState::KEY_MAYBE_V1: case RecvState::KEY: case RecvState::GARB_GARBTERM: // During the initial states (key/garbage), allocate once to fit the maximum (4111 // bytes). m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); break; case RecvState::VERSION: case RecvState::APP: { // During states where a packet is being received, as much as is expected but never // more than MAX_RESERVE_AHEAD bytes in addition to what is received so far. // This means attackers that want to cause us to waste allocated memory are limited // to MAX_RESERVE_AHEAD above the largest allowed message contents size, and to // MAX_RESERVE_AHEAD more than they've actually sent us. size_t alloc_add = std::min(max_read, msg_bytes.size() + MAX_RESERVE_AHEAD); m_recv_buffer.reserve(m_recv_buffer.size() + alloc_add); break; } case RecvState::APP_READY: // The buffer is empty in this state. Assume(m_recv_buffer.empty()); break; case RecvState::V1: // Should have bailed out above. Assume(false); break; } } // Can't read more than provided input. max_read = std::min(msg_bytes.size(), max_read); // Copy data to buffer. m_recv_buffer.insert(m_recv_buffer.end(), UCharCast(msg_bytes.data()), UCharCast(msg_bytes.data() + max_read)); msg_bytes = msg_bytes.subspan(max_read); // Process data in the buffer. switch (m_recv_state) { case RecvState::KEY_MAYBE_V1: ProcessReceivedMaybeV1Bytes(); if (m_recv_state == RecvState::V1) return true; break; case RecvState::KEY: if (!ProcessReceivedKeyBytes()) return false; break; case RecvState::GARB_GARBTERM: if (!ProcessReceivedGarbageBytes()) return false; break; case RecvState::VERSION: case RecvState::APP: if (!ProcessReceivedPacketBytes()) return false; break; case RecvState::APP_READY: return true; case RecvState::V1: // We should have bailed out before. Assume(false); break; } // Make sure we have made progress before continuing. Assume(max_read > 0); } return true; } std::optional V2Transport::GetMessageType(Span& contents) noexcept { if (contents.size() == 0) return std::nullopt; // Empty contents uint8_t first_byte = contents[0]; contents = contents.subspan(1); // Strip first byte. if (first_byte != 0) { // Short (1 byte) encoding. if (first_byte < std::size(V2_MESSAGE_IDS)) { // Valid short message id. return V2_MESSAGE_IDS[first_byte]; } else { // Unknown short message id. return std::nullopt; } } if (contents.size() < CMessageHeader::COMMAND_SIZE) { return std::nullopt; // Long encoding needs 12 message type bytes. } size_t msg_type_len{0}; while (msg_type_len < CMessageHeader::COMMAND_SIZE && contents[msg_type_len] != 0) { // Verify that message type bytes before the first 0x00 are in range. if (contents[msg_type_len] < ' ' || contents[msg_type_len] > 0x7F) { return {}; } ++msg_type_len; } std::string ret{reinterpret_cast(contents.data()), msg_type_len}; while (msg_type_len < CMessageHeader::COMMAND_SIZE) { // Verify that message type bytes after the first 0x00 are also 0x00. if (contents[msg_type_len] != 0) return {}; ++msg_type_len; } // Strip message type bytes of contents. contents = contents.subspan(CMessageHeader::COMMAND_SIZE); return ret; } CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) noexcept { AssertLockNotHeld(m_recv_mutex); LOCK(m_recv_mutex); if (m_recv_state == RecvState::V1) return m_v1_fallback.GetReceivedMessage(time, reject_message); Assume(m_recv_state == RecvState::APP_READY); Span contents{m_recv_decode_buffer}; auto msg_type = GetMessageType(contents); CDataStream ret(m_recv_type, m_recv_version); CNetMessage msg{std::move(ret)}; // Note that BIP324Cipher::EXPANSION also includes the length descriptor size. msg.m_raw_message_size = m_recv_decode_buffer.size() + BIP324Cipher::EXPANSION; if (msg_type) { reject_message = false; msg.m_type = std::move(*msg_type); msg.m_time = time; msg.m_message_size = contents.size(); msg.m_recv.resize(contents.size()); std::copy(contents.begin(), contents.end(), UCharCast(msg.m_recv.data())); } else { LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid); reject_message = true; } ClearShrink(m_recv_decode_buffer); SetReceiveState(RecvState::APP); return msg; } bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.SetMessageToSend(msg); // We only allow adding a new message to be sent when in the READY state (so the packet cipher // is available) and the send buffer is empty. This limits the number of messages in the send // buffer to just one, and leaves the responsibility for queueing them up to the caller. if (!(m_send_state == SendState::READY && m_send_buffer.empty())) return false; // Construct contents (encoding message type + payload). std::vector contents; auto short_message_id = V2_MESSAGE_MAP(msg.m_type); if (short_message_id) { contents.resize(1 + msg.data.size()); contents[0] = *short_message_id; std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1); } else { // Initialize with zeroes, and then write the message type string starting at offset 1. // This means contents[0] and the unused positions in contents[1..13] remain 0x00. contents.resize(1 + CMessageHeader::COMMAND_SIZE + msg.data.size(), 0); std::copy(msg.m_type.begin(), msg.m_type.end(), contents.data() + 1); std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1 + CMessageHeader::COMMAND_SIZE); } // Construct ciphertext in send buffer. m_send_buffer.resize(contents.size() + BIP324Cipher::EXPANSION); m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer)); m_send_type = msg.m_type; // Release memory ClearShrink(msg.data); return true; } Transport::BytesToSend V2Transport::GetBytesToSend(bool have_next_message) const noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.GetBytesToSend(have_next_message); if (m_send_state == SendState::MAYBE_V1) Assume(m_send_buffer.empty()); Assume(m_send_pos <= m_send_buffer.size()); return { Span{m_send_buffer}.subspan(m_send_pos), // We only have more to send after the current m_send_buffer if there is a (next) // message to be sent, and we're capable of sending packets. */ have_next_message && m_send_state == SendState::READY, m_send_type }; } void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent); if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) { LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid); } m_send_pos += bytes_sent; Assume(m_send_pos <= m_send_buffer.size()); if (m_send_pos >= CMessageHeader::HEADER_SIZE) { m_sent_v1_header_worth = true; } // Wipe the buffer when everything is sent. if (m_send_pos == m_send_buffer.size()) { m_send_pos = 0; ClearShrink(m_send_buffer); } } bool V2Transport::ShouldReconnectV1() const noexcept { AssertLockNotHeld(m_send_mutex); AssertLockNotHeld(m_recv_mutex); // Only outgoing connections need reconnection. if (!m_initiating) return false; LOCK(m_recv_mutex); // We only reconnect in the very first state and when the receive buffer is empty. Together // these conditions imply nothing has been received so far. if (m_recv_state != RecvState::KEY) return false; if (!m_recv_buffer.empty()) return false; // Check if we've sent enough for the other side to disconnect us (if it was V1). LOCK(m_send_mutex); return m_sent_v1_header_worth; } size_t V2Transport::GetSendMemoryUsage() const noexcept { AssertLockNotHeld(m_send_mutex); LOCK(m_send_mutex); if (m_send_state == SendState::V1) return m_v1_fallback.GetSendMemoryUsage(); return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer); } Transport::Info V2Transport::GetInfo() const noexcept { AssertLockNotHeld(m_recv_mutex); LOCK(m_recv_mutex); if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo(); Transport::Info info; // Do not report v2 and session ID until the version packet has been received // and verified (confirming that the other side very likely has the same keys as us). if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY && m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) { info.transport_type = TransportProtocolType::V2; info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID())); } else { info.transport_type = TransportProtocolType::DETECTING; } return info; } std::pair CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; bool data_left{false}; //!< second return value (whether unsent data remains) std::optional expected_more; while (true) { if (it != node.vSendMsg.end()) { // If possible, move one message from the send queue to the transport. This fails when // there is an existing message still being sent, or (for v2 transports) when the // handshake has not yet completed. size_t memusage = it->GetMemoryUsage(); if (node.m_transport->SetMessageToSend(*it)) { // Update memory usage of send buffer (as *it will be deleted). node.m_send_memusage -= memusage; ++it; } } const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end()); // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check, // verify that the previously returned 'more' was correct. if (expected_more.has_value()) Assume(!data.empty() == *expected_more); expected_more = more; data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; if (!data.empty()) { LOCK(node.m_sock_mutex); // There is no socket in case we've already disconnected, or in test cases without // real connections. In these cases, we bail out immediately and just leave things // in the send queue and transport. if (!node.m_sock) { break; } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE if (more) { flags |= MSG_MORE; } #endif nBytes = node.m_sock->Send(reinterpret_cast(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime(); node.nSendBytes += nBytes; // Notify transport that bytes have been processed. node.m_transport->MarkBytesSent(nBytes); // Update statistics per message type. if (!msg_type.empty()) { // don't report v2 handshake bytes for now node.AccountForSentBytes(msg_type, nBytes); } nSentSize += nBytes; if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more break; } } else { if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { LogPrint(BCLog::NET, "socket send error for peer=%d: %s\n", node.GetId(), NetworkErrorString(nErr)); node.CloseSocketDisconnect(); } } break; } } node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; if (it == node.vSendMsg.end()) { assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); return {nSentSize, data_left}; } /** Try to find a connection to evict when the node is full. * Extreme care must be taken to avoid opening the node to attacker * triggered network partitioning. * The strategy used here is to protect a small number of peers * for each of several distinct characteristics which are difficult * to forge. In order to partition a node the attacker must be * simultaneously better at all of them than honest peers. */ bool CConnman::AttemptToEvictConnection() { std::vector vEvictionCandidates; { LOCK(m_nodes_mutex); for (const CNode* node : m_nodes) { if (node->fDisconnect) continue; NodeEvictionCandidate candidate{ .id = node->GetId(), .m_connected = node->m_connected, .m_min_ping_time = node->m_min_ping_time, .m_last_block_time = node->m_last_block_time, .m_last_tx_time = node->m_last_tx_time, .fRelevantServices = node->m_has_all_wanted_services, .m_relay_txs = node->m_relays_txs.load(), .fBloomFilter = node->m_bloom_filter_loaded.load(), .nKeyedNetGroup = node->nKeyedNetGroup, .prefer_evict = node->m_prefer_evict, .m_is_local = node->addr.IsLocal(), .m_network = node->ConnectedThroughNetwork(), .m_noban = node->HasPermission(NetPermissionFlags::NoBan), .m_conn_type = node->m_conn_type, }; vEvictionCandidates.push_back(candidate); } } const std::optional node_id_to_evict = SelectNodeToEvict(std::move(vEvictionCandidates)); if (!node_id_to_evict) { return false; } LOCK(m_nodes_mutex); for (CNode* pnode : m_nodes) { if (pnode->GetId() == *node_id_to_evict) { LogPrint(BCLog::NET, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId()); pnode->fDisconnect = true; return true; } } return false; } void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { struct sockaddr_storage sockaddr; socklen_t len = sizeof(sockaddr); auto sock = hListenSocket.sock->Accept((struct sockaddr*)&sockaddr, &len); CAddress addr; if (!sock) { const int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK) { LogPrintf("socket error accept failed: %s\n", NetworkErrorString(nErr)); } return; } if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) { LogPrintLevel(BCLog::NET, BCLog::Level::Warning, "Unknown socket family\n"); } else { addr = CAddress{MaybeFlipIPv6toCJDNS(addr), NODE_NONE}; } const CAddress addr_bind{MaybeFlipIPv6toCJDNS(GetBindAddress(*sock)), NODE_NONE}; NetPermissionFlags permission_flags = NetPermissionFlags::None; hListenSocket.AddSocketPermissionFlags(permission_flags); CreateNodeFromAcceptedSocket(std::move(sock), permission_flags, addr_bind, addr); } void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, NetPermissionFlags permission_flags, const CAddress& addr_bind, const CAddress& addr) { int nInbound = 0; AddWhitelistPermissionFlags(permission_flags, addr); if (NetPermissions::HasFlag(permission_flags, NetPermissionFlags::Implicit)) { NetPermissions::ClearFlag(permission_flags, NetPermissionFlags::Implicit); if (gArgs.GetBoolArg("-whitelistforcerelay", DEFAULT_WHITELISTFORCERELAY)) NetPermissions::AddFlag(permission_flags, NetPermissionFlags::ForceRelay); if (gArgs.GetBoolArg("-whitelistrelay", DEFAULT_WHITELISTRELAY)) NetPermissions::AddFlag(permission_flags, NetPermissionFlags::Relay); NetPermissions::AddFlag(permission_flags, NetPermissionFlags::Mempool); NetPermissions::AddFlag(permission_flags, NetPermissionFlags::NoBan); } { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->IsInboundConn()) nInbound++; } } if (!fNetworkActive) { LogPrint(BCLog::NET, "connection from %s dropped: not accepting new connections\n", addr.ToStringAddrPort()); return; } if (!sock->IsSelectable()) { LogPrintf("connection from %s dropped: non-selectable socket\n", addr.ToStringAddrPort()); return; } // According to the internet TCP_NODELAY is not carried into accepted sockets // on all platforms. Set it again here just to be sure. const int on{1}; if (sock->SetSockOpt(IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) == SOCKET_ERROR) { LogPrint(BCLog::NET, "connection from %s: unable to set TCP_NODELAY, continuing anyway\n", addr.ToStringAddrPort()); } // Don't accept connections from banned peers. bool banned = m_banman && m_banman->IsBanned(addr); if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && banned) { LogPrint(BCLog::NET, "connection from %s dropped (banned)\n", addr.ToStringAddrPort()); return; } // Only accept connections from discouraged peers if our inbound slots aren't (almost) full. bool discouraged = m_banman && m_banman->IsDiscouraged(addr); if (!NetPermissions::HasFlag(permission_flags, NetPermissionFlags::NoBan) && nInbound + 1 >= m_max_inbound && discouraged) { LogPrint(BCLog::NET, "connection from %s dropped (discouraged)\n", addr.ToStringAddrPort()); return; } if (nInbound >= m_max_inbound) { if (!AttemptToEvictConnection()) { // No connection to evict, disconnect the new connection LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full)\n"); return; } } NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); ServiceFlags nodeServices = nLocalServices; if (NetPermissions::HasFlag(permission_flags, NetPermissionFlags::BloomFilter)) { nodeServices = static_cast(nodeServices | NODE_BLOOM); } const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end(); // The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is // detected, so use it whenever we signal NODE_P2P_V2. const bool use_v2transport(nodeServices & NODE_P2P_V2); CNode* pnode = new CNode(id, std::move(sock), addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, /*addrNameIn=*/"", ConnectionType::INBOUND, inbound_onion, CNodeOptions{ .permission_flags = permission_flags, .prefer_evict = discouraged, .recv_flood_size = nReceiveFloodSize, .use_v2transport = use_v2transport, }); pnode->AddRef(); m_msgproc->InitializeNode(*pnode, nodeServices); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToStringAddrPort()); { LOCK(m_nodes_mutex); m_nodes.push_back(pnode); } // We received a new connection, harvest entropy from the time (and our peer count) RandAddEvent((uint32_t)id); } bool CConnman::AddConnection(const std::string& address, ConnectionType conn_type) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); std::optional max_connections; switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; break; case ConnectionType::BLOCK_RELAY: max_connections = m_max_outbound_block_relay; break; // no limit for ADDR_FETCH because -seednode has no limit either case ConnectionType::ADDR_FETCH: break; // no limit for FEELER connections since they're short-lived case ConnectionType::FEELER: break; } // no default case, so the compiler can warn about missing cases // Count existing connections int existing_connections = WITH_LOCK(m_nodes_mutex, return std::count_if(m_nodes.begin(), m_nodes.end(), [conn_type](CNode* node) { return node->m_conn_type == conn_type; });); // Max connections of specified type already exist if (max_connections != std::nullopt && existing_connections >= max_connections) return false; // Max total outbound connections already exist CSemaphoreGrant grant(*semOutbound, true); if (!grant) return false; OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false); return true; } void CConnman::DisconnectNodes() { AssertLockNotHeld(m_nodes_mutex); AssertLockNotHeld(m_reconnections_mutex); // Use a temporary variable to accumulate desired reconnections, so we don't need // m_reconnections_mutex while holding m_nodes_mutex. decltype(m_reconnections) reconnections_to_add; { LOCK(m_nodes_mutex); if (!fNetworkActive) { // Disconnect any connected nodes for (CNode* pnode : m_nodes) { if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId()); pnode->fDisconnect = true; } } } // Disconnect unused nodes std::vector nodes_copy = m_nodes; for (CNode* pnode : nodes_copy) { if (pnode->fDisconnect) { // remove from m_nodes m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end()); // Add to reconnection list if appropriate. We don't reconnect right here, because // the creation of a connection is a blocking operation (up to several seconds), // and we don't want to hold up the socket handler thread for that long. if (pnode->m_transport->ShouldReconnectV1()) { reconnections_to_add.push_back({ .addr_connect = pnode->addr, .grant = std::move(pnode->grantOutbound), .destination = pnode->m_dest, .conn_type = pnode->m_conn_type, .use_v2transport = false}); LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId()); } // release outbound grant (if any) pnode->grantOutbound.Release(); // close socket and cleanup pnode->CloseSocketDisconnect(); // update connection count by network if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()]; // hold in disconnected pool until all refs are released pnode->Release(); m_nodes_disconnected.push_back(pnode); } } } { // Delete disconnected nodes std::list nodes_disconnected_copy = m_nodes_disconnected; for (CNode* pnode : nodes_disconnected_copy) { // Destroy the object only after other threads have stopped using it. if (pnode->GetRefCount() <= 0) { m_nodes_disconnected.remove(pnode); DeleteNode(pnode); } } } { // Move entries from reconnections_to_add to m_reconnections. LOCK(m_reconnections_mutex); m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add)); } } void CConnman::NotifyNumConnectionsChanged() { size_t nodes_size; { LOCK(m_nodes_mutex); nodes_size = m_nodes.size(); } if(nodes_size != nPrevNodeCount) { nPrevNodeCount = nodes_size; if (m_client_interface) { m_client_interface->NotifyNumConnectionsChanged(nodes_size); } } } bool CConnman::ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const { return node.m_connected + m_peer_connect_timeout < now; } bool CConnman::InactivityCheck(const CNode& node) const { // Tests that see disconnects after using mocktime can start nodes with a // large timeout. For example, -peertimeout=999999999. const auto now{GetTime()}; const auto last_send{node.m_last_send.load()}; const auto last_recv{node.m_last_recv.load()}; if (!ShouldRunInactivityChecks(node, now)) return false; if (last_recv.count() == 0 || last_send.count() == 0) { LogPrint(BCLog::NET, "socket no message in first %i seconds, %d %d peer=%d\n", count_seconds(m_peer_connect_timeout), last_recv.count() != 0, last_send.count() != 0, node.GetId()); return true; } if (now > last_send + TIMEOUT_INTERVAL) { LogPrint(BCLog::NET, "socket sending timeout: %is peer=%d\n", count_seconds(now - last_send), node.GetId()); return true; } if (now > last_recv + TIMEOUT_INTERVAL) { LogPrint(BCLog::NET, "socket receive timeout: %is peer=%d\n", count_seconds(now - last_recv), node.GetId()); return true; } if (!node.fSuccessfullyConnected) { LogPrint(BCLog::NET, "version handshake timeout peer=%d\n", node.GetId()); return true; } return false; } Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) { Sock::EventsPerSock events_per_sock; for (const ListenSocket& hListenSocket : vhListenSocket) { events_per_sock.emplace(hListenSocket.sock, Sock::Events{Sock::RECV}); } for (CNode* pnode : nodes) { bool select_recv = !pnode->fPauseRecv; bool select_send; { LOCK(pnode->cs_vSend); // Sending is possible if either there are bytes to send right now, or if there will be // once a potential message from vSendMsg is handed to the transport. GetBytesToSend // determines both of these in a single call. const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty()); select_send = !to_send.empty() || more; } if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); if (pnode->m_sock) { Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); events_per_sock.emplace(pnode->m_sock, Sock::Events{event}); } } return events_per_sock; } void CConnman::SocketHandler() { AssertLockNotHeld(m_total_bytes_sent_mutex); Sock::EventsPerSock events_per_sock; { const NodesSnapshot snap{*this, /*shuffle=*/false}; const auto timeout = std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS); // Check for the readiness of the already connected sockets and the // listening sockets in one call ("readiness" as in poll(2) or // select(2)). If none are ready, wait for a short while and return // empty sets. events_per_sock = GenerateWaitSockets(snap.Nodes()); if (events_per_sock.empty() || !events_per_sock.begin()->first->WaitMany(timeout, events_per_sock)) { interruptNet.sleep_for(timeout); } // Service (send/receive) each of the already connected nodes. SocketHandlerConnected(snap.Nodes(), events_per_sock); } // Accept new connections from listening sockets. SocketHandlerListening(events_per_sock); } void CConnman::SocketHandlerConnected(const std::vector& nodes, const Sock::EventsPerSock& events_per_sock) { AssertLockNotHeld(m_total_bytes_sent_mutex); for (CNode* pnode : nodes) { if (interruptNet) return; // // Receive // bool recvSet = false; bool sendSet = false; bool errorSet = false; { LOCK(pnode->m_sock_mutex); if (!pnode->m_sock) { continue; } const auto it = events_per_sock.find(pnode->m_sock); if (it != events_per_sock.end()) { recvSet = it->second.occurred & Sock::RECV; sendSet = it->second.occurred & Sock::SEND; errorSet = it->second.occurred & Sock::ERR; } } if (sendSet) { // Send data auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); if (bytes_sent) { RecordBytesSent(bytes_sent); // If both receiving and (non-optimistic) sending were possible, we first attempt // sending. If that succeeds, but does not fully drain the send queue, do not // attempt to receive. This avoids needlessly queueing data if the remote peer // is slow at receiving data, by means of TCP flow control. We only do this when // sending actually succeeded to make sure progress is always made; otherwise a // deadlock would be possible when both sides have data to send, but neither is // receiving. if (data_left) recvSet = false; } } if (recvSet || errorSet) { // typical socket buffer is 8K-64K uint8_t pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->m_sock_mutex); if (!pnode->m_sock) { continue; } nBytes = pnode->m_sock->Recv(pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } if (nBytes > 0) { bool notify = false; if (!pnode->ReceiveMsgBytes({pchBuf, (size_t)nBytes}, notify)) { pnode->CloseSocketDisconnect(); } RecordBytesRecv(nBytes); if (notify) { pnode->MarkReceivedMsgsForProcessing(); WakeMessageHandler(); } } else if (nBytes == 0) { // socket closed gracefully if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed for peer=%d\n", pnode->GetId()); } pnode->CloseSocketDisconnect(); } else if (nBytes < 0) { // error int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) { if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket recv error for peer=%d: %s\n", pnode->GetId(), NetworkErrorString(nErr)); } pnode->CloseSocketDisconnect(); } } } if (InactivityCheck(*pnode)) pnode->fDisconnect = true; } } void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) { for (const ListenSocket& listen_socket : vhListenSocket) { if (interruptNet) { return; } const auto it = events_per_sock.find(listen_socket.sock); if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) { AcceptConnection(listen_socket); } } } void CConnman::ThreadSocketHandler() { AssertLockNotHeld(m_total_bytes_sent_mutex); while (!interruptNet) { DisconnectNodes(); NotifyNumConnectionsChanged(); SocketHandler(); } } void CConnman::WakeMessageHandler() { { LOCK(mutexMsgProc); fMsgProcWake = true; } condMsgProc.notify_one(); } void CConnman::ThreadDNSAddressSeed() { FastRandomContext rng; std::vector seeds = m_params.DNSSeeds(); Shuffle(seeds.begin(), seeds.end(), rng); int seeds_right_now = 0; // Number of seeds left before testing if we have enough connections int found = 0; if (gArgs.GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED)) { // When -forcednsseed is provided, query all. seeds_right_now = seeds.size(); } else if (addrman.Size() == 0) { // If we have no known peers, query all. // This will occur on the first run, or if peers.dat has been // deleted. seeds_right_now = seeds.size(); } // goal: only query DNS seed if address need is acute // * If we have a reasonable number of peers in addrman, spend // some time trying them first. This improves user privacy by // creating fewer identifying DNS requests, reduces trust by // giving seeds less influence on the network topology, and // reduces traffic to the seeds. // * When querying DNS seeds query a few at once, this ensures // that we don't give DNS seeds the ability to eclipse nodes // that query them. // * If we continue having problems, eventually query all the // DNS seeds, and if that fails too, also try the fixed seeds. // (done in ThreadOpenConnections) const std::chrono::seconds seeds_wait_time = (addrman.Size() >= DNSSEEDS_DELAY_PEER_THRESHOLD ? DNSSEEDS_DELAY_MANY_PEERS : DNSSEEDS_DELAY_FEW_PEERS); for (const std::string& seed : seeds) { if (seeds_right_now == 0) { seeds_right_now += DNSSEEDS_TO_QUERY_AT_ONCE; if (addrman.Size() > 0) { LogPrintf("Waiting %d seconds before querying DNS seeds.\n", seeds_wait_time.count()); std::chrono::seconds to_wait = seeds_wait_time; while (to_wait.count() > 0) { // if sleeping for the MANY_PEERS interval, wake up // early to see if we have enough peers and can stop // this thread entirely freeing up its resources std::chrono::seconds w = std::min(DNSSEEDS_DELAY_FEW_PEERS, to_wait); if (!interruptNet.sleep_for(w)) return; to_wait -= w; int nRelevant = 0; { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->fSuccessfullyConnected && pnode->IsFullOutboundConn()) ++nRelevant; } } if (nRelevant >= 2) { if (found > 0) { LogPrintf("%d addresses found from DNS seeds\n", found); LogPrintf("P2P peers available. Finished DNS seeding.\n"); } else { LogPrintf("P2P peers available. Skipped DNS seeding.\n"); } return; } } } } if (interruptNet) return; // hold off on querying seeds if P2P network deactivated if (!fNetworkActive) { LogPrintf("Waiting for network to be reactivated before querying DNS seeds.\n"); do { if (!interruptNet.sleep_for(std::chrono::seconds{1})) return; } while (!fNetworkActive); } LogPrintf("Loading addresses from DNS seed %s\n", seed); // If -proxy is in use, we make an ADDR_FETCH connection to the DNS resolved peer address // for the base dns seed domain in chainparams if (HaveNameProxy()) { AddAddrFetch(seed); } else { std::vector vAdd; ServiceFlags requiredServiceBits = GetDesirableServiceFlags(NODE_NONE); std::string host = strprintf("x%x.%s", requiredServiceBits, seed); CNetAddr resolveSource; if (!resolveSource.SetInternal(host)) { continue; } unsigned int nMaxIPs = 256; // Limits number of IPs learned from a DNS seed const auto addresses{LookupHost(host, nMaxIPs, true)}; if (!addresses.empty()) { for (const CNetAddr& ip : addresses) { CAddress addr = CAddress(CService(ip, m_params.GetDefaultPort()), requiredServiceBits); addr.nTime = rng.rand_uniform_delay(Now() - 3 * 24h, -4 * 24h); // use a random age between 3 and 7 days old vAdd.push_back(addr); found++; } addrman.Add(vAdd, resolveSource); } else { // If the seed does not support a subdomain with our desired service bits, // we make an ADDR_FETCH connection to the DNS resolved peer address for the // base dns seed domain in chainparams AddAddrFetch(seed); } } --seeds_right_now; } LogPrintf("%d addresses found from DNS seeds\n", found); } void CConnman::DumpAddresses() { const auto start{SteadyClock::now()}; DumpPeerAddresses(::gArgs, addrman); LogPrint(BCLog::NET, "Flushed %d addresses to peers.dat %dms\n", addrman.Size(), Ticks(SteadyClock::now() - start)); } void CConnman::ProcessAddrFetch() { AssertLockNotHeld(m_unused_i2p_sessions_mutex); std::string strDest; { LOCK(m_addr_fetches_mutex); if (m_addr_fetches.empty()) return; strDest = m_addr_fetches.front(); m_addr_fetches.pop_front(); } CAddress addr; CSemaphoreGrant grant(*semOutbound, /*fTry=*/true); if (grant) { OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, /*use_v2transport=*/false); } } bool CConnman::GetTryNewOutboundPeer() const { return m_try_another_outbound_peer; } void CConnman::SetTryNewOutboundPeer(bool flag) { m_try_another_outbound_peer = flag; LogPrint(BCLog::NET, "setting try another outbound peer=%s\n", flag ? "true" : "false"); } void CConnman::StartExtraBlockRelayPeers() { LogPrint(BCLog::NET, "enabling extra block-relay-only peers\n"); m_start_extra_block_relay_peers = true; } // Return the number of peers we have over our outbound connection limit // Exclude peers that are marked for disconnect, or are going to be // disconnected soon (eg ADDR_FETCH and FEELER) // Also exclude peers that haven't finished initial connection handshake yet // (so that we don't decide we're over our desired connection limit, and then // evict some peer that has finished the handshake) int CConnman::GetExtraFullOutboundCount() const { int full_outbound_peers = 0; { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsFullOutboundConn()) { ++full_outbound_peers; } } } return std::max(full_outbound_peers - m_max_outbound_full_relay, 0); } int CConnman::GetExtraBlockRelayCount() const { int block_relay_peers = 0; { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->fSuccessfullyConnected && !pnode->fDisconnect && pnode->IsBlockOnlyConn()) { ++block_relay_peers; } } } return std::max(block_relay_peers - m_max_outbound_block_relay, 0); } std::unordered_set CConnman::GetReachableEmptyNetworks() const { std::unordered_set networks{}; for (int n = 0; n < NET_MAX; n++) { enum Network net = (enum Network)n; if (net == NET_UNROUTABLE || net == NET_INTERNAL) continue; if (g_reachable_nets.Contains(net) && addrman.Size(net, std::nullopt) == 0) { networks.insert(net); } } return networks; } bool CConnman::MultipleManualOrFullOutboundConns(Network net) const { AssertLockHeld(m_nodes_mutex); return m_network_conn_counts[net] > 1; } bool CConnman::MaybePickPreferredNetwork(std::optional& network) { std::array nets{NET_IPV4, NET_IPV6, NET_ONION, NET_I2P, NET_CJDNS}; Shuffle(nets.begin(), nets.end(), FastRandomContext()); LOCK(m_nodes_mutex); for (const auto net : nets) { if (g_reachable_nets.Contains(net) && m_network_conn_counts[net] == 0 && addrman.Size(net) != 0) { network = net; return true; } } return false; } void CConnman::ThreadOpenConnections(const std::vector connect) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_reconnections_mutex); FastRandomContext rng; // Connect to specific addresses if (!connect.empty()) { for (int64_t nLoop = 0;; nLoop++) { for (const std::string& strAddr : connect) { CAddress addr(CService(), NODE_NONE); OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/false); for (int i = 0; i < 10 && i < nLoop; i++) { if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; } } if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; } } // Initiate network connections auto start = GetTime(); // Minimum time before next feeler connection (in microseconds). auto next_feeler = GetExponentialRand(start, FEELER_INTERVAL); auto next_extra_block_relay = GetExponentialRand(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); auto next_extra_network_peer{GetExponentialRand(start, EXTRA_NETWORK_PEER_INTERVAL)}; const bool dnsseed = gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED); bool add_fixed_seeds = gArgs.GetBoolArg("-fixedseeds", DEFAULT_FIXEDSEEDS); const bool use_seednodes{gArgs.IsArgSet("-seednode")}; if (!add_fixed_seeds) { LogPrintf("Fixed seeds are disabled\n"); } while (!interruptNet) { ProcessAddrFetch(); if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; PerformReconnections(); CSemaphoreGrant grant(*semOutbound); if (interruptNet) return; const std::unordered_set fixed_seed_networks{GetReachableEmptyNetworks()}; if (add_fixed_seeds && !fixed_seed_networks.empty()) { // When the node starts with an empty peers.dat, there are a few other sources of peers before // we fallback on to fixed seeds: -dnsseed, -seednode, -addnode // If none of those are available, we fallback on to fixed seeds immediately, else we allow // 60 seconds for any of those sources to populate addrman. bool add_fixed_seeds_now = false; // It is cheapest to check if enough time has passed first. if (GetTime() > start + std::chrono::minutes{1}) { add_fixed_seeds_now = true; LogPrintf("Adding fixed seeds as 60 seconds have passed and addrman is empty for at least one reachable network\n"); } // Perform cheap checks before locking a mutex. else if (!dnsseed && !use_seednodes) { LOCK(m_added_nodes_mutex); if (m_added_node_params.empty()) { add_fixed_seeds_now = true; LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n"); } } if (add_fixed_seeds_now) { std::vector seed_addrs{ConvertSeeds(m_params.FixedSeeds())}; // We will not make outgoing connections to peers that are unreachable // (e.g. because of -onlynet configuration). // Therefore, we do not add them to addrman in the first place. // In case previously unreachable networks become reachable // (e.g. in case of -onlynet changes by the user), fixed seeds will // be loaded only for networks for which we have no addresses. seed_addrs.erase(std::remove_if(seed_addrs.begin(), seed_addrs.end(), [&fixed_seed_networks](const CAddress& addr) { return fixed_seed_networks.count(addr.GetNetwork()) == 0; }), seed_addrs.end()); CNetAddr local; local.SetInternal("fixedseeds"); addrman.Add(seed_addrs, local); add_fixed_seeds = false; LogPrintf("Added %d fixed seeds from reachable networks.\n", seed_addrs.size()); } } // // Choose an address to connect to based on most recently seen // CAddress addrConnect; // Only connect out to one peer per ipv4/ipv6 network group (/16 for IPv4). int nOutboundFullRelay = 0; int nOutboundBlockRelay = 0; int outbound_privacy_network_peers = 0; std::set> outbound_ipv46_peer_netgroups; { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->IsFullOutboundConn()) nOutboundFullRelay++; if (pnode->IsBlockOnlyConn()) nOutboundBlockRelay++; // Make sure our persistent outbound slots to ipv4/ipv6 peers belong to different netgroups. switch (pnode->m_conn_type) { // We currently don't take inbound connections into account. Since they are // free to make, an attacker could make them to prevent us from connecting to // certain peers. case ConnectionType::INBOUND: // Short-lived outbound connections should not affect how we select outbound // peers from addrman. case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: break; case ConnectionType::MANUAL: case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::BLOCK_RELAY: const CAddress address{pnode->addr}; if (address.IsTor() || address.IsI2P() || address.IsCJDNS()) { // Since our addrman-groups for these networks are // random, without relation to the route we // take to connect to these peers or to the // difficulty in obtaining addresses with diverse // groups, we don't worry about diversity with // respect to our addrman groups when connecting to // these networks. ++outbound_privacy_network_peers; } else { outbound_ipv46_peer_netgroups.insert(m_netgroupman.GetGroup(address)); } } // no default case, so the compiler can warn about missing cases } } ConnectionType conn_type = ConnectionType::OUTBOUND_FULL_RELAY; auto now = GetTime(); bool anchor = false; bool fFeeler = false; std::optional preferred_net; // Determine what type of connection to open. Opening // BLOCK_RELAY connections to addresses from anchors.dat gets the highest // priority. Then we open OUTBOUND_FULL_RELAY priority until we // meet our full-relay capacity. Then we open BLOCK_RELAY connection // until we hit our block-relay-only peer limit. // GetTryNewOutboundPeer() gets set when a stale tip is detected, so we // try opening an additional OUTBOUND_FULL_RELAY connection. If none of // these conditions are met, check to see if it's time to try an extra // block-relay-only peer (to confirm our tip is current, see below) or the next_feeler // timer to decide if we should open a FEELER. if (!m_anchors.empty() && (nOutboundBlockRelay < m_max_outbound_block_relay)) { conn_type = ConnectionType::BLOCK_RELAY; anchor = true; } else if (nOutboundFullRelay < m_max_outbound_full_relay) { // OUTBOUND_FULL_RELAY } else if (nOutboundBlockRelay < m_max_outbound_block_relay) { conn_type = ConnectionType::BLOCK_RELAY; } else if (GetTryNewOutboundPeer()) { // OUTBOUND_FULL_RELAY } else if (now > next_extra_block_relay && m_start_extra_block_relay_peers) { // Periodically connect to a peer (using regular outbound selection // methodology from addrman) and stay connected long enough to sync // headers, but not much else. // // Then disconnect the peer, if we haven't learned anything new. // // The idea is to make eclipse attacks very difficult to pull off, // because every few minutes we're finding a new peer to learn headers // from. // // This is similar to the logic for trying extra outbound (full-relay) // peers, except: // - we do this all the time on an exponential timer, rather than just when // our tip is stale // - we potentially disconnect our next-youngest block-relay-only peer, if our // newest block-relay-only peer delivers a block more recently. // See the eviction logic in net_processing.cpp. // // Because we can promote these connections to block-relay-only // connections, they do not get their own ConnectionType enum // (similar to how we deal with extra outbound peers). next_extra_block_relay = GetExponentialRand(now, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); conn_type = ConnectionType::BLOCK_RELAY; } else if (now > next_feeler) { next_feeler = GetExponentialRand(now, FEELER_INTERVAL); conn_type = ConnectionType::FEELER; fFeeler = true; } else if (nOutboundFullRelay == m_max_outbound_full_relay && m_max_outbound_full_relay == MAX_OUTBOUND_FULL_RELAY_CONNECTIONS && now > next_extra_network_peer && MaybePickPreferredNetwork(preferred_net)) { // Full outbound connection management: Attempt to get at least one // outbound peer from each reachable network by making extra connections // and then protecting "only" peers from a network during outbound eviction. // This is not attempted if the user changed -maxconnections to a value // so low that less than MAX_OUTBOUND_FULL_RELAY_CONNECTIONS are made, // to prevent interactions with otherwise protected outbound peers. next_extra_network_peer = GetExponentialRand(now, EXTRA_NETWORK_PEER_INTERVAL); } else { // skip to next iteration of while loop continue; } addrman.ResolveCollisions(); const auto current_time{NodeClock::now()}; int nTries = 0; while (!interruptNet) { if (anchor && !m_anchors.empty()) { const CAddress addr = m_anchors.back(); m_anchors.pop_back(); if (!addr.IsValid() || IsLocal(addr) || !g_reachable_nets.Contains(addr) || !HasAllDesirableServiceFlags(addr.nServices) || outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) continue; addrConnect = addr; LogPrint(BCLog::NET, "Trying to make an anchor connection to %s\n", addrConnect.ToStringAddrPort()); break; } // If we didn't find an appropriate destination after trying 100 addresses fetched from addrman, // stop this loop, and let the outer loop run again (which sleeps, adds seed nodes, recalculates // already-connected network ranges, ...) before trying new addrman addresses. nTries++; if (nTries > 100) break; CAddress addr; NodeSeconds addr_last_try{0s}; if (fFeeler) { // First, try to get a tried table collision address. This returns // an empty (invalid) address if there are no collisions to try. std::tie(addr, addr_last_try) = addrman.SelectTriedCollision(); if (!addr.IsValid()) { // No tried table collisions. Select a new table address // for our feeler. std::tie(addr, addr_last_try) = addrman.Select(true); } else if (AlreadyConnectedToAddress(addr)) { // If test-before-evict logic would have us connect to a // peer that we're already connected to, just mark that // address as Good(). We won't be able to initiate the // connection anyway, so this avoids inadvertently evicting // a currently-connected peer. addrman.Good(addr); // Select a new table address for our feeler instead. std::tie(addr, addr_last_try) = addrman.Select(true); } } else { // Not a feeler // If preferred_net has a value set, pick an extra outbound // peer from that network. The eviction logic in net_processing // ensures that a peer from another network will be evicted. std::tie(addr, addr_last_try) = addrman.Select(false, preferred_net); } // Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) { continue; } // if we selected an invalid or local address, restart if (!addr.IsValid() || IsLocal(addr)) { break; } if (!g_reachable_nets.Contains(addr)) { continue; } // only consider very recently tried nodes after 30 failed attempts if (current_time - addr_last_try < 10min && nTries < 30) { continue; } // for non-feelers, require all the services we'll want, // for feelers, only require they be a full node (only because most // SPV clients don't have a good address DB available) if (!fFeeler && !HasAllDesirableServiceFlags(addr.nServices)) { continue; } else if (fFeeler && !MayHaveUsefulAddressDB(addr.nServices)) { continue; } // Do not connect to bad ports, unless 50 invalid addresses have been selected already. if (nTries < 50 && (addr.IsIPv4() || addr.IsIPv6()) && IsBadPort(addr.GetPort())) { continue; } addrConnect = addr; break; } if (addrConnect.IsValid()) { if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. if (!interruptNet.sleep_for(rng.rand_uniform_duration(FEELER_SLEEP_WINDOW))) { return; } LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort()); } if (preferred_net != std::nullopt) LogPrint(BCLog::NET, "Making network specific connection to %s on %s.\n", addrConnect.ToStringAddrPort(), GetNetworkName(preferred_net.value())); // Record addrman failure attempts when node has at least 2 persistent outbound connections to peers with // different netgroups in ipv4/ipv6 networks + all peers in Tor/I2P/CJDNS networks. // Don't record addrman failure attempts when node is offline. This can be identified since all local // network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1. const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(m_max_automatic_connections - 1, 2)}; // Use BIP324 transport when both us and them have NODE_V2_P2P set. const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2); OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport); } } } std::vector CConnman::GetCurrentBlockRelayOnlyConns() const { std::vector ret; LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->IsBlockOnlyConn()) { ret.push_back(pnode->addr); } } return ret; } std::vector CConnman::GetAddedNodeInfo() const { std::vector ret; std::list lAddresses(0); { LOCK(m_added_nodes_mutex); ret.reserve(m_added_node_params.size()); std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses)); } // Build a map of all already connected addresses (by IP:port and by name) to inbound/outbound and resolved CService std::map mapConnected; std::map> mapConnectedByName; { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { if (pnode->addr.IsValid()) { mapConnected[pnode->addr] = pnode->IsInboundConn(); } std::string addrName{pnode->m_addr_name}; if (!addrName.empty()) { mapConnectedByName[std::move(addrName)] = std::make_pair(pnode->IsInboundConn(), static_cast(pnode->addr)); } } } for (const auto& addr : lAddresses) { CService service(LookupNumeric(addr.m_added_node, GetDefaultPort(addr.m_added_node))); AddedNodeInfo addedNode{addr, CService(), false, false}; if (service.IsValid()) { // strAddNode is an IP:port auto it = mapConnected.find(service); if (it != mapConnected.end()) { addedNode.resolvedAddress = service; addedNode.fConnected = true; addedNode.fInbound = it->second; } } else { // strAddNode is a name auto it = mapConnectedByName.find(addr.m_added_node); if (it != mapConnectedByName.end()) { addedNode.resolvedAddress = it->second.second; addedNode.fConnected = true; addedNode.fInbound = it->second.first; } } ret.emplace_back(std::move(addedNode)); } return ret; } void CConnman::ThreadOpenAddedConnections() { AssertLockNotHeld(m_unused_i2p_sessions_mutex); AssertLockNotHeld(m_reconnections_mutex); while (true) { CSemaphoreGrant grant(*semAddnode); std::vector vInfo = GetAddedNodeInfo(); bool tried = false; for (const AddedNodeInfo& info : vInfo) { if (!info.fConnected) { if (!grant) { // If we've used up our semaphore and need a new one, let's not wait here since while we are waiting // the addednodeinfo state might change. break; } tried = true; CAddress addr(CService(), NODE_NONE); OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport); if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return; grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true); } } // Retry every 60 seconds if a connection was attempted, otherwise two seconds if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2))) return; // See if any reconnections are desired. PerformReconnections(); } } // if successful, this moves the passed grant to the constructed node void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); assert(conn_type != ConnectionType::INBOUND); // // Initiate outbound network connection // if (interruptNet) { return; } if (!fNetworkActive) { return; } if (!pszDest) { bool banned_or_discouraged = m_banman && (m_banman->IsDiscouraged(addrConnect) || m_banman->IsBanned(addrConnect)); if (IsLocal(addrConnect) || banned_or_discouraged || AlreadyConnectedToAddress(addrConnect)) { return; } } else if (FindNode(std::string(pszDest))) return; CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport); if (!pnode) return; pnode->grantOutbound = std::move(grant_outbound); m_msgproc->InitializeNode(*pnode, nLocalServices); { LOCK(m_nodes_mutex); m_nodes.push_back(pnode); // update connection count by network if (pnode->IsManualOrFullOutboundConn()) ++m_network_conn_counts[pnode->addr.GetNetwork()]; } } Mutex NetEventsInterface::g_msgproc_mutex; void CConnman::ThreadMessageHandler() { LOCK(NetEventsInterface::g_msgproc_mutex); while (!flagInterruptMsgProc) { bool fMoreWork = false; { // Randomize the order in which we process messages from/to our peers. // This prevents attacks in which an attacker exploits having multiple // consecutive connections in the m_nodes list. const NodesSnapshot snap{*this, /*shuffle=*/true}; for (CNode* pnode : snap.Nodes()) { if (pnode->fDisconnect) continue; // Receive messages bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; // Send messages m_msgproc->SendMessages(pnode); if (flagInterruptMsgProc) return; } } WAIT_LOCK(mutexMsgProc, lock); if (!fMoreWork) { condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this]() EXCLUSIVE_LOCKS_REQUIRED(mutexMsgProc) { return fMsgProcWake; }); } fMsgProcWake = false; } } void CConnman::ThreadI2PAcceptIncoming() { static constexpr auto err_wait_begin = 1s; static constexpr auto err_wait_cap = 5min; auto err_wait = err_wait_begin; bool advertising_listen_addr = false; i2p::Connection conn; auto SleepOnFailure = [&]() { interruptNet.sleep_for(err_wait); if (err_wait < err_wait_cap) { err_wait += 1s; } }; while (!interruptNet) { if (!m_i2p_sam_session->Listen(conn)) { if (advertising_listen_addr && conn.me.IsValid()) { RemoveLocal(conn.me); advertising_listen_addr = false; } SleepOnFailure(); continue; } if (!advertising_listen_addr) { AddLocal(conn.me, LOCAL_MANUAL); advertising_listen_addr = true; } if (!m_i2p_sam_session->Accept(conn)) { SleepOnFailure(); continue; } CreateNodeFromAcceptedSocket(std::move(conn.sock), NetPermissionFlags::None, CAddress{conn.me, NODE_NONE}, CAddress{conn.peer, NODE_NONE}); err_wait = err_wait_begin; } } bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions) { int nOne = 1; // Create socket for listening for incoming connections struct sockaddr_storage sockaddr; socklen_t len = sizeof(sockaddr); if (!addrBind.GetSockAddr((struct sockaddr*)&sockaddr, &len)) { strError = strprintf(Untranslated("Bind address family for %s not supported"), addrBind.ToStringAddrPort()); LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); return false; } std::unique_ptr sock = CreateSock(addrBind); if (!sock) { strError = strprintf(Untranslated("Couldn't open socket for incoming connections (socket returned error %s)"), NetworkErrorString(WSAGetLastError())); LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); return false; } // Allow binding if the port is still in TIME_WAIT state after // the program was closed and restarted. if (sock->SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { strError = strprintf(Untranslated("Error setting SO_REUSEADDR on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError.original); } // some systems don't have IPV6_V6ONLY but are always v6only; others do have the option // and enable it by default or not. Try to enable it, if possible. if (addrBind.IsIPv6()) { #ifdef IPV6_V6ONLY if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (sockopt_arg_type)&nOne, sizeof(int)) == SOCKET_ERROR) { strError = strprintf(Untranslated("Error setting IPV6_V6ONLY on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError.original); } #endif #ifdef WIN32 int nProtLevel = PROTECTION_LEVEL_UNRESTRICTED; if (sock->SetSockOpt(IPPROTO_IPV6, IPV6_PROTECTION_LEVEL, (const char*)&nProtLevel, sizeof(int)) == SOCKET_ERROR) { strError = strprintf(Untranslated("Error setting IPV6_PROTECTION_LEVEL on socket: %s, continuing anyway"), NetworkErrorString(WSAGetLastError())); LogPrintf("%s\n", strError.original); } #endif } if (sock->Bind(reinterpret_cast(&sockaddr), len) == SOCKET_ERROR) { int nErr = WSAGetLastError(); if (nErr == WSAEADDRINUSE) strError = strprintf(_("Unable to bind to %s on this computer. %s is probably already running."), addrBind.ToStringAddrPort(), PACKAGE_NAME); else strError = strprintf(_("Unable to bind to %s on this computer (bind returned error %s)"), addrBind.ToStringAddrPort(), NetworkErrorString(nErr)); LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); return false; } LogPrintf("Bound to %s\n", addrBind.ToStringAddrPort()); // Listen for incoming connections if (sock->Listen(SOMAXCONN) == SOCKET_ERROR) { strError = strprintf(_("Listening for incoming connections failed (listen returned error %s)"), NetworkErrorString(WSAGetLastError())); LogPrintLevel(BCLog::NET, BCLog::Level::Error, "%s\n", strError.original); return false; } vhListenSocket.emplace_back(std::move(sock), permissions); return true; } void Discover() { if (!fDiscover) return; #ifdef WIN32 // Get local host IP char pszHostName[256] = ""; if (gethostname(pszHostName, sizeof(pszHostName)) != SOCKET_ERROR) { const std::vector addresses{LookupHost(pszHostName, 0, true)}; for (const CNetAddr& addr : addresses) { if (AddLocal(addr, LOCAL_IF)) LogPrintf("%s: %s - %s\n", __func__, pszHostName, addr.ToStringAddr()); } } #elif (HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS) // Get local host ip struct ifaddrs* myaddrs; if (getifaddrs(&myaddrs) == 0) { for (struct ifaddrs* ifa = myaddrs; ifa != nullptr; ifa = ifa->ifa_next) { if (ifa->ifa_addr == nullptr) continue; if ((ifa->ifa_flags & IFF_UP) == 0) continue; if (strcmp(ifa->ifa_name, "lo") == 0) continue; if (strcmp(ifa->ifa_name, "lo0") == 0) continue; if (ifa->ifa_addr->sa_family == AF_INET) { struct sockaddr_in* s4 = (struct sockaddr_in*)(ifa->ifa_addr); CNetAddr addr(s4->sin_addr); if (AddLocal(addr, LOCAL_IF)) LogPrintf("%s: IPv4 %s: %s\n", __func__, ifa->ifa_name, addr.ToStringAddr()); } else if (ifa->ifa_addr->sa_family == AF_INET6) { struct sockaddr_in6* s6 = (struct sockaddr_in6*)(ifa->ifa_addr); CNetAddr addr(s6->sin6_addr); if (AddLocal(addr, LOCAL_IF)) LogPrintf("%s: IPv6 %s: %s\n", __func__, ifa->ifa_name, addr.ToStringAddr()); } } freeifaddrs(myaddrs); } #endif } void CConnman::SetNetworkActive(bool active) { LogPrintf("%s: %s\n", __func__, active); if (fNetworkActive == active) { return; } fNetworkActive = active; if (m_client_interface) { m_client_interface->NotifyNetworkActiveChanged(fNetworkActive); } } CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In, AddrMan& addrman_in, const NetGroupManager& netgroupman, const CChainParams& params, bool network_active) : addrman(addrman_in) , m_netgroupman{netgroupman} , nSeed0(nSeed0In) , nSeed1(nSeed1In) , m_params(params) { SetTryNewOutboundPeer(false); Options connOptions; Init(connOptions); SetNetworkActive(network_active); } NodeId CConnman::GetNewNodeId() { return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } uint16_t CConnman::GetDefaultPort(Network net) const { return net == NET_I2P ? I2P_SAM31_PORT : m_params.GetDefaultPort(); } uint16_t CConnman::GetDefaultPort(const std::string& addr) const { CNetAddr a; return a.SetSpecial(addr) ? GetDefaultPort(a.GetNetwork()) : m_params.GetDefaultPort(); } bool CConnman::Bind(const CService& addr_, unsigned int flags, NetPermissionFlags permissions) { const CService addr{MaybeFlipIPv6toCJDNS(addr_)}; bilingual_str strError; if (!BindListenPort(addr, strError, permissions)) { if ((flags & BF_REPORT_ERROR) && m_client_interface) { m_client_interface->ThreadSafeMessageBox(strError, "", CClientUIInterface::MSG_ERROR); } return false; } if (addr.IsRoutable() && fDiscover && !(flags & BF_DONT_ADVERTISE) && !NetPermissions::HasFlag(permissions, NetPermissionFlags::NoBan)) { AddLocal(addr, LOCAL_BIND); } return true; } bool CConnman::InitBinds(const Options& options) { bool fBound = false; for (const auto& addrBind : options.vBinds) { fBound |= Bind(addrBind, BF_REPORT_ERROR, NetPermissionFlags::None); } for (const auto& addrBind : options.vWhiteBinds) { fBound |= Bind(addrBind.m_service, BF_REPORT_ERROR, addrBind.m_flags); } for (const auto& addr_bind : options.onion_binds) { fBound |= Bind(addr_bind, BF_DONT_ADVERTISE, NetPermissionFlags::None); } if (options.bind_on_any) { struct in_addr inaddr_any; inaddr_any.s_addr = htonl(INADDR_ANY); struct in6_addr inaddr6_any = IN6ADDR_ANY_INIT; fBound |= Bind(CService(inaddr6_any, GetListenPort()), BF_NONE, NetPermissionFlags::None); fBound |= Bind(CService(inaddr_any, GetListenPort()), !fBound ? BF_REPORT_ERROR : BF_NONE, NetPermissionFlags::None); } return fBound; } bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) { AssertLockNotHeld(m_total_bytes_sent_mutex); Init(connOptions); if (fListen && !InitBinds(connOptions)) { if (m_client_interface) { m_client_interface->ThreadSafeMessageBox( _("Failed to listen on any port. Use -listen=0 if you want this."), "", CClientUIInterface::MSG_ERROR); } return false; } Proxy i2p_sam; if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) { m_i2p_sam_session = std::make_unique(gArgs.GetDataDirNet() / "i2p_private_key", i2p_sam.proxy, &interruptNet); } for (const auto& strDest : connOptions.vSeedNodes) { AddAddrFetch(strDest); } if (m_use_addrman_outgoing) { // Load addresses from anchors.dat m_anchors = ReadAnchors(gArgs.GetDataDirNet() / ANCHORS_DATABASE_FILENAME); if (m_anchors.size() > MAX_BLOCK_RELAY_ONLY_ANCHORS) { m_anchors.resize(MAX_BLOCK_RELAY_ONLY_ANCHORS); } LogPrintf("%i block-relay-only anchors will be tried for connections.\n", m_anchors.size()); } if (m_client_interface) { m_client_interface->InitMessage(_("Starting network threads…").translated); } fAddressesInitialized = true; if (semOutbound == nullptr) { // initialize semaphore semOutbound = std::make_unique(std::min(m_max_automatic_outbound, m_max_automatic_connections)); } if (semAddnode == nullptr) { // initialize semaphore semAddnode = std::make_unique(m_max_addnode); } // // Start threads // assert(m_msgproc); interruptNet.reset(); flagInterruptMsgProc = false; { LOCK(mutexMsgProc); fMsgProcWake = false; } // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&util::TraceThread, "net", [this] { ThreadSocketHandler(); }); if (!gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED)) LogPrintf("DNS seeding disabled\n"); else threadDNSAddressSeed = std::thread(&util::TraceThread, "dnsseed", [this] { ThreadDNSAddressSeed(); }); // Initiate manual connections threadOpenAddedConnections = std::thread(&util::TraceThread, "addcon", [this] { ThreadOpenAddedConnections(); }); if (connOptions.m_use_addrman_outgoing && !connOptions.m_specified_outgoing.empty()) { if (m_client_interface) { m_client_interface->ThreadSafeMessageBox( _("Cannot provide specific connections and have addrman find outgoing connections at the same time."), "", CClientUIInterface::MSG_ERROR); } return false; } if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty()) { threadOpenConnections = std::thread( &util::TraceThread, "opencon", [this, connect = connOptions.m_specified_outgoing] { ThreadOpenConnections(connect); }); } // Process messages threadMessageHandler = std::thread(&util::TraceThread, "msghand", [this] { ThreadMessageHandler(); }); if (m_i2p_sam_session) { threadI2PAcceptIncoming = std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); } // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); return true; } class CNetCleanup { public: CNetCleanup() = default; ~CNetCleanup() { #ifdef WIN32 // Shutdown Windows Sockets WSACleanup(); #endif } }; static CNetCleanup instance_of_cnetcleanup; void CConnman::Interrupt() { { LOCK(mutexMsgProc); flagInterruptMsgProc = true; } condMsgProc.notify_all(); interruptNet(); g_socks5_interrupt(); if (semOutbound) { for (int i=0; ipost(); } } if (semAddnode) { for (int i=0; ipost(); } } } void CConnman::StopThreads() { if (threadI2PAcceptIncoming.joinable()) { threadI2PAcceptIncoming.join(); } if (threadMessageHandler.joinable()) threadMessageHandler.join(); if (threadOpenConnections.joinable()) threadOpenConnections.join(); if (threadOpenAddedConnections.joinable()) threadOpenAddedConnections.join(); if (threadDNSAddressSeed.joinable()) threadDNSAddressSeed.join(); if (threadSocketHandler.joinable()) threadSocketHandler.join(); } void CConnman::StopNodes() { if (fAddressesInitialized) { DumpAddresses(); fAddressesInitialized = false; if (m_use_addrman_outgoing) { // Anchor connections are only dumped during clean shutdown. std::vector anchors_to_dump = GetCurrentBlockRelayOnlyConns(); if (anchors_to_dump.size() > MAX_BLOCK_RELAY_ONLY_ANCHORS) { anchors_to_dump.resize(MAX_BLOCK_RELAY_ONLY_ANCHORS); } DumpAnchors(gArgs.GetDataDirNet() / ANCHORS_DATABASE_FILENAME, anchors_to_dump); } } // Delete peer connections. std::vector nodes; WITH_LOCK(m_nodes_mutex, nodes.swap(m_nodes)); for (CNode* pnode : nodes) { pnode->CloseSocketDisconnect(); DeleteNode(pnode); } for (CNode* pnode : m_nodes_disconnected) { DeleteNode(pnode); } m_nodes_disconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); semAddnode.reset(); } void CConnman::DeleteNode(CNode* pnode) { assert(pnode); m_msgproc->FinalizeNode(*pnode); delete pnode; } CConnman::~CConnman() { Interrupt(); Stop(); } std::vector CConnman::GetAddresses(size_t max_addresses, size_t max_pct, std::optional network) const { std::vector addresses = addrman.GetAddr(max_addresses, max_pct, network); if (m_banman) { addresses.erase(std::remove_if(addresses.begin(), addresses.end(), [this](const CAddress& addr){return m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr);}), addresses.end()); } return addresses; } std::vector CConnman::GetAddresses(CNode& requestor, size_t max_addresses, size_t max_pct) { auto local_socket_bytes = requestor.addrBind.GetAddrBytes(); uint64_t cache_id = GetDeterministicRandomizer(RANDOMIZER_ID_ADDRCACHE) .Write(requestor.ConnectedThroughNetwork()) .Write(local_socket_bytes) // For outbound connections, the port of the bound address is randomly // assigned by the OS and would therefore not be useful for seeding. .Write(requestor.IsInboundConn() ? requestor.addrBind.GetPort() : 0) .Finalize(); const auto current_time = GetTime(); auto r = m_addr_response_caches.emplace(cache_id, CachedAddrResponse{}); CachedAddrResponse& cache_entry = r.first->second; if (cache_entry.m_cache_entry_expiration < current_time) { // If emplace() added new one it has expiration 0. cache_entry.m_addrs_response_cache = GetAddresses(max_addresses, max_pct, /*network=*/std::nullopt); // Choosing a proper cache lifetime is a trade-off between the privacy leak minimization // and the usefulness of ADDR responses to honest users. // // Longer cache lifetime makes it more difficult for an attacker to scrape // enough AddrMan data to maliciously infer something useful. // By the time an attacker scraped enough AddrMan records, most of // the records should be old enough to not leak topology info by // e.g. analyzing real-time changes in timestamps. // // It takes only several hundred requests to scrape everything from an AddrMan containing 100,000 nodes, // so ~24 hours of cache lifetime indeed makes the data less inferable by the time // most of it could be scraped (considering that timestamps are updated via // ADDR self-announcements and when nodes communicate). // We also should be robust to those attacks which may not require scraping *full* victim's AddrMan // (because even several timestamps of the same handful of nodes may leak privacy). // // On the other hand, longer cache lifetime makes ADDR responses // outdated and less useful for an honest requestor, e.g. if most nodes // in the ADDR response are no longer active. // // However, the churn in the network is known to be rather low. Since we consider // nodes to be "terrible" (see IsTerrible()) if the timestamps are older than 30 days, // max. 24 hours of "penalty" due to cache shouldn't make any meaningful difference // in terms of the freshness of the response. cache_entry.m_cache_entry_expiration = current_time + std::chrono::hours(21) + GetRandMillis(std::chrono::hours(6)); } return cache_entry.m_addrs_response_cache; } bool CConnman::AddNode(const AddedNodeParams& add) { LOCK(m_added_nodes_mutex); for (const auto& it : m_added_node_params) { if (add.m_added_node == it.m_added_node) return false; } m_added_node_params.push_back(add); return true; } bool CConnman::RemoveAddedNode(const std::string& strNode) { LOCK(m_added_nodes_mutex); for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) { if (strNode == it->m_added_node) { m_added_node_params.erase(it); return true; } } return false; } size_t CConnman::GetNodeCount(ConnectionDirection flags) const { LOCK(m_nodes_mutex); if (flags == ConnectionDirection::Both) // Shortcut if we want total return m_nodes.size(); int nNum = 0; for (const auto& pnode : m_nodes) { if (flags & (pnode->IsInboundConn() ? ConnectionDirection::In : ConnectionDirection::Out)) { nNum++; } } return nNum; } uint32_t CConnman::GetMappedAS(const CNetAddr& addr) const { return m_netgroupman.GetMappedAS(addr); } void CConnman::GetNodeStats(std::vector& vstats) const { vstats.clear(); LOCK(m_nodes_mutex); vstats.reserve(m_nodes.size()); for (CNode* pnode : m_nodes) { vstats.emplace_back(); pnode->CopyStats(vstats.back()); vstats.back().m_mapped_as = GetMappedAS(pnode->addr); } } bool CConnman::DisconnectNode(const std::string& strNode) { LOCK(m_nodes_mutex); if (CNode* pnode = FindNode(strNode)) { LogPrint(BCLog::NET, "disconnect by address%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", strNode) : ""), pnode->GetId()); pnode->fDisconnect = true; return true; } return false; } bool CConnman::DisconnectNode(const CSubNet& subnet) { bool disconnected = false; LOCK(m_nodes_mutex); for (CNode* pnode : m_nodes) { if (subnet.Match(pnode->addr)) { LogPrint(BCLog::NET, "disconnect by subnet%s matched peer=%d; disconnecting\n", (fLogIPs ? strprintf("=%s", subnet.ToString()) : ""), pnode->GetId()); pnode->fDisconnect = true; disconnected = true; } } return disconnected; } bool CConnman::DisconnectNode(const CNetAddr& addr) { return DisconnectNode(CSubNet(addr)); } bool CConnman::DisconnectNode(NodeId id) { LOCK(m_nodes_mutex); for(CNode* pnode : m_nodes) { if (id == pnode->GetId()) { LogPrint(BCLog::NET, "disconnect by id peer=%d; disconnecting\n", pnode->GetId()); pnode->fDisconnect = true; return true; } } return false; } void CConnman::RecordBytesRecv(uint64_t bytes) { nTotalBytesRecv += bytes; } void CConnman::RecordBytesSent(uint64_t bytes) { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); nTotalBytesSent += bytes; const auto now = GetTime(); if (nMaxOutboundCycleStartTime + MAX_UPLOAD_TIMEFRAME < now) { // timeframe expired, reset cycle nMaxOutboundCycleStartTime = now; nMaxOutboundTotalBytesSentInCycle = 0; } nMaxOutboundTotalBytesSentInCycle += bytes; } uint64_t CConnman::GetMaxOutboundTarget() const { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); return nMaxOutboundLimit; } std::chrono::seconds CConnman::GetMaxOutboundTimeframe() const { return MAX_UPLOAD_TIMEFRAME; } std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle() const { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); return GetMaxOutboundTimeLeftInCycle_(); } std::chrono::seconds CConnman::GetMaxOutboundTimeLeftInCycle_() const { AssertLockHeld(m_total_bytes_sent_mutex); if (nMaxOutboundLimit == 0) return 0s; if (nMaxOutboundCycleStartTime.count() == 0) return MAX_UPLOAD_TIMEFRAME; const std::chrono::seconds cycleEndTime = nMaxOutboundCycleStartTime + MAX_UPLOAD_TIMEFRAME; const auto now = GetTime(); return (cycleEndTime < now) ? 0s : cycleEndTime - now; } bool CConnman::OutboundTargetReached(bool historicalBlockServingLimit) const { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); if (nMaxOutboundLimit == 0) return false; if (historicalBlockServingLimit) { // keep a large enough buffer to at least relay each block once const std::chrono::seconds timeLeftInCycle = GetMaxOutboundTimeLeftInCycle_(); const uint64_t buffer = timeLeftInCycle / std::chrono::minutes{10} * MAX_BLOCK_SERIALIZED_SIZE; if (buffer >= nMaxOutboundLimit || nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit - buffer) return true; } else if (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) return true; return false; } uint64_t CConnman::GetOutboundTargetBytesLeft() const { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); if (nMaxOutboundLimit == 0) return 0; return (nMaxOutboundTotalBytesSentInCycle >= nMaxOutboundLimit) ? 0 : nMaxOutboundLimit - nMaxOutboundTotalBytesSentInCycle; } uint64_t CConnman::GetTotalBytesRecv() const { return nTotalBytesRecv; } uint64_t CConnman::GetTotalBytesSent() const { AssertLockNotHeld(m_total_bytes_sent_mutex); LOCK(m_total_bytes_sent_mutex); return nTotalBytesSent; } ServiceFlags CConnman::GetLocalServices() const { return nLocalServices; } static std::unique_ptr MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept { if (use_v2transport) { return std::make_unique(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION); } else { return std::make_unique(id, SER_NETWORK, INIT_PROTO_VERSION); } } CNode::CNode(NodeId idIn, std::shared_ptr sock, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress& addrBindIn, const std::string& addrNameIn, ConnectionType conn_type_in, bool inbound_onion, CNodeOptions&& node_opts) : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)}, m_permission_flags{node_opts.permission_flags}, m_sock{sock}, m_connected{GetTime()}, addr{addrIn}, addrBind{addrBindIn}, m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn}, m_dest(addrNameIn), m_inbound_onion{inbound_onion}, m_prefer_evict{node_opts.prefer_evict}, nKeyedNetGroup{nKeyedNetGroupIn}, m_conn_type{conn_type_in}, id{idIn}, nLocalHostNonce{nLocalHostNonceIn}, m_recv_flood_size{node_opts.recv_flood_size}, m_i2p_sam_session{std::move(node_opts.i2p_sam_session)} { if (inbound_onion) assert(conn_type_in == ConnectionType::INBOUND); for (const std::string &msg : getAllNetMessageTypes()) mapRecvBytesPerMsgType[msg] = 0; mapRecvBytesPerMsgType[NET_MESSAGE_TYPE_OTHER] = 0; if (fLogIPs) { LogPrint(BCLog::NET, "Added connection to %s peer=%d\n", m_addr_name, id); } else { LogPrint(BCLog::NET, "Added connection peer=%d\n", id); } } void CNode::MarkReceivedMsgsForProcessing() { AssertLockNotHeld(m_msg_process_queue_mutex); size_t nSizeAdded = 0; for (const auto& msg : vRecvMsg) { // vRecvMsg contains only completed CNetMessage // the single possible partially deserialized message are held by TransportDeserializer nSizeAdded += msg.m_raw_message_size; } LOCK(m_msg_process_queue_mutex); m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg); m_msg_process_queue_size += nSizeAdded; fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; } std::optional> CNode::PollMessage() { LOCK(m_msg_process_queue_mutex); if (m_msg_process_queue.empty()) return std::nullopt; std::list msgs; // Just take one message msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin()); m_msg_process_queue_size -= msgs.front().m_raw_message_size; fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); } bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; } void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); size_t nMessageSize = msg.data.size(); LogPrint(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId()); if (gArgs.GetBoolArg("-capturemessages", false)) { CaptureMessage(pnode->addr, msg.m_type, msg.data, /*is_incoming=*/false); } TRACE6(net, outbound_message, pnode->GetId(), pnode->m_addr_name.c_str(), pnode->ConnectionTypeAsString().c_str(), msg.m_type.c_str(), msg.data.size(), msg.data.data() ); size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); // Check if the transport still has unsent bytes, and indicate to it that we're about to // give it a message to send. const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(/*have_next_message=*/true); const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; // Update memory usage of send buffer. pnode->m_send_memusage += msg.GetMemoryUsage(); if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; // Move message to vSendMsg queue. pnode->vSendMsg.push_back(std::move(msg)); // If there was nothing to send before, and there is now (predicted by the "more" value // returned by the GetBytesToSend call above), attempt "optimistic write": // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually // doing a send, try sending from the calling thread if the queue was empty before. // With a V1Transport, more will always be true here, because adding a message always // results in sendable bytes there, but with V2Transport this is not the case (it may // still be in the handshake). if (queue_was_empty && more) { std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); } } if (nBytesSent) RecordBytesSent(nBytesSent); } bool CConnman::ForNode(NodeId id, std::function func) { CNode* found = nullptr; LOCK(m_nodes_mutex); for (auto&& pnode : m_nodes) { if(pnode->GetId() == id) { found = pnode; break; } } return found != nullptr && NodeFullyConnected(found) && func(found); } CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const { return CSipHasher(nSeed0, nSeed1).Write(id); } uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const { std::vector vchNetGroup(m_netgroupman.GetGroup(address)); return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize(); } void CConnman::PerformReconnections() { AssertLockNotHeld(m_reconnections_mutex); AssertLockNotHeld(m_unused_i2p_sessions_mutex); while (true) { // Move first element of m_reconnections to todo (avoiding an allocation inside the lock). decltype(m_reconnections) todo; { LOCK(m_reconnections_mutex); if (m_reconnections.empty()) break; todo.splice(todo.end(), m_reconnections, m_reconnections.begin()); } auto& item = *todo.begin(); OpenNetworkConnection(item.addr_connect, // We only reconnect if the first attempt to connect succeeded at // connection time, but then failed after the CNode object was // created. Since we already know connecting is possible, do not // count failure to reconnect. /*fCountFailure=*/false, std::move(item.grant), item.destination.empty() ? nullptr : item.destination.c_str(), item.conn_type, item.use_v2transport); } } // Dump binary message to file, with timestamp. static void CaptureMessageToFile(const CAddress& addr, const std::string& msg_type, Span data, bool is_incoming) { // Note: This function captures the message at the time of processing, // not at socket receive/send time. // This ensures that the messages are always in order from an application // layer (processing) perspective. auto now = GetTime(); // Windows folder names cannot include a colon std::string clean_addr = addr.ToStringAddrPort(); std::replace(clean_addr.begin(), clean_addr.end(), ':', '_'); fs::path base_path = gArgs.GetDataDirNet() / "message_capture" / fs::u8path(clean_addr); fs::create_directories(base_path); fs::path path = base_path / (is_incoming ? "msgs_recv.dat" : "msgs_sent.dat"); AutoFile f{fsbridge::fopen(path, "ab")}; ser_writedata64(f, now.count()); f << Span{msg_type}; for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) { f << uint8_t{'\0'}; } uint32_t size = data.size(); ser_writedata32(f, size); f << data; } std::function data, bool is_incoming)> CaptureMessage = CaptureMessageToFile;