aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp348
1 files changed, 225 insertions, 123 deletions
diff --git a/src/net.cpp b/src/net.cpp
index bfa2738e45..6b2ef5f43d 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -35,6 +35,7 @@
#include <util/threadinterrupt.h>
#include <util/trace.h>
#include <util/translation.h>
+#include <util/vector.h>
#ifdef WIN32
#include <string.h>
@@ -157,39 +158,34 @@ uint16_t GetListenPort()
return static_cast<uint16_t>(gArgs.GetIntArg("-port", Params().GetDefaultPort()));
}
-// find 'best' local address for a particular peer
-bool GetLocal(CService& addr, const CNode& peer)
+// Determine the "best" local address for a particular peer.
+[[nodiscard]] static std::optional<CService> GetLocal(const CNode& peer)
{
- if (!fListen)
- return false;
+ if (!fListen) return std::nullopt;
+ std::optional<CService> addr;
int nBestScore = -1;
int nBestReachability = -1;
{
LOCK(g_maplocalhost_mutex);
- for (const auto& entry : mapLocalHost)
- {
+ for (const auto& [local_addr, local_service_info] : mapLocalHost) {
// For privacy reasons, don't advertise our privacy-network address
// to other networks and don't advertise our other-network address
// to privacy networks.
- const Network our_net{entry.first.GetNetwork()};
- const Network peers_net{peer.ConnectedThroughNetwork()};
- if (our_net != peers_net &&
- (our_net == NET_ONION || our_net == NET_I2P ||
- peers_net == NET_ONION || peers_net == NET_I2P)) {
+ if (local_addr.GetNetwork() != peer.ConnectedThroughNetwork()
+ && (local_addr.IsPrivacyNet() || peer.IsConnectedThroughPrivacyNet())) {
continue;
}
- int nScore = entry.second.nScore;
- int nReachability = entry.first.GetReachabilityFrom(peer.addr);
- if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore))
- {
- addr = CService(entry.first, entry.second.nPort);
+ const int nScore{local_service_info.nScore};
+ const int nReachability{local_addr.GetReachabilityFrom(peer.addr)};
+ if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) {
+ addr.emplace(CService{local_addr, local_service_info.nPort});
nBestReachability = nReachability;
nBestScore = nScore;
}
}
}
- return nBestScore >= 0;
+ return addr;
}
//! Convert the serialized seeds into usable address objects.
@@ -215,17 +211,13 @@ static std::vector<CAddress> ConvertSeeds(const std::vector<uint8_t> &vSeedsIn)
return vSeedsOut;
}
-// get best local address for a particular peer as a CAddress
-// Otherwise, return the unroutable 0.0.0.0 but filled in with
+// Determine the "best" local address for a particular peer.
+// If none, return the unroutable 0.0.0.0 but filled in with
// the normal parameters, since the IP may be changed to a useful
// one by discovery.
CService GetLocalAddress(const CNode& peer)
{
- CService addr;
- if (GetLocal(addr, peer)) {
- return addr;
- }
- return CService{CNetAddr(), GetListenPort()};
+ return GetLocal(peer).value_or(CService{CNetAddr(), GetListenPort()});
}
static int GetnScore(const CService& addr)
@@ -236,7 +228,7 @@ static int GetnScore(const CService& addr)
}
// Is our peer's addrLocal potentially useful as an external IP source?
-bool IsPeerAddrLocalGood(CNode *pnode)
+[[nodiscard]] static bool IsPeerAddrLocalGood(CNode *pnode)
{
CService addrLocal = pnode->GetAddrLocal();
return fDiscover && pnode->addr.IsRoutable() && addrLocal.IsRoutable() &&
@@ -288,7 +280,7 @@ std::optional<CService> GetLocalAddrForPeer(CNode& node)
CService MaybeFlipIPv6toCJDNS(const CService& service)
{
CService ret{service};
- if (ret.m_net == NET_IPV6 && ret.m_addr[0] == 0xfc && IsReachable(NET_CJDNS)) {
+ if (ret.IsIPv6() && ret.HasCJDNSPrefix() && IsReachable(NET_CJDNS)) {
ret.m_net = NET_CJDNS;
}
return ret;
@@ -447,7 +439,7 @@ static CAddress GetBindAddress(const Sock& sock)
return addr_bind;
}
-CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type)
+CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@@ -465,7 +457,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
}
}
- LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying connection %s lastseen=%.1fhrs\n",
+ LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "trying %s connection %s lastseen=%.1fhrs\n",
+ use_v2transport ? "v2" : "v1",
pszDest ? pszDest : addrConnect.ToStringAddrPort(),
Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime));
@@ -504,7 +497,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
const bool use_proxy{GetProxy(addrConnect.GetNetwork(), proxy)};
bool proxyConnectionFailed = false;
- if (addrConnect.GetNetwork() == NET_I2P && use_proxy) {
+ if (addrConnect.IsI2P() && use_proxy) {
i2p::Connection conn;
if (m_i2p_sam_session) {
@@ -588,6 +581,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
CNodeOptions{
.i2p_sam_session = std::move(i2p_transient_session),
.recv_flood_size = nReceiveFloodSize,
+ .use_v2transport = use_v2transport,
});
pnode->AddRef();
@@ -636,6 +630,11 @@ Network CNode::ConnectedThroughNetwork() const
return m_inbound_onion ? NET_ONION : addr.GetNetClass();
}
+bool CNode::IsConnectedThroughPrivacyNet() const
+{
+ return m_inbound_onion || addr.IsPrivacyNet();
+}
+
#undef X
#define X(name) stats.name = name
void CNode::CopyStats(CNodeStats& stats)
@@ -668,6 +667,9 @@ void CNode::CopyStats(CNodeStats& stats)
LOCK(cs_vRecv);
X(mapRecvBytesPerMsgType);
X(nRecvBytes);
+ Transport::Info info = m_transport->GetInfo();
+ stats.m_transport_type = info.transport_type;
+ if (info.session_id) stats.m_session_id = HexStr(*info.session_id);
}
X(m_permission_flags);
@@ -735,6 +737,11 @@ V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noex
Reset();
}
+Transport::Info V1Transport::GetInfo() const noexcept
+{
+ return {.transport_type = TransportProtocolType::V1, .session_id = {}};
+}
+
int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
{
AssertLockHeld(m_recv_mutex);
@@ -858,7 +865,7 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
// serialize header
m_header_to_send.clear();
- CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
+ CVectorWriter{INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
// update state
m_message_to_send = std::move(msg);
@@ -899,8 +906,7 @@ void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
m_bytes_sent = 0;
} else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
// We're done sending a message's data. Wipe the data vector to reduce memory consumption.
- m_message_to_send.data.clear();
- m_message_to_send.data.shrink_to_fit();
+ ClearShrink(m_message_to_send.data);
m_bytes_sent = 0;
}
}
@@ -1006,8 +1012,7 @@ void V2Transport::StartSendingHandshake() noexcept
m_send_buffer.resize(EllSwiftPubKey::size() + m_send_garbage.size());
std::copy(std::begin(m_cipher.GetOurPubKey()), std::end(m_cipher.GetOurPubKey()), MakeWritableByteSpan(m_send_buffer).begin());
std::copy(m_send_garbage.begin(), m_send_garbage.end(), m_send_buffer.begin() + EllSwiftPubKey::size());
- // We cannot wipe m_send_garbage as it will still be used to construct the garbage
- // authentication packet.
+ // We cannot wipe m_send_garbage as it will still be used as AAD later in the handshake.
}
V2Transport::V2Transport(NodeId nodeid, bool initiating, int type_in, int version_in, const CKey& key, Span<const std::byte> ent32, std::vector<uint8_t> garbage) noexcept :
@@ -1041,9 +1046,6 @@ void V2Transport::SetReceiveState(RecvState recv_state) noexcept
Assume(recv_state == RecvState::GARB_GARBTERM);
break;
case RecvState::GARB_GARBTERM:
- Assume(recv_state == RecvState::GARBAUTH);
- break;
- case RecvState::GARBAUTH:
Assume(recv_state == RecvState::VERSION);
break;
case RecvState::VERSION:
@@ -1123,8 +1125,8 @@ void V2Transport::ProcessReceivedMaybeV1Bytes() noexcept
SetReceiveState(RecvState::V1);
SetSendState(SendState::V1);
// Reset v2 transport buffers to save memory.
- m_recv_buffer = {};
- m_send_buffer = {};
+ ClearShrink(m_recv_buffer);
+ ClearShrink(m_send_buffer);
} else {
// We have not received enough to distinguish v1 from v2 yet. Wait until more bytes come.
}
@@ -1175,25 +1177,15 @@ bool V2Transport::ProcessReceivedKeyBytes() noexcept
m_cipher.GetSendGarbageTerminator().end(),
MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN).begin());
- // Construct garbage authentication packet in the send buffer (using the garbage data which
- // is still there).
- m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION);
- m_cipher.Encrypt(
- /*contents=*/{},
- /*aad=*/MakeByteSpan(m_send_garbage),
- /*ignore=*/false,
- /*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION));
- // We no longer need the garbage.
- m_send_garbage.clear();
- m_send_garbage.shrink_to_fit();
-
- // Construct version packet in the send buffer.
+ // Construct version packet in the send buffer, with the sent garbage data as AAD.
m_send_buffer.resize(m_send_buffer.size() + BIP324Cipher::EXPANSION + VERSION_CONTENTS.size());
m_cipher.Encrypt(
/*contents=*/VERSION_CONTENTS,
- /*aad=*/{},
+ /*aad=*/MakeByteSpan(m_send_garbage),
/*ignore=*/false,
/*output=*/MakeWritableByteSpan(m_send_buffer).last(BIP324Cipher::EXPANSION + VERSION_CONTENTS.size()));
+ // We no longer need the garbage.
+ ClearShrink(m_send_garbage);
} else {
// We still have to receive more key bytes.
}
@@ -1207,11 +1199,11 @@ bool V2Transport::ProcessReceivedGarbageBytes() noexcept
Assume(m_recv_buffer.size() <= MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
if (m_recv_buffer.size() >= BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
if (MakeByteSpan(m_recv_buffer).last(BIP324Cipher::GARBAGE_TERMINATOR_LEN) == m_cipher.GetReceiveGarbageTerminator()) {
- // Garbage terminator received. Switch to receiving garbage authentication packet.
- m_recv_garbage = std::move(m_recv_buffer);
- m_recv_garbage.resize(m_recv_garbage.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN);
+ // Garbage terminator received. Store garbage to authenticate it as AAD later.
+ m_recv_aad = std::move(m_recv_buffer);
+ m_recv_aad.resize(m_recv_aad.size() - BIP324Cipher::GARBAGE_TERMINATOR_LEN);
m_recv_buffer.clear();
- SetReceiveState(RecvState::GARBAUTH);
+ SetReceiveState(RecvState::VERSION);
} else if (m_recv_buffer.size() == MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN) {
// We've reached the maximum length for garbage + garbage terminator, and the
// terminator still does not match. Abort.
@@ -1230,8 +1222,7 @@ bool V2Transport::ProcessReceivedGarbageBytes() noexcept
bool V2Transport::ProcessReceivedPacketBytes() noexcept
{
AssertLockHeld(m_recv_mutex);
- Assume(m_recv_state == RecvState::GARBAUTH || m_recv_state == RecvState::VERSION ||
- m_recv_state == RecvState::APP);
+ Assume(m_recv_state == RecvState::VERSION || m_recv_state == RecvState::APP);
// The maximum permitted contents length for a packet, consisting of:
// - 0x00 byte: indicating long message type encoding
@@ -1254,50 +1245,42 @@ bool V2Transport::ProcessReceivedPacketBytes() noexcept
// as GetMaxBytesToProcess only allows up to LENGTH_LEN into the buffer before that point.
m_recv_decode_buffer.resize(m_recv_len);
bool ignore{false};
- Span<const std::byte> aad;
- if (m_recv_state == RecvState::GARBAUTH) aad = MakeByteSpan(m_recv_garbage);
bool ret = m_cipher.Decrypt(
/*input=*/MakeByteSpan(m_recv_buffer).subspan(BIP324Cipher::LENGTH_LEN),
- /*aad=*/aad,
+ /*aad=*/MakeByteSpan(m_recv_aad),
/*ignore=*/ignore,
/*contents=*/MakeWritableByteSpan(m_recv_decode_buffer));
if (!ret) {
LogPrint(BCLog::NET, "V2 transport error: packet decryption failure (%u bytes), peer=%d\n", m_recv_len, m_nodeid);
return false;
}
+ // We have decrypted a valid packet with the AAD we expected, so clear the expected AAD.
+ ClearShrink(m_recv_aad);
// Feed the last 4 bytes of the Poly1305 authentication tag (and its timing) into our RNG.
RandAddEvent(ReadLE32(m_recv_buffer.data() + m_recv_buffer.size() - 4));
- // At this point we have a valid packet decrypted into m_recv_decode_buffer. Depending on
- // the current state, decide what to do with it.
- switch (m_recv_state) {
- case RecvState::GARBAUTH:
- // Ignore flag does not matter for garbage authentication. Any valid packet functions
- // as authentication. Receive and process the version packet next.
- SetReceiveState(RecvState::VERSION);
- m_recv_garbage = {};
- break;
- case RecvState::VERSION:
- if (!ignore) {
+ // At this point we have a valid packet decrypted into m_recv_decode_buffer. If it's not a
+ // decoy, which we simply ignore, use the current state to decide what to do with it.
+ if (!ignore) {
+ switch (m_recv_state) {
+ case RecvState::VERSION:
// Version message received; transition to application phase. The contents is
// ignored, but can be used for future extensions.
SetReceiveState(RecvState::APP);
- }
- break;
- case RecvState::APP:
- if (!ignore) {
+ break;
+ case RecvState::APP:
// Application message decrypted correctly. It can be extracted using GetMessage().
SetReceiveState(RecvState::APP_READY);
+ break;
+ default:
+ // Any other state is invalid (this function should not have been called).
+ Assume(false);
}
- break;
- default:
- // Any other state is invalid (this function should not have been called).
- Assume(false);
}
// Wipe the receive buffer where the next packet will be received into.
- m_recv_buffer = {};
+ ClearShrink(m_recv_buffer);
// In all but APP_READY state, we can wipe the decoded contents.
- if (m_recv_state != RecvState::APP_READY) m_recv_decode_buffer = {};
+ if (m_recv_state != RecvState::APP_READY) ClearShrink(m_recv_decode_buffer);
} else {
// We either have less than 3 bytes, so we don't know the packet's length yet, or more
// than 3 bytes but less than the packet's full ciphertext. Wait until those arrive.
@@ -1328,7 +1311,6 @@ size_t V2Transport::GetMaxBytesToProcess() noexcept
case RecvState::GARB_GARBTERM:
// Process garbage bytes one by one (because terminator may appear anywhere).
return 1;
- case RecvState::GARBAUTH:
case RecvState::VERSION:
case RecvState::APP:
// These three states all involve decoding a packet. Process the length descriptor first,
@@ -1382,7 +1364,6 @@ bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept
// bytes).
m_recv_buffer.reserve(MAX_GARBAGE_LEN + BIP324Cipher::GARBAGE_TERMINATOR_LEN);
break;
- case RecvState::GARBAUTH:
case RecvState::VERSION:
case RecvState::APP: {
// During states where a packet is being received, as much as is expected but never
@@ -1426,7 +1407,6 @@ bool V2Transport::ReceivedBytes(Span<const uint8_t>& msg_bytes) noexcept
if (!ProcessReceivedGarbageBytes()) return false;
break;
- case RecvState::GARBAUTH:
case RecvState::VERSION:
case RecvState::APP:
if (!ProcessReceivedPacketBytes()) return false;
@@ -1511,7 +1491,7 @@ CNetMessage V2Transport::GetReceivedMessage(std::chrono::microseconds time, bool
LogPrint(BCLog::NET, "V2 transport error: invalid message type (%u bytes contents), peer=%d\n", m_recv_decode_buffer.size(), m_nodeid);
reject_message = true;
}
- m_recv_decode_buffer = {};
+ ClearShrink(m_recv_decode_buffer);
SetReceiveState(RecvState::APP);
return msg;
@@ -1545,7 +1525,7 @@ bool V2Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
m_cipher.Encrypt(MakeByteSpan(contents), {}, false, MakeWritableByteSpan(m_send_buffer));
m_send_type = msg.m_type;
// Release memory
- msg.data = {};
+ ClearShrink(msg.data);
return true;
}
@@ -1572,15 +1552,39 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
LOCK(m_send_mutex);
if (m_send_state == SendState::V1) return m_v1_fallback.MarkBytesSent(bytes_sent);
+ if (m_send_state == SendState::AWAITING_KEY && m_send_pos == 0 && bytes_sent > 0) {
+ LogPrint(BCLog::NET, "start sending v2 handshake to peer=%d\n", m_nodeid);
+ }
+
m_send_pos += bytes_sent;
Assume(m_send_pos <= m_send_buffer.size());
+ if (m_send_pos >= CMessageHeader::HEADER_SIZE) {
+ m_sent_v1_header_worth = true;
+ }
// Wipe the buffer when everything is sent.
if (m_send_pos == m_send_buffer.size()) {
m_send_pos = 0;
- m_send_buffer = {};
+ ClearShrink(m_send_buffer);
}
}
+bool V2Transport::ShouldReconnectV1() const noexcept
+{
+ AssertLockNotHeld(m_send_mutex);
+ AssertLockNotHeld(m_recv_mutex);
+ // Only outgoing connections need reconnection.
+ if (!m_initiating) return false;
+
+ LOCK(m_recv_mutex);
+ // We only reconnect in the very first state and when the receive buffer is empty. Together
+ // these conditions imply nothing has been received so far.
+ if (m_recv_state != RecvState::KEY) return false;
+ if (!m_recv_buffer.empty()) return false;
+ // Check if we've sent enough for the other side to disconnect us (if it was V1).
+ LOCK(m_send_mutex);
+ return m_sent_v1_header_worth;
+}
+
size_t V2Transport::GetSendMemoryUsage() const noexcept
{
AssertLockNotHeld(m_send_mutex);
@@ -1590,6 +1594,27 @@ size_t V2Transport::GetSendMemoryUsage() const noexcept
return sizeof(m_send_buffer) + memusage::DynamicUsage(m_send_buffer);
}
+Transport::Info V2Transport::GetInfo() const noexcept
+{
+ AssertLockNotHeld(m_recv_mutex);
+ LOCK(m_recv_mutex);
+ if (m_recv_state == RecvState::V1) return m_v1_fallback.GetInfo();
+
+ Transport::Info info;
+
+ // Do not report v2 and session ID until the version packet has been received
+ // and verified (confirming that the other side very likely has the same keys as us).
+ if (m_recv_state != RecvState::KEY_MAYBE_V1 && m_recv_state != RecvState::KEY &&
+ m_recv_state != RecvState::GARB_GARBTERM && m_recv_state != RecvState::VERSION) {
+ info.transport_type = TransportProtocolType::V2;
+ info.session_id = uint256(MakeUCharSpan(m_cipher.GetSessionID()));
+ } else {
+ info.transport_type = TransportProtocolType::DETECTING;
+ }
+
+ return info;
+}
+
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
@@ -1639,7 +1664,9 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
// Notify transport that bytes have been processed.
node.m_transport->MarkBytesSent(nBytes);
// Update statistics per message type.
- node.AccountForSentBytes(msg_type, nBytes);
+ if (!msg_type.empty()) { // don't report v2 handshake bytes for now
+ node.AccountForSentBytes(msg_type, nBytes);
+ }
nSentSize += nBytes;
if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
@@ -1822,6 +1849,10 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
}
const bool inbound_onion = std::find(m_onion_binds.begin(), m_onion_binds.end(), addr_bind) != m_onion_binds.end();
+ // The V2Transport transparently falls back to V1 behavior when an incoming V1 connection is
+ // detected, so use it whenever we signal NODE_P2P_V2.
+ const bool use_v2transport(nodeServices & NODE_P2P_V2);
+
CNode* pnode = new CNode(id,
std::move(sock),
addr,
@@ -1835,6 +1866,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
.permission_flags = permission_flags,
.prefer_evict = discouraged,
.recv_flood_size = nReceiveFloodSize,
+ .use_v2transport = use_v2transport,
});
pnode->AddRef();
m_msgproc->InitializeNode(*pnode, nodeServices);
@@ -1883,12 +1915,19 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
CSemaphoreGrant grant(*semOutbound, true);
if (!grant) return false;
- OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
+ OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/false);
return true;
}
void CConnman::DisconnectNodes()
{
+ AssertLockNotHeld(m_nodes_mutex);
+ AssertLockNotHeld(m_reconnections_mutex);
+
+ // Use a temporary variable to accumulate desired reconnections, so we don't need
+ // m_reconnections_mutex while holding m_nodes_mutex.
+ decltype(m_reconnections) reconnections_to_add;
+
{
LOCK(m_nodes_mutex);
@@ -1911,6 +1950,19 @@ void CConnman::DisconnectNodes()
// remove from m_nodes
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
+ // Add to reconnection list if appropriate. We don't reconnect right here, because
+ // the creation of a connection is a blocking operation (up to several seconds),
+ // and we don't want to hold up the socket handler thread for that long.
+ if (pnode->m_transport->ShouldReconnectV1()) {
+ reconnections_to_add.push_back({
+ .addr_connect = pnode->addr,
+ .grant = std::move(pnode->grantOutbound),
+ .destination = pnode->m_dest,
+ .conn_type = pnode->m_conn_type,
+ .use_v2transport = false});
+ LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
+ }
+
// release outbound grant (if any)
pnode->grantOutbound.Release();
@@ -1938,6 +1990,11 @@ void CConnman::DisconnectNodes()
}
}
}
+ {
+ // Move entries from reconnections_to_add to m_reconnections.
+ LOCK(m_reconnections_mutex);
+ m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add));
+ }
}
void CConnman::NotifyNumConnectionsChanged()
@@ -2315,9 +2372,9 @@ void CConnman::ProcessAddrFetch()
m_addr_fetches.pop_front();
}
CAddress addr;
- CSemaphoreGrant grant(*semOutbound, true);
+ CSemaphoreGrant grant(*semOutbound, /*fTry=*/true);
if (grant) {
- OpenNetworkConnection(addr, false, &grant, strDest.c_str(), ConnectionType::ADDR_FETCH);
+ OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, /*use_v2transport=*/false);
}
}
@@ -2410,6 +2467,7 @@ bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
+ AssertLockNotHeld(m_reconnections_mutex);
FastRandomContext rng;
// Connect to specific addresses
if (!connect.empty())
@@ -2419,7 +2477,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
for (const std::string& strAddr : connect)
{
CAddress addr(CService(), NODE_NONE);
- OpenNetworkConnection(addr, false, nullptr, strAddr.c_str(), ConnectionType::MANUAL);
+ OpenNetworkConnection(addr, false, {}, strAddr.c_str(), ConnectionType::MANUAL, /*use_v2transport=*/false);
for (int i = 0; i < 10 && i < nLoop; i++)
{
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
@@ -2453,6 +2511,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
+ PerformReconnections();
+
CSemaphoreGrant grant(*semOutbound);
if (interruptNet)
return;
@@ -2473,7 +2533,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
// Perform cheap checks before locking a mutex.
else if (!dnsseed && !use_seednodes) {
LOCK(m_added_nodes_mutex);
- if (m_added_nodes.empty()) {
+ if (m_added_node_params.empty()) {
add_fixed_seeds_now = true;
LogPrintf("Adding fixed seeds as -dnsseed=0 (or IPv4/IPv6 connections are disabled via -onlynet) and neither -addnode nor -seednode are provided\n");
}
@@ -2722,7 +2782,9 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
// Don't record addrman failure attempts when node is offline. This can be identified since all local
// network connections (if any) belong in the same netgroup, and the size of `outbound_ipv46_peer_netgroups` would only be 1.
const bool count_failures{((int)outbound_ipv46_peer_netgroups.size() + outbound_privacy_network_peers) >= std::min(nMaxConnections - 1, 2)};
- OpenNetworkConnection(addrConnect, count_failures, &grant, /*strDest=*/nullptr, conn_type);
+ // Use BIP324 transport when both us and them have NODE_V2_P2P set.
+ const bool use_v2transport(addrConnect.nServices & GetLocalServices() & NODE_P2P_V2);
+ OpenNetworkConnection(addrConnect, count_failures, std::move(grant), /*strDest=*/nullptr, conn_type, use_v2transport);
}
}
}
@@ -2744,11 +2806,11 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
{
std::vector<AddedNodeInfo> ret;
- std::list<std::string> lAddresses(0);
+ std::list<AddedNodeParams> lAddresses(0);
{
LOCK(m_added_nodes_mutex);
- ret.reserve(m_added_nodes.size());
- std::copy(m_added_nodes.cbegin(), m_added_nodes.cend(), std::back_inserter(lAddresses));
+ ret.reserve(m_added_node_params.size());
+ std::copy(m_added_node_params.cbegin(), m_added_node_params.cend(), std::back_inserter(lAddresses));
}
@@ -2768,9 +2830,9 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
}
}
- for (const std::string& strAddNode : lAddresses) {
- CService service(LookupNumeric(strAddNode, GetDefaultPort(strAddNode)));
- AddedNodeInfo addedNode{strAddNode, CService(), false, false};
+ for (const auto& addr : lAddresses) {
+ CService service(LookupNumeric(addr.m_added_node, GetDefaultPort(addr.m_added_node)));
+ AddedNodeInfo addedNode{addr, CService(), false, false};
if (service.IsValid()) {
// strAddNode is an IP:port
auto it = mapConnected.find(service);
@@ -2781,7 +2843,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
}
} else {
// strAddNode is a name
- auto it = mapConnectedByName.find(strAddNode);
+ auto it = mapConnectedByName.find(addr.m_added_node);
if (it != mapConnectedByName.end()) {
addedNode.resolvedAddress = it->second.second;
addedNode.fConnected = true;
@@ -2797,6 +2859,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
void CConnman::ThreadOpenAddedConnections()
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
+ AssertLockNotHeld(m_reconnections_mutex);
while (true)
{
CSemaphoreGrant grant(*semAddnode);
@@ -2804,26 +2867,28 @@ void CConnman::ThreadOpenAddedConnections()
bool tried = false;
for (const AddedNodeInfo& info : vInfo) {
if (!info.fConnected) {
- if (!grant.TryAcquire()) {
+ if (!grant) {
// If we've used up our semaphore and need a new one, let's not wait here since while we are waiting
// the addednodeinfo state might change.
break;
}
tried = true;
CAddress addr(CService(), NODE_NONE);
- OpenNetworkConnection(addr, false, &grant, info.strAddedNode.c_str(), ConnectionType::MANUAL);
- if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
- return;
+ OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport);
+ if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return;
+ grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true);
}
}
// Retry every 60 seconds if a connection was attempted, otherwise two seconds
if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
return;
+ // See if any reconnections are desired.
+ PerformReconnections();
}
}
// if successful, this moves the passed grant to the constructed node
-void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound, const char *pszDest, ConnectionType conn_type)
+void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport)
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
assert(conn_type != ConnectionType::INBOUND);
@@ -2845,12 +2910,11 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
} else if (FindNode(std::string(pszDest)))
return;
- CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type);
+ CNode* pnode = ConnectNode(addrConnect, pszDest, fCountFailure, conn_type, use_v2transport);
if (!pnode)
return;
- if (grantOutbound)
- grantOutbound->MoveTo(pnode->grantOutbound);
+ pnode->grantOutbound = std::move(grant_outbound);
m_msgproc->InitializeNode(*pnode, nLocalServices);
{
@@ -3403,23 +3467,23 @@ std::vector<CAddress> CConnman::GetAddresses(CNode& requestor, size_t max_addres
return cache_entry.m_addrs_response_cache;
}
-bool CConnman::AddNode(const std::string& strNode)
+bool CConnman::AddNode(const AddedNodeParams& add)
{
LOCK(m_added_nodes_mutex);
- for (const std::string& it : m_added_nodes) {
- if (strNode == it) return false;
+ for (const auto& it : m_added_node_params) {
+ if (add.m_added_node == it.m_added_node) return false;
}
- m_added_nodes.push_back(strNode);
+ m_added_node_params.push_back(add);
return true;
}
bool CConnman::RemoveAddedNode(const std::string& strNode)
{
LOCK(m_added_nodes_mutex);
- for(std::vector<std::string>::iterator it = m_added_nodes.begin(); it != m_added_nodes.end(); ++it) {
- if (strNode == *it) {
- m_added_nodes.erase(it);
+ for (auto it = m_added_node_params.begin(); it != m_added_node_params.end(); ++it) {
+ if (strNode == it->m_added_node) {
+ m_added_node_params.erase(it);
return true;
}
}
@@ -3607,6 +3671,15 @@ ServiceFlags CConnman::GetLocalServices() const
return nLocalServices;
}
+static std::unique_ptr<Transport> MakeTransport(NodeId id, bool use_v2transport, bool inbound) noexcept
+{
+ if (use_v2transport) {
+ return std::make_unique<V2Transport>(id, /*initiating=*/!inbound, SER_NETWORK, INIT_PROTO_VERSION);
+ } else {
+ return std::make_unique<V1Transport>(id, SER_NETWORK, INIT_PROTO_VERSION);
+ }
+}
+
CNode::CNode(NodeId idIn,
std::shared_ptr<Sock> sock,
const CAddress& addrIn,
@@ -3617,13 +3690,14 @@ CNode::CNode(NodeId idIn,
ConnectionType conn_type_in,
bool inbound_onion,
CNodeOptions&& node_opts)
- : m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
+ : m_transport{MakeTransport(idIn, node_opts.use_v2transport, conn_type_in == ConnectionType::INBOUND)},
m_permission_flags{node_opts.permission_flags},
m_sock{sock},
m_connected{GetTime<std::chrono::seconds>()},
addr{addrIn},
addrBind{addrBindIn},
m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn},
+ m_dest(addrNameIn),
m_inbound_onion{inbound_onion},
m_prefer_evict{node_opts.prefer_evict},
nKeyedNetGroup{nKeyedNetGroupIn},
@@ -3754,10 +3828,38 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize();
}
-void CaptureMessageToFile(const CAddress& addr,
- const std::string& msg_type,
- Span<const unsigned char> data,
- bool is_incoming)
+void CConnman::PerformReconnections()
+{
+ AssertLockNotHeld(m_reconnections_mutex);
+ AssertLockNotHeld(m_unused_i2p_sessions_mutex);
+ while (true) {
+ // Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
+ decltype(m_reconnections) todo;
+ {
+ LOCK(m_reconnections_mutex);
+ if (m_reconnections.empty()) break;
+ todo.splice(todo.end(), m_reconnections, m_reconnections.begin());
+ }
+
+ auto& item = *todo.begin();
+ OpenNetworkConnection(item.addr_connect,
+ // We only reconnect if the first attempt to connect succeeded at
+ // connection time, but then failed after the CNode object was
+ // created. Since we already know connecting is possible, do not
+ // count failure to reconnect.
+ /*fCountFailure=*/false,
+ std::move(item.grant),
+ item.destination.empty() ? nullptr : item.destination.c_str(),
+ item.conn_type,
+ item.use_v2transport);
+ }
+}
+
+// Dump binary message to file, with timestamp.
+static void CaptureMessageToFile(const CAddress& addr,
+ const std::string& msg_type,
+ Span<const unsigned char> data,
+ bool is_incoming)
{
// Note: This function captures the message at the time of processing,
// not at socket receive/send time.