aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp232
1 files changed, 136 insertions, 96 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index ce3b037f60..92d0c48c2c 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -235,6 +235,36 @@ struct Peer {
/** Whether this peer relays txs via wtxid */
std::atomic<bool> m_wtxid_relay{false};
+ struct TxRelay {
+ mutable RecursiveMutex cs_filter;
+ // We use fRelayTxes for two purposes -
+ // a) it allows us to not relay tx invs before receiving the peer's version message
+ // b) the peer may tell us in its version message that we should not relay tx invs
+ // unless it loads a bloom filter.
+ bool fRelayTxes GUARDED_BY(cs_filter){false};
+ std::unique_ptr<CBloomFilter> pfilter PT_GUARDED_BY(cs_filter) GUARDED_BY(cs_filter){nullptr};
+
+ mutable RecursiveMutex cs_tx_inventory;
+ CRollingBloomFilter filterInventoryKnown GUARDED_BY(cs_tx_inventory){50000, 0.000001};
+ // Set of transaction ids we still have to announce.
+ // They are sorted by the mempool before relay, so the order is not important.
+ std::set<uint256> setInventoryTxToSend;
+ // Used for BIP35 mempool sending
+ bool fSendMempool GUARDED_BY(cs_tx_inventory){false};
+ // Last time a "MEMPOOL" request was serviced.
+ std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
+ std::chrono::microseconds nNextInvSend{0};
+
+ /** Minimum fee rate with which to filter inv's to this node */
+ std::atomic<CAmount> minFeeFilter{0};
+ CAmount lastSentFeeFilter{0};
+ std::chrono::microseconds m_next_send_feefilter{0};
+ };
+
+ /** Transaction relay data. Will be a nullptr if we're not relaying
+ * transactions with this peer (e.g. if it's a block-relay-only peer) */
+ std::unique_ptr<TxRelay> m_tx_relay;
+
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send;
/** Probabilistic filter to track recent addr messages relayed with this
@@ -293,8 +323,9 @@ struct Peer {
/** Work queue of items requested by this peer **/
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);
- explicit Peer(NodeId id)
+ explicit Peer(NodeId id, bool tx_relay)
: m_id(id)
+ , m_tx_relay(tx_relay ? std::make_unique<TxRelay>() : nullptr)
{}
};
@@ -394,7 +425,7 @@ private:
EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
/** Send a version message to a peer */
- void PushNodeVersion(CNode& pnode);
+ void PushNodeVersion(CNode& pnode, Peer& peer);
/** Send a ping message every PING_INTERVAL or if requested via RPC. May
* mark the peer to be disconnected if a ping has timed out.
@@ -415,7 +446,7 @@ private:
void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable);
/** Send `feefilter` message. */
- void MaybeSendFeefilter(CNode& node, std::chrono::microseconds current_time);
+ void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time);
const CChainParams& m_chainparams;
CConnman& m_connman;
@@ -823,6 +854,14 @@ static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& ins
}
}
+static void AddKnownTx(Peer& peer, const uint256& hash)
+{
+ if (peer.m_tx_relay != nullptr) {
+ LOCK(peer.m_tx_relay->cs_tx_inventory);
+ peer.m_tx_relay->filterInventoryKnown.insert(hash);
+ }
+}
+
static void UpdatePreferredDownload(const CNode& node, CNodeState* state) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
nPreferredDownload -= state->fPreferredDownload;
@@ -1118,7 +1157,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(NodeId nodeid, unsigned int count
} // namespace
-void PeerManagerImpl::PushNodeVersion(CNode& pnode)
+void PeerManagerImpl::PushNodeVersion(CNode& pnode, Peer& peer)
{
// Note that pnode->GetLocalServices() is a reflection of the local
// services we were offering when the CNode object was created for this
@@ -1133,7 +1172,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode)
CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService();
uint64_t your_services{addr.nServices};
- const bool tx_relay = !m_ignore_incoming_txs && pnode.m_tx_relay != nullptr && !pnode.IsFeelerConn();
+ const bool tx_relay = !m_ignore_incoming_txs && peer.m_tx_relay != nullptr && !pnode.IsFeelerConn();
m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime,
your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime)
my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime)
@@ -1190,13 +1229,13 @@ void PeerManagerImpl::InitializeNode(CNode *pnode)
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(pnode->IsInboundConn()));
assert(m_txrequest.Count(nodeid) == 0);
}
+ PeerRef peer = std::make_shared<Peer>(nodeid, /*tx_relay=*/ !pnode->IsBlockOnlyConn());
{
- PeerRef peer = std::make_shared<Peer>(nodeid);
LOCK(m_peer_mutex);
- m_peer_map.emplace_hint(m_peer_map.end(), nodeid, std::move(peer));
+ m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer);
}
if (!pnode->IsInboundConn()) {
- PushNodeVersion(*pnode);
+ PushNodeVersion(*pnode, *peer);
}
}
@@ -1326,6 +1365,14 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load();
}
+ if (peer->m_tx_relay != nullptr) {
+ stats.fRelayTxes = WITH_LOCK(peer->m_tx_relay->cs_filter, return peer->m_tx_relay->fRelayTxes);
+ stats.minFeeFilter = peer->m_tx_relay->minFeeFilter.load();
+ } else {
+ stats.fRelayTxes = false;
+ stats.minFeeFilter = 0;
+ }
+
stats.m_ping_wait = ping_wait;
stats.m_addr_processed = peer->m_addr_processed.load();
stats.m_addr_rate_limited = peer->m_addr_rate_limited.load();
@@ -1738,23 +1785,17 @@ void PeerManagerImpl::SendPings()
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
{
- std::map<const NodeId, const uint256&> relay_peers;
- {
- // Don't hold m_peer_mutex while calling ForEachNode() to avoid an
- // m_peer_mutex/cs_vNodes lock inversion. During shutdown, FinalizeNode()
- // is called while holding cs_vNodes.
- LOCK(m_peer_mutex);
- for (auto& it : m_peer_map) {
- relay_peers.emplace(it.first, it.second->m_wtxid_relay ? wtxid : txid);
- }
- }
-
- m_connman.ForEachNode([&relay_peers](CNode* node) {
- auto it = relay_peers.find(node->GetId());
- if (it == relay_peers.end()) return; // Should never happen
+ LOCK(m_peer_mutex);
+ for(auto& it : m_peer_map) {
+ Peer& peer = *it.second;
+ if (!peer.m_tx_relay) continue;
- node->PushTxInventory(it->second);
- });
+ const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
+ LOCK(peer.m_tx_relay->cs_tx_inventory);
+ if (!peer.m_tx_relay->filterInventoryKnown.contains(hash)) {
+ peer.m_tx_relay->setInventoryTxToSend.insert(hash);
+ }
+ };
}
void PeerManagerImpl::RelayAddress(NodeId originator,
@@ -1900,11 +1941,11 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv&
} else if (inv.IsMsgFilteredBlk()) {
bool sendMerkleBlock = false;
CMerkleBlock merkleBlock;
- if (pfrom.m_tx_relay != nullptr) {
- LOCK(pfrom.m_tx_relay->cs_filter);
- if (pfrom.m_tx_relay->pfilter) {
+ if (peer.m_tx_relay != nullptr) {
+ LOCK(peer.m_tx_relay->cs_filter);
+ if (peer.m_tx_relay->pfilter) {
sendMerkleBlock = true;
- merkleBlock = CMerkleBlock(*pblock, *pfrom.m_tx_relay->pfilter);
+ merkleBlock = CMerkleBlock(*pblock, *peer.m_tx_relay->pfilter);
}
}
if (sendMerkleBlock) {
@@ -1993,7 +2034,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
const auto now{GetTime<std::chrono::seconds>()};
// Get last mempool request time
- const auto mempool_req = pfrom.m_tx_relay != nullptr ? pfrom.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min();
+ const auto mempool_req = peer.m_tx_relay != nullptr ? peer.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min();
// Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process
@@ -2006,7 +2047,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
const CInv &inv = *it++;
- if (pfrom.m_tx_relay == nullptr) {
+ if (peer.m_tx_relay == nullptr) {
// Ignore GETDATA requests for transactions from blocks-only peers.
continue;
}
@@ -2034,7 +2075,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
}
for (const uint256& parent_txid : parent_ids_to_add) {
// Relaying a transaction with a recent but unconfirmed parent.
- if (WITH_LOCK(pfrom.m_tx_relay->cs_tx_inventory, return !pfrom.m_tx_relay->filterInventoryKnown.contains(parent_txid))) {
+ if (WITH_LOCK(peer.m_tx_relay->cs_tx_inventory, return !peer.m_tx_relay->filterInventoryKnown.contains(parent_txid))) {
LOCK(cs_main);
State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid);
}
@@ -2630,7 +2671,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Inbound peers send us their version message when they connect.
// We send our version message in response.
if (pfrom.IsInboundConn()) {
- PushNodeVersion(pfrom);
+ PushNodeVersion(pfrom, *peer);
}
// Change version
@@ -2669,9 +2710,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// set nodes not capable of serving the complete blockchain history as "limited nodes"
pfrom.m_limited_node = (!(nServices & NODE_NETWORK) && (nServices & NODE_NETWORK_LIMITED));
- if (pfrom.m_tx_relay != nullptr) {
- LOCK(pfrom.m_tx_relay->cs_filter);
- pfrom.m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message
+ if (peer->m_tx_relay != nullptr) {
+ LOCK(peer->m_tx_relay->cs_filter);
+ peer->m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message
if (fRelay) pfrom.m_relays_txs = true;
}
@@ -2998,7 +3039,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Reject tx INVs when the -blocksonly setting is enabled, or this is a
// block-relay-only peer
- bool reject_tx_invs{m_ignore_incoming_txs || (pfrom.m_tx_relay == nullptr)};
+ bool reject_tx_invs{m_ignore_incoming_txs || (peer->m_tx_relay == nullptr)};
// Allow peers with relay permission to send data other than blocks in blocks only mode
if (pfrom.HasPermission(NetPermissionFlags::Relay)) {
@@ -3045,7 +3086,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const bool fAlreadyHave = AlreadyHaveTx(gtxid);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
- pfrom.AddKnownTx(inv.hash);
+ AddKnownTx(*peer, inv.hash);
if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) {
AddTxAnnouncement(pfrom, gtxid, current_time);
}
@@ -3275,8 +3316,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Stop processing the transaction early if
// 1) We are in blocks only mode and peer has no relay permission
// 2) This peer is a block-relay-only peer
- if ((m_ignore_incoming_txs && !pfrom.HasPermission(NetPermissionFlags::Relay)) || (pfrom.m_tx_relay == nullptr))
- {
+ if ((m_ignore_incoming_txs && !pfrom.HasPermission(NetPermissionFlags::Relay)) || (peer->m_tx_relay == nullptr)) {
LogPrint(BCLog::NET, "transaction sent in violation of protocol peer=%d\n", pfrom.GetId());
pfrom.fDisconnect = true;
return;
@@ -3295,14 +3335,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const uint256& wtxid = ptx->GetWitnessHash();
const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
- pfrom.AddKnownTx(hash);
+ AddKnownTx(*peer, hash);
if (peer->m_wtxid_relay && txid != wtxid) {
// Insert txid into filterInventoryKnown, even for
// wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced
// filter, when a child tx is requested. See
// ProcessGetData().
- pfrom.AddKnownTx(txid);
+ AddKnownTx(*peer, txid);
}
LOCK2(cs_main, g_cs_orphans);
@@ -3392,7 +3432,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Eventually we should replace this with an improved
// protocol for getting all unconfirmed parents.
const auto gtxid{GenTxid::Txid(parent_txid)};
- pfrom.AddKnownTx(parent_txid);
+ AddKnownTx(*peer, parent_txid);
if (!AlreadyHaveTx(gtxid)) AddTxAnnouncement(pfrom, gtxid, current_time);
}
@@ -3888,9 +3928,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
return;
}
- if (pfrom.m_tx_relay != nullptr) {
- LOCK(pfrom.m_tx_relay->cs_tx_inventory);
- pfrom.m_tx_relay->fSendMempool = true;
+ if (peer->m_tx_relay != nullptr) {
+ LOCK(peer->m_tx_relay->cs_tx_inventory);
+ peer->m_tx_relay->fSendMempool = true;
}
return;
}
@@ -3984,12 +4024,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// There is no excuse for sending a too-large filter
Misbehaving(pfrom.GetId(), 100, "too-large bloom filter");
}
- else if (pfrom.m_tx_relay != nullptr)
+ else if (peer->m_tx_relay != nullptr)
{
- LOCK(pfrom.m_tx_relay->cs_filter);
- pfrom.m_tx_relay->pfilter.reset(new CBloomFilter(filter));
+ LOCK(peer->m_tx_relay->cs_filter);
+ peer->m_tx_relay->pfilter.reset(new CBloomFilter(filter));
pfrom.m_bloom_filter_loaded = true;
- pfrom.m_tx_relay->fRelayTxes = true;
+ peer->m_tx_relay->fRelayTxes = true;
pfrom.m_relays_txs = true;
}
return;
@@ -4009,10 +4049,10 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
bool bad = false;
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
bad = true;
- } else if (pfrom.m_tx_relay != nullptr) {
- LOCK(pfrom.m_tx_relay->cs_filter);
- if (pfrom.m_tx_relay->pfilter) {
- pfrom.m_tx_relay->pfilter->insert(vData);
+ } else if (peer->m_tx_relay != nullptr) {
+ LOCK(peer->m_tx_relay->cs_filter);
+ if (peer->m_tx_relay->pfilter) {
+ peer->m_tx_relay->pfilter->insert(vData);
} else {
bad = true;
}
@@ -4029,13 +4069,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
pfrom.fDisconnect = true;
return;
}
- if (pfrom.m_tx_relay == nullptr) {
+ if (peer->m_tx_relay == nullptr) {
return;
}
- LOCK(pfrom.m_tx_relay->cs_filter);
- pfrom.m_tx_relay->pfilter = nullptr;
+ LOCK(peer->m_tx_relay->cs_filter);
+ peer->m_tx_relay->pfilter = nullptr;
pfrom.m_bloom_filter_loaded = false;
- pfrom.m_tx_relay->fRelayTxes = true;
+ peer->m_tx_relay->fRelayTxes = true;
pfrom.m_relays_txs = true;
return;
}
@@ -4044,8 +4084,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
CAmount newFeeFilter = 0;
vRecv >> newFeeFilter;
if (MoneyRange(newFeeFilter)) {
- if (pfrom.m_tx_relay != nullptr) {
- pfrom.m_tx_relay->minFeeFilter = newFeeFilter;
+ if (peer->m_tx_relay != nullptr) {
+ peer->m_tx_relay->minFeeFilter = newFeeFilter;
}
LogPrint(BCLog::NET, "received: feefilter of %s from peer=%d\n", CFeeRate(newFeeFilter).ToString(), pfrom.GetId());
}
@@ -4503,10 +4543,10 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros
}
}
-void PeerManagerImpl::MaybeSendFeefilter(CNode& pto, std::chrono::microseconds current_time)
+void PeerManagerImpl::MaybeSendFeefilter(CNode& pto, Peer& peer, std::chrono::microseconds current_time)
{
if (m_ignore_incoming_txs) return;
- if (!pto.m_tx_relay) return;
+ if (!peer.m_tx_relay) return;
if (pto.GetCommonVersion() < FEEFILTER_VERSION) return;
// peers with the forcerelay permission should not filter txs to us
if (pto.HasPermission(NetPermissionFlags::ForceRelay)) return;
@@ -4520,27 +4560,27 @@ void PeerManagerImpl::MaybeSendFeefilter(CNode& pto, std::chrono::microseconds c
currentFilter = MAX_MONEY;
} else {
static const CAmount MAX_FILTER{g_filter_rounder.round(MAX_MONEY)};
- if (pto.m_tx_relay->lastSentFeeFilter == MAX_FILTER) {
+ if (peer.m_tx_relay->lastSentFeeFilter == MAX_FILTER) {
// Send the current filter if we sent MAX_FILTER previously
// and made it out of IBD.
- pto.m_tx_relay->m_next_send_feefilter = 0us;
+ peer.m_tx_relay->m_next_send_feefilter = 0us;
}
}
- if (current_time > pto.m_tx_relay->m_next_send_feefilter) {
+ if (current_time > peer.m_tx_relay->m_next_send_feefilter) {
CAmount filterToSend = g_filter_rounder.round(currentFilter);
// We always have a fee filter of at least minRelayTxFee
filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK());
- if (filterToSend != pto.m_tx_relay->lastSentFeeFilter) {
+ if (filterToSend != peer.m_tx_relay->lastSentFeeFilter) {
m_connman.PushMessage(&pto, CNetMsgMaker(pto.GetCommonVersion()).Make(NetMsgType::FEEFILTER, filterToSend));
- pto.m_tx_relay->lastSentFeeFilter = filterToSend;
+ peer.m_tx_relay->lastSentFeeFilter = filterToSend;
}
- pto.m_tx_relay->m_next_send_feefilter = GetExponentialRand(current_time, AVG_FEEFILTER_BROADCAST_INTERVAL);
+ peer.m_tx_relay->m_next_send_feefilter = GetExponentialRand(current_time, AVG_FEEFILTER_BROADCAST_INTERVAL);
}
// If the fee filter has changed substantially and it's still more than MAX_FEEFILTER_CHANGE_DELAY
// until scheduled broadcast, then move the broadcast to within MAX_FEEFILTER_CHANGE_DELAY.
- else if (current_time + MAX_FEEFILTER_CHANGE_DELAY < pto.m_tx_relay->m_next_send_feefilter &&
- (currentFilter < 3 * pto.m_tx_relay->lastSentFeeFilter / 4 || currentFilter > 4 * pto.m_tx_relay->lastSentFeeFilter / 3)) {
- pto.m_tx_relay->m_next_send_feefilter = current_time + GetRandomDuration<std::chrono::microseconds>(MAX_FEEFILTER_CHANGE_DELAY);
+ else if (current_time + MAX_FEEFILTER_CHANGE_DELAY < peer.m_tx_relay->m_next_send_feefilter &&
+ (currentFilter < 3 * peer.m_tx_relay->lastSentFeeFilter / 4 || currentFilter > 4 * peer.m_tx_relay->lastSentFeeFilter / 3)) {
+ peer.m_tx_relay->m_next_send_feefilter = current_time + GetRandomDuration<std::chrono::microseconds>(MAX_FEEFILTER_CHANGE_DELAY);
}
}
@@ -4807,45 +4847,45 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
peer->m_blocks_for_inv_relay.clear();
}
- if (pto->m_tx_relay != nullptr) {
- LOCK(pto->m_tx_relay->cs_tx_inventory);
+ if (peer->m_tx_relay != nullptr) {
+ LOCK(peer->m_tx_relay->cs_tx_inventory);
// Check whether periodic sends should happen
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
- if (pto->m_tx_relay->nNextInvSend < current_time) {
+ if (peer->m_tx_relay->nNextInvSend < current_time) {
fSendTrickle = true;
if (pto->IsInboundConn()) {
- pto->m_tx_relay->nNextInvSend = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
+ peer->m_tx_relay->nNextInvSend = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL);
} else {
- pto->m_tx_relay->nNextInvSend = GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL);
+ peer->m_tx_relay->nNextInvSend = GetExponentialRand(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL);
}
}
// Time to send but the peer has requested we not relay transactions.
if (fSendTrickle) {
- LOCK(pto->m_tx_relay->cs_filter);
- if (!pto->m_tx_relay->fRelayTxes) pto->m_tx_relay->setInventoryTxToSend.clear();
+ LOCK(peer->m_tx_relay->cs_filter);
+ if (!peer->m_tx_relay->fRelayTxes) peer->m_tx_relay->setInventoryTxToSend.clear();
}
// Respond to BIP35 mempool requests
- if (fSendTrickle && pto->m_tx_relay->fSendMempool) {
+ if (fSendTrickle && peer->m_tx_relay->fSendMempool) {
auto vtxinfo = m_mempool.infoAll();
- pto->m_tx_relay->fSendMempool = false;
- const CFeeRate filterrate{pto->m_tx_relay->minFeeFilter.load()};
+ peer->m_tx_relay->fSendMempool = false;
+ const CFeeRate filterrate{peer->m_tx_relay->minFeeFilter.load()};
- LOCK(pto->m_tx_relay->cs_filter);
+ LOCK(peer->m_tx_relay->cs_filter);
for (const auto& txinfo : vtxinfo) {
const uint256& hash = peer->m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
- pto->m_tx_relay->setInventoryTxToSend.erase(hash);
+ peer->m_tx_relay->setInventoryTxToSend.erase(hash);
// Don't send transactions that peers will not put into their mempool
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
continue;
}
- if (pto->m_tx_relay->pfilter) {
- if (!pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
+ if (peer->m_tx_relay->pfilter) {
+ if (!peer->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
}
- pto->m_tx_relay->filterInventoryKnown.insert(hash);
+ peer->m_tx_relay->filterInventoryKnown.insert(hash);
// Responses to MEMPOOL requests bypass the m_recently_announced_invs filter.
vInv.push_back(inv);
if (vInv.size() == MAX_INV_SZ) {
@@ -4853,18 +4893,18 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
vInv.clear();
}
}
- pto->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
+ peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast<std::chrono::seconds>(current_time);
}
// Determine transactions to relay
if (fSendTrickle) {
// Produce a vector with all candidates for sending
std::vector<std::set<uint256>::iterator> vInvTx;
- vInvTx.reserve(pto->m_tx_relay->setInventoryTxToSend.size());
- for (std::set<uint256>::iterator it = pto->m_tx_relay->setInventoryTxToSend.begin(); it != pto->m_tx_relay->setInventoryTxToSend.end(); it++) {
+ vInvTx.reserve(peer->m_tx_relay->setInventoryTxToSend.size());
+ for (std::set<uint256>::iterator it = peer->m_tx_relay->setInventoryTxToSend.begin(); it != peer->m_tx_relay->setInventoryTxToSend.end(); it++) {
vInvTx.push_back(it);
}
- const CFeeRate filterrate{pto->m_tx_relay->minFeeFilter.load()};
+ const CFeeRate filterrate{peer->m_tx_relay->minFeeFilter.load()};
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
// A heap is used so that not all items need sorting if only a few are being sent.
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
@@ -4872,7 +4912,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays.
unsigned int nRelayedTransactions = 0;
- LOCK(pto->m_tx_relay->cs_filter);
+ LOCK(peer->m_tx_relay->cs_filter);
while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) {
// Fetch the top element from the heap
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
@@ -4881,9 +4921,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
uint256 hash = *it;
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
// Remove it from the to-be-sent set
- pto->m_tx_relay->setInventoryTxToSend.erase(it);
+ peer->m_tx_relay->setInventoryTxToSend.erase(it);
// Check if not in the filter already
- if (pto->m_tx_relay->filterInventoryKnown.contains(hash)) {
+ if (peer->m_tx_relay->filterInventoryKnown.contains(hash)) {
continue;
}
// Not in the mempool anymore? don't bother sending it.
@@ -4897,7 +4937,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
continue;
}
- if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
+ if (peer->m_tx_relay->pfilter && !peer->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
State(pto->GetId())->m_recently_announced_invs.insert(hash);
vInv.push_back(inv);
@@ -4924,14 +4964,14 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
- pto->m_tx_relay->filterInventoryKnown.insert(hash);
+ peer->m_tx_relay->filterInventoryKnown.insert(hash);
if (hash != txid) {
// Insert txid into filterInventoryKnown, even for
// wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced
// filter, when a child tx is requested. See
// ProcessGetData().
- pto->m_tx_relay->filterInventoryKnown.insert(txid);
+ peer->m_tx_relay->filterInventoryKnown.insert(txid);
}
}
}
@@ -5052,6 +5092,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
if (!vGetData.empty())
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
} // release cs_main
- MaybeSendFeefilter(*pto, current_time);
+ MaybeSendFeefilter(*pto, *peer, current_time);
return true;
}