aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/release-notes-19988.md9
-rw-r--r--src/Makefile.am3
-rw-r--r--src/Makefile.test.include9
-rw-r--r--src/limitedmap.h100
-rw-r--r--src/net.h1
-rw-r--r--src/net_permissions.cpp2
-rw-r--r--src/net_permissions.h1
-rw-r--r--src/net_processing.cpp310
-rw-r--r--src/net_processing.h8
-rw-r--r--src/primitives/transaction.h4
-rw-r--r--src/test/fuzz/txrequest.cpp374
-rw-r--r--src/test/limitedmap_tests.cpp101
-rw-r--r--src/test/txrequest_tests.cpp738
-rw-r--r--src/txrequest.cpp748
-rw-r--r--src/txrequest.h211
-rw-r--r--src/uint256.cpp1
-rw-r--r--src/uint256.h1
-rwxr-xr-xtest/functional/p2p_tx_download.py123
18 files changed, 2297 insertions, 447 deletions
diff --git a/doc/release-notes-19988.md b/doc/release-notes-19988.md
new file mode 100644
index 0000000000..ef26eb3032
--- /dev/null
+++ b/doc/release-notes-19988.md
@@ -0,0 +1,9 @@
+P2P changes
+-----------
+
+The size of the set of transactions that peers have announced and we consider
+for requests has been reduced from 100000 to 5000 (per peer), and further
+announcements will be ignored when that limit is reached. If you need to
+dump (very) large batches of transactions, exceptions can be made for trusted
+peers using the "relay" network permission. For localhost for example it can
+be enabled using the command line option `-whitelist=relay@127.0.0.1`.
diff --git a/src/Makefile.am b/src/Makefile.am
index aa63b5f516..1dd821f968 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -151,7 +151,6 @@ BITCOIN_CORE_H = \
interfaces/wallet.h \
key.h \
key_io.h \
- limitedmap.h \
logging.h \
logging/timer.h \
memusage.h \
@@ -215,6 +214,7 @@ BITCOIN_CORE_H = \
timedata.h \
torcontrol.h \
txdb.h \
+ txrequest.h \
txmempool.h \
undo.h \
util/asmap.h \
@@ -327,6 +327,7 @@ libbitcoin_server_a_SOURCES = \
timedata.cpp \
torcontrol.cpp \
txdb.cpp \
+ txrequest.cpp \
txmempool.cpp \
validation.cpp \
validationinterface.cpp \
diff --git a/src/Makefile.test.include b/src/Makefile.test.include
index 06dde87ddd..3af30db4db 100644
--- a/src/Makefile.test.include
+++ b/src/Makefile.test.include
@@ -151,6 +151,7 @@ FUZZ_TARGETS = \
test/fuzz/tx_in_deserialize \
test/fuzz/tx_out \
test/fuzz/txoutcompressor_deserialize \
+ test/fuzz/txrequest \
test/fuzz/txundo_deserialize \
test/fuzz/uint160_deserialize \
test/fuzz/uint256_deserialize
@@ -235,7 +236,6 @@ BITCOIN_TESTS =\
test/interfaces_tests.cpp \
test/key_io_tests.cpp \
test/key_tests.cpp \
- test/limitedmap_tests.cpp \
test/logging_tests.cpp \
test/dbwrapper_tests.cpp \
test/validation_tests.cpp \
@@ -275,6 +275,7 @@ BITCOIN_TESTS =\
test/torcontrol_tests.cpp \
test/transaction_tests.cpp \
test/txindex_tests.cpp \
+ test/txrequest_tests.cpp \
test/txvalidation_tests.cpp \
test/txvalidationcache_tests.cpp \
test/uint256_tests.cpp \
@@ -1214,6 +1215,12 @@ test_fuzz_txoutcompressor_deserialize_LDADD = $(FUZZ_SUITE_LD_COMMON)
test_fuzz_txoutcompressor_deserialize_LDFLAGS = $(FUZZ_SUITE_LDFLAGS_COMMON)
test_fuzz_txoutcompressor_deserialize_SOURCES = test/fuzz/deserialize.cpp
+test_fuzz_txrequest_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES)
+test_fuzz_txrequest_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
+test_fuzz_txrequest_LDADD = $(FUZZ_SUITE_LD_COMMON)
+test_fuzz_txrequest_LDFLAGS = $(FUZZ_SUITE_LDFLAGS_COMMON)
+test_fuzz_txrequest_SOURCES = test/fuzz/txrequest.cpp
+
test_fuzz_txundo_deserialize_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) -DTXUNDO_DESERIALIZE=1
test_fuzz_txundo_deserialize_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
test_fuzz_txundo_deserialize_LDADD = $(FUZZ_SUITE_LD_COMMON)
diff --git a/src/limitedmap.h b/src/limitedmap.h
deleted file mode 100644
index 7d66964e36..0000000000
--- a/src/limitedmap.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (c) 2012-2018 The Bitcoin Core developers
-// Distributed under the MIT software license, see the accompanying
-// file COPYING or http://www.opensource.org/licenses/mit-license.php.
-
-#ifndef BITCOIN_LIMITEDMAP_H
-#define BITCOIN_LIMITEDMAP_H
-
-#include <assert.h>
-#include <map>
-
-/** STL-like map container that only keeps the N elements with the highest value. */
-template <typename K, typename V>
-class limitedmap
-{
-public:
- typedef K key_type;
- typedef V mapped_type;
- typedef std::pair<const key_type, mapped_type> value_type;
- typedef typename std::map<K, V>::const_iterator const_iterator;
- typedef typename std::map<K, V>::size_type size_type;
-
-protected:
- std::map<K, V> map;
- typedef typename std::map<K, V>::iterator iterator;
- std::multimap<V, iterator> rmap;
- typedef typename std::multimap<V, iterator>::iterator rmap_iterator;
- size_type nMaxSize;
-
-public:
- explicit limitedmap(size_type nMaxSizeIn)
- {
- assert(nMaxSizeIn > 0);
- nMaxSize = nMaxSizeIn;
- }
- const_iterator begin() const { return map.begin(); }
- const_iterator end() const { return map.end(); }
- size_type size() const { return map.size(); }
- bool empty() const { return map.empty(); }
- const_iterator find(const key_type& k) const { return map.find(k); }
- size_type count(const key_type& k) const { return map.count(k); }
- void insert(const value_type& x)
- {
- std::pair<iterator, bool> ret = map.insert(x);
- if (ret.second) {
- if (map.size() > nMaxSize) {
- map.erase(rmap.begin()->second);
- rmap.erase(rmap.begin());
- }
- rmap.insert(make_pair(x.second, ret.first));
- }
- }
- void erase(const key_type& k)
- {
- iterator itTarget = map.find(k);
- if (itTarget == map.end())
- return;
- std::pair<rmap_iterator, rmap_iterator> itPair = rmap.equal_range(itTarget->second);
- for (rmap_iterator it = itPair.first; it != itPair.second; ++it)
- if (it->second == itTarget) {
- rmap.erase(it);
- map.erase(itTarget);
- return;
- }
- // Shouldn't ever get here
- assert(0);
- }
- void update(const_iterator itIn, const mapped_type& v)
- {
- // Using map::erase() with empty range instead of map::find() to get a non-const iterator,
- // since it is a constant time operation in C++11. For more details, see
- // https://stackoverflow.com/questions/765148/how-to-remove-constness-of-const-iterator
- iterator itTarget = map.erase(itIn, itIn);
-
- if (itTarget == map.end())
- return;
- std::pair<rmap_iterator, rmap_iterator> itPair = rmap.equal_range(itTarget->second);
- for (rmap_iterator it = itPair.first; it != itPair.second; ++it)
- if (it->second == itTarget) {
- rmap.erase(it);
- itTarget->second = v;
- rmap.insert(make_pair(v, itTarget));
- return;
- }
- // Shouldn't ever get here
- assert(0);
- }
- size_type max_size() const { return nMaxSize; }
- size_type max_size(size_type s)
- {
- assert(s > 0);
- while (map.size() > s) {
- map.erase(rmap.begin()->second);
- rmap.erase(rmap.begin());
- }
- nMaxSize = s;
- return nMaxSize;
- }
-};
-
-#endif // BITCOIN_LIMITEDMAP_H
diff --git a/src/net.h b/src/net.h
index 7c63516394..4ba872d02c 100644
--- a/src/net.h
+++ b/src/net.h
@@ -14,7 +14,6 @@
#include <compat.h>
#include <crypto/siphash.h>
#include <hash.h>
-#include <limitedmap.h>
#include <net_permissions.h>
#include <netaddress.h>
#include <optional.h>
diff --git a/src/net_permissions.cpp b/src/net_permissions.cpp
index 53648deb40..d40fdfb113 100644
--- a/src/net_permissions.cpp
+++ b/src/net_permissions.cpp
@@ -12,7 +12,7 @@ const std::vector<std::string> NET_PERMISSIONS_DOC{
"bloomfilter (allow requesting BIP37 filtered blocks and transactions)",
"noban (do not ban for misbehavior; implies download)",
"forcerelay (relay transactions that are already in the mempool; implies relay)",
- "relay (relay even in -blocksonly mode)",
+ "relay (relay even in -blocksonly mode, and unlimited transaction announcements)",
"mempool (allow requesting BIP35 mempool contents)",
"download (allow getheaders during IBD, no disconnect after maxuploadtarget limit)",
"addr (responses to GETADDR avoid hitting the cache and contain random records with the most up-to-date info)"
diff --git a/src/net_permissions.h b/src/net_permissions.h
index 5b68f635a7..bba0ea1695 100644
--- a/src/net_permissions.h
+++ b/src/net_permissions.h
@@ -19,6 +19,7 @@ enum NetPermissionFlags {
// Can query bloomfilter even if -peerbloomfilters is false
PF_BLOOMFILTER = (1U << 1),
// Relay and accept transactions from this peer, even if -blocksonly is true
+ // This peer is also not subject to limits on how many transaction INVs are tracked
PF_RELAY = (1U << 3),
// Always relay transactions from this peer, even if already in mempool
// Keep parameter interaction: forcerelay implies relay
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 9ad3f5d6f4..74f3390fee 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -72,22 +72,22 @@ static constexpr std::chrono::minutes PING_INTERVAL{2};
static const unsigned int MAX_LOCATOR_SZ = 101;
/** The maximum number of entries in an 'inv' protocol message */
static const unsigned int MAX_INV_SZ = 50000;
-/** Maximum number of in-flight transactions from a peer */
-static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
-/** Maximum number of announced transactions from a peer */
-static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
-/** How many microseconds to delay requesting transactions via txids, if we have wtxid-relaying peers */
-static constexpr std::chrono::microseconds TXID_RELAY_DELAY{std::chrono::seconds{2}};
-/** How many microseconds to delay requesting transactions from inbound peers */
-static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
+/** Maximum number of in-flight transaction requests from a peer. It is not a hard limit, but the threshold at which
+ * point the OVERLOADED_PEER_TX_DELAY kicks in. */
+static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100;
+/** Maximum number of transactions to consider for requesting, per peer. It provides a reasonable DoS limit to
+ * per-peer memory usage spent on announcements, while covering peers continuously sending INVs at the maximum
+ * rate (by our own policy, see INVENTORY_BROADCAST_PER_SECOND) for several minutes, while not receiving
+ * the actual transaction (from any peer) in response to requests for them. */
+static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 5000;
+/** How long to delay requesting transactions via txids, if we have wtxid-relaying peers */
+static constexpr auto TXID_RELAY_DELAY = std::chrono::seconds{2};
+/** How long to delay requesting transactions from non-preferred peers */
+static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2};
+/** How long to delay requesting transactions from overloaded peers (see MAX_PEER_TX_REQUEST_IN_FLIGHT). */
+static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2};
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
-/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
-static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
-/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
-static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10};
-static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
-"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
static const unsigned int MAX_GETDATA_SZ = 1000;
/** Number of blocks that can be requested at any given time from a single peer. */
@@ -375,69 +375,6 @@ struct CNodeState {
//! Time of last new block announcement
int64_t m_last_block_announcement;
- /*
- * State associated with transaction download.
- *
- * Tx download algorithm:
- *
- * When inv comes in, queue up (process_time, txid) inside the peer's
- * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
- * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
- *
- * The process_time for a transaction is set to nNow for outbound peers,
- * nNow + 2 seconds for inbound peers. This is the time at which we'll
- * consider trying to request the transaction from the peer in
- * SendMessages(). The delay for inbound peers is to allow outbound peers
- * a chance to announce before we request from inbound peers, to prevent
- * an adversary from using inbound connections to blind us to a
- * transaction (InvBlock).
- *
- * When we call SendMessages() for a given peer,
- * we will loop over the transactions in m_tx_process_time, looking
- * at the transactions whose process_time <= nNow. We'll request each
- * such transaction that we don't have already and that hasn't been
- * requested from another peer recently, up until we hit the
- * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
- * g_already_asked_for for each requested txid, storing the time of the
- * GETDATA request. We use g_already_asked_for to coordinate transaction
- * requests amongst our peers.
- *
- * For transactions that we still need but we have already recently
- * requested from some other peer, we'll reinsert (process_time, txid)
- * back into the peer's m_tx_process_time at the point in the future at
- * which the most recent GETDATA request would time out (ie
- * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
- * We add an additional delay for inbound peers, again to prefer
- * attempting download from outbound peers first.
- * We also add an extra small random delay up to 2 seconds
- * to avoid biasing some peers over others. (e.g., due to fixed ordering
- * of peer processing in ThreadMessageHandler).
- *
- * When we receive a transaction from a peer, we remove the txid from the
- * peer's m_tx_in_flight set and from their recently announced set
- * (m_tx_announced). We also clear g_already_asked_for for that entry, so
- * that if somehow the transaction is not accepted but also not added to
- * the reject filter, then we will eventually redownload from other
- * peers.
- */
- struct TxDownloadState {
- /* Track when to attempt download of announced transactions (process
- * time in micros -> txid)
- */
- std::multimap<std::chrono::microseconds, GenTxid> m_tx_process_time;
-
- //! Store all the transactions a peer has recently announced
- std::set<uint256> m_tx_announced;
-
- //! Store transactions which were requested by us, with timestamp
- std::map<uint256, std::chrono::microseconds> m_tx_in_flight;
-
- //! Periodically check for stuck getdata requests
- std::chrono::microseconds m_check_expiry_timer{0};
- };
-
- TxDownloadState m_tx_download;
-
//! Whether this peer is an inbound connection
bool m_is_inbound;
@@ -478,9 +415,6 @@ struct CNodeState {
}
};
-// Keeps track of the time (in microseconds) when transactions were requested last time
-limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
-
/** Map maintaining per-node state. */
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
@@ -817,73 +751,35 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
}
}
-void EraseTxRequest(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- g_already_asked_for.erase(gtxid.GetHash());
-}
-
-std::chrono::microseconds GetTxRequestTime(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- auto it = g_already_asked_for.find(gtxid.GetHash());
- if (it != g_already_asked_for.end()) {
- return it->second;
- }
- return {};
-}
-
-void UpdateTxRequestTime(const GenTxid& gtxid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- auto it = g_already_asked_for.find(gtxid.GetHash());
- if (it == g_already_asked_for.end()) {
- g_already_asked_for.insert(std::make_pair(gtxid.GetHash(), request_time));
- } else {
- g_already_asked_for.update(it, request_time);
- }
-}
-
-std::chrono::microseconds CalculateTxGetDataTime(const GenTxid& gtxid, std::chrono::microseconds current_time, bool use_inbound_delay, bool use_txid_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
-{
- std::chrono::microseconds process_time;
- const auto last_request_time = GetTxRequestTime(gtxid);
- // First time requesting this tx
- if (last_request_time.count() == 0) {
- process_time = current_time;
- } else {
- // Randomize the delay to avoid biasing some peers over others (such as due to
- // fixed ordering of peer processing in ThreadMessageHandler)
- process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
- }
-
- // We delay processing announcements from inbound peers
- if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY;
-
- // We delay processing announcements from peers that use txid-relay (instead of wtxid)
- if (use_txid_delay) process_time += TXID_RELAY_DELAY;
-
- return process_time;
-}
+} // namespace
-void RequestTx(CNodeState* state, const GenTxid& gtxid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+void PeerManager::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
{
- CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
- if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
- peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
- peer_download_state.m_tx_announced.count(gtxid.GetHash())) {
- // Too many queued announcements from this peer, or we already have
- // this announcement
+ AssertLockHeld(::cs_main); // For m_txrequest
+ NodeId nodeid = node.GetId();
+ if (!node.HasPermission(PF_RELAY) && m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) {
+ // Too many queued announcements from this peer
return;
}
- peer_download_state.m_tx_announced.insert(gtxid.GetHash());
-
- // Calculate the time to try requesting this transaction. Use
- // fPreferredDownload as a proxy for outbound peers.
- const auto process_time = CalculateTxGetDataTime(gtxid, current_time, !state->fPreferredDownload, !state->m_wtxid_relay && g_wtxid_relay_peers > 0);
-
- peer_download_state.m_tx_process_time.emplace(process_time, gtxid);
+ const CNodeState* state = State(nodeid);
+
+ // Decide the TxRequestTracker parameters for this announcement:
+ // - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN permission)
+ // - "reqtime": current time plus delays for:
+ // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections
+ // - TXID_RELAY_DELAY for txid announcements while wtxid peers are available
+ // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least
+ // MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight (and don't have PF_RELAY).
+ auto delay = std::chrono::microseconds{0};
+ const bool preferred = state->fPreferredDownload;
+ if (!preferred) delay += NONPREF_PEER_TX_DELAY;
+ if (!gtxid.IsWtxid() && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY;
+ const bool overloaded = !node.HasPermission(PF_RELAY) &&
+ m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT;
+ if (overloaded) delay += OVERLOADED_PEER_TX_DELAY;
+ m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay);
}
-} // namespace
-
// This function is used for testing the stale tip eviction logic, see
// denialofservice_tests.cpp
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds)
@@ -900,6 +796,7 @@ void PeerManager::InitializeNode(CNode *pnode) {
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn()));
+ assert(m_txrequest.Count(nodeid) == 0);
}
{
PeerRef peer = std::make_shared<Peer>(nodeid);
@@ -957,6 +854,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
mapBlocksInFlight.erase(entry.hash);
}
EraseOrphansFor(nodeid);
+ m_txrequest.DisconnectedPeer(nodeid);
nPreferredDownload -= state->fPreferredDownload;
nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0);
assert(nPeersWithValidatedDownloads >= 0);
@@ -974,6 +872,7 @@ void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) {
assert(nPeersWithValidatedDownloads == 0);
assert(g_outbound_peers_with_protect_from_disconnect == 0);
assert(g_wtxid_relay_peers == 0);
+ assert(m_txrequest.Size() == 0);
}
LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid);
}
@@ -1286,7 +1185,8 @@ PeerManager::PeerManager(const CChainParams& chainparams, CConnman& connman, Ban
/**
* Evict orphan txn pool entries (EraseOrphanTx) based on a newly connected
- * block. Also save the time of the last tip update.
+ * block, remember the recently confirmed transactions, and delete tracked
+ * announcements for them. Also save the time of the last tip update.
*/
void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
{
@@ -1330,6 +1230,13 @@ void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, co
}
}
}
+ {
+ LOCK(cs_main);
+ for (const auto& ptx : pblock->vtx) {
+ m_txrequest.ForgetTxHash(ptx->GetHash());
+ m_txrequest.ForgetTxHash(ptx->GetWitnessHash());
+ }
+ }
}
void PeerManager::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex)
@@ -2770,7 +2677,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
pfrom.fDisconnect = true;
return;
} else if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
- RequestTx(State(pfrom.GetId()), gtxid, current_time);
+ AddTxAnnouncement(pfrom, gtxid, current_time);
}
} else {
LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId());
@@ -3024,11 +2931,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
TxValidationState state;
- for (const GenTxid& gtxid : {GenTxid(false, txid), GenTxid(true, wtxid)}) {
- nodestate->m_tx_download.m_tx_announced.erase(gtxid.GetHash());
- nodestate->m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
- EraseTxRequest(gtxid);
- }
+ m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
+ if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
std::list<CTransactionRef> lRemovedTxn;
@@ -3047,6 +2951,10 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
if (!AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid), m_mempool) &&
AcceptToMemoryPool(m_mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */)) {
m_mempool.check(&::ChainstateActive().CoinsTip());
+ // As this version of the transaction was acceptable, we can forget about any
+ // requests for it.
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), m_connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i));
@@ -3102,10 +3010,14 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// protocol for getting all unconfirmed parents.
const GenTxid gtxid{/* is_wtxid=*/false, parent_txid};
pfrom.AddKnownTx(parent_txid);
- if (!AlreadyHaveTx(gtxid, m_mempool)) RequestTx(State(pfrom.GetId()), gtxid, current_time);
+ if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time);
}
AddOrphanTx(ptx, pfrom.GetId());
+ // Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore.
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
+
// DoS prevention: do not allow mapOrphanTransactions to grow unbounded (see CVE-2012-3789)
unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, gArgs.GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS));
unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx);
@@ -3122,6 +3034,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// from any of our non-wtxidrelay peers.
recentRejects->insert(tx.GetHash());
recentRejects->insert(tx.GetWitnessHash());
+ m_txrequest.ForgetTxHash(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
}
} else {
if (state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) {
@@ -3140,6 +3054,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// if we start doing this too early.
assert(recentRejects);
recentRejects->insert(tx.GetWitnessHash());
+ m_txrequest.ForgetTxHash(tx.GetWitnessHash());
// If the transaction failed for TX_INPUTS_NOT_STANDARD,
// then we know that the witness was irrelevant to the policy
// failure, since this check depends only on the txid
@@ -3150,6 +3065,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
// parent-fetching by txid via the orphan-handling logic).
if (state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && tx.GetWitnessHash() != tx.GetHash()) {
recentRejects->insert(tx.GetHash());
+ m_txrequest.ForgetTxHash(tx.GetHash());
}
if (RecursiveDynamicUsage(*ptx) < 100000) {
AddToCompactExtraTransactions(ptx);
@@ -3790,24 +3706,15 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
}
if (msg_type == NetMsgType::NOTFOUND) {
- // Remove the NOTFOUND transactions from the peer
- LOCK(cs_main);
- CNodeState *state = State(pfrom.GetId());
std::vector<CInv> vInv;
vRecv >> vInv;
- if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ LOCK(::cs_main);
for (CInv &inv : vInv) {
if (inv.IsGenTxMsg()) {
- // If we receive a NOTFOUND message for a txid we requested, erase
- // it from our data structures for this peer.
- auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
- if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
- // Skip any further work if this is a spurious NOTFOUND
- // message.
- continue;
- }
- state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
- state->m_tx_download.m_tx_announced.erase(inv.hash);
+ // If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as
+ // completed in TxRequestTracker.
+ m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash);
}
}
}
@@ -4582,67 +4489,26 @@ bool PeerManager::SendMessages(CNode* pto)
//
// Message: getdata (non-blocks)
//
-
- // For robustness, expire old requests after a long timeout, so that
- // we can resume downloading transactions from a peer even if they
- // were unresponsive in the past.
- // Eventually we should consider disconnecting peers, but this is
- // conservative.
- if (state.m_tx_download.m_check_expiry_timer <= current_time) {
- for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
- if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
- LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
- state.m_tx_download.m_tx_announced.erase(it->first);
- state.m_tx_download.m_tx_in_flight.erase(it++);
- } else {
- ++it;
- }
- }
- // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
- // so that we're not doing this for all peers at the same time.
- state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL);
- }
-
- auto& tx_process_time = state.m_tx_download.m_tx_process_time;
- while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
- const GenTxid gtxid = tx_process_time.begin()->second;
- // Erase this entry from tx_process_time (it may be added back for
- // processing at a later time, see below)
- tx_process_time.erase(tx_process_time.begin());
- CInv inv(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
- if (!AlreadyHaveTx(ToGenTxid(inv), m_mempool)) {
- // If this transaction was last requested more than 1 minute ago,
- // then request.
- const auto last_request_time = GetTxRequestTime(gtxid);
- if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
- LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
- vGetData.push_back(inv);
- if (vGetData.size() >= MAX_GETDATA_SZ) {
- m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
- vGetData.clear();
- }
- UpdateTxRequestTime(gtxid, current_time);
- state.m_tx_download.m_tx_in_flight.emplace(gtxid.GetHash(), current_time);
- } else {
- // This transaction is in flight from someone else; queue
- // up processing to happen after the download times out
- // (with a slight delay for inbound peers, to prefer
- // requests to outbound peers).
- // Don't apply the txid-delay to re-requests of a
- // transaction; the heuristic of delaying requests to
- // txid-relay peers is to save bandwidth on initial
- // announcement of a transaction, and doesn't make sense
- // for a followup request if our first peer times out (and
- // would open us up to an attacker using inbound
- // wtxid-relay to prevent us from requesting transactions
- // from outbound txid-relay peers).
- const auto next_process_time = CalculateTxGetDataTime(gtxid, current_time, !state.fPreferredDownload, false);
- tx_process_time.emplace(next_process_time, gtxid);
+ std::vector<std::pair<NodeId, GenTxid>> expired;
+ auto requestable = m_txrequest.GetRequestable(pto->GetId(), current_time, &expired);
+ for (const auto& entry : expired) {
+ LogPrint(BCLog::NET, "timeout of inflight %s %s from peer=%d\n", entry.second.IsWtxid() ? "wtx" : "tx",
+ entry.second.GetHash().ToString(), entry.first);
+ }
+ for (const GenTxid& gtxid : requestable) {
+ if (!AlreadyHaveTx(gtxid, m_mempool)) {
+ LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx",
+ gtxid.GetHash().ToString(), pto->GetId());
+ vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash());
+ if (vGetData.size() >= MAX_GETDATA_SZ) {
+ m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
+ vGetData.clear();
}
+ m_txrequest.RequestedTx(pto->GetId(), gtxid.GetHash(), current_time + GETDATA_TX_INTERVAL);
} else {
- // We have already seen this transaction, no need to download.
- state.m_tx_download.m_tx_announced.erase(gtxid.GetHash());
- state.m_tx_download.m_tx_in_flight.erase(gtxid.GetHash());
+ // We have already seen this transaction, no need to download. This is just a belt-and-suspenders, as
+ // this should already be called whenever a transaction becomes AlreadyHaveTx().
+ m_txrequest.ForgetTxHash(gtxid.GetHash());
}
}
diff --git a/src/net_processing.h b/src/net_processing.h
index 946a5f4715..578660355a 100644
--- a/src/net_processing.h
+++ b/src/net_processing.h
@@ -9,6 +9,7 @@
#include <consensus/params.h>
#include <net.h>
#include <sync.h>
+#include <txrequest.h>
#include <validationinterface.h>
class BlockTransactionsRequest;
@@ -127,12 +128,19 @@ private:
void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req);
+ /** Register with TxRequestTracker that an INV has been received from a
+ * peer. The announcement parameters are decided in PeerManager and then
+ * passed to TxRequestTracker. */
+ void AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
+ EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
+
const CChainParams& m_chainparams;
CConnman& m_connman;
/** Pointer to this node's banman. May be nullptr - check existence before dereferencing. */
BanMan* const m_banman;
ChainstateManager& m_chainman;
CTxMemPool& m_mempool;
+ TxRequestTracker m_txrequest GUARDED_BY(::cs_main);
int64_t m_stale_tip_check_time; //!< Next time to check for stale tip
};
diff --git a/src/primitives/transaction.h b/src/primitives/transaction.h
index 77cb1781a4..00544f64fe 100644
--- a/src/primitives/transaction.h
+++ b/src/primitives/transaction.h
@@ -399,8 +399,8 @@ template <typename Tx> static inline CTransactionRef MakeTransactionRef(Tx&& txI
/** A generic txid reference (txid or wtxid). */
class GenTxid
{
- const bool m_is_wtxid;
- const uint256 m_hash;
+ bool m_is_wtxid;
+ uint256 m_hash;
public:
GenTxid(bool is_wtxid, const uint256& hash) : m_is_wtxid(is_wtxid), m_hash(hash) {}
bool IsWtxid() const { return m_is_wtxid; }
diff --git a/src/test/fuzz/txrequest.cpp b/src/test/fuzz/txrequest.cpp
new file mode 100644
index 0000000000..9529ad3274
--- /dev/null
+++ b/src/test/fuzz/txrequest.cpp
@@ -0,0 +1,374 @@
+// Copyright (c) 2020 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include <crypto/common.h>
+#include <crypto/sha256.h>
+#include <crypto/siphash.h>
+#include <primitives/transaction.h>
+#include <test/fuzz/fuzz.h>
+#include <txrequest.h>
+
+#include <bitset>
+#include <cstdint>
+#include <queue>
+#include <vector>
+
+namespace {
+
+constexpr int MAX_TXHASHES = 16;
+constexpr int MAX_PEERS = 16;
+
+//! Randomly generated GenTxids used in this test (length is MAX_TXHASHES).
+uint256 TXHASHES[MAX_TXHASHES];
+
+//! Precomputed random durations (positive and negative, each ~exponentially distributed).
+std::chrono::microseconds DELAYS[256];
+
+struct Initializer
+{
+ Initializer()
+ {
+ for (uint8_t txhash = 0; txhash < MAX_TXHASHES; txhash += 1) {
+ CSHA256().Write(&txhash, 1).Finalize(TXHASHES[txhash].begin());
+ }
+ int i = 0;
+ // DELAYS[N] for N=0..15 is just N microseconds.
+ for (; i < 16; ++i) {
+ DELAYS[i] = std::chrono::microseconds{i};
+ }
+ // DELAYS[N] for N=16..127 has randomly-looking but roughly exponentially increasing values up to
+ // 198.416453 seconds.
+ for (; i < 128; ++i) {
+ int diff_bits = ((i - 10) * 2) / 9;
+ uint64_t diff = 1 + (CSipHasher(0, 0).Write(i).Finalize() >> (64 - diff_bits));
+ DELAYS[i] = DELAYS[i - 1] + std::chrono::microseconds{diff};
+ }
+ // DELAYS[N] for N=128..255 are negative delays with the same magnitude as N=0..127.
+ for (; i < 256; ++i) {
+ DELAYS[i] = -DELAYS[255 - i];
+ }
+ }
+} g_initializer;
+
+/** Tester class for TxRequestTracker
+ *
+ * It includes a naive reimplementation of its behavior, for a limited set
+ * of MAX_TXHASHES distinct txids, and MAX_PEERS peer identifiers.
+ *
+ * All of the public member functions perform the same operation on
+ * an actual TxRequestTracker and on the state of the reimplementation.
+ * The output of GetRequestable is compared with the expected value
+ * as well.
+ *
+ * Check() calls the TxRequestTracker's sanity check, plus compares the
+ * output of the constant accessors (Size(), CountLoad(), CountTracked())
+ * with expected values.
+ */
+class Tester
+{
+ //! TxRequestTracker object being tested.
+ TxRequestTracker m_tracker;
+
+ //! States for txid/peer combinations in the naive data structure.
+ enum class State {
+ NOTHING, //!< Absence of this txid/peer combination
+
+ // Note that this implementation does not distinguish between DELAYED/READY/BEST variants of CANDIDATE.
+ CANDIDATE,
+ REQUESTED,
+ COMPLETED,
+ };
+
+ //! Sequence numbers, incremented whenever a new CANDIDATE is added.
+ uint64_t m_current_sequence{0};
+
+ //! List of future 'events' (all inserted reqtimes/exptimes). This is used to implement AdvanceToEvent.
+ std::priority_queue<std::chrono::microseconds, std::vector<std::chrono::microseconds>,
+ std::greater<std::chrono::microseconds>> m_events;
+
+ //! Information about a txhash/peer combination.
+ struct Announcement
+ {
+ std::chrono::microseconds m_time;
+ uint64_t m_sequence;
+ State m_state{State::NOTHING};
+ bool m_preferred;
+ bool m_is_wtxid;
+ uint64_t m_priority; //!< Precomputed priority.
+ };
+
+ //! Information about all txhash/peer combination.
+ Announcement m_announcements[MAX_TXHASHES][MAX_PEERS];
+
+ //! The current time; can move forward and backward.
+ std::chrono::microseconds m_now{244466666};
+
+ //! Delete txhashes whose only announcements are COMPLETED.
+ void Cleanup(int txhash)
+ {
+ bool all_nothing = true;
+ for (int peer = 0; peer < MAX_PEERS; ++peer) {
+ const Announcement& ann = m_announcements[txhash][peer];
+ if (ann.m_state != State::NOTHING) {
+ if (ann.m_state != State::COMPLETED) return;
+ all_nothing = false;
+ }
+ }
+ if (all_nothing) return;
+ for (int peer = 0; peer < MAX_PEERS; ++peer) {
+ m_announcements[txhash][peer].m_state = State::NOTHING;
+ }
+ }
+
+ //! Find the current best peer to request from for a txhash (or -1 if none).
+ int GetSelected(int txhash) const
+ {
+ int ret = -1;
+ uint64_t ret_priority = 0;
+ for (int peer = 0; peer < MAX_PEERS; ++peer) {
+ const Announcement& ann = m_announcements[txhash][peer];
+ // Return -1 if there already is a (non-expired) in-flight request.
+ if (ann.m_state == State::REQUESTED) return -1;
+ // If it's a viable candidate, see if it has lower priority than the best one so far.
+ if (ann.m_state == State::CANDIDATE && ann.m_time <= m_now) {
+ if (ret == -1 || ann.m_priority > ret_priority) {
+ std::tie(ret, ret_priority) = std::tie(peer, ann.m_priority);
+ }
+ }
+ }
+ return ret;
+ }
+
+public:
+ Tester() : m_tracker(true) {}
+
+ std::chrono::microseconds Now() const { return m_now; }
+
+ void AdvanceTime(std::chrono::microseconds offset)
+ {
+ m_now += offset;
+ while (!m_events.empty() && m_events.top() <= m_now) m_events.pop();
+ }
+
+ void AdvanceToEvent()
+ {
+ while (!m_events.empty() && m_events.top() <= m_now) m_events.pop();
+ if (!m_events.empty()) {
+ m_now = m_events.top();
+ m_events.pop();
+ }
+ }
+
+ void DisconnectedPeer(int peer)
+ {
+ // Apply to naive structure: all announcements for that peer are wiped.
+ for (int txhash = 0; txhash < MAX_TXHASHES; ++txhash) {
+ if (m_announcements[txhash][peer].m_state != State::NOTHING) {
+ m_announcements[txhash][peer].m_state = State::NOTHING;
+ Cleanup(txhash);
+ }
+ }
+
+ // Call TxRequestTracker's implementation.
+ m_tracker.DisconnectedPeer(peer);
+ }
+
+ void ForgetTxHash(int txhash)
+ {
+ // Apply to naive structure: all announcements for that txhash are wiped.
+ for (int peer = 0; peer < MAX_PEERS; ++peer) {
+ m_announcements[txhash][peer].m_state = State::NOTHING;
+ }
+ Cleanup(txhash);
+
+ // Call TxRequestTracker's implementation.
+ m_tracker.ForgetTxHash(TXHASHES[txhash]);
+ }
+
+ void ReceivedInv(int peer, int txhash, bool is_wtxid, bool preferred, std::chrono::microseconds reqtime)
+ {
+ // Apply to naive structure: if no announcement for txidnum/peer combination
+ // already, create a new CANDIDATE; otherwise do nothing.
+ Announcement& ann = m_announcements[txhash][peer];
+ if (ann.m_state == State::NOTHING) {
+ ann.m_preferred = preferred;
+ ann.m_state = State::CANDIDATE;
+ ann.m_time = reqtime;
+ ann.m_is_wtxid = is_wtxid;
+ ann.m_sequence = m_current_sequence++;
+ ann.m_priority = m_tracker.ComputePriority(TXHASHES[txhash], peer, ann.m_preferred);
+
+ // Add event so that AdvanceToEvent can quickly jump to the point where its reqtime passes.
+ if (reqtime > m_now) m_events.push(reqtime);
+ }
+
+ // Call TxRequestTracker's implementation.
+ m_tracker.ReceivedInv(peer, GenTxid{is_wtxid, TXHASHES[txhash]}, preferred, reqtime);
+ }
+
+ void RequestedTx(int peer, int txhash, std::chrono::microseconds exptime)
+ {
+ // Apply to naive structure: if a CANDIDATE announcement exists for peer/txhash,
+ // convert it to REQUESTED, and change any existing REQUESTED announcement for the same txhash to COMPLETED.
+ if (m_announcements[txhash][peer].m_state == State::CANDIDATE) {
+ for (int peer2 = 0; peer2 < MAX_PEERS; ++peer2) {
+ if (m_announcements[txhash][peer2].m_state == State::REQUESTED) {
+ m_announcements[txhash][peer2].m_state = State::COMPLETED;
+ }
+ }
+ m_announcements[txhash][peer].m_state = State::REQUESTED;
+ m_announcements[txhash][peer].m_time = exptime;
+ }
+
+ // Add event so that AdvanceToEvent can quickly jump to the point where its exptime passes.
+ if (exptime > m_now) m_events.push(exptime);
+
+ // Call TxRequestTracker's implementation.
+ m_tracker.RequestedTx(peer, TXHASHES[txhash], exptime);
+ }
+
+ void ReceivedResponse(int peer, int txhash)
+ {
+ // Apply to naive structure: convert anything to COMPLETED.
+ if (m_announcements[txhash][peer].m_state != State::NOTHING) {
+ m_announcements[txhash][peer].m_state = State::COMPLETED;
+ Cleanup(txhash);
+ }
+
+ // Call TxRequestTracker's implementation.
+ m_tracker.ReceivedResponse(peer, TXHASHES[txhash]);
+ }
+
+ void GetRequestable(int peer)
+ {
+ // Implement using naive structure:
+
+ //! list of (sequence number, txhash, is_wtxid) tuples.
+ std::vector<std::tuple<uint64_t, int, bool>> result;
+ std::vector<std::pair<NodeId, GenTxid>> expected_expired;
+ for (int txhash = 0; txhash < MAX_TXHASHES; ++txhash) {
+ // Mark any expired REQUESTED announcements as COMPLETED.
+ for (int peer2 = 0; peer2 < MAX_PEERS; ++peer2) {
+ Announcement& ann2 = m_announcements[txhash][peer2];
+ if (ann2.m_state == State::REQUESTED && ann2.m_time <= m_now) {
+ expected_expired.emplace_back(peer2, GenTxid{ann2.m_is_wtxid, TXHASHES[txhash]});
+ ann2.m_state = State::COMPLETED;
+ break;
+ }
+ }
+ // And delete txids with only COMPLETED announcements left.
+ Cleanup(txhash);
+ // CANDIDATEs for which this announcement has the highest priority get returned.
+ const Announcement& ann = m_announcements[txhash][peer];
+ if (ann.m_state == State::CANDIDATE && GetSelected(txhash) == peer) {
+ result.emplace_back(ann.m_sequence, txhash, ann.m_is_wtxid);
+ }
+ }
+ // Sort the results by sequence number.
+ std::sort(result.begin(), result.end());
+ std::sort(expected_expired.begin(), expected_expired.end());
+
+ // Compare with TxRequestTracker's implementation.
+ std::vector<std::pair<NodeId, GenTxid>> expired;
+ const auto actual = m_tracker.GetRequestable(peer, m_now, &expired);
+ std::sort(expired.begin(), expired.end());
+ assert(expired == expected_expired);
+
+ m_tracker.PostGetRequestableSanityCheck(m_now);
+ assert(result.size() == actual.size());
+ for (size_t pos = 0; pos < actual.size(); ++pos) {
+ assert(TXHASHES[std::get<1>(result[pos])] == actual[pos].GetHash());
+ assert(std::get<2>(result[pos]) == actual[pos].IsWtxid());
+ }
+ }
+
+ void Check()
+ {
+ // Compare CountTracked and CountLoad with naive structure.
+ size_t total = 0;
+ for (int peer = 0; peer < MAX_PEERS; ++peer) {
+ size_t tracked = 0;
+ size_t inflight = 0;
+ size_t candidates = 0;
+ for (int txhash = 0; txhash < MAX_TXHASHES; ++txhash) {
+ tracked += m_announcements[txhash][peer].m_state != State::NOTHING;
+ inflight += m_announcements[txhash][peer].m_state == State::REQUESTED;
+ candidates += m_announcements[txhash][peer].m_state == State::CANDIDATE;
+ }
+ assert(m_tracker.Count(peer) == tracked);
+ assert(m_tracker.CountInFlight(peer) == inflight);
+ assert(m_tracker.CountCandidates(peer) == candidates);
+ total += tracked;
+ }
+ // Compare Size.
+ assert(m_tracker.Size() == total);
+
+ // Invoke internal consistency check of TxRequestTracker object.
+ m_tracker.SanityCheck();
+ }
+};
+} // namespace
+
+void test_one_input(const std::vector<uint8_t>& buffer)
+{
+ // Tester object (which encapsulates a TxRequestTracker).
+ Tester tester;
+
+ // Decode the input as a sequence of instructions with parameters
+ auto it = buffer.begin();
+ while (it != buffer.end()) {
+ int cmd = *(it++) % 11;
+ int peer, txidnum, delaynum;
+ switch (cmd) {
+ case 0: // Make time jump to the next event (m_time of CANDIDATE or REQUESTED)
+ tester.AdvanceToEvent();
+ break;
+ case 1: // Change time
+ delaynum = it == buffer.end() ? 0 : *(it++);
+ tester.AdvanceTime(DELAYS[delaynum]);
+ break;
+ case 2: // Query for requestable txs
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ tester.GetRequestable(peer);
+ break;
+ case 3: // Peer went offline
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ tester.DisconnectedPeer(peer);
+ break;
+ case 4: // No longer need tx
+ txidnum = it == buffer.end() ? 0 : *(it++);
+ tester.ForgetTxHash(txidnum % MAX_TXHASHES);
+ break;
+ case 5: // Received immediate preferred inv
+ case 6: // Same, but non-preferred.
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ txidnum = it == buffer.end() ? 0 : *(it++);
+ tester.ReceivedInv(peer, txidnum % MAX_TXHASHES, (txidnum / MAX_TXHASHES) & 1, cmd & 1,
+ std::chrono::microseconds::min());
+ break;
+ case 7: // Received delayed preferred inv
+ case 8: // Same, but non-preferred.
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ txidnum = it == buffer.end() ? 0 : *(it++);
+ delaynum = it == buffer.end() ? 0 : *(it++);
+ tester.ReceivedInv(peer, txidnum % MAX_TXHASHES, (txidnum / MAX_TXHASHES) & 1, cmd & 1,
+ tester.Now() + DELAYS[delaynum]);
+ break;
+ case 9: // Requested tx from peer
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ txidnum = it == buffer.end() ? 0 : *(it++);
+ delaynum = it == buffer.end() ? 0 : *(it++);
+ tester.RequestedTx(peer, txidnum % MAX_TXHASHES, tester.Now() + DELAYS[delaynum]);
+ break;
+ case 10: // Received response
+ peer = it == buffer.end() ? 0 : *(it++) % MAX_PEERS;
+ txidnum = it == buffer.end() ? 0 : *(it++);
+ tester.ReceivedResponse(peer, txidnum % MAX_TXHASHES);
+ break;
+ default:
+ assert(false);
+ }
+ }
+ tester.Check();
+}
diff --git a/src/test/limitedmap_tests.cpp b/src/test/limitedmap_tests.cpp
deleted file mode 100644
index ea18debbd3..0000000000
--- a/src/test/limitedmap_tests.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright (c) 2012-2019 The Bitcoin Core developers
-// Distributed under the MIT software license, see the accompanying
-// file COPYING or http://www.opensource.org/licenses/mit-license.php.
-
-#include <limitedmap.h>
-
-#include <test/util/setup_common.h>
-
-#include <boost/test/unit_test.hpp>
-
-BOOST_FIXTURE_TEST_SUITE(limitedmap_tests, BasicTestingSetup)
-
-BOOST_AUTO_TEST_CASE(limitedmap_test)
-{
- // create a limitedmap capped at 10 items
- limitedmap<int, int> map(10);
-
- // check that the max size is 10
- BOOST_CHECK(map.max_size() == 10);
-
- // check that it's empty
- BOOST_CHECK(map.size() == 0);
-
- // insert (-1, -1)
- map.insert(std::pair<int, int>(-1, -1));
-
- // make sure that the size is updated
- BOOST_CHECK(map.size() == 1);
-
- // make sure that the new item is in the map
- BOOST_CHECK(map.count(-1) == 1);
-
- // insert 10 new items
- for (int i = 0; i < 10; i++) {
- map.insert(std::pair<int, int>(i, i + 1));
- }
-
- // make sure that the map now contains 10 items...
- BOOST_CHECK(map.size() == 10);
-
- // ...and that the first item has been discarded
- BOOST_CHECK(map.count(-1) == 0);
-
- // iterate over the map, both with an index and an iterator
- limitedmap<int, int>::const_iterator it = map.begin();
- for (int i = 0; i < 10; i++) {
- // make sure the item is present
- BOOST_CHECK(map.count(i) == 1);
-
- // use the iterator to check for the expected key and value
- BOOST_CHECK(it->first == i);
- BOOST_CHECK(it->second == i + 1);
-
- // use find to check for the value
- BOOST_CHECK(map.find(i)->second == i + 1);
-
- // update and recheck
- map.update(it, i + 2);
- BOOST_CHECK(map.find(i)->second == i + 2);
-
- it++;
- }
-
- // check that we've exhausted the iterator
- BOOST_CHECK(it == map.end());
-
- // resize the map to 5 items
- map.max_size(5);
-
- // check that the max size and size are now 5
- BOOST_CHECK(map.max_size() == 5);
- BOOST_CHECK(map.size() == 5);
-
- // check that items less than 5 have been discarded
- // and items greater than 5 are retained
- for (int i = 0; i < 10; i++) {
- if (i < 5) {
- BOOST_CHECK(map.count(i) == 0);
- } else {
- BOOST_CHECK(map.count(i) == 1);
- }
- }
-
- // erase some items not in the map
- for (int i = 100; i < 1000; i += 100) {
- map.erase(i);
- }
-
- // check that the size is unaffected
- BOOST_CHECK(map.size() == 5);
-
- // erase the remaining elements
- for (int i = 5; i < 10; i++) {
- map.erase(i);
- }
-
- // check that the map is now empty
- BOOST_CHECK(map.empty());
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/src/test/txrequest_tests.cpp b/src/test/txrequest_tests.cpp
new file mode 100644
index 0000000000..1d137b03b1
--- /dev/null
+++ b/src/test/txrequest_tests.cpp
@@ -0,0 +1,738 @@
+// Copyright (c) 2020 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+
+#include <txrequest.h>
+#include <uint256.h>
+
+#include <test/util/setup_common.h>
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+
+#include <boost/test/unit_test.hpp>
+
+BOOST_FIXTURE_TEST_SUITE(txrequest_tests, BasicTestingSetup)
+
+namespace {
+
+constexpr std::chrono::microseconds MIN_TIME = std::chrono::microseconds::min();
+constexpr std::chrono::microseconds MAX_TIME = std::chrono::microseconds::max();
+constexpr std::chrono::microseconds MICROSECOND = std::chrono::microseconds{1};
+constexpr std::chrono::microseconds NO_TIME = std::chrono::microseconds{0};
+
+/** An Action is a function to call at a particular (simulated) timestamp. */
+using Action = std::pair<std::chrono::microseconds, std::function<void()>>;
+
+/** Object that stores actions from multiple interleaved scenarios, and data shared across them.
+ *
+ * The Scenario below is used to fill this.
+ */
+struct Runner
+{
+ /** The TxRequestTracker being tested. */
+ TxRequestTracker txrequest;
+
+ /** List of actions to be executed (in order of increasing timestamp). */
+ std::vector<Action> actions;
+
+ /** Which node ids have been assigned already (to prevent reuse). */
+ std::set<NodeId> peerset;
+
+ /** Which txhashes have been assigned already (to prevent reuse). */
+ std::set<uint256> txhashset;
+
+ /** Which (peer, gtxid) combinations are known to be expired. These need to be accumulated here instead of
+ * checked directly in the GetRequestable return value to avoid introducing a dependency between the various
+ * parallel tests. */
+ std::multiset<std::pair<NodeId, GenTxid>> expired;
+};
+
+std::chrono::microseconds RandomTime8s() { return std::chrono::microseconds{1 + InsecureRandBits(23)}; }
+std::chrono::microseconds RandomTime1y() { return std::chrono::microseconds{1 + InsecureRandBits(45)}; }
+
+/** A proxy for a Runner that helps build a sequence of consecutive test actions on a TxRequestTracker.
+ *
+ * Each Scenario is a proxy through which actions for the (sequential) execution of various tests are added to a
+ * Runner. The actions from multiple scenarios are then run concurrently, resulting in these tests being performed
+ * against a TxRequestTracker in parallel. Every test has its own unique txhashes and NodeIds which are not
+ * reused in other tests, and thus they should be independent from each other. Running them in parallel however
+ * means that we verify the behavior (w.r.t. one test's txhashes and NodeIds) even when the state of the data
+ * structure is more complicated due to the presence of other tests.
+ */
+class Scenario
+{
+ Runner& m_runner;
+ std::chrono::microseconds m_now;
+ std::string m_testname;
+
+public:
+ Scenario(Runner& runner, std::chrono::microseconds starttime) : m_runner(runner), m_now(starttime) {}
+
+ /** Set a name for the current test, to give more clear error messages. */
+ void SetTestName(std::string testname)
+ {
+ m_testname = std::move(testname);
+ }
+
+ /** Advance this Scenario's time; this affects the timestamps newly scheduled events get. */
+ void AdvanceTime(std::chrono::microseconds amount)
+ {
+ assert(amount.count() >= 0);
+ m_now += amount;
+ }
+
+ /** Schedule a ForgetTxHash call at the Scheduler's current time. */
+ void ForgetTxHash(const uint256& txhash)
+ {
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ runner.txrequest.ForgetTxHash(txhash);
+ runner.txrequest.SanityCheck();
+ });
+ }
+
+ /** Schedule a ReceivedInv call at the Scheduler's current time. */
+ void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool pref, std::chrono::microseconds reqtime)
+ {
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ runner.txrequest.ReceivedInv(peer, gtxid, pref, reqtime);
+ runner.txrequest.SanityCheck();
+ });
+ }
+
+ /** Schedule a DisconnectedPeer call at the Scheduler's current time. */
+ void DisconnectedPeer(NodeId peer)
+ {
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ runner.txrequest.DisconnectedPeer(peer);
+ runner.txrequest.SanityCheck();
+ });
+ }
+
+ /** Schedule a RequestedTx call at the Scheduler's current time. */
+ void RequestedTx(NodeId peer, const uint256& txhash, std::chrono::microseconds exptime)
+ {
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ runner.txrequest.RequestedTx(peer, txhash, exptime);
+ runner.txrequest.SanityCheck();
+ });
+ }
+
+ /** Schedule a ReceivedResponse call at the Scheduler's current time. */
+ void ReceivedResponse(NodeId peer, const uint256& txhash)
+ {
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ runner.txrequest.ReceivedResponse(peer, txhash);
+ runner.txrequest.SanityCheck();
+ });
+ }
+
+ /** Schedule calls to verify the TxRequestTracker's state at the Scheduler's current time.
+ *
+ * @param peer The peer whose state will be inspected.
+ * @param expected The expected return value for GetRequestable(peer)
+ * @param candidates The expected return value CountCandidates(peer)
+ * @param inflight The expected return value CountInFlight(peer)
+ * @param completed The expected return value of Count(peer), minus candidates and inflight.
+ * @param checkname An arbitrary string to include in error messages, for test identificatrion.
+ * @param offset Offset with the current time to use (must be <= 0). This allows simulations of time going
+ * backwards (but note that the ordering of this event only follows the scenario's m_now.
+ */
+ void Check(NodeId peer, const std::vector<GenTxid>& expected, size_t candidates, size_t inflight,
+ size_t completed, const std::string& checkname,
+ std::chrono::microseconds offset = std::chrono::microseconds{0})
+ {
+ const auto comment = m_testname + " " + checkname;
+ auto& runner = m_runner;
+ const auto now = m_now;
+ assert(offset.count() <= 0);
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ std::vector<std::pair<NodeId, GenTxid>> expired_now;
+ auto ret = runner.txrequest.GetRequestable(peer, now + offset, &expired_now);
+ for (const auto& entry : expired_now) runner.expired.insert(entry);
+ runner.txrequest.SanityCheck();
+ runner.txrequest.PostGetRequestableSanityCheck(now + offset);
+ size_t total = candidates + inflight + completed;
+ size_t real_total = runner.txrequest.Count(peer);
+ size_t real_candidates = runner.txrequest.CountCandidates(peer);
+ size_t real_inflight = runner.txrequest.CountInFlight(peer);
+ BOOST_CHECK_MESSAGE(real_total == total, strprintf("[" + comment + "] total %i (%i expected)", real_total, total));
+ BOOST_CHECK_MESSAGE(real_inflight == inflight, strprintf("[" + comment + "] inflight %i (%i expected)", real_inflight, inflight));
+ BOOST_CHECK_MESSAGE(real_candidates == candidates, strprintf("[" + comment + "] candidates %i (%i expected)", real_candidates, candidates));
+ BOOST_CHECK_MESSAGE(ret == expected, "[" + comment + "] mismatching requestables");
+ });
+ }
+
+ /** Verify that an announcement for gtxid by peer has expired some time before this check is scheduled.
+ *
+ * Every expected expiration should be accounted for through exactly one call to this function.
+ */
+ void CheckExpired(NodeId peer, GenTxid gtxid)
+ {
+ const auto& testname = m_testname;
+ auto& runner = m_runner;
+ runner.actions.emplace_back(m_now, [=,&runner]() {
+ auto it = runner.expired.find(std::pair<NodeId, GenTxid>{peer, gtxid});
+ BOOST_CHECK_MESSAGE(it != runner.expired.end(), "[" + testname + "] missing expiration");
+ if (it != runner.expired.end()) runner.expired.erase(it);
+ });
+ }
+
+ /** Generate a random txhash, whose priorities for certain peers are constrained.
+ *
+ * For example, NewTxHash({{p1,p2,p3},{p2,p4,p5}}) will generate a txhash T such that both:
+ * - priority(p1,T) > priority(p2,T) > priority(p3,T)
+ * - priority(p2,T) > priority(p4,T) > priority(p5,T)
+ * where priority is the predicted internal TxRequestTracker's priority, assuming all announcements
+ * are within the same preferredness class.
+ */
+ uint256 NewTxHash(const std::vector<std::vector<NodeId>>& orders = {})
+ {
+ uint256 ret;
+ bool ok;
+ do {
+ ret = InsecureRand256();
+ ok = true;
+ for (const auto& order : orders) {
+ for (size_t pos = 1; pos < order.size(); ++pos) {
+ uint64_t prio_prev = m_runner.txrequest.ComputePriority(ret, order[pos - 1], true);
+ uint64_t prio_cur = m_runner.txrequest.ComputePriority(ret, order[pos], true);
+ if (prio_prev <= prio_cur) {
+ ok = false;
+ break;
+ }
+ }
+ if (!ok) break;
+ }
+ if (ok) {
+ ok = m_runner.txhashset.insert(ret).second;
+ }
+ } while(!ok);
+ return ret;
+ }
+
+ /** Generate a random GenTxid; the txhash follows NewTxHash; the is_wtxid flag is random. */
+ GenTxid NewGTxid(const std::vector<std::vector<NodeId>>& orders = {})
+ {
+ return {InsecureRandBool(), NewTxHash(orders)};
+ }
+
+ /** Generate a new random NodeId to use as peer. The same NodeId is never returned twice
+ * (across all Scenarios combined). */
+ NodeId NewPeer()
+ {
+ bool ok;
+ NodeId ret;
+ do {
+ ret = InsecureRandBits(63);
+ ok = m_runner.peerset.insert(ret).second;
+ } while(!ok);
+ return ret;
+ }
+
+ std::chrono::microseconds Now() const { return m_now; }
+};
+
+/** Add to scenario a test with a single tx announced by a single peer.
+ *
+ * config is an integer in [0, 32), which controls which variant of the test is used.
+ */
+void BuildSingleTest(Scenario& scenario, int config)
+{
+ auto peer = scenario.NewPeer();
+ auto gtxid = scenario.NewGTxid();
+ bool immediate = config & 1;
+ bool preferred = config & 2;
+ auto delay = immediate ? NO_TIME : RandomTime8s();
+
+ scenario.SetTestName(strprintf("Single(config=%i)", config));
+
+ // Receive an announcement, either immediately requestable or delayed.
+ scenario.ReceivedInv(peer, gtxid, preferred, immediate ? MIN_TIME : scenario.Now() + delay);
+ if (immediate) {
+ scenario.Check(peer, {gtxid}, 1, 0, 0, "s1");
+ } else {
+ scenario.Check(peer, {}, 1, 0, 0, "s2");
+ scenario.AdvanceTime(delay - MICROSECOND);
+ scenario.Check(peer, {}, 1, 0, 0, "s3");
+ scenario.AdvanceTime(MICROSECOND);
+ scenario.Check(peer, {gtxid}, 1, 0, 0, "s4");
+ }
+
+ if (config >> 3) { // We'll request the transaction
+ scenario.AdvanceTime(RandomTime8s());
+ auto expiry = RandomTime8s();
+ scenario.Check(peer, {gtxid}, 1, 0, 0, "s5");
+ scenario.RequestedTx(peer, gtxid.GetHash(), scenario.Now() + expiry);
+ scenario.Check(peer, {}, 0, 1, 0, "s6");
+
+ if ((config >> 3) == 1) { // The request will time out
+ scenario.AdvanceTime(expiry - MICROSECOND);
+ scenario.Check(peer, {}, 0, 1, 0, "s7");
+ scenario.AdvanceTime(MICROSECOND);
+ scenario.Check(peer, {}, 0, 0, 0, "s8");
+ scenario.CheckExpired(peer, gtxid);
+ return;
+ } else {
+ scenario.AdvanceTime(std::chrono::microseconds{InsecureRandRange(expiry.count())});
+ scenario.Check(peer, {}, 0, 1, 0, "s9");
+ if ((config >> 3) == 3) { // A response will arrive for the transaction
+ scenario.ReceivedResponse(peer, gtxid.GetHash());
+ scenario.Check(peer, {}, 0, 0, 0, "s10");
+ return;
+ }
+ }
+ }
+
+ if (config & 4) { // The peer will go offline
+ scenario.DisconnectedPeer(peer);
+ } else { // The transaction is no longer needed
+ scenario.ForgetTxHash(gtxid.GetHash());
+ }
+ scenario.Check(peer, {}, 0, 0, 0, "s11");
+}
+
+/** Add to scenario a test with a single tx announced by two peers, to verify the
+ * right peer is selected for requests.
+ *
+ * config is an integer in [0, 32), which controls which variant of the test is used.
+ */
+void BuildPriorityTest(Scenario& scenario, int config)
+{
+ scenario.SetTestName(strprintf("Priority(config=%i)", config));
+
+ // Two peers. They will announce in order {peer1, peer2}.
+ auto peer1 = scenario.NewPeer(), peer2 = scenario.NewPeer();
+ // Construct a transaction that under random rules would be preferred by peer2 or peer1,
+ // depending on configuration.
+ bool prio1 = config & 1;
+ auto gtxid = prio1 ? scenario.NewGTxid({{peer1, peer2}}) : scenario.NewGTxid({{peer2, peer1}});
+ bool pref1 = config & 2, pref2 = config & 4;
+
+ scenario.ReceivedInv(peer1, gtxid, pref1, MIN_TIME);
+ scenario.Check(peer1, {gtxid}, 1, 0, 0, "p1");
+ if (InsecureRandBool()) {
+ scenario.AdvanceTime(RandomTime8s());
+ scenario.Check(peer1, {gtxid}, 1, 0, 0, "p2");
+ }
+
+ scenario.ReceivedInv(peer2, gtxid, pref2, MIN_TIME);
+ bool stage2_prio =
+ // At this point, peer2 will be given priority if:
+ // - It is preferred and peer1 is not
+ (pref2 && !pref1) ||
+ // - They're in the same preference class,
+ // and the randomized priority favors peer2 over peer1.
+ (pref1 == pref2 && !prio1);
+ NodeId priopeer = stage2_prio ? peer2 : peer1, otherpeer = stage2_prio ? peer1 : peer2;
+ scenario.Check(otherpeer, {}, 1, 0, 0, "p3");
+ scenario.Check(priopeer, {gtxid}, 1, 0, 0, "p4");
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.Check(otherpeer, {}, 1, 0, 0, "p5");
+ scenario.Check(priopeer, {gtxid}, 1, 0, 0, "p6");
+
+ // We possibly request from the selected peer.
+ if (config & 8) {
+ scenario.RequestedTx(priopeer, gtxid.GetHash(), MAX_TIME);
+ scenario.Check(priopeer, {}, 0, 1, 0, "p7");
+ scenario.Check(otherpeer, {}, 1, 0, 0, "p8");
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ }
+
+ // The peer which was selected (or requested from) now goes offline, or a NOTFOUND is received from them.
+ if (config & 16) {
+ scenario.DisconnectedPeer(priopeer);
+ } else {
+ scenario.ReceivedResponse(priopeer, gtxid.GetHash());
+ }
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.Check(priopeer, {}, 0, 0, !(config & 16), "p8");
+ scenario.Check(otherpeer, {gtxid}, 1, 0, 0, "p9");
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+
+ // Now the other peer goes offline.
+ scenario.DisconnectedPeer(otherpeer);
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.Check(peer1, {}, 0, 0, 0, "p10");
+ scenario.Check(peer2, {}, 0, 0, 0, "p11");
+}
+
+/** Add to scenario a randomized test in which N peers announce the same transaction, to verify
+ * the order in which they are requested. */
+void BuildBigPriorityTest(Scenario& scenario, int peers)
+{
+ scenario.SetTestName(strprintf("BigPriority(peers=%i)", peers));
+
+ // We will have N peers announce the same transaction.
+ std::map<NodeId, bool> preferred;
+ std::vector<NodeId> pref_peers, npref_peers;
+ int num_pref = InsecureRandRange(peers + 1) ; // Some preferred, ...
+ int num_npref = peers - num_pref; // some not preferred.
+ for (int i = 0; i < num_pref; ++i) {
+ pref_peers.push_back(scenario.NewPeer());
+ preferred[pref_peers.back()] = true;
+ }
+ for (int i = 0; i < num_npref; ++i) {
+ npref_peers.push_back(scenario.NewPeer());
+ preferred[npref_peers.back()] = false;
+ }
+ // Make a list of all peers, in order of intended request order (concatenation of pref_peers and npref_peers).
+ std::vector<NodeId> request_order;
+ for (int i = 0; i < num_pref; ++i) request_order.push_back(pref_peers[i]);
+ for (int i = 0; i < num_npref; ++i) request_order.push_back(npref_peers[i]);
+
+ // Determine the announcement order randomly.
+ std::vector<NodeId> announce_order = request_order;
+ Shuffle(announce_order.begin(), announce_order.end(), g_insecure_rand_ctx);
+
+ // Find a gtxid whose txhash prioritization is consistent with the required ordering within pref_peers and
+ // within npref_peers.
+ auto gtxid = scenario.NewGTxid({pref_peers, npref_peers});
+
+ // Decide reqtimes in opposite order of the expected request order. This means that as time passes we expect the
+ // to-be-requested-from-peer will change every time a subsequent reqtime is passed.
+ std::map<NodeId, std::chrono::microseconds> reqtimes;
+ auto reqtime = scenario.Now();
+ for (int i = peers - 1; i >= 0; --i) {
+ reqtime += RandomTime8s();
+ reqtimes[request_order[i]] = reqtime;
+ }
+
+ // Actually announce from all peers simultaneously (but in announce_order).
+ for (const auto peer : announce_order) {
+ scenario.ReceivedInv(peer, gtxid, preferred[peer], reqtimes[peer]);
+ }
+ for (const auto peer : announce_order) {
+ scenario.Check(peer, {}, 1, 0, 0, "b1");
+ }
+
+ // Let time pass and observe the to-be-requested-from peer change, from nonpreferred to preferred, and from
+ // high priority to low priority within each class.
+ for (int i = peers - 1; i >= 0; --i) {
+ scenario.AdvanceTime(reqtimes[request_order[i]] - scenario.Now() - MICROSECOND);
+ scenario.Check(request_order[i], {}, 1, 0, 0, "b2");
+ scenario.AdvanceTime(MICROSECOND);
+ scenario.Check(request_order[i], {gtxid}, 1, 0, 0, "b3");
+ }
+
+ // Peers now in random order go offline, or send NOTFOUNDs. At every point in time the new to-be-requested-from
+ // peer should be the best remaining one, so verify this after every response.
+ for (int i = 0; i < peers; ++i) {
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ const int pos = InsecureRandRange(request_order.size());
+ const auto peer = request_order[pos];
+ request_order.erase(request_order.begin() + pos);
+ if (InsecureRandBool()) {
+ scenario.DisconnectedPeer(peer);
+ scenario.Check(peer, {}, 0, 0, 0, "b4");
+ } else {
+ scenario.ReceivedResponse(peer, gtxid.GetHash());
+ scenario.Check(peer, {}, 0, 0, request_order.size() > 0, "b5");
+ }
+ if (request_order.size()) {
+ scenario.Check(request_order[0], {gtxid}, 1, 0, 0, "b6");
+ }
+ }
+
+ // Everything is gone in the end.
+ for (const auto peer : announce_order) {
+ scenario.Check(peer, {}, 0, 0, 0, "b7");
+ }
+}
+
+/** Add to scenario a test with one peer announcing two transactions, to verify they are
+ * fetched in announcement order.
+ *
+ * config is an integer in [0, 4) inclusive, and selects the variant of the test.
+ */
+void BuildRequestOrderTest(Scenario& scenario, int config)
+{
+ scenario.SetTestName(strprintf("RequestOrder(config=%i)", config));
+
+ auto peer = scenario.NewPeer();
+ auto gtxid1 = scenario.NewGTxid();
+ auto gtxid2 = scenario.NewGTxid();
+
+ auto reqtime2 = scenario.Now() + RandomTime8s();
+ auto reqtime1 = reqtime2 + RandomTime8s();
+
+ scenario.ReceivedInv(peer, gtxid1, config & 1, reqtime1);
+ // Simulate time going backwards by giving the second announcement an earlier reqtime.
+ scenario.ReceivedInv(peer, gtxid2, config & 2, reqtime2);
+
+ scenario.AdvanceTime(reqtime2 - MICROSECOND - scenario.Now());
+ scenario.Check(peer, {}, 2, 0, 0, "o1");
+ scenario.AdvanceTime(MICROSECOND);
+ scenario.Check(peer, {gtxid2}, 2, 0, 0, "o2");
+ scenario.AdvanceTime(reqtime1 - MICROSECOND - scenario.Now());
+ scenario.Check(peer, {gtxid2}, 2, 0, 0, "o3");
+ scenario.AdvanceTime(MICROSECOND);
+ // Even with time going backwards in between announcements, the return value of GetRequestable is in
+ // announcement order.
+ scenario.Check(peer, {gtxid1, gtxid2}, 2, 0, 0, "o4");
+
+ scenario.DisconnectedPeer(peer);
+ scenario.Check(peer, {}, 0, 0, 0, "o5");
+}
+
+/** Add to scenario a test that verifies behavior related to both txid and wtxid with the same
+ * hash being announced.
+ *
+ * config is an integer in [0, 4) inclusive, and selects the variant of the test used.
+*/
+void BuildWtxidTest(Scenario& scenario, int config)
+{
+ scenario.SetTestName(strprintf("Wtxid(config=%i)", config));
+
+ auto peerT = scenario.NewPeer();
+ auto peerW = scenario.NewPeer();
+ auto txhash = scenario.NewTxHash();
+ GenTxid txid{false, txhash};
+ GenTxid wtxid{true, txhash};
+
+ auto reqtimeT = InsecureRandBool() ? MIN_TIME : scenario.Now() + RandomTime8s();
+ auto reqtimeW = InsecureRandBool() ? MIN_TIME : scenario.Now() + RandomTime8s();
+
+ // Announce txid first or wtxid first.
+ if (config & 1) {
+ scenario.ReceivedInv(peerT, txid, config & 2, reqtimeT);
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ReceivedInv(peerW, wtxid, !(config & 2), reqtimeW);
+ } else {
+ scenario.ReceivedInv(peerW, wtxid, !(config & 2), reqtimeW);
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ReceivedInv(peerT, txid, config & 2, reqtimeT);
+ }
+
+ // Let time pass if needed, and check that the preferred announcement (txid or wtxid)
+ // is correctly to-be-requested (and with the correct wtxidness).
+ auto max_reqtime = std::max(reqtimeT, reqtimeW);
+ if (max_reqtime > scenario.Now()) scenario.AdvanceTime(max_reqtime - scenario.Now());
+ if (config & 2) {
+ scenario.Check(peerT, {txid}, 1, 0, 0, "w1");
+ scenario.Check(peerW, {}, 1, 0, 0, "w2");
+ } else {
+ scenario.Check(peerT, {}, 1, 0, 0, "w3");
+ scenario.Check(peerW, {wtxid}, 1, 0, 0, "w4");
+ }
+
+ // Let the preferred announcement be requested. It's not going to be delivered.
+ auto expiry = RandomTime8s();
+ if (config & 2) {
+ scenario.RequestedTx(peerT, txid.GetHash(), scenario.Now() + expiry);
+ scenario.Check(peerT, {}, 0, 1, 0, "w5");
+ scenario.Check(peerW, {}, 1, 0, 0, "w6");
+ } else {
+ scenario.RequestedTx(peerW, wtxid.GetHash(), scenario.Now() + expiry);
+ scenario.Check(peerT, {}, 1, 0, 0, "w7");
+ scenario.Check(peerW, {}, 0, 1, 0, "w8");
+ }
+
+ // After reaching expiration time of the preferred announcement, verify that the
+ // remaining one is requestable
+ scenario.AdvanceTime(expiry);
+ if (config & 2) {
+ scenario.Check(peerT, {}, 0, 0, 1, "w9");
+ scenario.Check(peerW, {wtxid}, 1, 0, 0, "w10");
+ scenario.CheckExpired(peerT, txid);
+ } else {
+ scenario.Check(peerT, {txid}, 1, 0, 0, "w11");
+ scenario.Check(peerW, {}, 0, 0, 1, "w12");
+ scenario.CheckExpired(peerW, wtxid);
+ }
+
+ // If a good transaction with either that hash as wtxid or txid arrives, both
+ // announcements are gone.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ForgetTxHash(txhash);
+ scenario.Check(peerT, {}, 0, 0, 0, "w13");
+ scenario.Check(peerW, {}, 0, 0, 0, "w14");
+}
+
+/** Add to scenario a test that exercises clocks that go backwards. */
+void BuildTimeBackwardsTest(Scenario& scenario)
+{
+ auto peer1 = scenario.NewPeer();
+ auto peer2 = scenario.NewPeer();
+ auto gtxid = scenario.NewGTxid({{peer1, peer2}});
+
+ // Announce from peer2.
+ auto reqtime = scenario.Now() + RandomTime8s();
+ scenario.ReceivedInv(peer2, gtxid, true, reqtime);
+ scenario.Check(peer2, {}, 1, 0, 0, "r1");
+ scenario.AdvanceTime(reqtime - scenario.Now());
+ scenario.Check(peer2, {gtxid}, 1, 0, 0, "r2");
+ // Check that if the clock goes backwards by 1us, the transaction would stop being requested.
+ scenario.Check(peer2, {}, 1, 0, 0, "r3", -MICROSECOND);
+ // But it reverts to being requested if time goes forward again.
+ scenario.Check(peer2, {gtxid}, 1, 0, 0, "r4");
+
+ // Announce from peer1.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ReceivedInv(peer1, gtxid, true, MAX_TIME);
+ scenario.Check(peer2, {gtxid}, 1, 0, 0, "r5");
+ scenario.Check(peer1, {}, 1, 0, 0, "r6");
+
+ // Request from peer1.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ auto expiry = scenario.Now() + RandomTime8s();
+ scenario.RequestedTx(peer1, gtxid.GetHash(), expiry);
+ scenario.Check(peer1, {}, 0, 1, 0, "r7");
+ scenario.Check(peer2, {}, 1, 0, 0, "r8");
+
+ // Expiration passes.
+ scenario.AdvanceTime(expiry - scenario.Now());
+ scenario.Check(peer1, {}, 0, 0, 1, "r9");
+ scenario.Check(peer2, {gtxid}, 1, 0, 0, "r10"); // Request goes back to peer2.
+ scenario.CheckExpired(peer1, gtxid);
+ scenario.Check(peer1, {}, 0, 0, 1, "r11", -MICROSECOND); // Going back does not unexpire.
+ scenario.Check(peer2, {gtxid}, 1, 0, 0, "r12", -MICROSECOND);
+
+ // Peer2 goes offline, meaning no viable announcements remain.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.DisconnectedPeer(peer2);
+ scenario.Check(peer1, {}, 0, 0, 0, "r13");
+ scenario.Check(peer2, {}, 0, 0, 0, "r14");
+}
+
+/** Add to scenario a test that involves RequestedTx() calls for txhashes not returned by GetRequestable. */
+void BuildWeirdRequestsTest(Scenario& scenario)
+{
+ auto peer1 = scenario.NewPeer();
+ auto peer2 = scenario.NewPeer();
+ auto gtxid1 = scenario.NewGTxid({{peer1, peer2}});
+ auto gtxid2 = scenario.NewGTxid({{peer2, peer1}});
+
+ // Announce gtxid1 by peer1.
+ scenario.ReceivedInv(peer1, gtxid1, true, MIN_TIME);
+ scenario.Check(peer1, {gtxid1}, 1, 0, 0, "q1");
+
+ // Announce gtxid2 by peer2.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ReceivedInv(peer2, gtxid2, true, MIN_TIME);
+ scenario.Check(peer1, {gtxid1}, 1, 0, 0, "q2");
+ scenario.Check(peer2, {gtxid2}, 1, 0, 0, "q3");
+
+ // We request gtxid2 from *peer1* - no effect.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.RequestedTx(peer1, gtxid2.GetHash(), MAX_TIME);
+ scenario.Check(peer1, {gtxid1}, 1, 0, 0, "q4");
+ scenario.Check(peer2, {gtxid2}, 1, 0, 0, "q5");
+
+ // Now request gtxid1 from peer1 - marks it as REQUESTED.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ auto expiryA = scenario.Now() + RandomTime8s();
+ scenario.RequestedTx(peer1, gtxid1.GetHash(), expiryA);
+ scenario.Check(peer1, {}, 0, 1, 0, "q6");
+ scenario.Check(peer2, {gtxid2}, 1, 0, 0, "q7");
+
+ // Request it a second time - nothing happens, as it's already REQUESTED.
+ auto expiryB = expiryA + RandomTime8s();
+ scenario.RequestedTx(peer1, gtxid1.GetHash(), expiryB);
+ scenario.Check(peer1, {}, 0, 1, 0, "q8");
+ scenario.Check(peer2, {gtxid2}, 1, 0, 0, "q9");
+
+ // Also announce gtxid1 from peer2 now, so that the txhash isn't forgotten when the peer1 request expires.
+ scenario.ReceivedInv(peer2, gtxid1, true, MIN_TIME);
+ scenario.Check(peer1, {}, 0, 1, 0, "q10");
+ scenario.Check(peer2, {gtxid2}, 2, 0, 0, "q11");
+
+ // When reaching expiryA, it expires (not expiryB, which is later).
+ scenario.AdvanceTime(expiryA - scenario.Now());
+ scenario.Check(peer1, {}, 0, 0, 1, "q12");
+ scenario.Check(peer2, {gtxid2, gtxid1}, 2, 0, 0, "q13");
+ scenario.CheckExpired(peer1, gtxid1);
+
+ // Requesting it yet again from peer1 doesn't do anything, as it's already COMPLETED.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.RequestedTx(peer1, gtxid1.GetHash(), MAX_TIME);
+ scenario.Check(peer1, {}, 0, 0, 1, "q14");
+ scenario.Check(peer2, {gtxid2, gtxid1}, 2, 0, 0, "q15");
+
+ // Now announce gtxid2 from peer1.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.ReceivedInv(peer1, gtxid2, true, MIN_TIME);
+ scenario.Check(peer1, {}, 1, 0, 1, "q16");
+ scenario.Check(peer2, {gtxid2, gtxid1}, 2, 0, 0, "q17");
+
+ // And request it from peer1 (weird as peer2 has the preference).
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.RequestedTx(peer1, gtxid2.GetHash(), MAX_TIME);
+ scenario.Check(peer1, {}, 0, 1, 1, "q18");
+ scenario.Check(peer2, {gtxid1}, 2, 0, 0, "q19");
+
+ // If peer2 now (normally) requests gtxid2, the existing request by peer1 becomes COMPLETED.
+ if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s());
+ scenario.RequestedTx(peer2, gtxid2.GetHash(), MAX_TIME);
+ scenario.Check(peer1, {}, 0, 0, 2, "q20");
+ scenario.Check(peer2, {gtxid1}, 1, 1, 0, "q21");
+
+ // If peer2 goes offline, no viable announcements remain.
+ scenario.DisconnectedPeer(peer2);
+ scenario.Check(peer1, {}, 0, 0, 0, "q22");
+ scenario.Check(peer2, {}, 0, 0, 0, "q23");
+}
+
+void TestInterleavedScenarios()
+{
+ // Create a list of functions which add tests to scenarios.
+ std::vector<std::function<void(Scenario&)>> builders;
+ // Add instances of every test, for every configuration.
+ for (int n = 0; n < 64; ++n) {
+ builders.emplace_back([n](Scenario& scenario){ BuildWtxidTest(scenario, n); });
+ builders.emplace_back([n](Scenario& scenario){ BuildRequestOrderTest(scenario, n & 3); });
+ builders.emplace_back([n](Scenario& scenario){ BuildSingleTest(scenario, n & 31); });
+ builders.emplace_back([n](Scenario& scenario){ BuildPriorityTest(scenario, n & 31); });
+ builders.emplace_back([n](Scenario& scenario){ BuildBigPriorityTest(scenario, (n & 7) + 1); });
+ builders.emplace_back([](Scenario& scenario){ BuildTimeBackwardsTest(scenario); });
+ builders.emplace_back([](Scenario& scenario){ BuildWeirdRequestsTest(scenario); });
+ }
+ // Randomly shuffle all those functions.
+ Shuffle(builders.begin(), builders.end(), g_insecure_rand_ctx);
+
+ Runner runner;
+ auto starttime = RandomTime1y();
+ // Construct many scenarios, and run (up to) 10 randomly-chosen tests consecutively in each.
+ while (builders.size()) {
+ // Introduce some variation in the start time of each scenario, so they don't all start off
+ // concurrently, but get a more random interleaving.
+ auto scenario_start = starttime + RandomTime8s() + RandomTime8s() + RandomTime8s();
+ Scenario scenario(runner, scenario_start);
+ for (int j = 0; builders.size() && j < 10; ++j) {
+ builders.back()(scenario);
+ builders.pop_back();
+ }
+ }
+ // Sort all the actions from all those scenarios chronologically, resulting in the actions from
+ // distinct scenarios to become interleaved. Use stable_sort so that actions from one scenario
+ // aren't reordered w.r.t. each other.
+ std::stable_sort(runner.actions.begin(), runner.actions.end(), [](const Action& a1, const Action& a2) {
+ return a1.first < a2.first;
+ });
+
+ // Run all actions from all scenarios, in order.
+ for (auto& action : runner.actions) {
+ action.second();
+ }
+
+ BOOST_CHECK_EQUAL(runner.txrequest.Size(), 0U);
+ BOOST_CHECK(runner.expired.empty());
+}
+
+} // namespace
+
+BOOST_AUTO_TEST_CASE(TxRequestTest)
+{
+ for (int i = 0; i < 5; ++i) {
+ TestInterleavedScenarios();
+ }
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/src/txrequest.cpp b/src/txrequest.cpp
new file mode 100644
index 0000000000..494786c201
--- /dev/null
+++ b/src/txrequest.cpp
@@ -0,0 +1,748 @@
+// Copyright (c) 2020 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include <txrequest.h>
+
+#include <crypto/siphash.h>
+#include <net.h>
+#include <primitives/transaction.h>
+#include <random.h>
+#include <uint256.h>
+#include <util/memory.h>
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+
+#include <chrono>
+#include <unordered_map>
+#include <utility>
+
+#include <assert.h>
+
+namespace {
+
+/** The various states a (txhash,peer) pair can be in.
+ *
+ * Note that CANDIDATE is split up into 3 substates (DELAYED, BEST, READY), allowing more efficient implementation.
+ * Also note that the sorting order of ByTxHashView relies on the specific order of values in this enum.
+ *
+ * Expected behaviour is:
+ * - When first announced by a peer, the state is CANDIDATE_DELAYED until reqtime is reached.
+ * - Announcements that have reached their reqtime but not been requested will be either CANDIDATE_READY or
+ * CANDIDATE_BEST. Neither of those has an expiration time; they remain in that state until they're requested or
+ * no longer needed. CANDIDATE_READY announcements are promoted to CANDIDATE_BEST when they're the best one left.
+ * - When requested, an announcement will be in state REQUESTED until expiry is reached.
+ * - If expiry is reached, or the peer replies to the request (either with NOTFOUND or the tx), the state becomes
+ * COMPLETED.
+ */
+enum class State : uint8_t {
+ /** A CANDIDATE announcement whose reqtime is in the future. */
+ CANDIDATE_DELAYED,
+ /** A CANDIDATE announcement that's not CANDIDATE_DELAYED or CANDIDATE_BEST. */
+ CANDIDATE_READY,
+ /** The best CANDIDATE for a given txhash; only if there is no REQUESTED announcement already for that txhash.
+ * The CANDIDATE_BEST is the highest-priority announcement among all CANDIDATE_READY (and _BEST) ones for that
+ * txhash. */
+ CANDIDATE_BEST,
+ /** A REQUESTED announcement. */
+ REQUESTED,
+ /** A COMPLETED announcement. */
+ COMPLETED,
+};
+
+//! Type alias for sequence numbers.
+using SequenceNumber = uint64_t;
+
+/** An announcement. This is the data we track for each txid or wtxid that is announced to us by each peer. */
+struct Announcement {
+ /** Txid or wtxid that was announced. */
+ const uint256 m_txhash;
+ /** For CANDIDATE_{DELAYED,BEST,READY} the reqtime; for REQUESTED the expiry. */
+ std::chrono::microseconds m_time;
+ /** What peer the request was from. */
+ const NodeId m_peer;
+ /** What sequence number this announcement has. */
+ const SequenceNumber m_sequence : 59;
+ /** Whether the request is preferred. */
+ const bool m_preferred : 1;
+ /** Whether this is a wtxid request. */
+ const bool m_is_wtxid : 1;
+
+ /** What state this announcement is in. */
+ State m_state : 3;
+
+ /** Whether this announcement is selected. There can be at most 1 selected peer per txhash. */
+ bool IsSelected() const
+ {
+ return m_state == State::CANDIDATE_BEST || m_state == State::REQUESTED;
+ }
+
+ /** Whether this announcement is waiting for a certain time to pass. */
+ bool IsWaiting() const
+ {
+ return m_state == State::REQUESTED || m_state == State::CANDIDATE_DELAYED;
+ }
+
+ /** Whether this announcement can feasibly be selected if the current IsSelected() one disappears. */
+ bool IsSelectable() const
+ {
+ return m_state == State::CANDIDATE_READY || m_state == State::CANDIDATE_BEST;
+ }
+
+ /** Construct a new announcement from scratch, initially in CANDIDATE_DELAYED state. */
+ Announcement(const GenTxid& gtxid, NodeId peer, bool preferred, std::chrono::microseconds reqtime,
+ SequenceNumber sequence) :
+ m_txhash(gtxid.GetHash()), m_time(reqtime), m_peer(peer), m_sequence(sequence), m_preferred(preferred),
+ m_is_wtxid(gtxid.IsWtxid()), m_state(State::CANDIDATE_DELAYED) {}
+};
+
+//! Type alias for priorities.
+using Priority = uint64_t;
+
+/** A functor with embedded salt that computes priority of an announcement.
+ *
+ * Higher priorities are selected first.
+ */
+class PriorityComputer {
+ const uint64_t m_k0, m_k1;
+public:
+ explicit PriorityComputer(bool deterministic) :
+ m_k0{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)},
+ m_k1{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)} {}
+
+ Priority operator()(const uint256& txhash, NodeId peer, bool preferred) const
+ {
+ uint64_t low_bits = CSipHasher(m_k0, m_k1).Write(txhash.begin(), txhash.size()).Write(peer).Finalize() >> 1;
+ return low_bits | uint64_t{preferred} << 63;
+ }
+
+ Priority operator()(const Announcement& ann) const
+ {
+ return operator()(ann.m_txhash, ann.m_peer, ann.m_preferred);
+ }
+};
+
+// Definitions for the 3 indexes used in the main data structure.
+//
+// Each index has a By* type to identify it, a By*View data type to represent the view of announcement it is sorted
+// by, and an By*ViewExtractor type to convert an announcement into the By*View type.
+// See https://www.boost.org/doc/libs/1_58_0/libs/multi_index/doc/reference/key_extraction.html#key_extractors
+// for more information about the key extraction concept.
+
+// The ByPeer index is sorted by (peer, state == CANDIDATE_BEST, txhash)
+//
+// Uses:
+// * Looking up existing announcements by peer/txhash, by checking both (peer, false, txhash) and
+// (peer, true, txhash).
+// * Finding all CANDIDATE_BEST announcements for a given peer in GetRequestable.
+struct ByPeer {};
+using ByPeerView = std::tuple<NodeId, bool, const uint256&>;
+struct ByPeerViewExtractor
+{
+ using result_type = ByPeerView;
+ result_type operator()(const Announcement& ann) const
+ {
+ return ByPeerView{ann.m_peer, ann.m_state == State::CANDIDATE_BEST, ann.m_txhash};
+ }
+};
+
+// The ByTxHash index is sorted by (txhash, state, priority).
+//
+// Note: priority == 0 whenever state != CANDIDATE_READY.
+//
+// Uses:
+// * Deleting all announcements with a given txhash in ForgetTxHash.
+// * Finding the best CANDIDATE_READY to convert to CANDIDATE_BEST, when no other CANDIDATE_READY or REQUESTED
+// announcement exists for that txhash.
+// * Determining when no more non-COMPLETED announcements for a given txhash exist, so the COMPLETED ones can be
+// deleted.
+struct ByTxHash {};
+using ByTxHashView = std::tuple<const uint256&, State, Priority>;
+class ByTxHashViewExtractor {
+ const PriorityComputer& m_computer;
+public:
+ ByTxHashViewExtractor(const PriorityComputer& computer) : m_computer(computer) {}
+ using result_type = ByTxHashView;
+ result_type operator()(const Announcement& ann) const
+ {
+ const Priority prio = (ann.m_state == State::CANDIDATE_READY) ? m_computer(ann) : 0;
+ return ByTxHashView{ann.m_txhash, ann.m_state, prio};
+ }
+};
+
+enum class WaitState {
+ //! Used for announcements that need efficient testing of "is their timestamp in the future?".
+ FUTURE_EVENT,
+ //! Used for announcements whose timestamp is not relevant.
+ NO_EVENT,
+ //! Used for announcements that need efficient testing of "is their timestamp in the past?".
+ PAST_EVENT,
+};
+
+WaitState GetWaitState(const Announcement& ann)
+{
+ if (ann.IsWaiting()) return WaitState::FUTURE_EVENT;
+ if (ann.IsSelectable()) return WaitState::PAST_EVENT;
+ return WaitState::NO_EVENT;
+}
+
+// The ByTime index is sorted by (wait_state, time).
+//
+// All announcements with a timestamp in the future can be found by iterating the index forward from the beginning.
+// All announcements with a timestamp in the past can be found by iterating the index backwards from the end.
+//
+// Uses:
+// * Finding CANDIDATE_DELAYED announcements whose reqtime has passed, and REQUESTED announcements whose expiry has
+// passed.
+// * Finding CANDIDATE_READY/BEST announcements whose reqtime is in the future (when the clock time went backwards).
+struct ByTime {};
+using ByTimeView = std::pair<WaitState, std::chrono::microseconds>;
+struct ByTimeViewExtractor
+{
+ using result_type = ByTimeView;
+ result_type operator()(const Announcement& ann) const
+ {
+ return ByTimeView{GetWaitState(ann), ann.m_time};
+ }
+};
+
+/** Data type for the main data structure (Announcement objects with ByPeer/ByTxHash/ByTime indexes). */
+using Index = boost::multi_index_container<
+ Announcement,
+ boost::multi_index::indexed_by<
+ boost::multi_index::ordered_unique<boost::multi_index::tag<ByPeer>, ByPeerViewExtractor>,
+ boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTxHash>, ByTxHashViewExtractor>,
+ boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTime>, ByTimeViewExtractor>
+ >
+>;
+
+/** Helper type to simplify syntax of iterator types. */
+template<typename Tag>
+using Iter = typename Index::index<Tag>::type::iterator;
+
+/** Per-peer statistics object. */
+struct PeerInfo {
+ size_t m_total = 0; //!< Total number of announcements for this peer.
+ size_t m_completed = 0; //!< Number of COMPLETED announcements for this peer.
+ size_t m_requested = 0; //!< Number of REQUESTED announcements for this peer.
+};
+
+/** Per-txhash statistics object. Only used for sanity checking. */
+struct TxHashInfo
+{
+ //! Number of CANDIDATE_DELAYED announcements for this txhash.
+ size_t m_candidate_delayed = 0;
+ //! Number of CANDIDATE_READY announcements for this txhash.
+ size_t m_candidate_ready = 0;
+ //! Number of CANDIDATE_BEST announcements for this txhash (at most one).
+ size_t m_candidate_best = 0;
+ //! Number of REQUESTED announcements for this txhash (at most one; mutually exclusive with CANDIDATE_BEST).
+ size_t m_requested = 0;
+ //! The priority of the CANDIDATE_BEST announcement if one exists, or max() otherwise.
+ Priority m_priority_candidate_best = std::numeric_limits<Priority>::max();
+ //! The highest priority of all CANDIDATE_READY announcements (or min() if none exist).
+ Priority m_priority_best_candidate_ready = std::numeric_limits<Priority>::min();
+ //! All peers we have an announcement for this txhash for.
+ std::vector<NodeId> m_peers;
+};
+
+/** Compare two PeerInfo objects. Only used for sanity checking. */
+bool operator==(const PeerInfo& a, const PeerInfo& b)
+{
+ return std::tie(a.m_total, a.m_completed, a.m_requested) ==
+ std::tie(b.m_total, b.m_completed, b.m_requested);
+};
+
+/** (Re)compute the PeerInfo map from the index. Only used for sanity checking. */
+std::unordered_map<NodeId, PeerInfo> RecomputePeerInfo(const Index& index)
+{
+ std::unordered_map<NodeId, PeerInfo> ret;
+ for (const Announcement& ann : index) {
+ PeerInfo& info = ret[ann.m_peer];
+ ++info.m_total;
+ info.m_requested += (ann.m_state == State::REQUESTED);
+ info.m_completed += (ann.m_state == State::COMPLETED);
+ }
+ return ret;
+}
+
+/** Compute the TxHashInfo map. Only used for sanity checking. */
+std::map<uint256, TxHashInfo> ComputeTxHashInfo(const Index& index, const PriorityComputer& computer)
+{
+ std::map<uint256, TxHashInfo> ret;
+ for (const Announcement& ann : index) {
+ TxHashInfo& info = ret[ann.m_txhash];
+ // Classify how many announcements of each state we have for this txhash.
+ info.m_candidate_delayed += (ann.m_state == State::CANDIDATE_DELAYED);
+ info.m_candidate_ready += (ann.m_state == State::CANDIDATE_READY);
+ info.m_candidate_best += (ann.m_state == State::CANDIDATE_BEST);
+ info.m_requested += (ann.m_state == State::REQUESTED);
+ // And track the priority of the best CANDIDATE_READY/CANDIDATE_BEST announcements.
+ if (ann.m_state == State::CANDIDATE_BEST) {
+ info.m_priority_candidate_best = computer(ann);
+ }
+ if (ann.m_state == State::CANDIDATE_READY) {
+ info.m_priority_best_candidate_ready = std::max(info.m_priority_best_candidate_ready, computer(ann));
+ }
+ // Also keep track of which peers this txhash has an announcement for (so we can detect duplicates).
+ info.m_peers.push_back(ann.m_peer);
+ }
+ return ret;
+}
+
+GenTxid ToGenTxid(const Announcement& ann)
+{
+ return {ann.m_is_wtxid, ann.m_txhash};
+}
+
+} // namespace
+
+/** Actual implementation for TxRequestTracker's data structure. */
+class TxRequestTracker::Impl {
+ //! The current sequence number. Increases for every announcement. This is used to sort txhashes returned by
+ //! GetRequestable in announcement order.
+ SequenceNumber m_current_sequence{0};
+
+ //! This tracker's priority computer.
+ const PriorityComputer m_computer;
+
+ //! This tracker's main data structure. See SanityCheck() for the invariants that apply to it.
+ Index m_index;
+
+ //! Map with this tracker's per-peer statistics.
+ std::unordered_map<NodeId, PeerInfo> m_peerinfo;
+
+public:
+ void SanityCheck() const
+ {
+ // Recompute m_peerdata from m_index. This verifies the data in it as it should just be caching statistics
+ // on m_index. It also verifies the invariant that no PeerInfo announcements with m_total==0 exist.
+ assert(m_peerinfo == RecomputePeerInfo(m_index));
+
+ // Calculate per-txhash statistics from m_index, and validate invariants.
+ for (auto& item : ComputeTxHashInfo(m_index, m_computer)) {
+ TxHashInfo& info = item.second;
+
+ // Cannot have only COMPLETED peer (txhash should have been forgotten already)
+ assert(info.m_candidate_delayed + info.m_candidate_ready + info.m_candidate_best + info.m_requested > 0);
+
+ // Can have at most 1 CANDIDATE_BEST/REQUESTED peer
+ assert(info.m_candidate_best + info.m_requested <= 1);
+
+ // If there are any CANDIDATE_READY announcements, there must be exactly one CANDIDATE_BEST or REQUESTED
+ // announcement.
+ if (info.m_candidate_ready > 0) {
+ assert(info.m_candidate_best + info.m_requested == 1);
+ }
+
+ // If there is both a CANDIDATE_READY and a CANDIDATE_BEST announcement, the CANDIDATE_BEST one must be
+ // at least as good (equal or higher priority) as the best CANDIDATE_READY.
+ if (info.m_candidate_ready && info.m_candidate_best) {
+ assert(info.m_priority_candidate_best >= info.m_priority_best_candidate_ready);
+ }
+
+ // No txhash can have been announced by the same peer twice.
+ std::sort(info.m_peers.begin(), info.m_peers.end());
+ assert(std::adjacent_find(info.m_peers.begin(), info.m_peers.end()) == info.m_peers.end());
+ }
+ }
+
+ void PostGetRequestableSanityCheck(std::chrono::microseconds now) const
+ {
+ for (const Announcement& ann : m_index) {
+ if (ann.IsWaiting()) {
+ // REQUESTED and CANDIDATE_DELAYED must have a time in the future (they should have been converted
+ // to COMPLETED/CANDIDATE_READY respectively).
+ assert(ann.m_time > now);
+ } else if (ann.IsSelectable()) {
+ // CANDIDATE_READY and CANDIDATE_BEST cannot have a time in the future (they should have remained
+ // CANDIDATE_DELAYED, or should have been converted back to it if time went backwards).
+ assert(ann.m_time <= now);
+ }
+ }
+ }
+
+private:
+ //! Wrapper around Index::...::erase that keeps m_peerinfo up to date.
+ template<typename Tag>
+ Iter<Tag> Erase(Iter<Tag> it)
+ {
+ auto peerit = m_peerinfo.find(it->m_peer);
+ peerit->second.m_completed -= it->m_state == State::COMPLETED;
+ peerit->second.m_requested -= it->m_state == State::REQUESTED;
+ if (--peerit->second.m_total == 0) m_peerinfo.erase(peerit);
+ return m_index.get<Tag>().erase(it);
+ }
+
+ //! Wrapper around Index::...::modify that keeps m_peerinfo up to date.
+ template<typename Tag, typename Modifier>
+ void Modify(Iter<Tag> it, Modifier modifier)
+ {
+ auto peerit = m_peerinfo.find(it->m_peer);
+ peerit->second.m_completed -= it->m_state == State::COMPLETED;
+ peerit->second.m_requested -= it->m_state == State::REQUESTED;
+ m_index.get<Tag>().modify(it, std::move(modifier));
+ peerit->second.m_completed += it->m_state == State::COMPLETED;
+ peerit->second.m_requested += it->m_state == State::REQUESTED;
+ }
+
+ //! Convert a CANDIDATE_DELAYED announcement into a CANDIDATE_READY. If this makes it the new best
+ //! CANDIDATE_READY (and no REQUESTED exists) and better than the CANDIDATE_BEST (if any), it becomes the new
+ //! CANDIDATE_BEST.
+ void PromoteCandidateReady(Iter<ByTxHash> it)
+ {
+ assert(it != m_index.get<ByTxHash>().end());
+ assert(it->m_state == State::CANDIDATE_DELAYED);
+ // Convert CANDIDATE_DELAYED to CANDIDATE_READY first.
+ Modify<ByTxHash>(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_READY; });
+ // The following code relies on the fact that the ByTxHash is sorted by txhash, and then by state (first
+ // _DELAYED, then _READY, then _BEST/REQUESTED). Within the _READY announcements, the best one (highest
+ // priority) comes last. Thus, if an existing _BEST exists for the same txhash that this announcement may
+ // be preferred over, it must immediately follow the newly created _READY.
+ auto it_next = std::next(it);
+ if (it_next == m_index.get<ByTxHash>().end() || it_next->m_txhash != it->m_txhash ||
+ it_next->m_state == State::COMPLETED) {
+ // This is the new best CANDIDATE_READY, and there is no IsSelected() announcement for this txhash
+ // already.
+ Modify<ByTxHash>(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; });
+ } else if (it_next->m_state == State::CANDIDATE_BEST) {
+ Priority priority_old = m_computer(*it_next);
+ Priority priority_new = m_computer(*it);
+ if (priority_new > priority_old) {
+ // There is a CANDIDATE_BEST announcement already, but this one is better.
+ Modify<ByTxHash>(it_next, [](Announcement& ann){ ann.m_state = State::CANDIDATE_READY; });
+ Modify<ByTxHash>(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; });
+ }
+ }
+ }
+
+ //! Change the state of an announcement to something non-IsSelected(). If it was IsSelected(), the next best
+ //! announcement will be marked CANDIDATE_BEST.
+ void ChangeAndReselect(Iter<ByTxHash> it, State new_state)
+ {
+ assert(new_state == State::COMPLETED || new_state == State::CANDIDATE_DELAYED);
+ assert(it != m_index.get<ByTxHash>().end());
+ if (it->IsSelected() && it != m_index.get<ByTxHash>().begin()) {
+ auto it_prev = std::prev(it);
+ // The next best CANDIDATE_READY, if any, immediately precedes the REQUESTED or CANDIDATE_BEST
+ // announcement in the ByTxHash index.
+ if (it_prev->m_txhash == it->m_txhash && it_prev->m_state == State::CANDIDATE_READY) {
+ // If one such CANDIDATE_READY exists (for this txhash), convert it to CANDIDATE_BEST.
+ Modify<ByTxHash>(it_prev, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; });
+ }
+ }
+ Modify<ByTxHash>(it, [new_state](Announcement& ann){ ann.m_state = new_state; });
+ }
+
+ //! Check if 'it' is the only announcement for a given txhash that isn't COMPLETED.
+ bool IsOnlyNonCompleted(Iter<ByTxHash> it)
+ {
+ assert(it != m_index.get<ByTxHash>().end());
+ assert(it->m_state != State::COMPLETED); // Not allowed to call this on COMPLETED announcements.
+
+ // This announcement has a predecessor that belongs to the same txhash. Due to ordering, and the
+ // fact that 'it' is not COMPLETED, its predecessor cannot be COMPLETED here.
+ if (it != m_index.get<ByTxHash>().begin() && std::prev(it)->m_txhash == it->m_txhash) return false;
+
+ // This announcement has a successor that belongs to the same txhash, and is not COMPLETED.
+ if (std::next(it) != m_index.get<ByTxHash>().end() && std::next(it)->m_txhash == it->m_txhash &&
+ std::next(it)->m_state != State::COMPLETED) return false;
+
+ return true;
+ }
+
+ /** Convert any announcement to a COMPLETED one. If there are no non-COMPLETED announcements left for this
+ * txhash, they are deleted. If this was a REQUESTED announcement, and there are other CANDIDATEs left, the
+ * best one is made CANDIDATE_BEST. Returns whether the announcement still exists. */
+ bool MakeCompleted(Iter<ByTxHash> it)
+ {
+ assert(it != m_index.get<ByTxHash>().end());
+
+ // Nothing to be done if it's already COMPLETED.
+ if (it->m_state == State::COMPLETED) return true;
+
+ if (IsOnlyNonCompleted(it)) {
+ // This is the last non-COMPLETED announcement for this txhash. Delete all.
+ uint256 txhash = it->m_txhash;
+ do {
+ it = Erase<ByTxHash>(it);
+ } while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash);
+ return false;
+ }
+
+ // Mark the announcement COMPLETED, and select the next best announcement (the first CANDIDATE_READY) if
+ // needed.
+ ChangeAndReselect(it, State::COMPLETED);
+
+ return true;
+ }
+
+ //! Make the data structure consistent with a given point in time:
+ //! - REQUESTED annoucements with expiry <= now are turned into COMPLETED.
+ //! - CANDIDATE_DELAYED announcements with reqtime <= now are turned into CANDIDATE_{READY,BEST}.
+ //! - CANDIDATE_{READY,BEST} announcements with reqtime > now are turned into CANDIDATE_DELAYED.
+ void SetTimePoint(std::chrono::microseconds now, std::vector<std::pair<NodeId, GenTxid>>* expired)
+ {
+ if (expired) expired->clear();
+
+ // Iterate over all CANDIDATE_DELAYED and REQUESTED from old to new, as long as they're in the past,
+ // and convert them to CANDIDATE_READY and COMPLETED respectively.
+ while (!m_index.empty()) {
+ auto it = m_index.get<ByTime>().begin();
+ if (it->m_state == State::CANDIDATE_DELAYED && it->m_time <= now) {
+ PromoteCandidateReady(m_index.project<ByTxHash>(it));
+ } else if (it->m_state == State::REQUESTED && it->m_time <= now) {
+ if (expired) expired->emplace_back(it->m_peer, ToGenTxid(*it));
+ MakeCompleted(m_index.project<ByTxHash>(it));
+ } else {
+ break;
+ }
+ }
+
+ while (!m_index.empty()) {
+ // If time went backwards, we may need to demote CANDIDATE_BEST and CANDIDATE_READY announcements back
+ // to CANDIDATE_DELAYED. This is an unusual edge case, and unlikely to matter in production. However,
+ // it makes it much easier to specify and test TxRequestTracker::Impl's behaviour.
+ auto it = std::prev(m_index.get<ByTime>().end());
+ if (it->IsSelectable() && it->m_time > now) {
+ ChangeAndReselect(m_index.project<ByTxHash>(it), State::CANDIDATE_DELAYED);
+ } else {
+ break;
+ }
+ }
+ }
+
+public:
+ Impl(bool deterministic) :
+ m_computer(deterministic),
+ // Explicitly initialize m_index as we need to pass a reference to m_computer to ByTxHashViewExtractor.
+ m_index(boost::make_tuple(
+ boost::make_tuple(ByPeerViewExtractor(), std::less<ByPeerView>()),
+ boost::make_tuple(ByTxHashViewExtractor(m_computer), std::less<ByTxHashView>()),
+ boost::make_tuple(ByTimeViewExtractor(), std::less<ByTimeView>())
+ )) {}
+
+ // Disable copying and assigning (a default copy won't work due the stateful ByTxHashViewExtractor).
+ Impl(const Impl&) = delete;
+ Impl& operator=(const Impl&) = delete;
+
+ void DisconnectedPeer(NodeId peer)
+ {
+ auto& index = m_index.get<ByPeer>();
+ auto it = index.lower_bound(ByPeerView{peer, false, uint256::ZERO});
+ while (it != index.end() && it->m_peer == peer) {
+ // Check what to continue with after this iteration. 'it' will be deleted in what follows, so we need to
+ // decide what to continue with afterwards. There are a number of cases to consider:
+ // - std::next(it) is end() or belongs to a different peer. In that case, this is the last iteration
+ // of the loop (denote this by setting it_next to end()).
+ // - 'it' is not the only non-COMPLETED announcement for its txhash. This means it will be deleted, but
+ // no other Announcement objects will be modified. Continue with std::next(it) if it belongs to the
+ // same peer, but decide this ahead of time (as 'it' may change position in what follows).
+ // - 'it' is the only non-COMPLETED announcement for its txhash. This means it will be deleted along
+ // with all other announcements for the same txhash - which may include std::next(it). However, other
+ // than 'it', no announcements for the same peer can be affected (due to (peer, txhash) uniqueness).
+ // In other words, the situation where std::next(it) is deleted can only occur if std::next(it)
+ // belongs to a different peer but the same txhash as 'it'. This is covered by the first bulletpoint
+ // already, and we'll have set it_next to end().
+ auto it_next = (std::next(it) == index.end() || std::next(it)->m_peer != peer) ? index.end() :
+ std::next(it);
+ // If the announcement isn't already COMPLETED, first make it COMPLETED (which will mark other
+ // CANDIDATEs as CANDIDATE_BEST, or delete all of a txhash's announcements if no non-COMPLETED ones are
+ // left).
+ if (MakeCompleted(m_index.project<ByTxHash>(it))) {
+ // Then actually delete the announcement (unless it was already deleted by MakeCompleted).
+ Erase<ByPeer>(it);
+ }
+ it = it_next;
+ }
+ }
+
+ void ForgetTxHash(const uint256& txhash)
+ {
+ auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
+ while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash) {
+ it = Erase<ByTxHash>(it);
+ }
+ }
+
+ void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred,
+ std::chrono::microseconds reqtime)
+ {
+ // Bail out if we already have a CANDIDATE_BEST announcement for this (txhash, peer) combination. The case
+ // where there is a non-CANDIDATE_BEST announcement already will be caught by the uniqueness property of the
+ // ByPeer index when we try to emplace the new object below.
+ if (m_index.get<ByPeer>().count(ByPeerView{peer, true, gtxid.GetHash()})) return;
+
+ // Try creating the announcement with CANDIDATE_DELAYED state (which will fail due to the uniqueness
+ // of the ByPeer index if a non-CANDIDATE_BEST announcement already exists with the same txhash and peer).
+ // Bail out in that case.
+ auto ret = m_index.get<ByPeer>().emplace(gtxid, peer, preferred, reqtime, m_current_sequence);
+ if (!ret.second) return;
+
+ // Update accounting metadata.
+ ++m_peerinfo[peer].m_total;
+ ++m_current_sequence;
+ }
+
+ //! Find the GenTxids to request now from peer.
+ std::vector<GenTxid> GetRequestable(NodeId peer, std::chrono::microseconds now,
+ std::vector<std::pair<NodeId, GenTxid>>* expired)
+ {
+ // Move time.
+ SetTimePoint(now, expired);
+
+ // Find all CANDIDATE_BEST announcements for this peer.
+ std::vector<const Announcement*> selected;
+ auto it_peer = m_index.get<ByPeer>().lower_bound(ByPeerView{peer, true, uint256::ZERO});
+ while (it_peer != m_index.get<ByPeer>().end() && it_peer->m_peer == peer &&
+ it_peer->m_state == State::CANDIDATE_BEST) {
+ selected.emplace_back(&*it_peer);
+ ++it_peer;
+ }
+
+ // Sort by sequence number.
+ std::sort(selected.begin(), selected.end(), [](const Announcement* a, const Announcement* b) {
+ return a->m_sequence < b->m_sequence;
+ });
+
+ // Convert to GenTxid and return.
+ std::vector<GenTxid> ret;
+ ret.reserve(selected.size());
+ std::transform(selected.begin(), selected.end(), std::back_inserter(ret), [](const Announcement* ann) {
+ return ToGenTxid(*ann);
+ });
+ return ret;
+ }
+
+ void RequestedTx(NodeId peer, const uint256& txhash, std::chrono::microseconds expiry)
+ {
+ auto it = m_index.get<ByPeer>().find(ByPeerView{peer, true, txhash});
+ if (it == m_index.get<ByPeer>().end()) {
+ // There is no CANDIDATE_BEST announcement, look for a _READY or _DELAYED instead. If the caller only
+ // ever invokes RequestedTx with the values returned by GetRequestable, and no other non-const functions
+ // other than ForgetTxHash and GetRequestable in between, this branch will never execute (as txhashes
+ // returned by GetRequestable always correspond to CANDIDATE_BEST announcements).
+
+ it = m_index.get<ByPeer>().find(ByPeerView{peer, false, txhash});
+ if (it == m_index.get<ByPeer>().end() || (it->m_state != State::CANDIDATE_DELAYED &&
+ it->m_state != State::CANDIDATE_READY)) {
+ // There is no CANDIDATE announcement tracked for this peer, so we have nothing to do. Either this
+ // txhash wasn't tracked at all (and the caller should have called ReceivedInv), or it was already
+ // requested and/or completed for other reasons and this is just a superfluous RequestedTx call.
+ return;
+ }
+
+ // Look for an existing CANDIDATE_BEST or REQUESTED with the same txhash. We only need to do this if the
+ // found announcement had a different state than CANDIDATE_BEST. If it did, invariants guarantee that no
+ // other CANDIDATE_BEST or REQUESTED can exist.
+ auto it_old = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_BEST, 0});
+ if (it_old != m_index.get<ByTxHash>().end() && it_old->m_txhash == txhash) {
+ if (it_old->m_state == State::CANDIDATE_BEST) {
+ // The data structure's invariants require that there can be at most one CANDIDATE_BEST or one
+ // REQUESTED announcement per txhash (but not both simultaneously), so we have to convert any
+ // existing CANDIDATE_BEST to another CANDIDATE_* when constructing another REQUESTED.
+ // It doesn't matter whether we pick CANDIDATE_READY or _DELAYED here, as SetTimePoint()
+ // will correct it at GetRequestable() time. If time only goes forward, it will always be
+ // _READY, so pick that to avoid extra work in SetTimePoint().
+ Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.m_state = State::CANDIDATE_READY; });
+ } else if (it_old->m_state == State::REQUESTED) {
+ // As we're no longer waiting for a response to the previous REQUESTED announcement, convert it
+ // to COMPLETED. This also helps guaranteeing progress.
+ Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.m_state = State::COMPLETED; });
+ }
+ }
+ }
+
+ Modify<ByPeer>(it, [expiry](Announcement& ann) {
+ ann.m_state = State::REQUESTED;
+ ann.m_time = expiry;
+ });
+ }
+
+ void ReceivedResponse(NodeId peer, const uint256& txhash)
+ {
+ // We need to search the ByPeer index for both (peer, false, txhash) and (peer, true, txhash).
+ auto it = m_index.get<ByPeer>().find(ByPeerView{peer, false, txhash});
+ if (it == m_index.get<ByPeer>().end()) {
+ it = m_index.get<ByPeer>().find(ByPeerView{peer, true, txhash});
+ }
+ if (it != m_index.get<ByPeer>().end()) MakeCompleted(m_index.project<ByTxHash>(it));
+ }
+
+ size_t CountInFlight(NodeId peer) const
+ {
+ auto it = m_peerinfo.find(peer);
+ if (it != m_peerinfo.end()) return it->second.m_requested;
+ return 0;
+ }
+
+ size_t CountCandidates(NodeId peer) const
+ {
+ auto it = m_peerinfo.find(peer);
+ if (it != m_peerinfo.end()) return it->second.m_total - it->second.m_requested - it->second.m_completed;
+ return 0;
+ }
+
+ size_t Count(NodeId peer) const
+ {
+ auto it = m_peerinfo.find(peer);
+ if (it != m_peerinfo.end()) return it->second.m_total;
+ return 0;
+ }
+
+ //! Count how many announcements are being tracked in total across all peers and transactions.
+ size_t Size() const { return m_index.size(); }
+
+ uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const
+ {
+ // Return Priority as a uint64_t as Priority is internal.
+ return uint64_t{m_computer(txhash, peer, preferred)};
+ }
+
+};
+
+TxRequestTracker::TxRequestTracker(bool deterministic) :
+ m_impl{MakeUnique<TxRequestTracker::Impl>(deterministic)} {}
+
+TxRequestTracker::~TxRequestTracker() = default;
+
+void TxRequestTracker::ForgetTxHash(const uint256& txhash) { m_impl->ForgetTxHash(txhash); }
+void TxRequestTracker::DisconnectedPeer(NodeId peer) { m_impl->DisconnectedPeer(peer); }
+size_t TxRequestTracker::CountInFlight(NodeId peer) const { return m_impl->CountInFlight(peer); }
+size_t TxRequestTracker::CountCandidates(NodeId peer) const { return m_impl->CountCandidates(peer); }
+size_t TxRequestTracker::Count(NodeId peer) const { return m_impl->Count(peer); }
+size_t TxRequestTracker::Size() const { return m_impl->Size(); }
+void TxRequestTracker::SanityCheck() const { m_impl->SanityCheck(); }
+
+void TxRequestTracker::PostGetRequestableSanityCheck(std::chrono::microseconds now) const
+{
+ m_impl->PostGetRequestableSanityCheck(now);
+}
+
+void TxRequestTracker::ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred,
+ std::chrono::microseconds reqtime)
+{
+ m_impl->ReceivedInv(peer, gtxid, preferred, reqtime);
+}
+
+void TxRequestTracker::RequestedTx(NodeId peer, const uint256& txhash, std::chrono::microseconds expiry)
+{
+ m_impl->RequestedTx(peer, txhash, expiry);
+}
+
+void TxRequestTracker::ReceivedResponse(NodeId peer, const uint256& txhash)
+{
+ m_impl->ReceivedResponse(peer, txhash);
+}
+
+std::vector<GenTxid> TxRequestTracker::GetRequestable(NodeId peer, std::chrono::microseconds now,
+ std::vector<std::pair<NodeId, GenTxid>>* expired)
+{
+ return m_impl->GetRequestable(peer, now, expired);
+}
+
+uint64_t TxRequestTracker::ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const
+{
+ return m_impl->ComputePriority(txhash, peer, preferred);
+}
diff --git a/src/txrequest.h b/src/txrequest.h
new file mode 100644
index 0000000000..cd3042c87e
--- /dev/null
+++ b/src/txrequest.h
@@ -0,0 +1,211 @@
+// Copyright (c) 2020 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_TXREQUEST_H
+#define BITCOIN_TXREQUEST_H
+
+#include <primitives/transaction.h>
+#include <net.h> // For NodeId
+#include <uint256.h>
+
+#include <chrono>
+#include <vector>
+
+#include <stdint.h>
+
+/** Data structure to keep track of, and schedule, transaction downloads from peers.
+ *
+ * === Specification ===
+ *
+ * We keep track of which peers have announced which transactions, and use that to determine which requests
+ * should go to which peer, when, and in what order.
+ *
+ * The following information is tracked per peer/tx combination ("announcement"):
+ * - Which peer announced it (through their NodeId)
+ * - The txid or wtxid of the transaction (collectively called "txhash" in what follows)
+ * - Whether it was a tx or wtx announcement (see BIP339).
+ * - What the earliest permitted time is that that transaction can be requested from that peer (called "reqtime").
+ * - Whether it's from a "preferred" peer or not. Which announcements get this flag is determined by the caller, but
+ * this is designed for outbound peers, or other peers that we have a higher level of trust in. Even when the
+ * peers' preferredness changes, the preferred flag of existing announcements from that peer won't change.
+ * - Whether or not the transaction was requested already, and if so, when it times out (called "expiry").
+ * - Whether or not the transaction request failed already (timed out, or invalid transaction or NOTFOUND was
+ * received).
+ *
+ * Transaction requests are then assigned to peers, following these rules:
+ *
+ * - No transaction is requested as long as another request for the same txhash is outstanding (it needs to fail
+ * first by passing expiry, or a NOTFOUND or invalid transaction has to be received for it).
+ *
+ * Rationale: to avoid wasting bandwidth on multiple copies of the same transaction. Note that this only works
+ * per txhash, so if the same transaction is announced both through txid and wtxid, we have no means
+ * to prevent fetching both (the caller can however mitigate this by delaying one, see further).
+ *
+ * - The same transaction is never requested twice from the same peer, unless the announcement was forgotten in
+ * between, and re-announced. Announcements are forgotten only:
+ * - If a peer goes offline, all its announcements are forgotten.
+ * - If a transaction has been successfully received, or is otherwise no longer needed, the caller can call
+ * ForgetTxHash, which removes all announcements across all peers with the specified txhash.
+ * - If for a given txhash only already-failed announcements remain, they are all forgotten.
+ *
+ * Rationale: giving a peer multiple chances to announce a transaction would allow them to bias requests in their
+ * favor, worsening transaction censoring attacks. The flip side is that as long as an attacker manages
+ * to prevent us from receiving a transaction, failed announcements (including those from honest peers)
+ * will linger longer, increasing memory usage somewhat. The impact of this is limited by imposing a
+ * cap on the number of tracked announcements per peer. As failed requests in response to announcements
+ * from honest peers should be rare, this almost solely hinders attackers.
+ * Transaction censoring attacks can be done by announcing transactions quickly while not answering
+ * requests for them. See https://allquantor.at/blockchainbib/pdf/miller2015topology.pdf for more
+ * information.
+ *
+ * - Transactions are not requested from a peer until its reqtime has passed.
+ *
+ * Rationale: enable the calling code to define a delay for less-than-ideal peers, so that (presumed) better
+ * peers have a chance to give their announcement first.
+ *
+ * - If multiple viable candidate peers exist according to the above rules, pick a peer as follows:
+ *
+ * - If any preferred peers are available, non-preferred peers are not considered for what follows.
+ *
+ * Rationale: preferred peers are more trusted by us, so are less likely to be under attacker control.
+ *
+ * - Pick a uniformly random peer among the candidates.
+ *
+ * Rationale: random assignments are hard to influence for attackers.
+ *
+ * Together these rules strike a balance between being fast in non-adverserial conditions and minimizing
+ * susceptibility to censorship attacks. An attacker that races the network:
+ * - Will be unsuccessful if all preferred connections are honest (and there is at least one preferred connection).
+ * - If there are P preferred connections of which Ph>=1 are honest, the attacker can delay us from learning
+ * about a transaction by k expiration periods, where k ~ 1 + NHG(N=P-1,K=P-Ph-1,r=1), which has mean
+ * P/(Ph+1) (where NHG stands for Negative Hypergeometric distribution). The "1 +" is due to the fact that the
+ * attacker can be the first to announce through a preferred connection in this scenario, which very likely means
+ * they get the first request.
+ * - If all P preferred connections are to the attacker, and there are NP non-preferred connections of which NPh>=1
+ * are honest, where we assume that the attacker can disconnect and reconnect those connections, the distribution
+ * becomes k ~ P + NB(p=1-NPh/NP,r=1) (where NB stands for Negative Binomial distribution), which has mean
+ * P-1+NP/NPh.
+ *
+ * Complexity:
+ * - Memory usage is proportional to the total number of tracked announcements (Size()) plus the number of
+ * peers with a nonzero number of tracked announcements.
+ * - CPU usage is generally logarithmic in the total number of tracked announcements, plus the number of
+ * announcements affected by an operation (amortized O(1) per announcement).
+ */
+class TxRequestTracker {
+ // Avoid littering this header file with implementation details.
+ class Impl;
+ const std::unique_ptr<Impl> m_impl;
+
+public:
+ //! Construct a TxRequestTracker.
+ explicit TxRequestTracker(bool deterministic = false);
+ ~TxRequestTracker();
+
+ // Conceptually, the data structure consists of a collection of "announcements", one for each peer/txhash
+ // combination:
+ //
+ // - CANDIDATE announcements represent transactions that were announced by a peer, and that become available for
+ // download after their reqtime has passed.
+ //
+ // - REQUESTED announcements represent transactions that have been requested, and which we're awaiting a
+ // response for from that peer. Their expiry value determines when the request times out.
+ //
+ // - COMPLETED announcements represent transactions that have been requested from a peer, and a NOTFOUND or a
+ // transaction was received in response (valid or not), or they timed out. They're only kept around to
+ // prevent requesting them again. If only COMPLETED announcements for a given txhash remain (so no CANDIDATE
+ // or REQUESTED ones), all of them are deleted (this is an invariant, and maintained by all operations below).
+ //
+ // The operations below manipulate the data structure.
+
+ /** Adds a new CANDIDATE announcement.
+ *
+ * Does nothing if one already exists for that (txhash, peer) combination (whether it's CANDIDATE, REQUESTED, or
+ * COMPLETED). Note that the txid/wtxid property is ignored for determining uniqueness, so if an announcement
+ * is added for a wtxid H, while one for txid H from the same peer already exists, it will be ignored. This is
+ * harmless as the txhashes being equal implies it is a non-segwit transaction, so it doesn't matter how it is
+ * fetched. The new announcement is given the specified preferred and reqtime values, and takes its is_wtxid
+ * from the specified gtxid.
+ */
+ void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred,
+ std::chrono::microseconds reqtime);
+
+ /** Deletes all announcements for a given peer.
+ *
+ * It should be called when a peer goes offline.
+ */
+ void DisconnectedPeer(NodeId peer);
+
+ /** Deletes all announcements for a given txhash (both txid and wtxid ones).
+ *
+ * This should be called when a transaction is no longer needed. The caller should ensure that new announcements
+ * for the same txhash will not trigger new ReceivedInv calls, at least in the short term after this call.
+ */
+ void ForgetTxHash(const uint256& txhash);
+
+ /** Find the txids to request now from peer.
+ *
+ * It does the following:
+ * - Convert all REQUESTED announcements (for all txhashes/peers) with (expiry <= now) to COMPLETED ones.
+ * These are returned in expired, if non-nullptr.
+ * - Requestable announcements are selected: CANDIDATE announcements from the specified peer with
+ * (reqtime <= now) for which no existing REQUESTED announcement with the same txhash from a different peer
+ * exists, and for which the specified peer is the best choice among all (reqtime <= now) CANDIDATE
+ * announcements with the same txhash (subject to preferredness rules, and tiebreaking using a deterministic
+ * salted hash of peer and txhash).
+ * - The selected announcements are converted to GenTxids using their is_wtxid flag, and returned in
+ * announcement order (even if multiple were added at the same time, or when the clock went backwards while
+ * they were being added). This is done to minimize disruption from dependent transactions being requested
+ * out of order: if multiple dependent transactions are announced simultaneously by one peer, and end up
+ * being requested from them, the requests will happen in announcement order.
+ */
+ std::vector<GenTxid> GetRequestable(NodeId peer, std::chrono::microseconds now,
+ std::vector<std::pair<NodeId, GenTxid>>* expired = nullptr);
+
+ /** Marks a transaction as requested, with a specified expiry.
+ *
+ * If no CANDIDATE announcement for the provided peer and txhash exists, this call has no effect. Otherwise:
+ * - That announcement is converted to REQUESTED.
+ * - If any other REQUESTED announcement for the same txhash already existed, it means an unexpected request
+ * was made (GetRequestable will never advise doing so). In this case it is converted to COMPLETED, as we're
+ * no longer waiting for a response to it.
+ */
+ void RequestedTx(NodeId peer, const uint256& txhash, std::chrono::microseconds expiry);
+
+ /** Converts a CANDIDATE or REQUESTED announcement to a COMPLETED one. If no such announcement exists for the
+ * provided peer and txhash, nothing happens.
+ *
+ * It should be called whenever a transaction or NOTFOUND was received from a peer. When the transaction is
+ * not needed entirely anymore, ForgetTxhash should be called instead of, or in addition to, this call.
+ */
+ void ReceivedResponse(NodeId peer, const uint256& txhash);
+
+ // The operations below inspect the data structure.
+
+ /** Count how many REQUESTED announcements a peer has. */
+ size_t CountInFlight(NodeId peer) const;
+
+ /** Count how many CANDIDATE announcements a peer has. */
+ size_t CountCandidates(NodeId peer) const;
+
+ /** Count how many announcements a peer has (REQUESTED, CANDIDATE, and COMPLETED combined). */
+ size_t Count(NodeId peer) const;
+
+ /** Count how many announcements are being tracked in total across all peers and transaction hashes. */
+ size_t Size() const;
+
+ /** Access to the internal priority computation (testing only) */
+ uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const;
+
+ /** Run internal consistency check (testing only). */
+ void SanityCheck() const;
+
+ /** Run a time-dependent internal consistency check (testing only).
+ *
+ * This can only be called immediately after GetRequestable, with the same 'now' parameter.
+ */
+ void PostGetRequestableSanityCheck(std::chrono::microseconds now) const;
+};
+
+#endif // BITCOIN_TXREQUEST_H
diff --git a/src/uint256.cpp b/src/uint256.cpp
index d074df2f20..f358b62903 100644
--- a/src/uint256.cpp
+++ b/src/uint256.cpp
@@ -80,4 +80,5 @@ template std::string base_blob<256>::ToString() const;
template void base_blob<256>::SetHex(const char*);
template void base_blob<256>::SetHex(const std::string&);
+const uint256 uint256::ZERO(0);
const uint256 uint256::ONE(1);
diff --git a/src/uint256.h b/src/uint256.h
index c55cb31456..ceae70707e 100644
--- a/src/uint256.h
+++ b/src/uint256.h
@@ -126,6 +126,7 @@ public:
constexpr uint256() {}
constexpr explicit uint256(uint8_t v) : base_blob<256>(v) {}
explicit uint256(const std::vector<unsigned char>& vch) : base_blob<256>(vch) {}
+ static const uint256 ZERO;
static const uint256 ONE;
};
diff --git a/test/functional/p2p_tx_download.py b/test/functional/p2p_tx_download.py
index 5c3f021b3f..16d9302db8 100755
--- a/test/functional/p2p_tx_download.py
+++ b/test/functional/p2p_tx_download.py
@@ -42,15 +42,15 @@ class TestP2PConn(P2PInterface):
# Constants from net_processing
GETDATA_TX_INTERVAL = 60 # seconds
-MAX_GETDATA_RANDOM_DELAY = 2 # seconds
INBOUND_PEER_TX_DELAY = 2 # seconds
TXID_RELAY_DELAY = 2 # seconds
+OVERLOADED_PEER_DELAY = 2 # seconds
MAX_GETDATA_IN_FLIGHT = 100
-TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10
+MAX_PEER_TX_ANNOUNCEMENTS = 5000
# Python test constants
NUM_INBOUND = 10
-MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY
+MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY
class TxDownloadTest(BitcoinTestFramework):
@@ -121,14 +121,12 @@ class TxDownloadTest(BitcoinTestFramework):
# * the first time it is re-requested from the outbound peer, plus
# * 2 seconds to avoid races
assert self.nodes[1].getpeerinfo()[0]['inbound'] == False
- timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + (
- GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY)
+ timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL
self.log.info("Tx should be received at node 1 after {} seconds".format(timeout))
self.sync_mempools(timeout=timeout)
def test_in_flight_max(self):
- self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format(
- MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60))
+ self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format(MAX_GETDATA_IN_FLIGHT))
txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)]
p = self.nodes[0].p2ps[0]
@@ -136,31 +134,120 @@ class TxDownloadTest(BitcoinTestFramework):
with p2p_lock:
p.tx_getdata_count = 0
- p.send_message(msg_inv([CInv(t=MSG_WTX, h=i) for i in txids]))
+ mock_time = int(time.time() + 1)
+ self.nodes[0].setmocktime(mock_time)
+ for i in range(MAX_GETDATA_IN_FLIGHT):
+ p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
+ p.sync_with_ping()
+ mock_time += INBOUND_PEER_TX_DELAY
+ self.nodes[0].setmocktime(mock_time)
p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT)
+ for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)):
+ p.send_message(msg_inv([CInv(t=MSG_WTX, h=txids[i])]))
+ p.sync_with_ping()
+ self.log.info("No more than {} requests should be seen within {} seconds after announcement".format(MAX_GETDATA_IN_FLIGHT, INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1))
+ self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY - 1)
+ p.sync_with_ping()
with p2p_lock:
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT)
+ self.log.info("If we wait {} seconds after announcement, we should eventually get more requests".format(INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY))
+ self.nodes[0].setmocktime(mock_time + INBOUND_PEER_TX_DELAY + OVERLOADED_PEER_DELAY)
+ p.wait_until(lambda: p.tx_getdata_count == len(txids))
- self.log.info("Now check that if we send a NOTFOUND for a transaction, we'll get one more request")
- p.send_message(msg_notfound(vec=[CInv(t=MSG_WTX, h=txids[0])]))
- p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, timeout=10)
+ def test_expiry_fallback(self):
+ self.log.info('Check that expiry will select another peer for download')
+ WTXID = 0xffaa
+ peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ for p in [peer1, peer2]:
+ p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
+ # One of the peers is asked for the tx
+ peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
with p2p_lock:
- assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1)
+ peer_expiry, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
+ assert_equal(peer_fallback.tx_getdata_count, 0)
+ self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1) # Wait for request to peer_expiry to expire
+ peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
+ with p2p_lock:
+ assert_equal(peer_fallback.tx_getdata_count, 1)
+ self.restart_node(0) # reset mocktime
- WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL
- self.log.info("if we wait about {} minutes, we should eventually get more requests".format(WAIT_TIME / 60))
- self.nodes[0].setmocktime(int(time.time() + WAIT_TIME))
- p.wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2)
- self.nodes[0].setmocktime(0)
+ def test_disconnect_fallback(self):
+ self.log.info('Check that disconnect will select another peer for download')
+ WTXID = 0xffbb
+ peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ for p in [peer1, peer2]:
+ p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
+ # One of the peers is asked for the tx
+ peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
+ with p2p_lock:
+ peer_disconnect, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
+ assert_equal(peer_fallback.tx_getdata_count, 0)
+ peer_disconnect.peer_disconnect()
+ peer_disconnect.wait_for_disconnect()
+ peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
+ with p2p_lock:
+ assert_equal(peer_fallback.tx_getdata_count, 1)
+
+ def test_notfound_fallback(self):
+ self.log.info('Check that notfounds will select another peer for download immediately')
+ WTXID = 0xffdd
+ peer1 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer2 = self.nodes[0].add_p2p_connection(TestP2PConn())
+ for p in [peer1, peer2]:
+ p.send_message(msg_inv([CInv(t=MSG_WTX, h=WTXID)]))
+ # One of the peers is asked for the tx
+ peer2.wait_until(lambda: sum(p.tx_getdata_count for p in [peer1, peer2]) == 1)
+ with p2p_lock:
+ peer_notfound, peer_fallback = (peer1, peer2) if peer1.tx_getdata_count == 1 else (peer2, peer1)
+ assert_equal(peer_fallback.tx_getdata_count, 0)
+ peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_WTX, WTXID)])) # Send notfound, so that fallback peer is selected
+ peer_fallback.wait_until(lambda: peer_fallback.tx_getdata_count >= 1, timeout=1)
+ with p2p_lock:
+ assert_equal(peer_fallback.tx_getdata_count, 1)
+
+ def test_preferred_inv(self):
+ self.log.info('Check that invs from preferred peers are downloaded immediately')
+ self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1'])
+ peer = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer.send_message(msg_inv([CInv(t=MSG_WTX, h=0xff00ff00)]))
+ peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1)
+ with p2p_lock:
+ assert_equal(peer.tx_getdata_count, 1)
+
+ def test_large_inv_batch(self):
+ self.log.info('Test how large inv batches are handled with relay permission')
+ self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1'])
+ peer = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)]))
+ peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS + 1)
+
+ self.log.info('Test how large inv batches are handled without relay permission')
+ self.restart_node(0)
+ peer = self.nodes[0].add_p2p_connection(TestP2PConn())
+ peer.send_message(msg_inv([CInv(t=MSG_WTX, h=wtxid) for wtxid in range(MAX_PEER_TX_ANNOUNCEMENTS + 1)]))
+ peer.wait_until(lambda: peer.tx_getdata_count == MAX_PEER_TX_ANNOUNCEMENTS)
+ peer.sync_with_ping()
+ with p2p_lock:
+ assert_equal(peer.tx_getdata_count, MAX_PEER_TX_ANNOUNCEMENTS)
def test_spurious_notfound(self):
self.log.info('Check that spurious notfound is ignored')
self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)]))
def run_test(self):
+ # Run tests without mocktime that only need one peer-connection first, to avoid restarting the nodes
+ self.test_expiry_fallback()
+ self.test_disconnect_fallback()
+ self.test_notfound_fallback()
+ self.test_preferred_inv()
+ self.test_large_inv_batch()
+ self.test_spurious_notfound()
+
# Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when
# the next trickle relay event happens.
- for test in [self.test_spurious_notfound, self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
+ for test in [self.test_in_flight_max, self.test_inv_block, self.test_tx_requests]:
self.stop_nodes()
self.start_nodes()
self.connect_nodes(1, 0)