aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp361
1 files changed, 130 insertions, 231 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 490ae8db82..f14db379fb 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -23,6 +23,7 @@
#include <random.h>
#include <reverse_iterator.h>
#include <scheduler.h>
+#include <streams.h>
#include <tinyformat.h>
#include <txmempool.h>
#include <util/check.h> // For NDEBUG compile time check
@@ -71,22 +72,22 @@ static constexpr std::chrono::minutes PING_INTERVAL{2};
static const unsigned int MAX_LOCATOR_SZ = 101;
/** The maximum number of entries in an 'inv' protocol message */
static const unsigned int MAX_INV_SZ = 50000;
-/** Maximum number of in-flight transactions from a peer */
-static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
-/** Maximum number of announced transactions from a peer */
-static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
-/** How many microseconds to delay requesting transactions via txids, if we have wtxid-relaying peers */
-static constexpr std::chrono::microseconds TXID_RELAY_DELAY{std::chrono::seconds{2}};
-/** How many microseconds to delay requesting transactions from inbound peers */
-static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
+/** Maximum number of in-flight transaction requests from a peer. It is not a hard limit, but the threshold at which
+ * point the OVERLOADED_PEER_TX_DELAY kicks in. */
+static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100;
+/** Maximum number of transactions to consider for requesting, per peer. It provides a reasonable DoS limit to
+ * per-peer memory usage spent on announcements, while covering peers continuously sending INVs at the maximum
+ * rate (by our own policy, see INVENTORY_BROADCAST_PER_SECOND) for several minutes, while not receiving
+ * the actual transaction (from any peer) in response to requests for them. */
+static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 5000;
+/** How long to delay requesting transactions via txids, if we have wtxid-relaying peers */
+static constexpr auto TXID_RELAY_DELAY = std::chrono::seconds{2};
+/** How long to delay requesting transactions from non-preferred peers */
+static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2};
+/** How long to delay requesting transactions from overloaded peers (see MAX_PEER_TX_REQUEST_IN_FLIGHT). */
+static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2};
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
-/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
-static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
-/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
-static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10};
-static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
-"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
static const unsigned int MAX_GETDATA_SZ = 1000;
/** Number of blocks that can be requested at any given time from a single peer. */
@@ -374,69 +375,6 @@ struct CNodeState {
//! Time of last new block announcement
int64_t m_last_block_announcement;
- /*
- * State associated with transaction download.
- *
- * Tx download algorithm:
- *
- * When inv comes in, queue up (process_time, txid) inside the peer's
- * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
- * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
- *
- * The process_time for a transaction is set to nNow for outbound peers,
- * nNow + 2 seconds for inbound peers. This is the time at which we'll
- * consider trying to request the transaction from the peer in
- * SendMessages(). The delay for inbound peers is to allow outbound peers
- * a chance to announce before we request from inbound peers, to prevent
- * an adversary from using inbound connections to blind us to a
- * transaction (InvBlock).
- *
- * When we call SendMessages() for a given peer,
- * we will loop over the transactions in m_tx_process_time, looking
- * at the transactions whose process_time <= nNow. We'll request each
- * such transaction that we don't have already and that hasn't been
- * requested from another peer recently, up until we hit the
- * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
- * g_already_asked_for for each requested txid, storing the time of the
- * GETDATA request. We use g_already_asked_for to coordinate transaction
- * requests amongst our peers.
- *
- * For transactions that we still need but we have already recently
- * requested from some other peer, we'll reinsert (process_time, txid)
- * back into the peer's m_tx_process_time at the point in the future at
- * which the most recent GETDATA request would time out (ie
- * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
- * We add an additional delay for inbound peers, again to prefer
- * attempting download from outbound peers first.
- * We also add an extra small random delay up to 2 seconds
- * to avoid biasing some peers over others. (e.g., due to fixed ordering
- * of peer processing in ThreadMessageHandler).
- *
- * When we receive a transaction from a peer, we remove the txid from the
- * peer's m_tx_in_flight set and from their recently announced set
- * (m_tx_announced). We also clear g_already_asked_for for that entry, so
- * that if somehow the transaction is not accepted but also not added to
- * the reject filter, then we will eventually redownload from other
- * peers.
- */
- struct TxDownloadState {
- /* Track when to attempt download of announced transactions (process
- * time in micros -> txid)
- */
- std::multimap<std::chrono::microseconds, GenTxid> m_tx_process_time;
-
- //! Store all the transactions a peer has recently announced
- std::set<uint256> m_tx_announced;
-
- //! Store transactions which were requested by us, with timestamp
- std::map<uint256, std::chrono::microseconds> m_tx_in_flight;
-
- //! Periodically check for stuck getdata requests
- std::chrono::microseconds m_check_expiry_timer{0};
- };
-
- TxDownloadState m_tx_download;
-
//! Whether this peer is an inbound connection
bool m_is_inbound;
@@ -477,9 +415,6 @@ struct CNodeState {
}
};
-// Keeps track of the time (in microseconds) when transactions were requested last time
-limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
-
/** Map maintaining per-node state. */
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
@@ -816,73 +751,35 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
}
}
-void EraseTxRequest(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- g_already_asked_for.erase(gtxid.GetHash());
-}
-
-std::chrono::microseconds GetTxRequestTime(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- auto it = g_already_asked_for.find(gtxid.GetHash());
- if (it != g_already_asked_for.end()) {
- return it->second;
- }
- return {};
-}
-
-void UpdateTxRequestTime(const GenTxid& gtxid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- auto it = g_already_asked_for.find(gtxid.GetHash());
- if (it == g_already_asked_for.end()) {
- g_already_asked_for.insert(std::make_pair(gtxid.GetHash(), request_time));
- } else {
- g_already_asked_for.update(it, request_time);
- }
-}
-
-std::chrono::microseconds CalculateTxGetDataTime(const GenTxid& gtxid, std::chrono::microseconds current_time, bool use_inbound_delay, bool use_txid_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- std::chrono::microseconds process_time;
- const auto last_request_time = GetTxRequestTime(gtxid);
- // First time requesting this tx
- if (last_request_time.count() == 0) {
- process_time = current_time;
- } else {
- // Randomize the delay to avoid biasing some peers over others (such as due to
- // fixed ordering of peer processing in ThreadMessageHandler)
- process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
- }
-
- // We delay processing announcements from inbound peers
- if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
-
- // We delay processing announcements from peers that use txid-relay (instead of wtxid)
- if (use_txid_delay) process_time += TXID_RELAY_DELAY;
-
- return process_time;
-}
+} // namespace
-void RequestTx(CNodeState* state, const GenTxid& gtxid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+void PeerManager::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
{
- CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
- if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
- peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
- peer_download_state.m_tx_announced.count(gtxid.GetHash())) {
- // Too many queued announcements from this peer, or we already have
- // this announcement
+ AssertLockHeld(::cs_main); // For m_txrequest
+ NodeId nodeid = node.GetId();
+ if (!node.HasPermission(PF_RELAY) && m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) {
+ // Too many queued announcements from this peer
return;
}
- peer_download_state.m_tx_announced.insert(gtxid.GetHash());
-
- // Calculate the time to try requesting this transaction. Use
- // fPreferredDownload as a proxy for outbound peers.
- const auto process_time = CalculateTxGetDataTime(gtxid, current_time, !state->fPreferredDownload, !state->m_wtxid_relay && g_wtxid_relay_peers > 0);
-
- peer_download_state.m_tx_process_time.emplace(process_time, gtxid);
+ const CNodeState* state = State(nodeid);
+
+ // Decide the TxRequestTracker parameters for this announcement:
+ // - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN permission)
+ // - "reqtime": current time plus delays for:
+ // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections
+ // - TXID_RELAY_DELAY for txid announcements while wtxid peers are available
+ // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least
+ // MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight (and don't have PF_RELAY).
+ auto delay = std::chrono::microseconds{0};
+ const bool preferred = state->fPreferredDownload;
+ if (!preferred) delay += NONPREF_PEER_TX_DELAY;
+ if (!gtxid.IsWtxid() && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY;
+ const bool overloaded = !node.HasPermission(PF_RELAY) &&
+ m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT;
+ if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;
+ m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay);
}
-} // namespace
-
// This function is used for testing the stale tip eviction logic, see
// denialofservice_tests.cpp
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds)
@@ -899,6 +796,7 @@ void PeerManager::InitializeNode(CNode *pnode) {
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn()));
+ assert(m_txrequest.Count(nodeid) == 0);
}
{
PeerRef peer = std::make_shared<Peer>(nodeid);
@@ -956,6 +854,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
+ m_txrequest.DisconnectedPeer(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);
@@ -973,6 +872,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
assert(nPeersWithValidatedDownloads == 0);
assert(g_outbound_peers_with_protect_from_disconnect == 0);
assert(g_wtxid_relay_peers == 0);
+ assert(m_txrequest.Size() == 0);
}
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}
@@ -1285,7 +1185,8 @@ PeerManager::PeerManager(const CChainParams& chainparams, CConnman& connman, Ban
/**
* Evict orphan txn pool entries (EraseOrphanTx) based on a newly connected
- * block. Also save the time of the last tip update.
+ * block, remember the recently confirmed transactions, and delete tracked
+ * announcements for them. Also save the time of the last tip update.
*/
void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
{
@@ -1329,6 +1230,13 @@ void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, co
}
}
}
+ {
+ LOCK(cs_main);
+ for (const auto& ptx : pblock->vtx) {
+ m_txrequest.ForgetTxHash(ptx->GetHash());
+ m_txrequest.ForgetTxHash(ptx->GetWitnessHash());
+ }
+ }
}
void PeerManager::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex)
@@ -1527,6 +1435,7 @@ void RelayTransaction(const uint256& txid, const uint256& wtxid, const CConnman&
static void RelayAddress(const CAddress& addr, bool fReachable, const CConnman& connman)
{
+ if (!fReachable && !addr.IsRelayable()) return;
// Relay to a limited number of other nodes
// Use deterministic randomness to send to the same nodes for 24 hours
@@ -2435,11 +2344,16 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
pfrom.SetCommonVersion(greatest_common_version);
pfrom.nVersion = nVersion;
+ const CNetMsgMaker msg_maker(greatest_common_version);
+
if (greatest_common_version >= WTXID_RELAY_VERSION) {
- m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::WTXIDRELAY));
+ m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::WTXIDRELAY));
}
- m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::VERACK));
+ m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::VERACK));
+
+ // Signal ADDRv2 support (BIP155).
+ m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::SENDADDRV2));
pfrom.nServices = nServices;
pfrom.SetAddrLocal(addrMe);
@@ -2548,8 +2462,9 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// At this point, the outgoing message serialization version can't change.
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());
- if (msg_type == NetMsgType::VERACK)
- {
+ if (msg_type == NetMsgType::VERACK) {
+ if (pfrom.fSuccessfullyConnected) return;
+
if (!pfrom.IsInboundConn()) {
// Mark this node as currently connected, so we update its timestamp later.
LOCK(cs_main);
@@ -2608,16 +2523,25 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}
- if (msg_type == NetMsgType::ADDR) {
+ if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) {
+ int stream_version = vRecv.GetVersion();
+ if (msg_type == NetMsgType::ADDRV2) {
+ // Add ADDRV2_FORMAT to the version so that the CNetAddr and CAddress
+ // unserialize methods know that an address in v2 format is coming.
+ stream_version |= ADDRV2_FORMAT;
+ }
+
+ OverrideStream<CDataStream> s(&vRecv, vRecv.GetType(), stream_version);
std::vector<CAddress> vAddr;
- vRecv >> vAddr;
+
+ s >> vAddr;
if (!pfrom.RelayAddrsWithConn()) {
return;
}
if (vAddr.size() > MAX_ADDR_TO_SEND)
{
- Misbehaving(pfrom.GetId(), 20, strprintf("addr message size = %u", vAddr.size()));
+ Misbehaving(pfrom.GetId(), 20, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}
@@ -2661,6 +2585,11 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}
+ if (msg_type == NetMsgType::SENDADDRV2) {
+ pfrom.m_wants_addrv2 = true;
+ return;
+ }
+
if (msg_type == NetMsgType::SENDHEADERS) {
LOCK(cs_main);
State(pfrom.GetId())->fPreferHeaders = true;
@@ -2749,7 +2678,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
pfrom.fDisconnect = true;
return;
} else if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
- RequestTx(State(pfrom.GetId()), gtxid, current_time);
+ AddTxAnnouncement(pfrom, gtxid, current_time);
}
} else {
LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId());
@@ -3003,11 +2932,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
TxValidationState state;
- for (const GenTxid& gtxid : {GenTxid(false, txid), GenTxid(true, wtxid)}) {
- nodestate->m_tx_download.m_tx_announced.erase(gtxid.GetHash());
- nodestate->m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
- EraseTxRequest(gtxid);
- }
+ m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
+ if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
std::list<CTransactionRef> lRemovedTxn;
@@ -3026,6 +2952,10 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
if (!AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid), m_mempool) &&
AcceptToMemoryPool(m_mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */)) {
m_mempool.check(&::ChainstateActive().CoinsTip());
+ // As this version of the transaction was acceptable, we can forget about any
+ // requests for it.
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), m_connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i));
@@ -3081,10 +3011,14 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// protocol for getting all unconfirmed parents.
const GenTxid gtxid{/* is_wtxid=*/false, parent_txid};
pfrom.AddKnownTx(parent_txid);
- if (!AlreadyHaveTx(gtxid, m_mempool)) RequestTx(State(pfrom.GetId()), gtxid, current_time);
+ if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time);
}
AddOrphanTx(ptx, pfrom.GetId());
+ // Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore.
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
+
// DoS prevention: do not allow mapOrphanTransactions to grow unbounded (see CVE-2012-3789)
unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, gArgs.GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS));
unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx);
@@ -3101,6 +3035,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// from any of our non-wtxidrelay peers.
recentRejects->insert(tx.GetHash());
recentRejects->insert(tx.GetWitnessHash());
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
}
} else {
if (state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) {
@@ -3119,6 +3055,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// if we start doing this too early.
assert(recentRejects);
recentRejects->insert(tx.GetWitnessHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
// If the transaction failed for TX_INPUTS_NOT_STANDARD,
// then we know that the witness was irrelevant to the policy
// failure, since this check depends only on the txid
@@ -3129,6 +3066,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// parent-fetching by txid via the orphan-handling logic).
if (state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && tx.GetWitnessHash() != tx.GetHash()) {
recentRejects->insert(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetHash());
}
if (RecursiveDynamicUsage(*ptx) < 100000) {
AddToCompactExtraTransactions(ptx);
@@ -3769,24 +3707,15 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
}
if (msg_type == NetMsgType::NOTFOUND) {
- // Remove the NOTFOUND transactions from the peer
- LOCK(cs_main);
- CNodeState *state = State(pfrom.GetId());
std::vector<CInv> vInv;
vRecv >> vInv;
- if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ LOCK(::cs_main);
for (CInv &inv : vInv) {
if (inv.IsGenTxMsg()) {
- // If we receive a NOTFOUND message for a txid we requested, erase
- // it from our data structures for this peer.
- auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
- if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
- // Skip any further work if this is a spurious NOTFOUND
- // message.
- continue;
- }
- state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
- state->m_tx_download.m_tx_announced.erase(inv.hash);
+ // If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as
+ // completed in TxRequestTracker.
+ m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash);
}
}
}
@@ -4117,6 +4046,17 @@ bool PeerManager::SendMessages(CNode* pto)
std::vector<CAddress> vAddr;
vAddr.reserve(pto->vAddrToSend.size());
assert(pto->m_addr_known);
+
+ const char* msg_type;
+ int make_flags;
+ if (pto->m_wants_addrv2) {
+ msg_type = NetMsgType::ADDRV2;
+ make_flags = ADDRV2_FORMAT;
+ } else {
+ msg_type = NetMsgType::ADDR;
+ make_flags = 0;
+ }
+
for (const CAddress& addr : pto->vAddrToSend)
{
if (!pto->m_addr_known->contains(addr.GetKey()))
@@ -4126,14 +4066,14 @@ bool PeerManager::SendMessages(CNode* pto)
// receiver rejects addr messages larger than MAX_ADDR_TO_SEND
if (vAddr.size() >= MAX_ADDR_TO_SEND)
{
- m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
+ m_connman.PushMessage(pto, msgMaker.Make(make_flags, msg_type, vAddr));
vAddr.clear();
}
}
}
pto->vAddrToSend.clear();
if (!vAddr.empty())
- m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
+ m_connman.PushMessage(pto, msgMaker.Make(make_flags, msg_type, vAddr));
// we only send the big addr message once
if (pto->vAddrToSend.capacity() > 40)
pto->vAddrToSend.shrink_to_fit();
@@ -4550,67 +4490,26 @@ bool PeerManager::SendMessages(CNode* pto)
//
// Message: getdata (non-blocks)
//
-
- // For robustness, expire old requests after a long timeout, so that
- // we can resume downloading transactions from a peer even if they
- // were unresponsive in the past.
- // Eventually we should consider disconnecting peers, but this is
- // conservative.
- if (state.m_tx_download.m_check_expiry_timer <= current_time) {
- for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
- if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
- LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
- state.m_tx_download.m_tx_announced.erase(it->first);
- state.m_tx_download.m_tx_in_flight.erase(it++);
- } else {
- ++it;
- }
- }
- // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
- // so that we're not doing this for all peers at the same time.
- state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL);
- }
-
- auto& tx_process_time = state.m_tx_download.m_tx_process_time;
- while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
- const GenTxid gtxid = tx_process_time.begin()->second;
- // Erase this entry from tx_process_time (it may be added back for
- // processing at a later time, see below)
- tx_process_time.erase(tx_process_time.begin());
- CInv inv(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
- if (!AlreadyHaveTx(ToGenTxid(inv), m_mempool)) {
- // If this transaction was last requested more than 1 minute ago,
- // then request.
- const auto last_request_time = GetTxRequestTime(gtxid);
- if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
- LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
- vGetData.push_back(inv);
- if (vGetData.size() >= MAX_GETDATA_SZ) {
- m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
- vGetData.clear();
- }
- UpdateTxRequestTime(gtxid, current_time);
- state.m_tx_download.m_tx_in_flight.emplace(gtxid.GetHash(), current_time);
- } else {
- // This transaction is in flight from someone else; queue
- // up processing to happen after the download times out
- // (with a slight delay for inbound peers, to prefer
- // requests to outbound peers).
- // Don't apply the txid-delay to re-requests of a
- // transaction; the heuristic of delaying requests to
- // txid-relay peers is to save bandwidth on initial
- // announcement of a transaction, and doesn't make sense
- // for a followup request if our first peer times out (and
- // would open us up to an attacker using inbound
- // wtxid-relay to prevent us from requesting transactions
- // from outbound txid-relay peers).
- const auto next_process_time = CalculateTxGetDataTime(gtxid, current_time, !state.fPreferredDownload, false);
- tx_process_time.emplace(next_process_time, gtxid);
+ std::vector<std::pair<NodeId, GenTxid>> expired;
+ auto requestable = m_txrequest.GetRequestable(pto->GetId(), current_time, &expired);
+ for (const auto& entry : expired) {
+ LogPrint(BCLog::NET, "timeout of inflight %s %s from peer=%d\n", entry.second.IsWtxid() ? "wtx" : "tx",
+ entry.second.GetHash().ToString(), entry.first);
+ }
+ for (const GenTxid& gtxid : requestable) {
+ if (!AlreadyHaveTx(gtxid, m_mempool)) {
+ LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx",
+ gtxid.GetHash().ToString(), pto->GetId());
+ vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
+ if (vGetData.size() >= MAX_GETDATA_SZ) {
+ m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
+ vGetData.clear();
}
+ m_txrequest.RequestedTx(pto->GetId(), gtxid.GetHash(), current_time + GETDATA_TX_INTERVAL);
} else {
- // We have already seen this transaction, no need to download.
- state.m_tx_download.m_tx_announced.erase(gtxid.GetHash());
- state.m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
+ // We have already seen this transaction, no need to download. This is just a belt-and-suspenders, as
+ // this should already be called whenever a transaction becomes AlreadyHaveTx().
+ m_txrequest.ForgetTxHash(gtxid.GetHash());
}
}