diff options
Diffstat (limited to 'src/net.cpp')
-rw-r--r-- | src/net.cpp | 1030 |
1 files changed, 919 insertions, 111 deletions
diff --git a/src/net.cpp b/src/net.cpp index 282c8fb741..3955005dfa 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -19,6 +19,7 @@ #include <crypto/sha256.h> #include <i2p.h> #include <logging.h> +#include <memusage.h> #include <net_permissions.h> #include <netaddress.h> #include <netbase.h> @@ -30,7 +31,6 @@ #include <util/fs.h> #include <util/sock.h> #include <util/strencodings.h> -#include <util/syscall_sandbox.h> #include <util/thread.h> #include <util/threadinterrupt.h> #include <util/trace.h> @@ -38,18 +38,12 @@ #ifdef WIN32 #include <string.h> -#else -#include <fcntl.h> #endif #if HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS #include <ifaddrs.h> #endif -#ifdef USE_POLL -#include <poll.h> -#endif - #include <algorithm> #include <array> #include <cstdint> @@ -90,6 +84,9 @@ static constexpr std::chrono::seconds MAX_UPLOAD_TIMEFRAME{60 * 60 * 24}; // A random time period (0 to 1 seconds) is added to feeler connections to prevent synchronization. static constexpr auto FEELER_SLEEP_WINDOW{1s}; +/** Frequency to attempt extra connections to reachable networks we're not connected to yet **/ +static constexpr auto EXTRA_NETWORK_PEER_INTERVAL{5min}; + /** Used to pass flags to the Bind() function */ enum BindFlags { BF_NONE = 0, @@ -120,6 +117,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {}; std::string strSubVersion; +size_t CSerializedNetMsg::GetMemoryUsage() const noexcept +{ + // Don't count the dynamic memory used for the m_type string, by assuming it fits in the + // "small string" optimization area (which stores data inside the object itself, up to some + // size; 15 bytes in modern libstdc++). + return sizeof(*this) + memusage::DynamicUsage(data); +} + void CConnman::AddAddrFetch(const std::string& strDest) { LOCK(m_addr_fetches_mutex); @@ -153,7 +158,7 @@ uint16_t GetListenPort() } // find 'best' local address for a particular peer -bool GetLocal(CService& addr, const CNetAddr *paddrPeer) +bool GetLocal(CService& addr, const CNode& peer) { if (!fListen) return false; @@ -164,8 +169,18 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer) LOCK(g_maplocalhost_mutex); for (const auto& entry : mapLocalHost) { + // For privacy reasons, don't advertise our privacy-network address + // to other networks and don't advertise our other-network address + // to privacy networks. + const Network our_net{entry.first.GetNetwork()}; + const Network peers_net{peer.ConnectedThroughNetwork()}; + if (our_net != peers_net && + (our_net == NET_ONION || our_net == NET_I2P || + peers_net == NET_ONION || peers_net == NET_I2P)) { + continue; + } int nScore = entry.second.nScore; - int nReachability = entry.first.GetReachabilityFrom(paddrPeer); + int nReachability = entry.first.GetReachabilityFrom(peer.addr); if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) { addr = CService(entry.first, entry.second.nPort); @@ -187,7 +202,8 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn) const auto one_week{7 * 24h}; std::vector<CAddress> vSeedsOut; FastRandomContext rng; - CDataStream s(vSeedsIn, SER_NETWORK, PROTOCOL_VERSION | ADDRV2_FORMAT); + DataStream underlying_stream{vSeedsIn}; + ParamsStream s{CAddress::V2_NETWORK, underlying_stream}; while (!s.eof()) { CService endpoint; s >> endpoint; @@ -203,10 +219,10 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn) // Otherwise, return the unroutable 0.0.0.0 but filled in with // the normal parameters, since the IP may be changed to a useful // one by discovery. -CService GetLocalAddress(const CNetAddr& addrPeer) +CService GetLocalAddress(const CNode& peer) { CService addr; - if (GetLocal(addr, &addrPeer)) { + if (GetLocal(addr, peer)) { return addr; } return CService{CNetAddr(), GetListenPort()}; @@ -229,7 +245,7 @@ bool IsPeerAddrLocalGood(CNode *pnode) std::optional<CService> GetLocalAddrForPeer(CNode& node) { - CService addrLocal{GetLocalAddress(node.addr)}; + CService addrLocal{GetLocalAddress(node)}; if (gArgs.GetBoolArg("-addrmantest", false)) { // use IPv4 loopback during addrmantest addrLocal = CService(LookupNumeric("127.0.0.1", GetListenPort())); @@ -675,16 +691,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete) nRecvBytes += msg_bytes.size(); while (msg_bytes.size() > 0) { // absorb network data - int handled = m_deserializer->Read(msg_bytes); - if (handled < 0) { - // Serious header problem, disconnect from the peer. + if (!m_transport->ReceivedBytes(msg_bytes)) { + // Serious transport problem, disconnect from the peer. return false; } - if (m_deserializer->Complete()) { + if (m_transport->ReceivedMessageComplete()) { // decompose a transport agnostic CNetMessage from the deserializer bool reject_message{false}; - CNetMessage msg = m_deserializer->GetMessage(time, reject_message); + CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message); if (reject_message) { // Message deserialization failed. Drop the message but don't disconnect the peer. // store the size of the corrupt message @@ -711,8 +726,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete) return true; } -int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) +V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept : + m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) { + assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes)); + std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes); + LOCK(m_recv_mutex); + Reset(); +} + +int V1Transport::readHeader(Span<const uint8_t> msg_bytes) +{ + AssertLockHeld(m_recv_mutex); // copy data to temporary parsing buffer unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos; unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size()); @@ -734,7 +759,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) } // Check start string, network magic - if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { + if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id); return -1; } @@ -751,8 +776,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes) return nCopy; } -int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes) +int V1Transport::readData(Span<const uint8_t> msg_bytes) { + AssertLockHeld(m_recv_mutex); unsigned int nRemaining = hdr.nMessageSize - nDataPos; unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size()); @@ -768,19 +794,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes) return nCopy; } -const uint256& V1TransportDeserializer::GetMessageHash() const +const uint256& V1Transport::GetMessageHash() const { - assert(Complete()); + AssertLockHeld(m_recv_mutex); + assert(CompleteInternal()); if (data_hash.IsNull()) hasher.Finalize(data_hash); return data_hash; } -CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message) +CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message) { + AssertLockNotHeld(m_recv_mutex); // Initialize out parameter reject_message = false; // decompose a single CNetMessage from the TransportDeserializer + LOCK(m_recv_mutex); CNetMessage msg(std::move(vRecv)); // store message type string, time, and sizes @@ -813,53 +842,787 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds return msg; } -void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const +bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept { + AssertLockNotHeld(m_send_mutex); + // Determine whether a new message can be set. + LOCK(m_send_mutex); + if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false; + // create dbl-sha256 checksum uint256 hash = Hash(msg.data); // create header - CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size()); + CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size()); memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE); // serialize header - header.reserve(CMessageHeader::HEADER_SIZE); - CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; + m_header_to_send.clear(); + CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr}; + + // update state + m_message_to_send = std::move(msg); + m_sending_header = true; + m_bytes_sent = 0; + return true; +} + +Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_sending_header) { + return {Span{m_header_to_send}.subspan(m_bytes_sent), + // We have more to send after the header if the message has payload, or if there + // is a next message after that. + have_next_message || !m_message_to_send.data.empty(), + m_message_to_send.m_type + }; + } else { + return {Span{m_message_to_send.data}.subspan(m_bytes_sent), + // We only have more to send after this message's payload if there is another + // message. + have_next_message, + m_message_to_send.m_type + }; + } +} + +void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + m_bytes_sent += bytes_sent; + if (m_sending_header && m_bytes_sent == m_header_to_send.size()) { + // We're done sending a message's header. Switch to sending its data bytes. + m_sending_header = false; + m_bytes_sent = 0; + } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) { + // We're done sending a message's data. Wipe the data vector to reduce memory consumption. + m_message_to_send.data.clear(); + m_message_to_send.data.shrink_to_fit(); + m_bytes_sent = 0; + } +} + +size_t V1Transport::GetSendMemoryUsage() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded. + return m_message_to_send.GetMemoryUsage(); +} + +namespace { + +/** List of short messages as defined in BIP324, in order. + * + * Only message types that are actually implemented in this codebase need to be listed, as other + * messages get ignored anyway - whether we know how to decode them or not. + */ +const std::array<std::string, 33> V2_MESSAGE_IDS = { + "", // 12 bytes follow encoding the message type like in V1 + NetMsgType::ADDR, + NetMsgType::BLOCK, + NetMsgType::BLOCKTXN, + NetMsgType::CMPCTBLOCK, + NetMsgType::FEEFILTER, + NetMsgType::FILTERADD, + NetMsgType::FILTERCLEAR, + NetMsgType::FILTERLOAD, + NetMsgType::GETBLOCKS, + NetMsgType::GETBLOCKTXN, + NetMsgType::GETDATA, + NetMsgType::GETHEADERS, + NetMsgType::HEADERS, + NetMsgType::INV, + NetMsgType::MEMPOOL, + NetMsgType::MERKLEBLOCK, + NetMsgType::NOTFOUND, + NetMsgType::PING, + NetMsgType::PONG, + NetMsgType::SENDCMPCT, + NetMsgType::TX, + NetMsgType::GETCFILTERS, + NetMsgType::CFILTER, + NetMsgType::GETCFHEADERS, + NetMsgType::CFHEADERS, + NetMsgType::GETCFCHECKPT, + NetMsgType::CFCHECKPT, + NetMsgType::ADDRV2, + // Unimplemented message types that are assigned in BIP324: + "", + "", + "", + "" +}; + +class V2MessageMap +{ + std::unordered_map<std::string, uint8_t> m_map; + +public: + V2MessageMap() noexcept + { + for (size_t i = 1; i < std::size(V2_MESSAGE_IDS); ++i) { + m_map.emplace(V2_MESSAGE_IDS[i], i); + } + } + + std::optional<uint8_t> operator()(const std::string& message_name) const noexcept + { + auto it = m_map.find(message_name); + if (it == m_map.end()) return std::nullopt; + return it->second; + } +}; + +const V2MessageMap V2_MESSAGE_MAP; + +} // namespace + +V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in) noexcept : + m_cipher{}, m_initiating{initiating}, m_nodeid{nodeid}, + m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in}, + m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1}, + m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1} +{ + // Construct garbage (including its length) using a FastRandomContext. + FastRandomContext rng; + size_t garbage_len = rng.randrange(MAX_GARBAGE_LEN + 1); + // Initialize the send buffer with ellswift pubkey + garbage. + m_send_buffer.resize(EllSwiftPubKey::size() + garbage_len); + std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin()); + rng.fillrand(MakeWritableByteSpan(m_send_buffer).subspan(EllSwiftPubKey::size())); +} + +V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span<const std::byte> ent32, Span<const uint8_t> garbage) noexcept : + m_cipher{key, ent32}, m_initiating{initiating}, m_nodeid{nodeid}, + m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in}, + m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1}, + m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1} +{ + assert(garbage.size() <= MAX_GARBAGE_LEN); + // Initialize the send buffer with ellswift pubkey + provided garbage. + m_send_buffer.resize(EllSwiftPubKey::size() + garbage.size()); + std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin()); + std::copy(garbage.begin(), garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size()); +} + +void V2Transport::SetReceiveState(RecvState recv_state) noexcept +{ + AssertLockHeld(m_recv_mutex); + // Enforce allowed state transitions. + switch (m_recv_state) { + case RecvState::KEY_MAYBE_V1: + Assume(recv_state == RecvState::KEY || recv_state == RecvState::V1); + break; + case RecvState::KEY: + Assume(recv_state == RecvState::GARB_GARBTERM); + break; + case RecvState::GARB_GARBTERM: + Assume(recv_state == RecvState::GARBAUTH); + break; + case RecvState::GARBAUTH: + Assume(recv_state == RecvState::VERSION); + break; + case RecvState::VERSION: + Assume(recv_state == RecvState::APP); + break; + case RecvState::APP: + Assume(recv_state == RecvState::APP_READY); + break; + case RecvState::APP_READY: + Assume(recv_state == RecvState::APP); + break; + case RecvState::V1: + Assume(false); // V1 state cannot be left + break; + } + // Change state. + m_recv_state = recv_state; +} + +void V2Transport::SetSendState(SendState send_state) noexcept +{ + AssertLockHeld(m_send_mutex); + // Enforce allowed state transitions. + switch (m_send_state) { + case SendState::MAYBE_V1: + Assume(send_state == SendState::V1 || send_state == SendState::AWAITING_KEY); + break; + case SendState::AWAITING_KEY: + Assume(send_state == SendState::READY); + break; + case SendState::READY: + case SendState::V1: + Assume(false); // Final states + break; + } + // Change state. + m_send_state = send_state; +} + +bool V2Transport::ReceivedMessageComplete() const noexcept +{ + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); + if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedMessageComplete(); + + return m_recv_state == RecvState::APP_READY; +} + +void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept +{ + AssertLockHeld(m_recv_mutex); + AssertLockNotHeld(m_send_mutex); + Assume(m_recv_state == RecvState::KEY_MAYBE_V1); + // We still have to determine if this is a v1 or v2 connection. The bytes being received could + // be the beginning of either a v1 packet (network magic + "version\x00"), or of a v2 public + // key. BIP324 specifies that a mismatch with this 12-byte string should trigger sending of the + // key. + std::array<uint8_t, V1_PREFIX_LEN> v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0}; + std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), v1_prefix.begin()); + Assume(m_recv_buffer.size() <= v1_prefix.size()); + if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), v1_prefix.begin())) { + // Mismatch with v1 prefix, so we can assume a v2 connection. + SetReceiveState(RecvState::KEY); // Convert to KEY state, leaving received bytes around. + // Transition the sender to AWAITING_KEY state (if not already). + LOCK(m_send_mutex); + SetSendState(SendState::AWAITING_KEY); + } else if (m_recv_buffer.size() == v1_prefix.size()) { + // Full match with the v1 prefix, so fall back to v1 behavior. + LOCK(m_send_mutex); + Span<const uint8_t> feedback{m_recv_buffer}; + // Feed already received bytes to v1 transport. It should always accept these, because it's + // less than the size of a v1 header, and these are the first bytes fed to m_v1_fallback. + bool ret = m_v1_fallback.ReceivedBytes(feedback); + Assume(feedback.empty()); + Assume(ret); + SetReceiveState(RecvState::V1); + SetSendState(SendState::V1); + // Reset v2 transport buffers to save memory. + m_recv_buffer = {}; + m_send_buffer = {}; + } else { + // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come. + } +} + +bool V2Transport::ProcessReceivedKeyBytes() noexcept +{ + AssertLockHeld(m_recv_mutex); + AssertLockNotHeld(m_send_mutex); + Assume(m_recv_state == RecvState::KEY); + Assume(m_recv_buffer.size() <= EllSwiftPubKey::size()); + + // As a special exception, if bytes 4-16 of the key on a responder connection match the + // corresponding bytes of a V1 version message, but bytes 0-4 don't match the network magic + // (if they did, we'd have switched to V1 state already), assume this is a peer from + // another network, and disconnect them. They will almost certainly disconnect us too when + // they receive our uniformly random key and garbage, but detecting this case specially + // means we can log it. + static constexpr std::array<uint8_t, 12> MATCH = {'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0}; + static constexpr size_t OFFSET = sizeof(CMessageHeader::MessageStartChars); + if (!m_initiating && m_recv_buffer.size() >= OFFSET + MATCH.size()) { + if (std::equal(MATCH.begin(), MATCH.end(), m_recv_buffer.begin() + OFFSET)) { + LogPrint(BCLog::NET, "V2 transport error: V1 peer with wrong MessageStart %s\n", + HexStr(Span(m_recv_buffer).first(OFFSET))); + return false; + } + } + + if (m_recv_buffer.size() == EllSwiftPubKey::size()) { + // Other side's key has been fully received, and can now be Diffie-Hellman combined with + // our key to initialize the encryption ciphers. + + // Initialize the ciphers. + EllSwiftPubKey ellswift(MakeByteSpan(m_recv_buffer)); + LOCK(m_send_mutex); + m_cipher.Initialize(ellswift, m_initiating); + + // Switch receiver state to GARB_GARBTERM. + SetReceiveState(RecvState::GARB_GARBTERM); + m_recv_buffer.clear(); + + // Switch sender state to READY. + SetSendState(SendState::READY); + + // Append the garbage terminator to the send buffer. + size_t garbage_len = m_send_buffer.size() - EllSwiftPubKey::size(); + m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::GARBAGE_TERMINATOR_LEN); + std::copy(m_cipher.GetSendGarbageTerminator().begin(), + m_cipher.GetSendGarbageTerminator().end(), + MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin()); + + // Construct garbage authentication packet in the send buffer (using the garbage data which + // is still there). + m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION); + m_cipher.Encrypt( + /*contents=*/{}, + /*aad=*/MakeByteSpan(m_send_buffer).subspan(EllSwiftPubKey::size(), garbage_len), + /*ignore=*/false, + /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION)); + + // Construct version packet in the send buffer. + m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()); + m_cipher.Encrypt( + /*contents=*/VERSION_CONTENTS, + /*aad=*/{}, + /*ignore=*/false, + /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size())); + } else { + // We still have to receive more key bytes. + } + return true; +} + +bool V2Transport::ProcessReceivedGarbageBytes() noexcept +{ + AssertLockHeld(m_recv_mutex); + Assume(m_recv_state == RecvState::GARB_GARBTERM); + Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); + if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) { + if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) { + // Garbage terminator received. Switch to receiving garbage authentication packet. + m_recv_garbage = std::move(m_recv_buffer); + m_recv_garbage.resize(m_recv_garbage.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN); + m_recv_buffer.clear(); + SetReceiveState(RecvState::GARBAUTH); + } else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) { + // We've reached the maximum length for garbage + garbage terminator, and the + // terminator still does not match. Abort. + LogPrint(BCLog::NET, "V2 transport error: missing garbage terminator, peer=%d\n", m_nodeid); + return false; + } else { + // We still need to receive more garbage and/or garbage terminator bytes. + } + } else { + // We have less than GARBAGE_TERMINATOR_LEN (16) bytes, so we certainly need to receive + // more first. + } + return true; +} + +bool V2Transport::ProcessReceivedPacketBytes() noexcept +{ + AssertLockHeld(m_recv_mutex); + Assume(m_recv_state == RecvState::GARBAUTH || m_recv_state == RecvState::VERSION || + m_recv_state == RecvState::APP); + + // The maximum permitted contents length for a packet, consisting of: + // - 0x00 byte: indicating long message type encoding + // - 12 bytes of message type + // - payload + static constexpr size_t MAX_CONTENTS_LEN = + 1 + CMessageHeader::COMMAND_SIZE + + std::min<size_t>(MAX_SIZE, MAX_PROTOCOL_MESSAGE_LENGTH); + + if (m_recv_buffer.size() == BIP324Cipher::LENGTH_LEN) { + // Length descriptor received. + m_recv_len = m_cipher.DecryptLength(MakeByteSpan(m_recv_buffer)); + if (m_recv_len > MAX_CONTENTS_LEN) { + LogPrint(BCLog::NET, "V2 transport error: packet too large (%u bytes), peer=%d\n", m_recv_len, m_nodeid); + return false; + } + } else if (m_recv_buffer.size() > BIP324Cipher::LENGTH_LEN && m_recv_buffer.size() == m_recv_len + BIP324Cipher::EXPANSION) { + // Ciphertext received, decrypt it into m_recv_decode_buffer. + // Note that it is impossible to reach this branch without hitting the branch above first, + // as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point. + m_recv_decode_buffer.resize(m_recv_len); + bool ignore{false}; + Span<const std::byte> aad; + if (m_recv_state == RecvState::GARBAUTH) aad = MakeByteSpan(m_recv_garbage); + bool ret = m_cipher.Decrypt( + /*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN), + /*aad=*/aad, + /*ignore=*/ignore, + /*contents=*/MakeWritableByteSpan(m_recv_decode_buffer)); + if (!ret) { + LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid); + return false; + } + // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG. + RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4)); + + // At this point we have a valid packet decrypted into m_recv_decode_buffer. Depending on + // the current state, decide what to do with it. + switch (m_recv_state) { + case RecvState::GARBAUTH: + // Ignore flag does not matter for garbage authentication. Any valid packet functions + // as authentication. Receive and process the version packet next. + SetReceiveState(RecvState::VERSION); + m_recv_garbage = {}; + break; + case RecvState::VERSION: + if (!ignore) { + // Version message received; transition to application phase. The contents is + // ignored, but can be used for future extensions. + SetReceiveState(RecvState::APP); + } + break; + case RecvState::APP: + if (!ignore) { + // Application message decrypted correctly. It can be extracted using GetMessage(). + SetReceiveState(RecvState::APP_READY); + } + break; + default: + // Any other state is invalid (this function should not have been called). + Assume(false); + } + // Wipe the receive buffer where the next packet will be received into. + m_recv_buffer = {}; + // In all but APP_READY state, we can wipe the decoded contents. + if (m_recv_state != RecvState::APP_READY) m_recv_decode_buffer = {}; + } else { + // We either have less than 3 bytes, so we don't know the packet's length yet, or more + // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive. + } + return true; +} + +size_t V2Transport::GetMaxBytesToProcess() noexcept +{ + AssertLockHeld(m_recv_mutex); + switch (m_recv_state) { + case RecvState::KEY_MAYBE_V1: + // During the KEY_MAYBE_V1 state we do not allow more than the length of v1 prefix into the + // receive buffer. + Assume(m_recv_buffer.size() <= V1_PREFIX_LEN); + // As long as we're not sure if this is a v1 or v2 connection, don't receive more than what + // is strictly necessary to distinguish the two (12 bytes). If we permitted more than + // the v1 header size (24 bytes), we may not be able to feed the already-received bytes + // back into the m_v1_fallback V1 transport. + return V1_PREFIX_LEN - m_recv_buffer.size(); + case RecvState::KEY: + // During the KEY state, we only allow the 64-byte key into the receive buffer. + Assume(m_recv_buffer.size() <= EllSwiftPubKey::size()); + // As long as we have not received the other side's public key, don't receive more than + // that (64 bytes), as garbage follows, and locating the garbage terminator requires the + // key exchange first. + return EllSwiftPubKey::size() - m_recv_buffer.size(); + case RecvState::GARB_GARBTERM: + // Process garbage bytes one by one (because terminator may appear anywhere). + return 1; + case RecvState::GARBAUTH: + case RecvState::VERSION: + case RecvState::APP: + // These three states all involve decoding a packet. Process the length descriptor first, + // so that we know where the current packet ends (and we don't process bytes from the next + // packet or decoy yet). Then, process the ciphertext bytes of the current packet. + if (m_recv_buffer.size() < BIP324Cipher::LENGTH_LEN) { + return BIP324Cipher::LENGTH_LEN - m_recv_buffer.size(); + } else { + // Note that BIP324Cipher::EXPANSION is the total difference between contents size + // and encoded packet size, which includes the 3 bytes due to the packet length. + // When transitioning from receiving the packet length to receiving its ciphertext, + // the encrypted packet length is left in the receive buffer. + return BIP324Cipher::EXPANSION + m_recv_len - m_recv_buffer.size(); + } + case RecvState::APP_READY: + // No bytes can be processed until GetMessage() is called. + return 0; + case RecvState::V1: + // Not allowed (must be dealt with by the caller). + Assume(false); + return 0; + } + Assume(false); // unreachable + return 0; } -size_t CConnman::SocketSendData(CNode& node) const +bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept +{ + AssertLockNotHeld(m_recv_mutex); + /** How many bytes to allocate in the receive buffer at most above what is received so far. */ + static constexpr size_t MAX_RESERVE_AHEAD = 256 * 1024; + + LOCK(m_recv_mutex); + if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedBytes(msg_bytes); + + // Process the provided bytes in msg_bytes in a loop. In each iteration a nonzero number of + // bytes (decided by GetMaxBytesToProcess) are taken from the beginning om msg_bytes, and + // appended to m_recv_buffer. Then, depending on the receiver state, one of the + // ProcessReceived*Bytes functions is called to process the bytes in that buffer. + while (!msg_bytes.empty()) { + // Decide how many bytes to copy from msg_bytes to m_recv_buffer. + size_t max_read = GetMaxBytesToProcess(); + + // Reserve space in the buffer if there is not enough. + if (m_recv_buffer.size() + std::min(msg_bytes.size(), max_read) > m_recv_buffer.capacity()) { + switch (m_recv_state) { + case RecvState::KEY_MAYBE_V1: + case RecvState::KEY: + case RecvState::GARB_GARBTERM: + // During the initial states (key/garbage), allocate once to fit the maximum (4111 + // bytes). + m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN); + break; + case RecvState::GARBAUTH: + case RecvState::VERSION: + case RecvState::APP: { + // During states where a packet is being received, as much as is expected but never + // more than MAX_RESERVE_AHEAD bytes in addition to what is received so far. + // This means attackers that want to cause us to waste allocated memory are limited + // to MAX_RESERVE_AHEAD above the largest allowed message contents size, and to + // MAX_RESERVE_AHEAD more than they've actually sent us. + size_t alloc_add = std::min(max_read, msg_bytes.size() + MAX_RESERVE_AHEAD); + m_recv_buffer.reserve(m_recv_buffer.size() + alloc_add); + break; + } + case RecvState::APP_READY: + // The buffer is empty in this state. + Assume(m_recv_buffer.empty()); + break; + case RecvState::V1: + // Should have bailed out above. + Assume(false); + break; + } + } + + // Can't read more than provided input. + max_read = std::min(msg_bytes.size(), max_read); + // Copy data to buffer. + m_recv_buffer.insert(m_recv_buffer.end(), UCharCast(msg_bytes.data()), UCharCast(msg_bytes.data() + max_read)); + msg_bytes = msg_bytes.subspan(max_read); + + // Process data in the buffer. + switch (m_recv_state) { + case RecvState::KEY_MAYBE_V1: + ProcessReceivedMaybeV1Bytes(); + if (m_recv_state == RecvState::V1) return true; + break; + + case RecvState::KEY: + if (!ProcessReceivedKeyBytes()) return false; + break; + + case RecvState::GARB_GARBTERM: + if (!ProcessReceivedGarbageBytes()) return false; + break; + + case RecvState::GARBAUTH: + case RecvState::VERSION: + case RecvState::APP: + if (!ProcessReceivedPacketBytes()) return false; + break; + + case RecvState::APP_READY: + return true; + + case RecvState::V1: + // We should have bailed out before. + Assume(false); + break; + } + // Make sure we have made progress before continuing. + Assume(max_read > 0); + } + + return true; +} + +std::optional<std::string> V2Transport::GetMessageType(Span<const uint8_t>& contents) noexcept +{ + if (contents.size() == 0) return std::nullopt; // Empty contents + uint8_t first_byte = contents[0]; + contents = contents.subspan(1); // Strip first byte. + + if (first_byte != 0) { + // Short (1 byte) encoding. + if (first_byte < std::size(V2_MESSAGE_IDS)) { + // Valid short message id. + return V2_MESSAGE_IDS[first_byte]; + } else { + // Unknown short message id. + return std::nullopt; + } + } + + if (contents.size() < CMessageHeader::COMMAND_SIZE) { + return std::nullopt; // Long encoding needs 12 message type bytes. + } + + size_t msg_type_len{0}; + while (msg_type_len < CMessageHeader::COMMAND_SIZE && contents[msg_type_len] != 0) { + // Verify that message type bytes before the first 0x00 are in range. + if (contents[msg_type_len] < ' ' || contents[msg_type_len] > 0x7F) { + return {}; + } + ++msg_type_len; + } + std::string ret{reinterpret_cast<const char*>(contents.data()), msg_type_len}; + while (msg_type_len < CMessageHeader::COMMAND_SIZE) { + // Verify that message type bytes after the first 0x00 are also 0x00. + if (contents[msg_type_len] != 0) return {}; + ++msg_type_len; + } + // Strip message type bytes of contents. + contents = contents.subspan(CMessageHeader::COMMAND_SIZE); + return {std::move(ret)}; +} + +CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) noexcept +{ + AssertLockNotHeld(m_recv_mutex); + LOCK(m_recv_mutex); + if (m_recv_state == RecvState::V1) return m_v1_fallback.GetReceivedMessage(time, reject_message); + + Assume(m_recv_state == RecvState::APP_READY); + Span<const uint8_t> contents{m_recv_decode_buffer}; + auto msg_type = GetMessageType(contents); + CDataStream ret(m_recv_type, m_recv_version); + CNetMessage msg{std::move(ret)}; + // Note that BIP324Cipher::EXPANSION also includes the length descriptor size. + msg.m_raw_message_size = m_recv_decode_buffer.size() + BIP324Cipher::EXPANSION; + if (msg_type) { + reject_message = false; + msg.m_type = std::move(*msg_type); + msg.m_time = time; + msg.m_message_size = contents.size(); + msg.m_recv.resize(contents.size()); + std::copy(contents.begin(), contents.end(), UCharCast(msg.m_recv.data())); + } else { + LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid); + reject_message = true; + } + m_recv_decode_buffer = {}; + SetReceiveState(RecvState::APP); + + return msg; +} + +bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_send_state == SendState::V1) return m_v1_fallback.SetMessageToSend(msg); + // We only allow adding a new message to be sent when in the READY state (so the packet cipher + // is available) and the send buffer is empty. This limits the number of messages in the send + // buffer to just one, and leaves the responsibility for queueing them up to the caller. + if (!(m_send_state == SendState::READY && m_send_buffer.empty())) return false; + // Construct contents (encoding message type + payload). + std::vector<uint8_t> contents; + auto short_message_id = V2_MESSAGE_MAP(msg.m_type); + if (short_message_id) { + contents.resize(1 + msg.data.size()); + contents[0] = *short_message_id; + std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1); + } else { + // Initialize with zeroes, and then write the message type string starting at offset 1. + // This means contents[0] and the unused positions in contents[1..13] remain 0x00. + contents.resize(1 + CMessageHeader::COMMAND_SIZE + msg.data.size(), 0); + std::copy(msg.m_type.begin(), msg.m_type.end(), contents.data() + 1); + std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1 + CMessageHeader::COMMAND_SIZE); + } + // Construct ciphertext in send buffer. + m_send_buffer.resize(contents.size() + BIP324Cipher::EXPANSION); + m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer)); + m_send_type = msg.m_type; + // Release memory + msg.data = {}; + return true; +} + +Transport::BytesToSend V2Transport::GetBytesToSend(bool have_next_message) const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_send_state == SendState::V1) return m_v1_fallback.GetBytesToSend(have_next_message); + + // We do not send anything in MAYBE_V1 state (as we don't know if the peer is v1 or v2), + // despite there being data in the send buffer in that state. + if (m_send_state == SendState::MAYBE_V1) return {{}, false, m_send_type}; + Assume(m_send_pos <= m_send_buffer.size()); + return { + Span{m_send_buffer}.subspan(m_send_pos), + // We only have more to send after the current m_send_buffer if there is a (next) + // message to be sent, and we're capable of sending packets. */ + have_next_message && m_send_state == SendState::READY, + m_send_type + }; +} + +void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent); + + m_send_pos += bytes_sent; + Assume(m_send_pos <= m_send_buffer.size()); + // Only wipe the buffer when everything is sent in the READY state. In the AWAITING_KEY state + // we still need the garbage that's in the send buffer to construct the garbage authentication + // packet. + if (m_send_state == SendState::READY && m_send_pos == m_send_buffer.size()) { + m_send_pos = 0; + m_send_buffer = {}; + } +} + +size_t V2Transport::GetSendMemoryUsage() const noexcept +{ + AssertLockNotHeld(m_send_mutex); + LOCK(m_send_mutex); + if (m_send_state == SendState::V1) return m_v1_fallback.GetSendMemoryUsage(); + + return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer); +} + +std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; - - while (it != node.vSendMsg.end()) { - const auto& data = *it; - assert(data.size() > node.nSendOffset); + bool data_left{false}; //!< second return value (whether unsent data remains) + std::optional<bool> expected_more; + + while (true) { + if (it != node.vSendMsg.end()) { + // If possible, move one message from the send queue to the transport. This fails when + // there is an existing message still being sent, or (for v2 transports) when the + // handshake has not yet completed. + size_t memusage = it->GetMemoryUsage(); + if (node.m_transport->SetMessageToSend(*it)) { + // Update memory usage of send buffer (as *it will be deleted). + node.m_send_memusage -= memusage; + ++it; + } + } + const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end()); + // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more + // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check, + // verify that the previously returned 'more' was correct. + if (expected_more.has_value()) Assume(!data.empty() == *expected_more); + expected_more = more; + data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent int nBytes = 0; - { + if (!data.empty()) { LOCK(node.m_sock_mutex); + // There is no socket in case we've already disconnected, or in test cases without + // real connections. In these cases, we bail out immediately and just leave things + // in the send queue and transport. if (!node.m_sock) { break; } int flags = MSG_NOSIGNAL | MSG_DONTWAIT; #ifdef MSG_MORE - if (it + 1 != node.vSendMsg.end()) { + if (more) { flags |= MSG_MORE; } #endif - nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags); + nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags); } if (nBytes > 0) { node.m_last_send = GetTime<std::chrono::seconds>(); node.nSendBytes += nBytes; - node.nSendOffset += nBytes; + // Notify transport that bytes have been processed. + node.m_transport->MarkBytesSent(nBytes); + // Update statistics per message type. + node.AccountForSentBytes(msg_type, nBytes); nSentSize += nBytes; - if (node.nSendOffset == data.size()) { - node.nSendOffset = 0; - node.nSendSize -= data.size(); - node.fPauseSend = node.nSendSize > nSendBufferMaxSize; - it++; - } else { + if ((size_t)nBytes != data.size()) { // could not send full message; stop sending more break; } @@ -872,17 +1635,17 @@ size_t CConnman::SocketSendData(CNode& node) const node.CloseSocketDisconnect(); } } - // couldn't send anything at all break; } } + node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize; + if (it == node.vSendMsg.end()) { - assert(node.nSendOffset == 0); - assert(node.nSendSize == 0); + assert(node.m_send_memusage == 0); } node.vSendMsg.erase(node.vSendMsg.begin(), it); - return nSentSize; + return {nSentSize, data_left}; } /** Try to find a connection to evict when the node is full. @@ -1135,6 +1898,9 @@ void CConnman::DisconnectNodes() // close socket and cleanup pnode->CloseSocketDisconnect(); + // update connection count by network + if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()]; + // hold in disconnected pool until all refs are released pnode->Release(); m_nodes_disconnected.push_back(pnode); @@ -1217,37 +1983,23 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes) } for (CNode* pnode : nodes) { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - bool select_recv = !pnode->fPauseRecv; bool select_send; { LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); + // Sending is possible if either there are bytes to send right now, or if there will be + // once a potential message from vSendMsg is handed to the transport. GetBytesToSend + // determines both of these in a single call. + const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty()); + select_send = !to_send.empty() || more; } + if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); - if (!pnode->m_sock) { - continue; - } - - Sock::Event requested{0}; - if (select_send) { - requested = Sock::SEND; - } else if (select_recv) { - requested = Sock::RECV; + if (pnode->m_sock) { + Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0); + events_per_sock.emplace(pnode->m_sock, Sock::Events{event}); } - - events_per_sock.emplace(pnode->m_sock, Sock::Events{requested}); } return events_per_sock; @@ -1308,6 +2060,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes, errorSet = it->second.occurred & Sock::ERR; } } + + if (sendSet) { + // Send data + auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); + if (bytes_sent) { + RecordBytesSent(bytes_sent); + + // If both receiving and (non-optimistic) sending were possible, we first attempt + // sending. If that succeeds, but does not fully drain the send queue, do not + // attempt to receive. This avoids needlessly queueing data if the remote peer + // is slow at receiving data, by means of TCP flow control. We only do this when + // sending actually succeeded to make sure progress is always made; otherwise a + // deadlock would be possible when both sides have data to send, but neither is + // receiving. + if (data_left) recvSet = false; + } + } + if (recvSet || errorSet) { // typical socket buffer is 8K-64K @@ -1354,12 +2124,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes, } } - if (sendSet) { - // Send data - size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); - if (bytes_sent) RecordBytesSent(bytes_sent); - } - if (InactivityCheck(*pnode)) pnode->fDisconnect = true; } } @@ -1381,7 +2145,6 @@ void CConnman::ThreadSocketHandler() { AssertLockNotHeld(m_total_bytes_sent_mutex); - SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET); while (!interruptNet) { DisconnectNodes(); @@ -1401,7 +2164,6 @@ void CConnman::WakeMessageHandler() void CConnman::ThreadDNSAddressSeed() { - SetSyscallSandboxPolicy(SyscallSandboxPolicy::INITIALIZATION_DNS_SEED); FastRandomContext rng; std::vector<std::string> seeds = Params().DNSSeeds(); Shuffle(seeds.begin(), seeds.end(), rng); @@ -1604,10 +2366,31 @@ std::unordered_set<Network> CConnman::GetReachableEmptyNetworks() const return networks; } +bool CConnman::MultipleManualOrFullOutboundConns(Network net) const +{ + AssertLockHeld(m_nodes_mutex); + return m_network_conn_counts[net] > 1; +} + +bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network) +{ + std::array<Network, 5> nets{NET_IPV4, NET_IPV6, NET_ONION, NET_I2P, NET_CJDNS}; + Shuffle(nets.begin(), nets.end(), FastRandomContext()); + + LOCK(m_nodes_mutex); + for (const auto net : nets) { + if (IsReachable(net) && m_network_conn_counts[net] == 0 && addrman.Size(net) != 0) { + network = net; + return true; + } + } + + return false; +} + void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) { AssertLockNotHeld(m_unused_i2p_sessions_mutex); - SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET_OPEN_CONNECTION); FastRandomContext rng; // Connect to specific addresses if (!connect.empty()) @@ -1635,8 +2418,10 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) // Minimum time before next feeler connection (in microseconds). auto next_feeler = GetExponentialRand(start, FEELER_INTERVAL); auto next_extra_block_relay = GetExponentialRand(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL); + auto next_extra_network_peer{GetExponentialRand(start, EXTRA_NETWORK_PEER_INTERVAL)}; const bool dnsseed = gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED); bool add_fixed_seeds = gArgs.GetBoolArg("-fixedseeds", DEFAULT_FIXEDSEEDS); + const bool use_seednodes{gArgs.IsArgSet("-seednode")}; if (!add_fixed_seeds) { LogPrintf("Fixed seeds are disabled\n"); @@ -1666,12 +2451,12 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) LogPrintf("Adding fixed seeds as 60 seconds have passed and addrman is empty for at least one reachable network\n"); } - // Checking !dnsseed is cheaper before locking 2 mutexes. - if (!add_fixed_seeds_now && !dnsseed) { - LOCK2(m_addr_fetches_mutex, m_added_nodes_mutex); - if (m_addr_fetches.empty() && m_added_nodes.empty()) { + // Perform cheap checks before locking a mutex. + else if (!dnsseed && !use_seednodes) { + LOCK(m_added_nodes_mutex); + if (m_added_nodes.empty()) { add_fixed_seeds_now = true; - LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet), -addnode is not provided and all -seednode(s) attempted\n"); + LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n"); } } @@ -1746,6 +2531,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) auto now = GetTime<std::chrono::microseconds>(); bool anchor = false; bool fFeeler = false; + std::optional<Network> preferred_net; // Determine what type of connection to open. Opening // BLOCK_RELAY connections to addresses from anchors.dat gets the highest @@ -1795,6 +2581,17 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) next_feeler = GetExponentialRand(now, FEELER_INTERVAL); conn_type = ConnectionType::FEELER; fFeeler = true; + } else if (nOutboundFullRelay == m_max_outbound_full_relay && + m_max_outbound_full_relay == MAX_OUTBOUND_FULL_RELAY_CONNECTIONS && + now > next_extra_network_peer && + MaybePickPreferredNetwork(preferred_net)) { + // Full outbound connection management: Attempt to get at least one + // outbound peer from each reachable network by making extra connections + // and then protecting "only" peers from a network during outbound eviction. + // This is not attempted if the user changed -maxconnections to a value + // so low that less than MAX_OUTBOUND_FULL_RELAY_CONNECTIONS are made, + // to prevent interactions with otherwise protected outbound peers. + next_extra_network_peer = GetExponentialRand(now, EXTRA_NETWORK_PEER_INTERVAL); } else { // skip to next iteration of while loop continue; @@ -1848,12 +2645,15 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) } } else { // Not a feeler - std::tie(addr, addr_last_try) = addrman.Select(); + // If preferred_net has a value set, pick an extra outbound + // peer from that network. The eviction logic in net_processing + // ensures that a peer from another network will be evicted. + std::tie(addr, addr_last_try) = addrman.Select(false, preferred_net); } // Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) { - break; + continue; } // if we selected an invalid or local address, restart @@ -1895,6 +2695,9 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect) } LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort()); } + + if (preferred_net != std::nullopt) LogPrint(BCLog::NET, "Making network specific connection to %s on %s.\n", addrConnect.ToStringAddrPort(), GetNetworkName(preferred_net.value())); + // Record addrman failure attempts when node has at least 2 persistent outbound connections to peers with // different netgroups in ipv4/ipv6 networks + all peers in Tor/I2P/CJDNS networks. // Don't record addrman failure attempts when node is offline. This can be identified since all local @@ -1975,7 +2778,6 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const void CConnman::ThreadOpenAddedConnections() { AssertLockNotHeld(m_unused_i2p_sessions_mutex); - SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET_ADD_CONNECTION); while (true) { CSemaphoreGrant grant(*semAddnode); @@ -2035,6 +2837,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai { LOCK(m_nodes_mutex); m_nodes.push_back(pnode); + + // update connection count by network + if (pnode->IsManualOrFullOutboundConn()) ++m_network_conn_counts[pnode->addr.GetNetwork()]; } } @@ -2044,7 +2849,6 @@ void CConnman::ThreadMessageHandler() { LOCK(NetEventsInterface::g_msgproc_mutex); - SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER); while (!flagInterruptMsgProc) { bool fMoreWork = false; @@ -2530,7 +3334,7 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres auto local_socket_bytes = requestor.addrBind.GetAddrBytes(); uint64_t cache_id = GetDeterministicRandomizer(RANDOMIZER_ID_ADDRCACHE) .Write(requestor.ConnectedThroughNetwork()) - .Write(local_socket_bytes.data(), local_socket_bytes.size()) + .Write(local_socket_bytes) // For outbound connections, the port of the bound address is randomly // assigned by the OS and would therefore not be useful for seeding. .Write(requestor.IsInboundConn() ? requestor.addrBind.GetPort() : 0) @@ -2783,8 +3587,7 @@ CNode::CNode(NodeId idIn, ConnectionType conn_type_in, bool inbound_onion, CNodeOptions&& node_opts) - : m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))}, - m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())}, + : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)}, m_permission_flags{node_opts.permission_flags}, m_sock{sock}, m_connected{GetTime<std::chrono::seconds>()}, @@ -2867,26 +3670,31 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) msg.data.data() ); - // make sure we use the appropriate network transport format - std::vector<unsigned char> serializedHeader; - pnode->m_serializer->prepareForTransport(msg, serializedHeader); - size_t nTotalSize = nMessageSize + serializedHeader.size(); - size_t nBytesSent = 0; { LOCK(pnode->cs_vSend); - bool optimisticSend(pnode->vSendMsg.empty()); - - //log total amount of bytes per message type - pnode->AccountForSentBytes(msg.m_type, nTotalSize); - pnode->nSendSize += nTotalSize; - - if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true; - pnode->vSendMsg.push_back(std::move(serializedHeader)); - if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); - - // If write queue empty, attempt "optimistic write" - if (optimisticSend) nBytesSent = SocketSendData(*pnode); + // Check if the transport still has unsent bytes, and indicate to it that we're about to + // give it a message to send. + const auto& [to_send, more, _msg_type] = + pnode->m_transport->GetBytesToSend(/*have_next_message=*/true); + const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()}; + + // Update memory usage of send buffer. + pnode->m_send_memusage += msg.GetMemoryUsage(); + if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true; + // Move message to vSendMsg queue. + pnode->vSendMsg.push_back(std::move(msg)); + + // If there was nothing to send before, and there is now (predicted by the "more" value + // returned by the GetBytesToSend call above), attempt "optimistic write": + // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually + // doing a send, try sending from the calling thread if the queue was empty before. + // With a V1Transport, more will always be true here, because adding a message always + // results in sendable bytes there, but with V2Transport this is not the case (it may + // still be in the handshake). + if (queue_was_empty && more) { + std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode); + } } if (nBytesSent) RecordBytesSent(nBytesSent); } @@ -2913,7 +3721,7 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const { std::vector<unsigned char> vchNetGroup(m_netgroupman.GetGroup(address)); - return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize(); + return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize(); } void CaptureMessageToFile(const CAddress& addr, @@ -2938,13 +3746,13 @@ void CaptureMessageToFile(const CAddress& addr, AutoFile f{fsbridge::fopen(path, "ab")}; ser_writedata64(f, now.count()); - f.write(MakeByteSpan(msg_type)); + f << Span{msg_type}; for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) { f << uint8_t{'\0'}; } uint32_t size = data.size(); ser_writedata32(f, size); - f.write(AsBytes(data)); + f << data; } std::function<void(const CAddress& addr, |