aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp211
1 files changed, 187 insertions, 24 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 62b7d4e966..18dd2f010c 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -64,12 +64,28 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60;
/// Age after which a block is considered historical for purposes of rate
/// limiting block relay. Set to one week, denominated in seconds.
static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60;
+/** Maximum number of in-flight transactions from a peer */
+static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
+/** Maximum number of announced transactions from a peer */
+static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
+/** How many microseconds to delay requesting transactions from inbound peers */
+static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000;
+/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
+static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000;
+/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
+static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000;
+static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
+"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
+/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
+static const unsigned int MAX_GETDATA_SZ = 1000;
+
struct COrphanTx {
// When modifying, adapt the copy of this definition in tests/DoS_tests.
CTransactionRef tx;
NodeId fromPeer;
int64_t nTimeExpire;
+ size_t list_pos;
};
CCriticalSection g_cs_orphans;
std::map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(g_cs_orphans);
@@ -171,6 +187,8 @@ namespace {
};
std::map<COutPoint, std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans);
+ std::vector<std::map<uint256, COrphanTx>::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); //! For random eviction
+
static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0;
static std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans);
} // namespace
@@ -274,6 +292,66 @@ struct CNodeState {
//! Time of last new block announcement
int64_t m_last_block_announcement;
+ /*
+ * State associated with transaction download.
+ *
+ * Tx download algorithm:
+ *
+ * When inv comes in, queue up (process_time, txid) inside the peer's
+ * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
+ * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
+ *
+ * The process_time for a transaction is set to nNow for outbound peers,
+ * nNow + 2 seconds for inbound peers. This is the time at which we'll
+ * consider trying to request the transaction from the peer in
+ * SendMessages(). The delay for inbound peers is to allow outbound peers
+ * a chance to announce before we request from inbound peers, to prevent
+ * an adversary from using inbound connections to blind us to a
+ * transaction (InvBlock).
+ *
+ * When we call SendMessages() for a given peer,
+ * we will loop over the transactions in m_tx_process_time, looking
+ * at the transactions whose process_time <= nNow. We'll request each
+ * such transaction that we don't have already and that hasn't been
+ * requested from another peer recently, up until we hit the
+ * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
+ * g_already_asked_for for each requested txid, storing the time of the
+ * GETDATA request. We use g_already_asked_for to coordinate transaction
+ * requests amongst our peers.
+ *
+ * For transactions that we still need but we have already recently
+ * requested from some other peer, we'll reinsert (process_time, txid)
+ * back into the peer's m_tx_process_time at the point in the future at
+ * which the most recent GETDATA request would time out (ie
+ * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
+ * We add an additional delay for inbound peers, again to prefer
+ * attempting download from outbound peers first.
+ * We also add an extra small random delay up to 2 seconds
+ * to avoid biasing some peers over others. (e.g., due to fixed ordering
+ * of peer processing in ThreadMessageHandler).
+ *
+ * When we receive a transaction from a peer, we remove the txid from the
+ * peer's m_tx_in_flight set and from their recently announced set
+ * (m_tx_announced). We also clear g_already_asked_for for that entry, so
+ * that if somehow the transaction is not accepted but also not added to
+ * the reject filter, then we will eventually redownload from other
+ * peers.
+ */
+ struct TxDownloadState {
+ /* Track when to attempt download of announced transactions (process
+ * time in micros -> txid)
+ */
+ std::multimap<int64_t, uint256> m_tx_process_time;
+
+ //! Store all the transactions a peer has recently announced
+ std::set<uint256> m_tx_announced;
+
+ //! Store transactions which were requested by us
+ std::set<uint256> m_tx_in_flight;
+ };
+
+ TxDownloadState m_tx_download;
+
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
fCurrentlyConnected = false;
nMisbehavior = 0;
@@ -301,6 +379,9 @@ struct CNodeState {
}
};
+// Keeps track of the time (in microseconds) when transactions were requested last time
+limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
+
/** Map maintaining per-node state. */
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
@@ -591,6 +672,58 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
}
}
+void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ g_already_asked_for.erase(txid);
+}
+
+int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ auto it = g_already_asked_for.find(txid);
+ if (it != g_already_asked_for.end()) {
+ return it->second;
+ }
+ return 0;
+}
+
+void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ auto it = g_already_asked_for.find(txid);
+ if (it == g_already_asked_for.end()) {
+ g_already_asked_for.insert(std::make_pair(txid, request_time));
+ } else {
+ g_already_asked_for.update(it, request_time);
+ }
+}
+
+
+void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
+ if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) {
+ // Too many queued announcements from this peer, or we already have
+ // this announcement
+ return;
+ }
+ peer_download_state.m_tx_announced.insert(txid);
+
+ int64_t process_time;
+ int64_t last_request_time = GetTxRequestTime(txid);
+ // First time requesting this tx
+ if (last_request_time == 0) {
+ process_time = nNow;
+ } else {
+ // Randomize the delay to avoid biasing some peers over others (such as due to
+ // fixed ordering of peer processing in ThreadMessageHandler)
+ process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
+ }
+
+ // We delay processing announcements from non-preferred (eg inbound) peers
+ if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY;
+
+ peer_download_state.m_tx_process_time.emplace(process_time, txid);
+}
+
} // namespace
// This function is used for testing the stale tip eviction logic, see
@@ -707,8 +840,9 @@ bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRE
return false;
}
- auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME});
+ auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME, g_orphan_list.size()});
assert(ret.second);
+ g_orphan_list.push_back(ret.first);
for (const CTxIn& txin : tx->vin) {
mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first);
}
@@ -734,6 +868,18 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans)
if (itPrev->second.empty())
mapOrphanTransactionsByPrev.erase(itPrev);
}
+
+ size_t old_pos = it->second.list_pos;
+ assert(g_orphan_list[old_pos] == it);
+ if (old_pos + 1 != g_orphan_list.size()) {
+ // Unless we're deleting the last entry in g_orphan_list, move the last
+ // entry to the position we're deleting.
+ auto it_last = g_orphan_list.back();
+ g_orphan_list[old_pos] = it_last;
+ it_last->second.list_pos = old_pos;
+ }
+ g_orphan_list.pop_back();
+
mapOrphanTransactions.erase(it);
return 1;
}
@@ -784,11 +930,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans)
while (mapOrphanTransactions.size() > nMaxOrphans)
{
// Evict a random orphan:
- uint256 randomhash = rng.rand256();
- std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.lower_bound(randomhash);
- if (it == mapOrphanTransactions.end())
- it = mapOrphanTransactions.begin();
- EraseOrphanTx(it->first);
+ size_t randompos = rng.randrange(g_orphan_list.size());
+ EraseOrphanTx(g_orphan_list[randompos]->first);
++nEvicted;
}
return nEvicted;
@@ -1945,6 +2088,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
LOCK(cs_main);
uint32_t nFetchFlags = GetFetchFlags(pfrom);
+ int64_t nNow = GetTimeMicros();
for (CInv &inv : vInv)
{
@@ -1976,7 +2120,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (fBlocksOnly) {
LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId());
} else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) {
- pfrom->AskFor(inv);
+ RequestTx(State(pfrom->GetId()), inv.hash, nNow);
}
}
}
@@ -2211,8 +2355,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
bool fMissingInputs = false;
CValidationState state;
- pfrom->setAskFor.erase(inv.hash);
- mapAlreadyAskedFor.erase(inv.hash);
+ CNodeState* nodestate = State(pfrom->GetId());
+ nodestate->m_tx_download.m_tx_announced.erase(inv.hash);
+ nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash);
+ EraseTxRequest(inv.hash);
std::list<CTransactionRef> lRemovedTxn;
@@ -2303,10 +2449,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
}
if (!fRejectedParents) {
uint32_t nFetchFlags = GetFetchFlags(pfrom);
+ int64_t nNow = GetTimeMicros();
+
for (const CTxIn& txin : tx.vin) {
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv);
- if (!AlreadyHave(_inv)) pfrom->AskFor(_inv);
+ if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
}
AddOrphanTx(ptx, pfrom->GetId());
@@ -3731,24 +3879,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
//
// Message: getdata (non-blocks)
//
- while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
- {
- const CInv& inv = (*pto->mapAskFor.begin()).second;
- if (!AlreadyHave(inv))
- {
- LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
- vGetData.push_back(inv);
- if (vGetData.size() >= 1000)
- {
- connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
- vGetData.clear();
+ auto& tx_process_time = state.m_tx_download.m_tx_process_time;
+ while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
+ const uint256& txid = tx_process_time.begin()->second;
+ CInv inv(MSG_TX | GetFetchFlags(pto), txid);
+ if (!AlreadyHave(inv)) {
+ // If this transaction was last requested more than 1 minute ago,
+ // then request.
+ int64_t last_request_time = GetTxRequestTime(inv.hash);
+ if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
+ LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
+ vGetData.push_back(inv);
+ if (vGetData.size() >= MAX_GETDATA_SZ) {
+ connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
+ vGetData.clear();
+ }
+ UpdateTxRequestTime(inv.hash, nNow);
+ state.m_tx_download.m_tx_in_flight.insert(inv.hash);
+ } else {
+ // This transaction is in flight from someone else; queue
+ // up processing to happen after the download times out
+ // (with a slight delay for inbound peers, to prefer
+ // requests to outbound peers).
+ RequestTx(&state, txid, nNow);
}
} else {
- //If we're not going to ask, don't expect a response.
- pto->setAskFor.erase(inv.hash);
+ // We have already seen this transaction, no need to download.
+ state.m_tx_download.m_tx_announced.erase(inv.hash);
+ state.m_tx_download.m_tx_in_flight.erase(inv.hash);
}
- pto->mapAskFor.erase(pto->mapAskFor.begin());
+ tx_process_time.erase(tx_process_time.begin());
}
+
+
if (!vGetData.empty())
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));