aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp1030
1 files changed, 919 insertions, 111 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 282c8fb741..3955005dfa 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -19,6 +19,7 @@
#include <crypto/sha256.h>
#include <i2p.h>
#include <logging.h>
+#include <memusage.h>
#include <net_permissions.h>
#include <netaddress.h>
#include <netbase.h>
@@ -30,7 +31,6 @@
#include <util/fs.h>
#include <util/sock.h>
#include <util/strencodings.h>
-#include <util/syscall_sandbox.h>
#include <util/thread.h>
#include <util/threadinterrupt.h>
#include <util/trace.h>
@@ -38,18 +38,12 @@
#ifdef WIN32
#include <string.h>
-#else
-#include <fcntl.h>
#endif
#if HAVE_DECL_GETIFADDRS && HAVE_DECL_FREEIFADDRS
#include <ifaddrs.h>
#endif
-#ifdef USE_POLL
-#include <poll.h>
-#endif
-
#include <algorithm>
#include <array>
#include <cstdint>
@@ -90,6 +84,9 @@ static constexpr std::chrono::seconds MAX_UPLOAD_TIMEFRAME{60 * 60 * 24};
// A random time period (0 to 1 seconds) is added to feeler connections to prevent synchronization.
static constexpr auto FEELER_SLEEP_WINDOW{1s};
+/** Frequency to attempt extra connections to reachable networks we're not connected to yet **/
+static constexpr auto EXTRA_NETWORK_PEER_INTERVAL{5min};
+
/** Used to pass flags to the Bind() function */
enum BindFlags {
BF_NONE = 0,
@@ -120,6 +117,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute
static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {};
std::string strSubVersion;
+size_t CSerializedNetMsg::GetMemoryUsage() const noexcept
+{
+ // Don't count the dynamic memory used for the m_type string, by assuming it fits in the
+ // "small string" optimization area (which stores data inside the object itself, up to some
+ // size; 15 bytes in modern libstdc++).
+ return sizeof(*this) + memusage::DynamicUsage(data);
+}
+
void CConnman::AddAddrFetch(const std::string& strDest)
{
LOCK(m_addr_fetches_mutex);
@@ -153,7 +158,7 @@ uint16_t GetListenPort()
}
// find 'best' local address for a particular peer
-bool GetLocal(CService& addr, const CNetAddr *paddrPeer)
+bool GetLocal(CService& addr, const CNode& peer)
{
if (!fListen)
return false;
@@ -164,8 +169,18 @@ bool GetLocal(CService& addr, const CNetAddr *paddrPeer)
LOCK(g_maplocalhost_mutex);
for (const auto& entry : mapLocalHost)
{
+ // For privacy reasons, don't advertise our privacy-network address
+ // to other networks and don't advertise our other-network address
+ // to privacy networks.
+ const Network our_net{entry.first.GetNetwork()};
+ const Network peers_net{peer.ConnectedThroughNetwork()};
+ if (our_net != peers_net &&
+ (our_net == NET_ONION || our_net == NET_I2P ||
+ peers_net == NET_ONION || peers_net == NET_I2P)) {
+ continue;
+ }
int nScore = entry.second.nScore;
- int nReachability = entry.first.GetReachabilityFrom(paddrPeer);
+ int nReachability = entry.first.GetReachabilityFrom(peer.addr);
if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore))
{
addr = CService(entry.first, entry.second.nPort);
@@ -187,7 +202,8 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn)
const auto one_week{7 * 24h};
std::vector<CAddress> vSeedsOut;
FastRandomContext rng;
- CDataStream s(vSeedsIn, SER_NETWORK, PROTOCOL_VERSION | ADDRV2_FORMAT);
+ DataStream underlying_stream{vSeedsIn};
+ ParamsStream s{CAddress::V2_NETWORK, underlying_stream};
while (!s.eof()) {
CService endpoint;
s >> endpoint;
@@ -203,10 +219,10 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn)
// Otherwise, return the unroutable 0.0.0.0 but filled in with
// the normal parameters, since the IP may be changed to a useful
// one by discovery.
-CService GetLocalAddress(const CNetAddr& addrPeer)
+CService GetLocalAddress(const CNode& peer)
{
CService addr;
- if (GetLocal(addr, &addrPeer)) {
+ if (GetLocal(addr, peer)) {
return addr;
}
return CService{CNetAddr(), GetListenPort()};
@@ -229,7 +245,7 @@ bool IsPeerAddrLocalGood(CNode *pnode)
std::optional<CService> GetLocalAddrForPeer(CNode& node)
{
- CService addrLocal{GetLocalAddress(node.addr)};
+ CService addrLocal{GetLocalAddress(node)};
if (gArgs.GetBoolArg("-addrmantest", false)) {
// use IPv4 loopback during addrmantest
addrLocal = CService(LookupNumeric("127.0.0.1", GetListenPort()));
@@ -675,16 +691,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
nRecvBytes += msg_bytes.size();
while (msg_bytes.size() > 0) {
// absorb network data
- int handled = m_deserializer->Read(msg_bytes);
- if (handled < 0) {
- // Serious header problem, disconnect from the peer.
+ if (!m_transport->ReceivedBytes(msg_bytes)) {
+ // Serious transport problem, disconnect from the peer.
return false;
}
- if (m_deserializer->Complete()) {
+ if (m_transport->ReceivedMessageComplete()) {
// decompose a transport agnostic CNetMessage from the deserializer
bool reject_message{false};
- CNetMessage msg = m_deserializer->GetMessage(time, reject_message);
+ CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
if (reject_message) {
// Message deserialization failed. Drop the message but don't disconnect the peer.
// store the size of the corrupt message
@@ -711,8 +726,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
return true;
}
-int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
+V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept :
+ m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn)
{
+ assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes));
+ std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes);
+ LOCK(m_recv_mutex);
+ Reset();
+}
+
+int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
+{
+ AssertLockHeld(m_recv_mutex);
// copy data to temporary parsing buffer
unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -734,7 +759,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
}
// Check start string, network magic
- if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
+ if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id);
return -1;
}
@@ -751,8 +776,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
return nCopy;
}
-int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
+int V1Transport::readData(Span<const uint8_t> msg_bytes)
{
+ AssertLockHeld(m_recv_mutex);
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -768,19 +794,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
return nCopy;
}
-const uint256& V1TransportDeserializer::GetMessageHash() const
+const uint256& V1Transport::GetMessageHash() const
{
- assert(Complete());
+ AssertLockHeld(m_recv_mutex);
+ assert(CompleteInternal());
if (data_hash.IsNull())
hasher.Finalize(data_hash);
return data_hash;
}
-CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message)
+CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message)
{
+ AssertLockNotHeld(m_recv_mutex);
// Initialize out parameter
reject_message = false;
// decompose a single CNetMessage from the TransportDeserializer
+ LOCK(m_recv_mutex);
CNetMessage msg(std::move(vRecv));
// store message type string, time, and sizes
@@ -813,53 +842,787 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds
return msg;
}
-void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
+bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
{
+ AssertLockNotHeld(m_send_mutex);
+ // Determine whether a new message can be set.
+ LOCK(m_send_mutex);
+ if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
+
// create dbl-sha256 checksum
uint256 hash = Hash(msg.data);
// create header
- CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size());
+ CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size());
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
// serialize header
- header.reserve(CMessageHeader::HEADER_SIZE);
- CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
+ m_header_to_send.clear();
+ CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
+
+ // update state
+ m_message_to_send = std::move(msg);
+ m_sending_header = true;
+ m_bytes_sent = 0;
+ return true;
+}
+
+Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_sending_header) {
+ return {Span{m_header_to_send}.subspan(m_bytes_sent),
+ // We have more to send after the header if the message has payload, or if there
+ // is a next message after that.
+ have_next_message || !m_message_to_send.data.empty(),
+ m_message_to_send.m_type
+ };
+ } else {
+ return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
+ // We only have more to send after this message's payload if there is another
+ // message.
+ have_next_message,
+ m_message_to_send.m_type
+ };
+ }
+}
+
+void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ m_bytes_sent += bytes_sent;
+ if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
+ // We're done sending a message's header. Switch to sending its data bytes.
+ m_sending_header = false;
+ m_bytes_sent = 0;
+ } else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
+ // We're done sending a message's data. Wipe the data vector to reduce memory consumption.
+ m_message_to_send.data.clear();
+ m_message_to_send.data.shrink_to_fit();
+ m_bytes_sent = 0;
+ }
+}
+
+size_t V1Transport::GetSendMemoryUsage() const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ // Don't count sending-side fields besides m_message_to_send, as they're all small and bounded.
+ return m_message_to_send.GetMemoryUsage();
+}
+
+namespace {
+
+/** List of short messages as defined in BIP324, in order.
+ *
+ * Only message types that are actually implemented in this codebase need to be listed, as other
+ * messages get ignored anyway - whether we know how to decode them or not.
+ */
+const std::array<std::string, 33> V2_MESSAGE_IDS = {
+ "", // 12 bytes follow encoding the message type like in V1
+ NetMsgType::ADDR,
+ NetMsgType::BLOCK,
+ NetMsgType::BLOCKTXN,
+ NetMsgType::CMPCTBLOCK,
+ NetMsgType::FEEFILTER,
+ NetMsgType::FILTERADD,
+ NetMsgType::FILTERCLEAR,
+ NetMsgType::FILTERLOAD,
+ NetMsgType::GETBLOCKS,
+ NetMsgType::GETBLOCKTXN,
+ NetMsgType::GETDATA,
+ NetMsgType::GETHEADERS,
+ NetMsgType::HEADERS,
+ NetMsgType::INV,
+ NetMsgType::MEMPOOL,
+ NetMsgType::MERKLEBLOCK,
+ NetMsgType::NOTFOUND,
+ NetMsgType::PING,
+ NetMsgType::PONG,
+ NetMsgType::SENDCMPCT,
+ NetMsgType::TX,
+ NetMsgType::GETCFILTERS,
+ NetMsgType::CFILTER,
+ NetMsgType::GETCFHEADERS,
+ NetMsgType::CFHEADERS,
+ NetMsgType::GETCFCHECKPT,
+ NetMsgType::CFCHECKPT,
+ NetMsgType::ADDRV2,
+ // Unimplemented message types that are assigned in BIP324:
+ "",
+ "",
+ "",
+ ""
+};
+
+class V2MessageMap
+{
+ std::unordered_map<std::string, uint8_t> m_map;
+
+public:
+ V2MessageMap() noexcept
+ {
+ for (size_t i = 1; i < std::size(V2_MESSAGE_IDS); ++i) {
+ m_map.emplace(V2_MESSAGE_IDS[i], i);
+ }
+ }
+
+ std::optional<uint8_t> operator()(const std::string& message_name) const noexcept
+ {
+ auto it = m_map.find(message_name);
+ if (it == m_map.end()) return std::nullopt;
+ return it->second;
+ }
+};
+
+const V2MessageMap V2_MESSAGE_MAP;
+
+} // namespace
+
+V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in) noexcept :
+ m_cipher{}, m_initiating{initiating}, m_nodeid{nodeid},
+ m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in},
+ m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1},
+ m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1}
+{
+ // Construct garbage (including its length) using a FastRandomContext.
+ FastRandomContext rng;
+ size_t garbage_len = rng.randrange(MAX_GARBAGE_LEN + 1);
+ // Initialize the send buffer with ellswift pubkey + garbage.
+ m_send_buffer.resize(EllSwiftPubKey::size() + garbage_len);
+ std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin());
+ rng.fillrand(MakeWritableByteSpan(m_send_buffer).subspan(EllSwiftPubKey::size()));
+}
+
+V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span<const std::byte> ent32, Span<const uint8_t> garbage) noexcept :
+ m_cipher{key, ent32}, m_initiating{initiating}, m_nodeid{nodeid},
+ m_v1_fallback{nodeid, type_in, version_in}, m_recv_type{type_in}, m_recv_version{version_in},
+ m_recv_state{initiating ? RecvState::KEY : RecvState::KEY_MAYBE_V1},
+ m_send_state{initiating ? SendState::AWAITING_KEY : SendState::MAYBE_V1}
+{
+ assert(garbage.size() <= MAX_GARBAGE_LEN);
+ // Initialize the send buffer with ellswift pubkey + provided garbage.
+ m_send_buffer.resize(EllSwiftPubKey::size() + garbage.size());
+ std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin());
+ std::copy(garbage.begin(), garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size());
+}
+
+void V2Transport::SetReceiveState(RecvState recv_state) noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ // Enforce allowed state transitions.
+ switch (m_recv_state) {
+ case RecvState::KEY_MAYBE_V1:
+ Assume(recv_state == RecvState::KEY || recv_state == RecvState::V1);
+ break;
+ case RecvState::KEY:
+ Assume(recv_state == RecvState::GARB_GARBTERM);
+ break;
+ case RecvState::GARB_GARBTERM:
+ Assume(recv_state == RecvState::GARBAUTH);
+ break;
+ case RecvState::GARBAUTH:
+ Assume(recv_state == RecvState::VERSION);
+ break;
+ case RecvState::VERSION:
+ Assume(recv_state == RecvState::APP);
+ break;
+ case RecvState::APP:
+ Assume(recv_state == RecvState::APP_READY);
+ break;
+ case RecvState::APP_READY:
+ Assume(recv_state == RecvState::APP);
+ break;
+ case RecvState::V1:
+ Assume(false); // V1 state cannot be left
+ break;
+ }
+ // Change state.
+ m_recv_state = recv_state;
+}
+
+void V2Transport::SetSendState(SendState send_state) noexcept
+{
+ AssertLockHeld(m_send_mutex);
+ // Enforce allowed state transitions.
+ switch (m_send_state) {
+ case SendState::MAYBE_V1:
+ Assume(send_state == SendState::V1 || send_state == SendState::AWAITING_KEY);
+ break;
+ case SendState::AWAITING_KEY:
+ Assume(send_state == SendState::READY);
+ break;
+ case SendState::READY:
+ case SendState::V1:
+ Assume(false); // Final states
+ break;
+ }
+ // Change state.
+ m_send_state = send_state;
+}
+
+bool V2Transport::ReceivedMessageComplete() const noexcept
+{
+ AssertLockNotHeld(m_recv_mutex);
+ LOCK(m_recv_mutex);
+ if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedMessageComplete();
+
+ return m_recv_state == RecvState::APP_READY;
+}
+
+void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ AssertLockNotHeld(m_send_mutex);
+ Assume(m_recv_state == RecvState::KEY_MAYBE_V1);
+ // We still have to determine if this is a v1 or v2 connection. The bytes being received could
+ // be the beginning of either a v1 packet (network magic + "version\x00"), or of a v2 public
+ // key. BIP324 specifies that a mismatch with this 12-byte string should trigger sending of the
+ // key.
+ std::array<uint8_t, V1_PREFIX_LEN> v1_prefix = {0, 0, 0, 0, 'v', 'e', 'r', 's', 'i', 'o', 'n', 0};
+ std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), v1_prefix.begin());
+ Assume(m_recv_buffer.size() <= v1_prefix.size());
+ if (!std::equal(m_recv_buffer.begin(), m_recv_buffer.end(), v1_prefix.begin())) {
+ // Mismatch with v1 prefix, so we can assume a v2 connection.
+ SetReceiveState(RecvState::KEY); // Convert to KEY state, leaving received bytes around.
+ // Transition the sender to AWAITING_KEY state (if not already).
+ LOCK(m_send_mutex);
+ SetSendState(SendState::AWAITING_KEY);
+ } else if (m_recv_buffer.size() == v1_prefix.size()) {
+ // Full match with the v1 prefix, so fall back to v1 behavior.
+ LOCK(m_send_mutex);
+ Span<const uint8_t> feedback{m_recv_buffer};
+ // Feed already received bytes to v1 transport. It should always accept these, because it's
+ // less than the size of a v1 header, and these are the first bytes fed to m_v1_fallback.
+ bool ret = m_v1_fallback.ReceivedBytes(feedback);
+ Assume(feedback.empty());
+ Assume(ret);
+ SetReceiveState(RecvState::V1);
+ SetSendState(SendState::V1);
+ // Reset v2 transport buffers to save memory.
+ m_recv_buffer = {};
+ m_send_buffer = {};
+ } else {
+ // We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come.
+ }
+}
+
+bool V2Transport::ProcessReceivedKeyBytes() noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ AssertLockNotHeld(m_send_mutex);
+ Assume(m_recv_state == RecvState::KEY);
+ Assume(m_recv_buffer.size() <= EllSwiftPubKey::size());
+
+ // As a special exception, if bytes 4-16 of the key on a responder connection match the
+ // corresponding bytes of a V1 version message, but bytes 0-4 don't match the network magic
+ // (if they did, we'd have switched to V1 state already), assume this is a peer from
+ // another network, and disconnect them. They will almost certainly disconnect us too when
+ // they receive our uniformly random key and garbage, but detecting this case specially
+ // means we can log it.
+ static constexpr std::array<uint8_t, 12> MATCH = {'v', 'e', 'r', 's', 'i', 'o', 'n', 0, 0, 0, 0, 0};
+ static constexpr size_t OFFSET = sizeof(CMessageHeader::MessageStartChars);
+ if (!m_initiating && m_recv_buffer.size() >= OFFSET + MATCH.size()) {
+ if (std::equal(MATCH.begin(), MATCH.end(), m_recv_buffer.begin() + OFFSET)) {
+ LogPrint(BCLog::NET, "V2 transport error: V1 peer with wrong MessageStart %s\n",
+ HexStr(Span(m_recv_buffer).first(OFFSET)));
+ return false;
+ }
+ }
+
+ if (m_recv_buffer.size() == EllSwiftPubKey::size()) {
+ // Other side's key has been fully received, and can now be Diffie-Hellman combined with
+ // our key to initialize the encryption ciphers.
+
+ // Initialize the ciphers.
+ EllSwiftPubKey ellswift(MakeByteSpan(m_recv_buffer));
+ LOCK(m_send_mutex);
+ m_cipher.Initialize(ellswift, m_initiating);
+
+ // Switch receiver state to GARB_GARBTERM.
+ SetReceiveState(RecvState::GARB_GARBTERM);
+ m_recv_buffer.clear();
+
+ // Switch sender state to READY.
+ SetSendState(SendState::READY);
+
+ // Append the garbage terminator to the send buffer.
+ size_t garbage_len = m_send_buffer.size() - EllSwiftPubKey::size();
+ m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
+ std::copy(m_cipher.GetSendGarbageTerminator().begin(),
+ m_cipher.GetSendGarbageTerminator().end(),
+ MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin());
+
+ // Construct garbage authentication packet in the send buffer (using the garbage data which
+ // is still there).
+ m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION);
+ m_cipher.Encrypt(
+ /*contents=*/{},
+ /*aad=*/MakeByteSpan(m_send_buffer).subspan(EllSwiftPubKey::size(), garbage_len),
+ /*ignore=*/false,
+ /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION));
+
+ // Construct version packet in the send buffer.
+ m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size());
+ m_cipher.Encrypt(
+ /*contents=*/VERSION_CONTENTS,
+ /*aad=*/{},
+ /*ignore=*/false,
+ /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()));
+ } else {
+ // We still have to receive more key bytes.
+ }
+ return true;
+}
+
+bool V2Transport::ProcessReceivedGarbageBytes() noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ Assume(m_recv_state == RecvState::GARB_GARBTERM);
+ Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
+ if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
+ if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) {
+ // Garbage terminator received. Switch to receiving garbage authentication packet.
+ m_recv_garbage = std::move(m_recv_buffer);
+ m_recv_garbage.resize(m_recv_garbage.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN);
+ m_recv_buffer.clear();
+ SetReceiveState(RecvState::GARBAUTH);
+ } else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
+ // We've reached the maximum length for garbage + garbage terminator, and the
+ // terminator still does not match. Abort.
+ LogPrint(BCLog::NET, "V2 transport error: missing garbage terminator, peer=%d\n", m_nodeid);
+ return false;
+ } else {
+ // We still need to receive more garbage and/or garbage terminator bytes.
+ }
+ } else {
+ // We have less than GARBAGE_TERMINATOR_LEN (16) bytes, so we certainly need to receive
+ // more first.
+ }
+ return true;
+}
+
+bool V2Transport::ProcessReceivedPacketBytes() noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ Assume(m_recv_state == RecvState::GARBAUTH || m_recv_state == RecvState::VERSION ||
+ m_recv_state == RecvState::APP);
+
+ // The maximum permitted contents length for a packet, consisting of:
+ // - 0x00 byte: indicating long message type encoding
+ // - 12 bytes of message type
+ // - payload
+ static constexpr size_t MAX_CONTENTS_LEN =
+ 1 + CMessageHeader::COMMAND_SIZE +
+ std::min<size_t>(MAX_SIZE, MAX_PROTOCOL_MESSAGE_LENGTH);
+
+ if (m_recv_buffer.size() == BIP324Cipher::LENGTH_LEN) {
+ // Length descriptor received.
+ m_recv_len = m_cipher.DecryptLength(MakeByteSpan(m_recv_buffer));
+ if (m_recv_len > MAX_CONTENTS_LEN) {
+ LogPrint(BCLog::NET, "V2 transport error: packet too large (%u bytes), peer=%d\n", m_recv_len, m_nodeid);
+ return false;
+ }
+ } else if (m_recv_buffer.size() > BIP324Cipher::LENGTH_LEN && m_recv_buffer.size() == m_recv_len + BIP324Cipher::EXPANSION) {
+ // Ciphertext received, decrypt it into m_recv_decode_buffer.
+ // Note that it is impossible to reach this branch without hitting the branch above first,
+ // as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point.
+ m_recv_decode_buffer.resize(m_recv_len);
+ bool ignore{false};
+ Span<const std::byte> aad;
+ if (m_recv_state == RecvState::GARBAUTH) aad = MakeByteSpan(m_recv_garbage);
+ bool ret = m_cipher.Decrypt(
+ /*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN),
+ /*aad=*/aad,
+ /*ignore=*/ignore,
+ /*contents=*/MakeWritableByteSpan(m_recv_decode_buffer));
+ if (!ret) {
+ LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid);
+ return false;
+ }
+ // Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG.
+ RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4));
+
+ // At this point we have a valid packet decrypted into m_recv_decode_buffer. Depending on
+ // the current state, decide what to do with it.
+ switch (m_recv_state) {
+ case RecvState::GARBAUTH:
+ // Ignore flag does not matter for garbage authentication. Any valid packet functions
+ // as authentication. Receive and process the version packet next.
+ SetReceiveState(RecvState::VERSION);
+ m_recv_garbage = {};
+ break;
+ case RecvState::VERSION:
+ if (!ignore) {
+ // Version message received; transition to application phase. The contents is
+ // ignored, but can be used for future extensions.
+ SetReceiveState(RecvState::APP);
+ }
+ break;
+ case RecvState::APP:
+ if (!ignore) {
+ // Application message decrypted correctly. It can be extracted using GetMessage().
+ SetReceiveState(RecvState::APP_READY);
+ }
+ break;
+ default:
+ // Any other state is invalid (this function should not have been called).
+ Assume(false);
+ }
+ // Wipe the receive buffer where the next packet will be received into.
+ m_recv_buffer = {};
+ // In all but APP_READY state, we can wipe the decoded contents.
+ if (m_recv_state != RecvState::APP_READY) m_recv_decode_buffer = {};
+ } else {
+ // We either have less than 3 bytes, so we don't know the packet's length yet, or more
+ // than 3 bytes but less than the packet's full ciphertext. Wait until those arrive.
+ }
+ return true;
+}
+
+size_t V2Transport::GetMaxBytesToProcess() noexcept
+{
+ AssertLockHeld(m_recv_mutex);
+ switch (m_recv_state) {
+ case RecvState::KEY_MAYBE_V1:
+ // During the KEY_MAYBE_V1 state we do not allow more than the length of v1 prefix into the
+ // receive buffer.
+ Assume(m_recv_buffer.size() <= V1_PREFIX_LEN);
+ // As long as we're not sure if this is a v1 or v2 connection, don't receive more than what
+ // is strictly necessary to distinguish the two (12 bytes). If we permitted more than
+ // the v1 header size (24 bytes), we may not be able to feed the already-received bytes
+ // back into the m_v1_fallback V1 transport.
+ return V1_PREFIX_LEN - m_recv_buffer.size();
+ case RecvState::KEY:
+ // During the KEY state, we only allow the 64-byte key into the receive buffer.
+ Assume(m_recv_buffer.size() <= EllSwiftPubKey::size());
+ // As long as we have not received the other side's public key, don't receive more than
+ // that (64 bytes), as garbage follows, and locating the garbage terminator requires the
+ // key exchange first.
+ return EllSwiftPubKey::size() - m_recv_buffer.size();
+ case RecvState::GARB_GARBTERM:
+ // Process garbage bytes one by one (because terminator may appear anywhere).
+ return 1;
+ case RecvState::GARBAUTH:
+ case RecvState::VERSION:
+ case RecvState::APP:
+ // These three states all involve decoding a packet. Process the length descriptor first,
+ // so that we know where the current packet ends (and we don't process bytes from the next
+ // packet or decoy yet). Then, process the ciphertext bytes of the current packet.
+ if (m_recv_buffer.size() < BIP324Cipher::LENGTH_LEN) {
+ return BIP324Cipher::LENGTH_LEN - m_recv_buffer.size();
+ } else {
+ // Note that BIP324Cipher::EXPANSION is the total difference between contents size
+ // and encoded packet size, which includes the 3 bytes due to the packet length.
+ // When transitioning from receiving the packet length to receiving its ciphertext,
+ // the encrypted packet length is left in the receive buffer.
+ return BIP324Cipher::EXPANSION + m_recv_len - m_recv_buffer.size();
+ }
+ case RecvState::APP_READY:
+ // No bytes can be processed until GetMessage() is called.
+ return 0;
+ case RecvState::V1:
+ // Not allowed (must be dealt with by the caller).
+ Assume(false);
+ return 0;
+ }
+ Assume(false); // unreachable
+ return 0;
}
-size_t CConnman::SocketSendData(CNode& node) const
+bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept
+{
+ AssertLockNotHeld(m_recv_mutex);
+ /** How many bytes to allocate in the receive buffer at most above what is received so far. */
+ static constexpr size_t MAX_RESERVE_AHEAD = 256 * 1024;
+
+ LOCK(m_recv_mutex);
+ if (m_recv_state == RecvState::V1) return m_v1_fallback.ReceivedBytes(msg_bytes);
+
+ // Process the provided bytes in msg_bytes in a loop. In each iteration a nonzero number of
+ // bytes (decided by GetMaxBytesToProcess) are taken from the beginning om msg_bytes, and
+ // appended to m_recv_buffer. Then, depending on the receiver state, one of the
+ // ProcessReceived*Bytes functions is called to process the bytes in that buffer.
+ while (!msg_bytes.empty()) {
+ // Decide how many bytes to copy from msg_bytes to m_recv_buffer.
+ size_t max_read = GetMaxBytesToProcess();
+
+ // Reserve space in the buffer if there is not enough.
+ if (m_recv_buffer.size() + std::min(msg_bytes.size(), max_read) > m_recv_buffer.capacity()) {
+ switch (m_recv_state) {
+ case RecvState::KEY_MAYBE_V1:
+ case RecvState::KEY:
+ case RecvState::GARB_GARBTERM:
+ // During the initial states (key/garbage), allocate once to fit the maximum (4111
+ // bytes).
+ m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
+ break;
+ case RecvState::GARBAUTH:
+ case RecvState::VERSION:
+ case RecvState::APP: {
+ // During states where a packet is being received, as much as is expected but never
+ // more than MAX_RESERVE_AHEAD bytes in addition to what is received so far.
+ // This means attackers that want to cause us to waste allocated memory are limited
+ // to MAX_RESERVE_AHEAD above the largest allowed message contents size, and to
+ // MAX_RESERVE_AHEAD more than they've actually sent us.
+ size_t alloc_add = std::min(max_read, msg_bytes.size() + MAX_RESERVE_AHEAD);
+ m_recv_buffer.reserve(m_recv_buffer.size() + alloc_add);
+ break;
+ }
+ case RecvState::APP_READY:
+ // The buffer is empty in this state.
+ Assume(m_recv_buffer.empty());
+ break;
+ case RecvState::V1:
+ // Should have bailed out above.
+ Assume(false);
+ break;
+ }
+ }
+
+ // Can't read more than provided input.
+ max_read = std::min(msg_bytes.size(), max_read);
+ // Copy data to buffer.
+ m_recv_buffer.insert(m_recv_buffer.end(), UCharCast(msg_bytes.data()), UCharCast(msg_bytes.data() + max_read));
+ msg_bytes = msg_bytes.subspan(max_read);
+
+ // Process data in the buffer.
+ switch (m_recv_state) {
+ case RecvState::KEY_MAYBE_V1:
+ ProcessReceivedMaybeV1Bytes();
+ if (m_recv_state == RecvState::V1) return true;
+ break;
+
+ case RecvState::KEY:
+ if (!ProcessReceivedKeyBytes()) return false;
+ break;
+
+ case RecvState::GARB_GARBTERM:
+ if (!ProcessReceivedGarbageBytes()) return false;
+ break;
+
+ case RecvState::GARBAUTH:
+ case RecvState::VERSION:
+ case RecvState::APP:
+ if (!ProcessReceivedPacketBytes()) return false;
+ break;
+
+ case RecvState::APP_READY:
+ return true;
+
+ case RecvState::V1:
+ // We should have bailed out before.
+ Assume(false);
+ break;
+ }
+ // Make sure we have made progress before continuing.
+ Assume(max_read > 0);
+ }
+
+ return true;
+}
+
+std::optional<std::string> V2Transport::GetMessageType(Span<const uint8_t>& contents) noexcept
+{
+ if (contents.size() == 0) return std::nullopt; // Empty contents
+ uint8_t first_byte = contents[0];
+ contents = contents.subspan(1); // Strip first byte.
+
+ if (first_byte != 0) {
+ // Short (1 byte) encoding.
+ if (first_byte < std::size(V2_MESSAGE_IDS)) {
+ // Valid short message id.
+ return V2_MESSAGE_IDS[first_byte];
+ } else {
+ // Unknown short message id.
+ return std::nullopt;
+ }
+ }
+
+ if (contents.size() < CMessageHeader::COMMAND_SIZE) {
+ return std::nullopt; // Long encoding needs 12 message type bytes.
+ }
+
+ size_t msg_type_len{0};
+ while (msg_type_len < CMessageHeader::COMMAND_SIZE && contents[msg_type_len] != 0) {
+ // Verify that message type bytes before the first 0x00 are in range.
+ if (contents[msg_type_len] < ' ' || contents[msg_type_len] > 0x7F) {
+ return {};
+ }
+ ++msg_type_len;
+ }
+ std::string ret{reinterpret_cast<const char*>(contents.data()), msg_type_len};
+ while (msg_type_len < CMessageHeader::COMMAND_SIZE) {
+ // Verify that message type bytes after the first 0x00 are also 0x00.
+ if (contents[msg_type_len] != 0) return {};
+ ++msg_type_len;
+ }
+ // Strip message type bytes of contents.
+ contents = contents.subspan(CMessageHeader::COMMAND_SIZE);
+ return {std::move(ret)};
+}
+
+CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) noexcept
+{
+ AssertLockNotHeld(m_recv_mutex);
+ LOCK(m_recv_mutex);
+ if (m_recv_state == RecvState::V1) return m_v1_fallback.GetReceivedMessage(time, reject_message);
+
+ Assume(m_recv_state == RecvState::APP_READY);
+ Span<const uint8_t> contents{m_recv_decode_buffer};
+ auto msg_type = GetMessageType(contents);
+ CDataStream ret(m_recv_type, m_recv_version);
+ CNetMessage msg{std::move(ret)};
+ // Note that BIP324Cipher::EXPANSION also includes the length descriptor size.
+ msg.m_raw_message_size = m_recv_decode_buffer.size() + BIP324Cipher::EXPANSION;
+ if (msg_type) {
+ reject_message = false;
+ msg.m_type = std::move(*msg_type);
+ msg.m_time = time;
+ msg.m_message_size = contents.size();
+ msg.m_recv.resize(contents.size());
+ std::copy(contents.begin(), contents.end(), UCharCast(msg.m_recv.data()));
+ } else {
+ LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid);
+ reject_message = true;
+ }
+ m_recv_decode_buffer = {};
+ SetReceiveState(RecvState::APP);
+
+ return msg;
+}
+
+bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_send_state == SendState::V1) return m_v1_fallback.SetMessageToSend(msg);
+ // We only allow adding a new message to be sent when in the READY state (so the packet cipher
+ // is available) and the send buffer is empty. This limits the number of messages in the send
+ // buffer to just one, and leaves the responsibility for queueing them up to the caller.
+ if (!(m_send_state == SendState::READY && m_send_buffer.empty())) return false;
+ // Construct contents (encoding message type + payload).
+ std::vector<uint8_t> contents;
+ auto short_message_id = V2_MESSAGE_MAP(msg.m_type);
+ if (short_message_id) {
+ contents.resize(1 + msg.data.size());
+ contents[0] = *short_message_id;
+ std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1);
+ } else {
+ // Initialize with zeroes, and then write the message type string starting at offset 1.
+ // This means contents[0] and the unused positions in contents[1..13] remain 0x00.
+ contents.resize(1 + CMessageHeader::COMMAND_SIZE + msg.data.size(), 0);
+ std::copy(msg.m_type.begin(), msg.m_type.end(), contents.data() + 1);
+ std::copy(msg.data.begin(), msg.data.end(), contents.begin() + 1 + CMessageHeader::COMMAND_SIZE);
+ }
+ // Construct ciphertext in send buffer.
+ m_send_buffer.resize(contents.size() + BIP324Cipher::EXPANSION);
+ m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer));
+ m_send_type = msg.m_type;
+ // Release memory
+ msg.data = {};
+ return true;
+}
+
+Transport::BytesToSend V2Transport::GetBytesToSend(bool have_next_message) const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_send_state == SendState::V1) return m_v1_fallback.GetBytesToSend(have_next_message);
+
+ // We do not send anything in MAYBE_V1 state (as we don't know if the peer is v1 or v2),
+ // despite there being data in the send buffer in that state.
+ if (m_send_state == SendState::MAYBE_V1) return {{}, false, m_send_type};
+ Assume(m_send_pos <= m_send_buffer.size());
+ return {
+ Span{m_send_buffer}.subspan(m_send_pos),
+ // We only have more to send after the current m_send_buffer if there is a (next)
+ // message to be sent, and we're capable of sending packets. */
+ have_next_message && m_send_state == SendState::READY,
+ m_send_type
+ };
+}
+
+void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent);
+
+ m_send_pos += bytes_sent;
+ Assume(m_send_pos <= m_send_buffer.size());
+ // Only wipe the buffer when everything is sent in the READY state. In the AWAITING_KEY state
+ // we still need the garbage that's in the send buffer to construct the garbage authentication
+ // packet.
+ if (m_send_state == SendState::READY && m_send_pos == m_send_buffer.size()) {
+ m_send_pos = 0;
+ m_send_buffer = {};
+ }
+}
+
+size_t V2Transport::GetSendMemoryUsage() const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ LOCK(m_send_mutex);
+ if (m_send_state == SendState::V1) return m_v1_fallback.GetSendMemoryUsage();
+
+ return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer);
+}
+
+std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
-
- while (it != node.vSendMsg.end()) {
- const auto& data = *it;
- assert(data.size() > node.nSendOffset);
+ bool data_left{false}; //!< second return value (whether unsent data remains)
+ std::optional<bool> expected_more;
+
+ while (true) {
+ if (it != node.vSendMsg.end()) {
+ // If possible, move one message from the send queue to the transport. This fails when
+ // there is an existing message still being sent, or (for v2 transports) when the
+ // handshake has not yet completed.
+ size_t memusage = it->GetMemoryUsage();
+ if (node.m_transport->SetMessageToSend(*it)) {
+ // Update memory usage of send buffer (as *it will be deleted).
+ node.m_send_memusage -= memusage;
+ ++it;
+ }
+ }
+ const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end());
+ // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
+ // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
+ // verify that the previously returned 'more' was correct.
+ if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
+ expected_more = more;
+ data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
- {
+ if (!data.empty()) {
LOCK(node.m_sock_mutex);
+ // There is no socket in case we've already disconnected, or in test cases without
+ // real connections. In these cases, we bail out immediately and just leave things
+ // in the send queue and transport.
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
- if (it + 1 != node.vSendMsg.end()) {
+ if (more) {
flags |= MSG_MORE;
}
#endif
- nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
+ nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
- node.nSendOffset += nBytes;
+ // Notify transport that bytes have been processed.
+ node.m_transport->MarkBytesSent(nBytes);
+ // Update statistics per message type.
+ node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
- if (node.nSendOffset == data.size()) {
- node.nSendOffset = 0;
- node.nSendSize -= data.size();
- node.fPauseSend = node.nSendSize > nSendBufferMaxSize;
- it++;
- } else {
+ if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
break;
}
@@ -872,17 +1635,17 @@ size_t CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
- // couldn't send anything at all
break;
}
}
+ node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
+
if (it == node.vSendMsg.end()) {
- assert(node.nSendOffset == 0);
- assert(node.nSendSize == 0);
+ assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
- return nSentSize;
+ return {nSentSize, data_left};
}
/** Try to find a connection to evict when the node is full.
@@ -1135,6 +1898,9 @@ void CConnman::DisconnectNodes()
// close socket and cleanup
pnode->CloseSocketDisconnect();
+ // update connection count by network
+ if (pnode->IsManualOrFullOutboundConn()) --m_network_conn_counts[pnode->addr.GetNetwork()];
+
// hold in disconnected pool until all refs are released
pnode->Release();
m_nodes_disconnected.push_back(pnode);
@@ -1217,37 +1983,23 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
}
for (CNode* pnode : nodes) {
- // Implement the following logic:
- // * If there is data to send, select() for sending data. As this only
- // happens when optimistic write failed, we choose to first drain the
- // write buffer in this case before receiving more. This avoids
- // needlessly queueing received data, if the remote peer is not themselves
- // receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is space left in the receive buffer, select() for
- // receiving data.
- // * Hand off all complete messages to the processor, to be handled without
- // blocking here.
-
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
- select_send = !pnode->vSendMsg.empty();
+ // Sending is possible if either there are bytes to send right now, or if there will be
+ // once a potential message from vSendMsg is handed to the transport. GetBytesToSend
+ // determines both of these in a single call.
+ const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty());
+ select_send = !to_send.empty() || more;
}
+ if (!select_recv && !select_send) continue;
LOCK(pnode->m_sock_mutex);
- if (!pnode->m_sock) {
- continue;
- }
-
- Sock::Event requested{0};
- if (select_send) {
- requested = Sock::SEND;
- } else if (select_recv) {
- requested = Sock::RECV;
+ if (pnode->m_sock) {
+ Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
+ events_per_sock.emplace(pnode->m_sock, Sock::Events{event});
}
-
- events_per_sock.emplace(pnode->m_sock, Sock::Events{requested});
}
return events_per_sock;
@@ -1308,6 +2060,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
errorSet = it->second.occurred & Sock::ERR;
}
}
+
+ if (sendSet) {
+ // Send data
+ auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
+ if (bytes_sent) {
+ RecordBytesSent(bytes_sent);
+
+ // If both receiving and (non-optimistic) sending were possible, we first attempt
+ // sending. If that succeeds, but does not fully drain the send queue, do not
+ // attempt to receive. This avoids needlessly queueing data if the remote peer
+ // is slow at receiving data, by means of TCP flow control. We only do this when
+ // sending actually succeeded to make sure progress is always made; otherwise a
+ // deadlock would be possible when both sides have data to send, but neither is
+ // receiving.
+ if (data_left) recvSet = false;
+ }
+ }
+
if (recvSet || errorSet)
{
// typical socket buffer is 8K-64K
@@ -1354,12 +2124,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
}
- if (sendSet) {
- // Send data
- size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
- if (bytes_sent) RecordBytesSent(bytes_sent);
- }
-
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
@@ -1381,7 +2145,6 @@ void CConnman::ThreadSocketHandler()
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
- SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET);
while (!interruptNet)
{
DisconnectNodes();
@@ -1401,7 +2164,6 @@ void CConnman::WakeMessageHandler()
void CConnman::ThreadDNSAddressSeed()
{
- SetSyscallSandboxPolicy(SyscallSandboxPolicy::INITIALIZATION_DNS_SEED);
FastRandomContext rng;
std::vector<std::string> seeds = Params().DNSSeeds();
Shuffle(seeds.begin(), seeds.end(), rng);
@@ -1604,10 +2366,31 @@ std::unordered_set<Network> CConnman::GetReachableEmptyNetworks() const
return networks;
}
+bool CConnman::MultipleManualOrFullOutboundConns(Network net) const
+{
+ AssertLockHeld(m_nodes_mutex);
+ return m_network_conn_counts[net] > 1;
+}
+
+bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
+{
+ std::array<Network, 5> nets{NET_IPV4, NET_IPV6, NET_ONION, NET_I2P, NET_CJDNS};
+ Shuffle(nets.begin(), nets.end(), FastRandomContext());
+
+ LOCK(m_nodes_mutex);
+ for (const auto net : nets) {
+ if (IsReachable(net) && m_network_conn_counts[net] == 0 && addrman.Size(net) != 0) {
+ network = net;
+ return true;
+ }
+ }
+
+ return false;
+}
+
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
- SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET_OPEN_CONNECTION);
FastRandomContext rng;
// Connect to specific addresses
if (!connect.empty())
@@ -1635,8 +2418,10 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
// Minimum time before next feeler connection (in microseconds).
auto next_feeler = GetExponentialRand(start, FEELER_INTERVAL);
auto next_extra_block_relay = GetExponentialRand(start, EXTRA_BLOCK_RELAY_ONLY_PEER_INTERVAL);
+ auto next_extra_network_peer{GetExponentialRand(start, EXTRA_NETWORK_PEER_INTERVAL)};
const bool dnsseed = gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED);
bool add_fixed_seeds = gArgs.GetBoolArg("-fixedseeds", DEFAULT_FIXEDSEEDS);
+ const bool use_seednodes{gArgs.IsArgSet("-seednode")};
if (!add_fixed_seeds) {
LogPrintf("Fixed seeds are disabled\n");
@@ -1666,12 +2451,12 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
LogPrintf("Adding fixed seeds as 60 seconds have passed and addrman is empty for at least one reachable network\n");
}
- // Checking !dnsseed is cheaper before locking 2 mutexes.
- if (!add_fixed_seeds_now && !dnsseed) {
- LOCK2(m_addr_fetches_mutex, m_added_nodes_mutex);
- if (m_addr_fetches.empty() && m_added_nodes.empty()) {
+ // Perform cheap checks before locking a mutex.
+ else if (!dnsseed && !use_seednodes) {
+ LOCK(m_added_nodes_mutex);
+ if (m_added_nodes.empty()) {
add_fixed_seeds_now = true;
- LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet), -addnode is not provided and all -seednode(s) attempted\n");
+ LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n");
}
}
@@ -1746,6 +2531,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
auto now = GetTime<std::chrono::microseconds>();
bool anchor = false;
bool fFeeler = false;
+ std::optional<Network> preferred_net;
// Determine what type of connection to open. Opening
// BLOCK_RELAY connections to addresses from anchors.dat gets the highest
@@ -1795,6 +2581,17 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
next_feeler = GetExponentialRand(now, FEELER_INTERVAL);
conn_type = ConnectionType::FEELER;
fFeeler = true;
+ } else if (nOutboundFullRelay == m_max_outbound_full_relay &&
+ m_max_outbound_full_relay == MAX_OUTBOUND_FULL_RELAY_CONNECTIONS &&
+ now > next_extra_network_peer &&
+ MaybePickPreferredNetwork(preferred_net)) {
+ // Full outbound connection management: Attempt to get at least one
+ // outbound peer from each reachable network by making extra connections
+ // and then protecting "only" peers from a network during outbound eviction.
+ // This is not attempted if the user changed -maxconnections to a value
+ // so low that less than MAX_OUTBOUND_FULL_RELAY_CONNECTIONS are made,
+ // to prevent interactions with otherwise protected outbound peers.
+ next_extra_network_peer = GetExponentialRand(now, EXTRA_NETWORK_PEER_INTERVAL);
} else {
// skip to next iteration of while loop
continue;
@@ -1848,12 +2645,15 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
}
} else {
// Not a feeler
- std::tie(addr, addr_last_try) = addrman.Select();
+ // If preferred_net has a value set, pick an extra outbound
+ // peer from that network. The eviction logic in net_processing
+ // ensures that a peer from another network will be evicted.
+ std::tie(addr, addr_last_try) = addrman.Select(false, preferred_net);
}
// Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups
if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) {
- break;
+ continue;
}
// if we selected an invalid or local address, restart
@@ -1895,6 +2695,9 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
}
LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToStringAddrPort());
}
+
+ if (preferred_net != std::nullopt) LogPrint(BCLog::NET, "Making network specific connection to %s on %s.\n", addrConnect.ToStringAddrPort(), GetNetworkName(preferred_net.value()));
+
// Record addrman failure attempts when node has at least 2 persistent outbound connections to peers with
// different netgroups in ipv4/ipv6 networks + all peers in Tor/I2P/CJDNS networks.
// Don't record addrman failure attempts when node is offline. This can be identified since all local
@@ -1975,7 +2778,6 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
void CConnman::ThreadOpenAddedConnections()
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
- SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET_ADD_CONNECTION);
while (true)
{
CSemaphoreGrant grant(*semAddnode);
@@ -2035,6 +2837,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
{
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
+
+ // update connection count by network
+ if (pnode->IsManualOrFullOutboundConn()) ++m_network_conn_counts[pnode->addr.GetNetwork()];
}
}
@@ -2044,7 +2849,6 @@ void CConnman::ThreadMessageHandler()
{
LOCK(NetEventsInterface::g_msgproc_mutex);
- SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER);
while (!flagInterruptMsgProc)
{
bool fMoreWork = false;
@@ -2530,7 +3334,7 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres
auto local_socket_bytes = requestor.addrBind.GetAddrBytes();
uint64_t cache_id = GetDeterministicRandomizer(RANDOMIZER_ID_ADDRCACHE)
.Write(requestor.ConnectedThroughNetwork())
- .Write(local_socket_bytes.data(), local_socket_bytes.size())
+ .Write(local_socket_bytes)
// For outbound connections, the port of the bound address is randomly
// assigned by the OS and would therefore not be useful for seeding.
.Write(requestor.IsInboundConn() ? requestor.addrBind.GetPort() : 0)
@@ -2783,8 +3587,7 @@ CNode::CNode(NodeId idIn,
ConnectionType conn_type_in,
bool inbound_onion,
CNodeOptions&& node_opts)
- : m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))},
- m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())},
+ : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
m_permission_flags{node_opts.permission_flags},
m_sock{sock},
m_connected{GetTime<std::chrono::seconds>()},
@@ -2867,26 +3670,31 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
msg.data.data()
);
- // make sure we use the appropriate network transport format
- std::vector<unsigned char> serializedHeader;
- pnode->m_serializer->prepareForTransport(msg, serializedHeader);
- size_t nTotalSize = nMessageSize + serializedHeader.size();
-
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
- bool optimisticSend(pnode->vSendMsg.empty());
-
- //log total amount of bytes per message type
- pnode->AccountForSentBytes(msg.m_type, nTotalSize);
- pnode->nSendSize += nTotalSize;
-
- if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
- pnode->vSendMsg.push_back(std::move(serializedHeader));
- if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
-
- // If write queue empty, attempt "optimistic write"
- if (optimisticSend) nBytesSent = SocketSendData(*pnode);
+ // Check if the transport still has unsent bytes, and indicate to it that we're about to
+ // give it a message to send.
+ const auto& [to_send, more, _msg_type] =
+ pnode->m_transport->GetBytesToSend(/*have_next_message=*/true);
+ const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
+
+ // Update memory usage of send buffer.
+ pnode->m_send_memusage += msg.GetMemoryUsage();
+ if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
+ // Move message to vSendMsg queue.
+ pnode->vSendMsg.push_back(std::move(msg));
+
+ // If there was nothing to send before, and there is now (predicted by the "more" value
+ // returned by the GetBytesToSend call above), attempt "optimistic write":
+ // because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
+ // doing a send, try sending from the calling thread if the queue was empty before.
+ // With a V1Transport, more will always be true here, because adding a message always
+ // results in sendable bytes there, but with V2Transport this is not the case (it may
+ // still be in the handshake).
+ if (queue_was_empty && more) {
+ std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
+ }
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}
@@ -2913,7 +3721,7 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
{
std::vector<unsigned char> vchNetGroup(m_netgroupman.GetGroup(address));
- return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
+ return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize();
}
void CaptureMessageToFile(const CAddress& addr,
@@ -2938,13 +3746,13 @@ void CaptureMessageToFile(const CAddress& addr,
AutoFile f{fsbridge::fopen(path, "ab")};
ser_writedata64(f, now.count());
- f.write(MakeByteSpan(msg_type));
+ f << Span{msg_type};
for (auto i = msg_type.length(); i < CMessageHeader::COMMAND_SIZE; ++i) {
f << uint8_t{'\0'};
}
uint32_t size = data.size();
ser_writedata32(f, size);
- f.write(AsBytes(data));
+ f << data;
}
std::function<void(const CAddress& addr,