aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Newbery <john@johnnewbery.com>2021-01-27 22:34:43 +0000
committerJohn Newbery <john@johnnewbery.com>2022-03-18 11:21:48 +0000
commit785f55f7eeab0dedbeb8e0d0b459f3bdc538b621 (patch)
treeb2854cfbc864e0af7ded5da8663cdc866268af18
parent36346703f8558d6781c079c29ddece5a97477beb (diff)
downloadbitcoin-785f55f7eeab0dedbeb8e0d0b459f3bdc538b621.tar.xz
[net processing] Move m_wtxid_relay to Peer
Also, remove cs_main guard from m_wtxid_relay_peers and make it atomic. This should be fine since we don't need m_wtxid_relay_peers to be synchronized with m_wtxid_relay exactly at all times. After this change, RelayTransaction no longer requires cs_main.
-rw-r--r--src/net_processing.cpp76
1 files changed, 35 insertions, 41 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 8008c209bc..ce3b037f60 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -232,6 +232,9 @@ struct Peer {
/** Whether a ping has been requested by the user */
std::atomic<bool> m_ping_queued{false};
+ /** Whether this peer relays txs via wtxid */
+ std::atomic<bool> m_wtxid_relay{false};
+
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send;
/** Probabilistic filter to track recent addr messages relayed with this
@@ -331,9 +334,6 @@ public:
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override;
private:
- void _RelayTransaction(const uint256& txid, const uint256& wtxid)
- EXCLUSIVE_LOCKS_REQUIRED(cs_main);
-
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
void ConsiderEviction(CNode& pto, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@@ -464,7 +464,7 @@ private:
std::map<uint256, std::pair<NodeId, bool>> mapBlockSource GUARDED_BY(cs_main);
/** Number of peers with wtxid relay. */
- int m_wtxid_relay_peers GUARDED_BY(cs_main) = 0;
+ std::atomic<int> m_wtxid_relay_peers{0};
/** Number of outbound peers with m_chain_sync.m_protect. */
int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0;
@@ -779,9 +779,6 @@ struct CNodeState {
//! A rolling bloom filter of all announced tx CInvs to this peer.
CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001};
- //! Whether this peer relays txs via wtxid
- bool m_wtxid_relay{false};
-
CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {}
};
@@ -1211,8 +1208,7 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
CTransactionRef tx = m_mempool.get(txid);
if (tx != nullptr) {
- LOCK(cs_main);
- _RelayTransaction(txid, tx->GetWitnessHash());
+ RelayTransaction(txid, tx->GetWitnessHash());
} else {
m_mempool.RemoveUnbroadcastTx(txid, true);
}
@@ -1239,6 +1235,8 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
PeerRef peer = RemovePeer(nodeid);
assert(peer != nullptr);
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
+ m_wtxid_relay_peers -= peer->m_wtxid_relay;
+ assert(m_wtxid_relay_peers >= 0);
}
CNodeState *state = State(nodeid);
assert(state != nullptr);
@@ -1256,8 +1254,6 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
assert(m_peers_downloading_from >= 0);
m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
assert(m_outbound_peers_with_protect_from_disconnect >= 0);
- m_wtxid_relay_peers -= state->m_wtxid_relay;
- assert(m_wtxid_relay_peers >= 0);
mapNodeState.erase(nodeid);
@@ -1742,21 +1738,22 @@ void PeerManagerImpl::SendPings()
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
{
- WITH_LOCK(cs_main, _RelayTransaction(txid, wtxid););
-}
+ std::map<const NodeId, const uint256&> relay_peers;
+ {
+ // Don't hold m_peer_mutex while calling ForEachNode() to avoid an
+ // m_peer_mutex/cs_vNodes lock inversion. During shutdown, FinalizeNode()
+ // is called while holding cs_vNodes.
+ LOCK(m_peer_mutex);
+ for (auto& it : m_peer_map) {
+ relay_peers.emplace(it.first, it.second->m_wtxid_relay ? wtxid : txid);
+ }
+ }
-void PeerManagerImpl::_RelayTransaction(const uint256& txid, const uint256& wtxid)
-{
- m_connman.ForEachNode([&txid, &wtxid](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) {
- AssertLockHeld(::cs_main);
+ m_connman.ForEachNode([&relay_peers](CNode* node) {
+ auto it = relay_peers.find(node->GetId());
+ if (it == relay_peers.end()) return; // Should never happen
- CNodeState* state = State(pnode->GetId());
- if (state == nullptr) return;
- if (state->m_wtxid_relay) {
- pnode->PushTxInventory(wtxid);
- } else {
- pnode->PushTxInventory(txid);
- }
+ node->PushTxInventory(it->second);
});
}
@@ -2317,7 +2314,7 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set)
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
- _RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
+ RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set);
m_orphanage.EraseTx(orphanHash);
for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) {
@@ -2864,9 +2861,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
return;
}
if (pfrom.GetCommonVersion() >= WTXID_RELAY_VERSION) {
- LOCK(cs_main);
- if (!State(pfrom.GetId())->m_wtxid_relay) {
- State(pfrom.GetId())->m_wtxid_relay = true;
+ if (!peer->m_wtxid_relay) {
+ peer->m_wtxid_relay = true;
m_wtxid_relay_peers++;
} else {
LogPrint(BCLog::NET, "ignoring duplicate wtxidrelay from peer=%d\n", pfrom.GetId());
@@ -3020,7 +3016,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Ignore INVs that don't match wtxidrelay setting.
// Note that orphan parent fetching always uses MSG_TX GETDATAs regardless of the wtxidrelay setting.
// This is fine as no INV messages are involved in that process.
- if (State(pfrom.GetId())->m_wtxid_relay) {
+ if (peer->m_wtxid_relay) {
if (inv.IsMsgTx()) continue;
} else {
if (inv.IsMsgWtx()) continue;
@@ -3298,13 +3294,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const uint256& txid = ptx->GetHash();
const uint256& wtxid = ptx->GetWitnessHash();
- LOCK2(cs_main, g_cs_orphans);
-
- CNodeState* nodestate = State(pfrom.GetId());
-
- const uint256& hash = nodestate->m_wtxid_relay ? wtxid : txid;
+ const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
pfrom.AddKnownTx(hash);
- if (nodestate->m_wtxid_relay && txid != wtxid) {
+ if (peer->m_wtxid_relay && txid != wtxid) {
// Insert txid into filterInventoryKnown, even for
// wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced
@@ -3313,6 +3305,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
pfrom.AddKnownTx(txid);
}
+ LOCK2(cs_main, g_cs_orphans);
+
m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
@@ -3337,7 +3331,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
LogPrintf("Not relaying non-mempool transaction %s from forcerelay peer=%d\n", tx.GetHash().ToString(), pfrom.GetId());
} else {
LogPrintf("Force relaying tx %s from peer=%d\n", tx.GetHash().ToString(), pfrom.GetId());
- _RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
+ RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
}
}
return;
@@ -3351,7 +3345,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// requests for it.
m_txrequest.ForgetTxHash(tx.GetHash());
m_txrequest.ForgetTxHash(tx.GetWitnessHash());
- _RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
+ RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set);
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
@@ -4841,8 +4835,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
LOCK(pto->m_tx_relay->cs_filter);
for (const auto& txinfo : vtxinfo) {
- const uint256& hash = state.m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
- CInv inv(state.m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
+ const uint256& hash = peer->m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
+ CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
pto->m_tx_relay->setInventoryTxToSend.erase(hash);
// Don't send transactions that peers will not put into their mempool
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
@@ -4873,7 +4867,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
const CFeeRate filterrate{pto->m_tx_relay->minFeeFilter.load()};
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
// A heap is used so that not all items need sorting if only a few are being sent.
- CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, state.m_wtxid_relay);
+ CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
// No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays.
@@ -4885,7 +4879,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
std::set<uint256>::iterator it = vInvTx.back();
vInvTx.pop_back();
uint256 hash = *it;
- CInv inv(state.m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
+ CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
// Remove it from the to-be-sent set
pto->m_tx_relay->setInventoryTxToSend.erase(it);
// Check if not in the filter already