diff options
author | Wladimir J. van der Laan <laanwj@protonmail.com> | 2020-10-14 17:42:30 +0200 |
---|---|---|
committer | Wladimir J. van der Laan <laanwj@protonmail.com> | 2020-10-14 18:36:59 +0200 |
commit | c2c4dbaebd955ad2829364f7fa5b8169ca1ba6b9 (patch) | |
tree | 2932ddeda0c1c8721df357632c7310b6b68073bf /src/net_processing.cpp | |
parent | 99a1d572eabca89790216b3919a237e07063a376 (diff) | |
parent | fd9a0060f028a4c01bd88f58777dea34bdcbafd1 (diff) |
Merge #19988: Overhaul transaction request logic
fd9a0060f028a4c01bd88f58777dea34bdcbafd1 Report and verify expirations (Pieter Wuille)
86f50ed10f66b5535f0162cf0026456a9e3f8963 Delete limitedmap as it is unused now (Pieter Wuille)
cc16fff3e476a9378d2176b3c1b83ad12b1b052a Make txid delay penalty also apply to fetches of orphan's parents (Pieter Wuille)
173a1d2d3f824b83777ac713e89bee69fd87692d Expedite removal of tx requests that are no longer needed (Pieter Wuille)
de11b0a4eff20da3e3ca52dc90948b5253d329c5 Reduce MAX_PEER_TX_ANNOUNCEMENTS for non-PF_RELAY peers (Pieter Wuille)
242d16477df1a024c7126bad23dde39cad217eca Change transaction request logic to use txrequest (Pieter Wuille)
5b03121d60527a193a84c339151481f9c9c1962b Add txrequest fuzz tests (Pieter Wuille)
3c7fe0e5a0ee1abf4dc263ae5310e68253c866e1 Add txrequest unit tests (Pieter Wuille)
da3b8fde03f2e8060bb7ff3bff17175dab85f0cd Add txrequest module (Pieter Wuille)
Pull request description:
This replaces the transaction request logic with an encapsulated class that maintains all the state surrounding it. By keeping it stand alone, it can be easily tested (using included unit tests and fuzz tests).
The major changes are:
* Announcements from outbound (and whitelisted) peers are now always preferred over those from inbound peers. This used to be the case for the first request (by delaying the first request from inbound peers), and a bias afters. The 2s delay for requests from inbound peers still exists, but after that, if viable outbound peers remain for any given transaction, they will always be tried first.
* No more hard cap of 100 in flight transactions per peer, as there is less need for it (memory usage is linear in the number of announcements, but independent from the number in flight, and CPU usage isn't affected by it). Furthermore, if only one peer announces a transaction, and it has over 100 in flight already, we still want to request it from them. The cap is replaced with a rule that announcements from such overloaded peers get an additional 2s delay (possibly combined with the existing 2s delays for inbound connections, and for txid peers when wtxid peers are available).
* The limit of 100000 tracked announcements is reduced to 5000; this was excessive. This can be bypassed using the PF_RELAY permission (to accommodate locally dumping a batch of many transactions).
This replaces #19184, rebased on #18044 and with many small changes.
ACKs for top commit:
ariard:
Code Review ACK fd9a006. I've reviewed the new TxRequestTracker, its integration in net_processing, unit/functional/fuzzing test coverage. I looked more for soundness of new specification rather than functional consistency with old transaction request logic.
MarcoFalke:
Approach ACK fd9a0060f028a4c01bd88f58777dea34bdcbafd1 🏹
naumenkogs:
Code Review ACK fd9a006. I've reviewed everything, mostly to see how this stuff works at the lower level (less documentation-wise, more implementation-wise), and to try breaking it with unexpected sequences of events.
jnewbery:
utACK fd9a0060f028a4c01bd88f58777dea34bdcbafd1
jonatack:
WIP light ACK fd9a0060f028a4c01bd88f58777dea34bdcbafd1 have read the code, verified that each commit is hygienic, e.g. debug build clean and tests green, and have been running a node on and off with this branch and grepping the net debug log. Am still unpacking the discussion hidden by GitHub by fetching it via the API and connecting the dots, storing notes and suggestions in a local branch; at this point none are blockers.
ryanofsky:
Light code review ACK fd9a0060f028a4c01bd88f58777dea34bdcbafd1, looking at txrequest implementation, unit test implementation, and net_processing integration, just trying to understand how it works and looking for anything potentially confusing in the implementation. Didn't look at functional tests or catch up on review discussion. Just a sanity check review focused on:
Tree-SHA512: ea7b52710371498b59d9c9cfb5230dd544fe9c6cb699e69178dea641646104f38a0b5ec7f5f0dbf1eb579b7ec25a31ea420593eff3b7556433daf92d4b0f0dd7
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 310 |
1 files changed, 88 insertions, 222 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 9ad3f5d6f4..74f3390fee 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -72,22 +72,22 @@ static constexpr std::chrono::minutes PING_INTERVAL{2}; static const unsigned int MAX_LOCATOR_SZ = 101; /** The maximum number of entries in an 'inv' protocol message */ static const unsigned int MAX_INV_SZ = 50000; -/** Maximum number of in-flight transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; -/** Maximum number of announced transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; -/** How many microseconds to delay requesting transactions via txids, if we have wtxid-relaying peers */ -static constexpr std::chrono::microseconds TXID_RELAY_DELAY{std::chrono::seconds{2}}; -/** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; +/** Maximum number of in-flight transaction requests from a peer. It is not a hard limit, but the threshold at which + * point the OVERLOADED_PEER_TX_DELAY kicks in. */ +static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100; +/** Maximum number of transactions to consider for requesting, per peer. It provides a reasonable DoS limit to + * per-peer memory usage spent on announcements, while covering peers continuously sending INVs at the maximum + * rate (by our own policy, see INVENTORY_BROADCAST_PER_SECOND) for several minutes, while not receiving + * the actual transaction (from any peer) in response to requests for them. */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 5000; +/** How long to delay requesting transactions via txids, if we have wtxid-relaying peers */ +static constexpr auto TXID_RELAY_DELAY = std::chrono::seconds{2}; +/** How long to delay requesting transactions from non-preferred peers */ +static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2}; +/** How long to delay requesting transactions from overloaded peers (see MAX_PEER_TX_REQUEST_IN_FLIGHT). */ +static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2}; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; -/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; -/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ -static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10}; -static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, -"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ static const unsigned int MAX_GETDATA_SZ = 1000; /** Number of blocks that can be requested at any given time from a single peer. */ @@ -375,69 +375,6 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; - /* - * State associated with transaction download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). - * - * The process_time for a transaction is set to nNow for outbound peers, - * nNow + 2 seconds for inbound peers. This is the time at which we'll - * consider trying to request the transaction from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= nNow. We'll request each - * such transaction that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction - * requests amongst our peers. - * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - struct TxDownloadState { - /* Track when to attempt download of announced transactions (process - * time in micros -> txid) - */ - std::multimap<std::chrono::microseconds, GenTxid> m_tx_process_time; - - //! Store all the transactions a peer has recently announced - std::set<uint256> m_tx_announced; - - //! Store transactions which were requested by us, with timestamp - std::map<uint256, std::chrono::microseconds> m_tx_in_flight; - - //! Periodically check for stuck getdata requests - std::chrono::microseconds m_check_expiry_timer{0}; - }; - - TxDownloadState m_tx_download; - //! Whether this peer is an inbound connection bool m_is_inbound; @@ -478,9 +415,6 @@ struct CNodeState { } }; -// Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); - /** Map maintaining per-node state. */ static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main); @@ -817,73 +751,35 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } -void EraseTxRequest(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - g_already_asked_for.erase(gtxid.GetHash()); -} - -std::chrono::microseconds GetTxRequestTime(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(gtxid.GetHash()); - if (it != g_already_asked_for.end()) { - return it->second; - } - return {}; -} - -void UpdateTxRequestTime(const GenTxid& gtxid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(gtxid.GetHash()); - if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(gtxid.GetHash(), request_time)); - } else { - g_already_asked_for.update(it, request_time); - } -} - -std::chrono::microseconds CalculateTxGetDataTime(const GenTxid& gtxid, std::chrono::microseconds current_time, bool use_inbound_delay, bool use_txid_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - std::chrono::microseconds process_time; - const auto last_request_time = GetTxRequestTime(gtxid); - // First time requesting this tx - if (last_request_time.count() == 0) { - process_time = current_time; - } else { - // Randomize the delay to avoid biasing some peers over others (such as due to - // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY); - } - - // We delay processing announcements from inbound peers - if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; - - // We delay processing announcements from peers that use txid-relay (instead of wtxid) - if (use_txid_delay) process_time += TXID_RELAY_DELAY; - - return process_time; -} +} // namespace -void RequestTx(CNodeState* state, const GenTxid& gtxid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManager::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time) { - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(gtxid.GetHash())) { - // Too many queued announcements from this peer, or we already have - // this announcement + AssertLockHeld(::cs_main); // For m_txrequest + NodeId nodeid = node.GetId(); + if (!node.HasPermission(PF_RELAY) && m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { + // Too many queued announcements from this peer return; } - peer_download_state.m_tx_announced.insert(gtxid.GetHash()); - - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - const auto process_time = CalculateTxGetDataTime(gtxid, current_time, !state->fPreferredDownload, !state->m_wtxid_relay && g_wtxid_relay_peers > 0); - - peer_download_state.m_tx_process_time.emplace(process_time, gtxid); + const CNodeState* state = State(nodeid); + + // Decide the TxRequestTracker parameters for this announcement: + // - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN permission) + // - "reqtime": current time plus delays for: + // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections + // - TXID_RELAY_DELAY for txid announcements while wtxid peers are available + // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least + // MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight (and don't have PF_RELAY). + auto delay = std::chrono::microseconds{0}; + const bool preferred = state->fPreferredDownload; + if (!preferred) delay += NONPREF_PEER_TX_DELAY; + if (!gtxid.IsWtxid() && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY; + const bool overloaded = !node.HasPermission(PF_RELAY) && + m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; + if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; + m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay); } -} // namespace - // This function is used for testing the stale tip eviction logic, see // denialofservice_tests.cpp void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) @@ -900,6 +796,7 @@ void PeerManager::InitializeNode(CNode *pnode) { { LOCK(cs_main); mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn())); + assert(m_txrequest.Count(nodeid) == 0); } { PeerRef peer = std::make_shared<Peer>(nodeid); @@ -957,6 +854,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { mapBlocksInFlight.erase(entry.hash); } EraseOrphansFor(nodeid); + m_txrequest.DisconnectedPeer(nodeid); nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); @@ -974,6 +872,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { assert(nPeersWithValidatedDownloads == 0); assert(g_outbound_peers_with_protect_from_disconnect == 0); assert(g_wtxid_relay_peers == 0); + assert(m_txrequest.Size() == 0); } LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } @@ -1286,7 +1185,8 @@ PeerManager::PeerManager(const CChainParams& chainparams, CConnman& connman, Ban /** * Evict orphan txn pool entries (EraseOrphanTx) based on a newly connected - * block. Also save the time of the last tip update. + * block, remember the recently confirmed transactions, and delete tracked + * announcements for them. Also save the time of the last tip update. */ void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) { @@ -1330,6 +1230,13 @@ void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, co } } } + { + LOCK(cs_main); + for (const auto& ptx : pblock->vtx) { + m_txrequest.ForgetTxHash(ptx->GetHash()); + m_txrequest.ForgetTxHash(ptx->GetWitnessHash()); + } + } } void PeerManager::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) @@ -2770,7 +2677,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat pfrom.fDisconnect = true; return; } else if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) { - RequestTx(State(pfrom.GetId()), gtxid, current_time); + AddTxAnnouncement(pfrom, gtxid, current_time); } } else { LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId()); @@ -3024,11 +2931,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat TxValidationState state; - for (const GenTxid& gtxid : {GenTxid(false, txid), GenTxid(true, wtxid)}) { - nodestate->m_tx_download.m_tx_announced.erase(gtxid.GetHash()); - nodestate->m_tx_download.m_tx_in_flight.erase(gtxid.GetHash()); - EraseTxRequest(gtxid); - } + m_txrequest.ReceivedResponse(pfrom.GetId(), txid); + if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid); std::list<CTransactionRef> lRemovedTxn; @@ -3047,6 +2951,10 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat if (!AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid), m_mempool) && AcceptToMemoryPool(m_mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */)) { m_mempool.check(&::ChainstateActive().CoinsTip()); + // As this version of the transaction was acceptable, we can forget about any + // requests for it. + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), m_connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i)); @@ -3102,10 +3010,14 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat // protocol for getting all unconfirmed parents. const GenTxid gtxid{/* is_wtxid=*/false, parent_txid}; pfrom.AddKnownTx(parent_txid); - if (!AlreadyHaveTx(gtxid, m_mempool)) RequestTx(State(pfrom.GetId()), gtxid, current_time); + if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time); } AddOrphanTx(ptx, pfrom.GetId()); + // Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore. + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); + // DoS prevention: do not allow mapOrphanTransactions to grow unbounded (see CVE-2012-3789) unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, gArgs.GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx); @@ -3122,6 +3034,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat // from any of our non-wtxidrelay peers. recentRejects->insert(tx.GetHash()); recentRejects->insert(tx.GetWitnessHash()); + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); } } else { if (state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) { @@ -3140,6 +3054,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat // if we start doing this too early. assert(recentRejects); recentRejects->insert(tx.GetWitnessHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); // If the transaction failed for TX_INPUTS_NOT_STANDARD, // then we know that the witness was irrelevant to the policy // failure, since this check depends only on the txid @@ -3150,6 +3065,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat // parent-fetching by txid via the orphan-handling logic). if (state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && tx.GetWitnessHash() != tx.GetHash()) { recentRejects->insert(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetHash()); } if (RecursiveDynamicUsage(*ptx) < 100000) { AddToCompactExtraTransactions(ptx); @@ -3790,24 +3706,15 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat } if (msg_type == NetMsgType::NOTFOUND) { - // Remove the NOTFOUND transactions from the peer - LOCK(cs_main); - CNodeState *state = State(pfrom.GetId()); std::vector<CInv> vInv; vRecv >> vInv; - if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + LOCK(::cs_main); for (CInv &inv : vInv) { if (inv.IsGenTxMsg()) { - // If we receive a NOTFOUND message for a txid we requested, erase - // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); - if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { - // Skip any further work if this is a spurious NOTFOUND - // message. - continue; - } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv.hash); + // If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as + // completed in TxRequestTracker. + m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash); } } } @@ -4582,67 +4489,26 @@ bool PeerManager::SendMessages(CNode* pto) // // Message: getdata (non-blocks) // - - // For robustness, expire old requests after a long timeout, so that - // we can resume downloading transactions from a peer even if they - // were unresponsive in the past. - // Eventually we should consider disconnecting peers, but this is - // conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= current_time - TX_EXPIRY_INTERVAL) { - LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); - } else { - ++it; - } - } - // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize - // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL); - } - - auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const GenTxid gtxid = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for - // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); - CInv inv(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash()); - if (!AlreadyHaveTx(ToGenTxid(inv), m_mempool)) { - // If this transaction was last requested more than 1 minute ago, - // then request. - const auto last_request_time = GetTxRequestTime(gtxid); - if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); - } - UpdateTxRequestTime(gtxid, current_time); - state.m_tx_download.m_tx_in_flight.emplace(gtxid.GetHash(), current_time); - } else { - // This transaction is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - // Don't apply the txid-delay to re-requests of a - // transaction; the heuristic of delaying requests to - // txid-relay peers is to save bandwidth on initial - // announcement of a transaction, and doesn't make sense - // for a followup request if our first peer times out (and - // would open us up to an attacker using inbound - // wtxid-relay to prevent us from requesting transactions - // from outbound txid-relay peers). - const auto next_process_time = CalculateTxGetDataTime(gtxid, current_time, !state.fPreferredDownload, false); - tx_process_time.emplace(next_process_time, gtxid); + std::vector<std::pair<NodeId, GenTxid>> expired; + auto requestable = m_txrequest.GetRequestable(pto->GetId(), current_time, &expired); + for (const auto& entry : expired) { + LogPrint(BCLog::NET, "timeout of inflight %s %s from peer=%d\n", entry.second.IsWtxid() ? "wtx" : "tx", + entry.second.GetHash().ToString(), entry.first); + } + for (const GenTxid& gtxid : requestable) { + if (!AlreadyHaveTx(gtxid, m_mempool)) { + LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx", + gtxid.GetHash().ToString(), pto->GetId()); + vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash()); + if (vGetData.size() >= MAX_GETDATA_SZ) { + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); } + m_txrequest.RequestedTx(pto->GetId(), gtxid.GetHash(), current_time + GETDATA_TX_INTERVAL); } else { - // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(gtxid.GetHash()); - state.m_tx_download.m_tx_in_flight.erase(gtxid.GetHash()); + // We have already seen this transaction, no need to download. This is just a belt-and-suspenders, as + // this should already be called whenever a transaction becomes AlreadyHaveTx(). + m_txrequest.ForgetTxHash(gtxid.GetHash()); } } |