diff options
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 418 |
1 files changed, 257 insertions, 161 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 74700580ad..4dfd77c6cf 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -16,10 +16,12 @@ #include <hash.h> #include <headerssync.h> #include <index/blockfilterindex.h> +#include <kernel/mempool_entry.h> #include <merkleblock.h> #include <netbase.h> #include <netmessagemaker.h> #include <node/blockstorage.h> +#include <node/txreconciliation.h> #include <policy/fees.h> #include <policy/policy.h> #include <policy/settings.h> @@ -264,19 +266,14 @@ struct Peer { /** The feerate in the most recent BIP133 `feefilter` message sent to the peer. * It is *not* a p2p protocol violation for the peer to send us * transactions with a lower fee rate than this. See BIP133. */ - CAmount m_fee_filter_sent{0}; + CAmount m_fee_filter_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; /** Timestamp after which we will send the next BIP133 `feefilter` message * to the peer. */ - std::chrono::microseconds m_next_send_feefilter{0}; + std::chrono::microseconds m_next_send_feefilter GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; struct TxRelay { mutable RecursiveMutex m_bloom_filter_mutex; - /** Whether the peer wishes to receive transaction announcements. - * - * This is initially set based on the fRelay flag in the received - * `version` message. If initially set to false, it can only be flipped - * to true if we have offered the peer NODE_BLOOM services and it sends - * us a `filterload` or `filterclear` message. See BIP37. */ + /** Whether we relay transactions to this peer. */ bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false}; /** A bloom filter for which transactions to announce to the peer. See BIP37. */ std::unique_ptr<CBloomFilter> m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr}; @@ -298,7 +295,7 @@ struct Peer { std::atomic<std::chrono::seconds> m_last_mempool_req{0s}; /** The next time after which we will send an `inv` message containing * transaction announcements to this peer. */ - std::chrono::microseconds m_next_inv_send_time{0}; + std::chrono::microseconds m_next_inv_send_time GUARDED_BY(m_tx_inventory_mutex){0}; /** Minimum fee rate with which to filter transaction announcements to this node. See BIP133. */ std::atomic<CAmount> m_fee_filter_received{0}; @@ -319,7 +316,7 @@ struct Peer { }; /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ - std::vector<CAddress> m_addrs_to_send; + std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Probabilistic filter to track recent addr messages relayed with this * peer. Used to avoid relaying redundant addresses to this peer. * @@ -329,7 +326,7 @@ struct Peer { * * Presence of this filter must correlate with m_addr_relay_enabled. **/ - std::unique_ptr<CRollingBloomFilter> m_addr_known; + std::unique_ptr<CRollingBloomFilter> m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Whether we are participating in address relay with this connection. * * We set this bool to true for outbound peers (other than @@ -346,7 +343,7 @@ struct Peer { * initialized.*/ std::atomic_bool m_addr_relay_enabled{false}; /** Whether a getaddr request to this peer is outstanding. */ - bool m_getaddr_sent{false}; + bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Guards address sending timers. */ mutable Mutex m_addr_send_times_mutex; /** Time point to send the next ADDR message to this peer. */ @@ -357,22 +354,19 @@ struct Peer { * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */ std::atomic_bool m_wants_addrv2{false}; /** Whether this peer has already sent us a getaddr message. */ - bool m_getaddr_recvd{false}; + bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Number of addresses that can be processed from this peer. Start at 1 to * permit self-announcement. */ - double m_addr_token_bucket{1.0}; + double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0}; /** When m_addr_token_bucket was last updated */ - std::chrono::microseconds m_addr_token_timestamp{GetTime<std::chrono::microseconds>()}; + std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()}; /** Total number of addresses that were dropped due to rate limiting. */ std::atomic<uint64_t> m_addr_rate_limited{0}; /** Total number of addresses that were processed (excludes rate-limited ones). */ std::atomic<uint64_t> m_addr_processed{0}; - /** Set of txids to reconsider once their parent transactions have been accepted **/ - std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); - /** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */ - bool m_inv_triggered_getheaders_before_sync{false}; + bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Protects m_getdata_requests **/ Mutex m_getdata_requests_mutex; @@ -380,7 +374,7 @@ struct Peer { std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); /** Time of the last getheaders message to this peer */ - NodeClock::time_point m_last_getheaders_timestamp{}; + NodeClock::time_point m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){}; /** Protects m_headers_sync **/ Mutex m_headers_sync_mutex; @@ -399,9 +393,7 @@ struct Peer { private: Mutex m_tx_relay_mutex; - /** Transaction relay data. Will be a nullptr if we're not relaying - * transactions with this peer (e.g. if it's a block-relay-only peer or - * the peer has sent us fRelay=false with bloom filters disabled). */ + /** Transaction relay data. May be a nullptr. */ std::unique_ptr<TxRelay> m_tx_relay GUARDED_BY(m_tx_relay_mutex); }; @@ -515,9 +507,9 @@ public: void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex); - bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex); + bool SendMessages(CNode* pto) override + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); /** Implement PeerManager */ void StartScheduledTasks(CScheduler& scheduler) override; @@ -532,12 +524,12 @@ public: void UnitTestMisbehaving(NodeId peer_id, int howmuch) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), howmuch, ""); }; void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex); void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override; private: /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ - void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -589,8 +581,17 @@ private: */ bool MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer); - void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + /** + * Reconsider orphan transactions after a parent has been accepted to the mempool. + * + * @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only one + * orphan will be reconsidered on each call of this function. This set + * may be added to if accepting an orphan causes its children to be + * reconsidered. + * @return True if there are still orphans in this peer's work set. + */ + bool ProcessOrphanTx(Peer& peer) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main); /** Process a single headers message from a peer. * * @param[in] pfrom CNode of the peer @@ -601,7 +602,7 @@ private: void ProcessHeadersMessage(CNode& pfrom, Peer& peer, std::vector<CBlockHeader>&& headers, bool via_compact_block) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Various helpers for headers processing, invoked by ProcessHeadersMessage() */ /** Return true if headers are continuous and have valid proof-of-work (DoS points assigned on failure) */ bool CheckHeadersPoW(const std::vector<CBlockHeader>& headers, const Consensus::Params& consensusParams, Peer& peer); @@ -610,7 +611,7 @@ private: /** Deal with state tracking and headers sync for peers that send the * occasional non-connecting header (this can happen due to BIP 130 headers * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */ - void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers); + void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector<CBlockHeader>& headers) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Return true if the headers connect to each other, false otherwise */ bool CheckHeadersAreContinuous(const std::vector<CBlockHeader>& headers) const; /** Try to continue a low-work headers sync that has already begun. @@ -633,7 +634,7 @@ private: */ bool IsContinuationOfLowWorkHeadersSync(Peer& peer, CNode& pfrom, std::vector<CBlockHeader>& headers) - EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Check work on a headers chain to be processed, and if insufficient, * initiate our anti-DoS headers sync mechanism. * @@ -642,14 +643,13 @@ private: * @param[in] chain_start_header Where these headers connect in our index. * @param[in,out] headers The headers to be processed. * - * @return True if chain was low work and a headers sync was - * initiated (and headers will be empty after calling); false - * otherwise. + * @return True if chain was low work (headers will be empty after + * calling); false otherwise. */ bool TryLowWorkHeadersSync(Peer& peer, CNode& pfrom, const CBlockIndex* chain_start_header, std::vector<CBlockHeader>& headers) - EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Return true if the given header is an ancestor of * m_chainman.m_best_header or our current tip */ @@ -659,11 +659,11 @@ private: * We don't issue a getheaders message if we have a recent one outstanding. * This returns true if a getheaders is actually sent, and false otherwise. */ - bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer); + bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Potentially fetch blocks from this peer upon receipt of a new headers tip */ - void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast); + void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex& last_header); /** Update peer state based on received headers message */ - void UpdatePeerStateForReceivedHeaders(CNode& pfrom, const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers); + void UpdatePeerStateForReceivedHeaders(CNode& pfrom, const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers); void SendBlockTransactions(CNode& pfrom, Peer& peer, const CBlock& block, const BlockTransactionsRequest& req); @@ -683,10 +683,10 @@ private: void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); /** Send `addr` messages on a regular schedule. */ - void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time); + void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Send a single `sendheaders` message, after we have completed headers sync with a peer. */ - void MaybeSendSendHeaders(CNode& node, Peer& peer); + void MaybeSendSendHeaders(CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Relay (gossip) an address to a few randomly chosen nodes. * @@ -695,10 +695,10 @@ private: * @param[in] fReachable Whether the address' network is reachable. We relay unreachable * addresses less. */ - void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); /** Send `feefilter` message. */ - void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time); + void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); const CChainParams& m_chainparams; CConnman& m_connman; @@ -708,6 +708,7 @@ private: ChainstateManager& m_chainman; CTxMemPool& m_mempool; TxRequestTracker m_txrequest GUARDED_BY(::cs_main); + std::unique_ptr<TxReconciliationTracker> m_txreconciliation; /** The height of the best chain */ std::atomic<int> m_best_height{-1}; @@ -751,7 +752,7 @@ private: int nSyncStarted GUARDED_BY(cs_main) = 0; /** Hash of the last block we received via INV */ - uint256 m_last_block_inv_triggering_headers_sync{}; + uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY(g_msgproc_mutex){}; /** * Sources of received blocks, saved to be able punish them when processing @@ -863,7 +864,7 @@ private: std::atomic_bool m_headers_presync_should_signal{false}; /** Height of the highest block announced using BIP 152 high-bandwidth mode. */ - int m_highest_fast_announce{0}; + int m_highest_fast_announce GUARDED_BY(::cs_main){0}; /** Have we requested this block from a peer */ bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -924,14 +925,14 @@ private: /** Storage for orphan information */ TxOrphanage m_orphanage; - void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans); + void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Orphan/conflicted/etc transactions that are kept for compact block reconstruction. * The last -blockreconstructionextratxn/DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN of * these are kept in a ring buffer */ - std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); + std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_msgproc_mutex); /** Offset into vExtraTxnForCompact to insert the next tx */ - size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; + size_t vExtraTxnForCompactIt GUARDED_BY(g_msgproc_mutex) = 0; /** Check whether the last unknown block a peer advertised is not yet known. */ void ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -1010,7 +1011,10 @@ private: * @return True if address relay is enabled with peer * False if address relay is disallowed */ - bool SetupAddressRelay(const CNode& node, Peer& peer); + bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + + void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); }; const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -1036,13 +1040,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr) return peer.m_wants_addrv2 || addr.IsAddrV1Compatible(); } -static void AddAddressKnown(Peer& peer, const CAddress& addr) +void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr) { assert(peer.m_addr_known); peer.m_addr_known->insert(addr.GetKey()); } -static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) +void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) { // Known checking here is only to save space from duplicates. // Before sending, we'll filter it again for known addresses that were @@ -1291,7 +1295,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co // Make sure pindexBestKnownBlock is up to date, we'll need it. ProcessBlockAvailability(peer.m_id); - if (state->pindexBestKnownBlock == nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.ActiveChain().Tip()->nChainWork || state->pindexBestKnownBlock->nChainWork < nMinimumChainWork) { + if (state->pindexBestKnownBlock == nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.ActiveChain().Tip()->nChainWork || state->pindexBestKnownBlock->nChainWork < m_chainman.MinimumChainWork()) { // This peer has nothing interesting. return; } @@ -1380,7 +1384,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService(); uint64_t your_services{addr.nServices}; - const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn(); + const bool tx_relay{!RejectIncomingTxs(pnode)}; m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime, your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) @@ -1492,8 +1496,9 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) for (const QueuedBlock& entry : state->vBlocksInFlight) { mapBlocksInFlight.erase(entry.pindex->GetBlockHash()); } - WITH_LOCK(g_cs_orphans, m_orphanage.EraseForPeer(nodeid)); + m_orphanage.EraseForPeer(nodeid); m_txrequest.DisconnectedPeer(nodeid); + if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); m_num_preferred_download_peers -= state->fPreferredDownload; m_peers_downloading_from -= (state->nBlocksInFlight != 0); assert(m_peers_downloading_from >= 0); @@ -1738,6 +1743,8 @@ std::optional<std::string> PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl LOCK(cs_main); // Mark block as in-flight unless it already is (for this peer). + // If the peer does not send us a block, vBlocksInFlight remains non-empty, + // causing us to timeout and disconnect. // If a block was already in-flight for a different peer, its BLOCKTXN // response will be dropped. if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer"; @@ -1778,6 +1785,11 @@ PeerManagerImpl::PeerManagerImpl(CConnman& connman, AddrMan& addrman, m_mempool(pool), m_ignore_incoming_txs(ignore_incoming_txs) { + // While Erlay support is incomplete, it must be enabled explicitly via -txreconciliation. + // This argument can go away after Erlay support is complete. + if (gArgs.GetBoolArg("-txreconciliation", DEFAULT_TXRECONCILIATION_ENABLE)) { + m_txreconciliation = std::make_unique<TxReconciliationTracker>(TXRECONCILIATION_VERSION); + } } void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) @@ -2007,8 +2019,15 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid auto tx_relay = peer.GetTxRelay(); if (!tx_relay) continue; - const uint256& hash{peer.m_wtxid_relay ? wtxid : txid}; LOCK(tx_relay->m_tx_inventory_mutex); + // Only queue transactions for announcement once the version handshake + // is completed. The time of arrival for these transactions is + // otherwise at risk of leaking to a spy, if the spy is able to + // distinguish transactions received during the handshake from the rest + // in the announcement. + if (tx_relay->m_next_inv_send_time == 0s) continue; + + const uint256& hash{peer.m_wtxid_relay ? wtxid : txid}; if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) { tx_relay->m_tx_inventory_to_send.insert(hash); } @@ -2281,9 +2300,9 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic std::vector<uint256> parent_ids_to_add; { LOCK(m_mempool.cs); - auto txiter = m_mempool.GetIter(tx->GetHash()); - if (txiter) { - const CTxMemPoolEntry::Parents& parents = (*txiter)->GetMemPoolParentsConst(); + auto tx_iter = m_mempool.GetIter(tx->GetHash()); + if (tx_iter) { + const CTxMemPoolEntry::Parents& parents = (*tx_iter)->GetMemPoolParentsConst(); parent_ids_to_add.reserve(parents.size()); for (const CTxMemPoolEntry& parent : parents) { if (parent.GetTime() > now - UNCONDITIONAL_RELAY_DELAY) { @@ -2386,7 +2405,7 @@ arith_uint256 PeerManagerImpl::GetAntiDoSWorkThreshold() // near our tip. near_chaintip_work = tip->nChainWork - std::min<arith_uint256>(144*GetBlockProof(*tip), tip->nChainWork); } - return std::max(near_chaintip_work, arith_uint256(nMinimumChainWork)); + return std::max(near_chaintip_work, m_chainman.MinimumChainWork()); } /** @@ -2557,16 +2576,20 @@ bool PeerManagerImpl::TryLowWorkHeadersSync(Peer& peer, CNode& pfrom, const CBlo peer.m_headers_sync.reset(new HeadersSyncState(peer.m_id, m_chainparams.GetConsensus(), chain_start_header, minimum_chain_work)); - // Now a HeadersSyncState object for tracking this synchronization is created, - // process the headers using it as normal. - return IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers); + // Now a HeadersSyncState object for tracking this synchronization + // is created, process the headers using it as normal. Failures are + // handled inside of IsContinuationOfLowWorkHeadersSync. + (void)IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers); } else { LogPrint(BCLog::NET, "Ignoring low-work chain (height=%u) from peer=%d\n", chain_start_header->nHeight + headers.size(), pfrom.GetId()); - // Since this is a low-work headers chain, no further processing is required. - headers = {}; - return true; } + + // The peer has not yet given us a chain that meets our work threshold, + // so we want to prevent further processing of the headers in any case. + headers = {}; + return true; } + return false; } @@ -2599,22 +2622,21 @@ bool PeerManagerImpl::MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& loc } /* - * Given a new headers tip ending in pindexLast, potentially request blocks towards that tip. + * Given a new headers tip ending in last_header, potentially request blocks towards that tip. * We require that the given tip have at least as much work as our tip, and for * our current tip to be "close to synced" (see CanDirectFetch()). */ -void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast) +void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex& last_header) { const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); LOCK(cs_main); CNodeState *nodestate = State(pfrom.GetId()); - if (CanDirectFetch() && pindexLast->IsValid(BLOCK_VALID_TREE) && m_chainman.ActiveChain().Tip()->nChainWork <= pindexLast->nChainWork) { - + if (CanDirectFetch() && last_header.IsValid(BLOCK_VALID_TREE) && m_chainman.ActiveChain().Tip()->nChainWork <= last_header.nChainWork) { std::vector<const CBlockIndex*> vToFetch; - const CBlockIndex *pindexWalk = pindexLast; - // Calculate all the blocks we'd need to switch to pindexLast, up to a limit. + const CBlockIndex* pindexWalk{&last_header}; + // Calculate all the blocks we'd need to switch to last_header, up to a limit. while (pindexWalk && !m_chainman.ActiveChain().Contains(pindexWalk) && vToFetch.size() <= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { if (!(pindexWalk->nStatus & BLOCK_HAVE_DATA) && !IsBlockRequested(pindexWalk->GetBlockHash()) && @@ -2630,8 +2652,8 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c // direct fetch and rely on parallel download instead. if (!m_chainman.ActiveChain().Contains(pindexWalk)) { LogPrint(BCLog::NET, "Large reorg, won't direct fetch to %s (%d)\n", - pindexLast->GetBlockHash().ToString(), - pindexLast->nHeight); + last_header.GetBlockHash().ToString(), + last_header.nHeight); } else { std::vector<CInv> vGetData; // Download as much as possible, from earliest to latest. @@ -2648,14 +2670,15 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c } if (vGetData.size() > 1) { LogPrint(BCLog::NET, "Downloading blocks toward %s (%d) via headers direct fetch\n", - pindexLast->GetBlockHash().ToString(), pindexLast->nHeight); + last_header.GetBlockHash().ToString(), + last_header.nHeight); } if (vGetData.size() > 0) { if (!m_ignore_incoming_txs && nodestate->m_provides_cmpctblocks && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && - pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) { + last_header.pprev->IsValid(BLOCK_VALID_CHAIN)) { // In any case, we want to download using a compact block, not a regular one vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); } @@ -2666,12 +2689,12 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c } /** - * Given receipt of headers from a peer ending in pindexLast, along with + * Given receipt of headers from a peer ending in last_header, along with * whether that header was new and whether the headers message was full, * update the state we keep for the peer. */ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, - const CBlockIndex *pindexLast, bool received_new_header, bool may_have_more_headers) + const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers) { LOCK(cs_main); CNodeState *nodestate = State(pfrom.GetId()); @@ -2680,14 +2703,13 @@ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, } nodestate->nUnconnectingHeaders = 0; - assert(pindexLast); - UpdateBlockAvailability(pfrom.GetId(), pindexLast->GetBlockHash()); + UpdateBlockAvailability(pfrom.GetId(), last_header.GetBlockHash()); // From here, pindexBestKnownBlock should be guaranteed to be non-null, // because it is set in UpdateBlockAvailability. Some nullptr checks // are still present, however, as belt-and-suspenders. - if (received_new_header && pindexLast->nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { + if (received_new_header && last_header.nChainWork > m_chainman.ActiveChain().Tip()->nChainWork) { nodestate->m_last_block_announcement = GetTime(); } @@ -2696,14 +2718,14 @@ void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, if (m_chainman.ActiveChainstate().IsInitialBlockDownload() && !may_have_more_headers) { // If the peer has no more headers to give us, then we know we have // their tip. - if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < nMinimumChainWork) { + if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < m_chainman.MinimumChainWork()) { // This peer has too little work on their headers chain to help // us sync -- disconnect if it is an outbound disconnection // candidate. - // Note: We compare their tip to nMinimumChainWork (rather than + // Note: We compare their tip to the minimum chain work (rather than // m_chainman.ActiveChain().Tip()) because we won't start block download // until we have a headers chain that has at least - // nMinimumChainWork, even if a peer has a chain past our tip, + // the minimum chain work, even if a peer has a chain past our tip, // as an anti-DoS measure. if (pfrom.IsOutboundOrBlockRelayConn()) { LogPrintf("Disconnecting outbound peer %d -- headers chain has insufficient work\n", pfrom.GetId()); @@ -2843,7 +2865,7 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer, // If we don't have the last header, then this peer will have given us // something new (if these headers are valid). - bool received_new_header{last_received_header != nullptr}; + bool received_new_header{last_received_header == nullptr}; // Now process all the headers. BlockValidationState state; @@ -2853,7 +2875,7 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer, return; } } - Assume(pindexLast); + assert(pindexLast); // Consider fetching more headers if we are not using our headers-sync mechanism. if (nCount == MAX_HEADERS_RESULTS && !have_headers_sync) { @@ -2864,41 +2886,32 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer, } } - UpdatePeerStateForReceivedHeaders(pfrom, pindexLast, received_new_header, nCount == MAX_HEADERS_RESULTS); + UpdatePeerStateForReceivedHeaders(pfrom, *pindexLast, received_new_header, nCount == MAX_HEADERS_RESULTS); // Consider immediately downloading blocks. - HeadersDirectFetchBlocks(pfrom, peer, pindexLast); + HeadersDirectFetchBlocks(pfrom, peer, *pindexLast); return; } -/** - * Reconsider orphan transactions after a parent has been accepted to the mempool. - * - * @param[in,out] orphan_work_set The set of orphan transactions to reconsider. Generally only one - * orphan will be reconsidered on each call of this function. This set - * may be added to if accepting an orphan causes its children to be - * reconsidered. - */ -void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set) +bool PeerManagerImpl::ProcessOrphanTx(Peer& peer) { + AssertLockHeld(g_msgproc_mutex); AssertLockHeld(cs_main); - AssertLockHeld(g_cs_orphans); - - while (!orphan_work_set.empty()) { - const uint256 orphanHash = *orphan_work_set.begin(); - orphan_work_set.erase(orphan_work_set.begin()); - const auto [porphanTx, from_peer] = m_orphanage.GetTx(orphanHash); - if (porphanTx == nullptr) continue; + CTransactionRef porphanTx = nullptr; + NodeId from_peer = -1; + bool more = false; + while (CTransactionRef porphanTx = m_orphanage.GetTxToReconsider(peer.m_id, from_peer, more)) { const MempoolAcceptResult result = m_chainman.ProcessTransaction(porphanTx); const TxValidationState& state = result.m_state; + const uint256& orphanHash = porphanTx->GetHash(); if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); RelayTransaction(orphanHash, porphanTx->GetWitnessHash()); - m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set); + m_orphanage.AddChildrenToWorkSet(*porphanTx, peer.m_id); m_orphanage.EraseTx(orphanHash); for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { AddToCompactExtraTransactions(removedTx); @@ -2949,6 +2962,8 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set) break; } } + + return more; } bool PeerManagerImpl::PrepareBlockFilterRequest(CNode& node, Peer& peer, @@ -3135,6 +3150,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); PeerRef peer = GetPeerRef(pfrom.GetId()); @@ -3236,8 +3253,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::SENDADDRV2)); } - m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::VERACK)); - pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices); peer->m_their_services = nServices; pfrom.SetAddrLocal(addrMe); @@ -3247,11 +3262,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } peer->m_starting_height = starting_height; - // We only initialize the m_tx_relay data structure if: - // - this isn't an outbound block-relay-only connection; and - // - fRelay=true or we're offering NODE_BLOOM to this peer - // (NODE_BLOOM means that the peer may turn on tx relay later) + // Only initialize the Peer::TxRelay m_relay_txs data structure if: + // - this isn't an outbound block-relay-only connection, and + // - this isn't an outbound feeler connection, and + // - fRelay=true (the peer wishes to receive transaction announcements) + // or we're offering NODE_BLOOM to this peer. NODE_BLOOM means that + // the peer may turn on transaction relay later. if (!pfrom.IsBlockOnlyConn() && + !pfrom.IsFeelerConn() && (fRelay || (peer->m_our_services & NODE_BLOOM))) { auto* const tx_relay = peer->SetTxRelay(); { @@ -3261,6 +3279,22 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (fRelay) pfrom.m_relays_txs = true; } + if (greatest_common_version >= WTXID_RELAY_VERSION && m_txreconciliation) { + // Per BIP-330, we announce txreconciliation support if: + // - protocol version per the peer's VERSION message supports WTXID_RELAY; + // - transaction relay is supported per the peer's VERSION message (see m_relays_txs); + // - this is not a block-relay-only connection and not a feeler (see m_relays_txs); + // - this is not an addr fetch connection; + // - we are not in -blocksonly mode. + if (pfrom.m_relays_txs && !pfrom.IsAddrFetchConn() && !m_ignore_incoming_txs) { + const uint64_t recon_salt = m_txreconciliation->PreRegisterPeer(pfrom.GetId()); + m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::SENDTXRCNCL, + TXRECONCILIATION_VERSION, recon_salt)); + } + } + + m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::VERACK)); + // Potentially mark this peer as a preferred download peer. { LOCK(cs_main); @@ -3269,39 +3303,20 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_num_preferred_download_peers += state->fPreferredDownload; } - // Self advertisement & GETADDR logic - if (!pfrom.IsInboundConn() && SetupAddressRelay(pfrom, *peer)) { - // For outbound peers, we try to relay our address (so that other - // nodes can try to find us more quickly, as we have no guarantee - // that an outbound peer is even aware of how to reach us) and do a - // one-time address fetch (to help populate/update our addrman). If - // we're starting up for the first time, our addrman may be pretty - // empty and no one will know who we are, so these mechanisms are - // important to help us connect to the network. - // + // Attempt to initialize address relay for outbound peers and use result + // to decide whether to send GETADDR, so that we don't send it to + // inbound or outbound block-relay-only peers. + bool send_getaddr{false}; + if (!pfrom.IsInboundConn()) { + send_getaddr = SetupAddressRelay(pfrom, *peer); + } + if (send_getaddr) { + // Do a one-time address fetch to help populate/update our addrman. + // If we're starting up for the first time, our addrman may be pretty + // empty, so this mechanism is important to help us connect to the network. // We skip this for block-relay-only peers. We want to avoid // potentially leaking addr information and we do not want to // indicate to the peer that we will participate in addr relay. - if (fListen && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) - { - CAddress addr{GetLocalAddress(pfrom.addr), peer->m_our_services, Now<NodeSeconds>()}; - FastRandomContext insecure_rand; - if (addr.IsRoutable()) - { - LogPrint(BCLog::NET, "ProcessMessages: advertising address %s\n", addr.ToString()); - PushAddress(*peer, addr, insecure_rand); - } else if (IsPeerAddrLocalGood(&pfrom)) { - // Override just the address with whatever the peer sees us as. - // Leave the port in addr as it was returned by GetLocalAddress() - // above, as this is an outbound connection and the peer cannot - // observe our listening port. - addr.SetIP(addrMe); - LogPrint(BCLog::NET, "ProcessMessages: advertising address %s\n", addr.ToString()); - PushAddress(*peer, addr, insecure_rand); - } - } - - // Get recent addresses m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::GETADDR)); peer->m_getaddr_sent = true; // When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response @@ -3388,6 +3403,30 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // they may wish to request compact blocks from us m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, /*high_bandwidth=*/false, /*version=*/CMPCTBLOCKS_VERSION)); } + + if (m_txreconciliation) { + if (!peer->m_wtxid_relay || !m_txreconciliation->IsPeerRegistered(pfrom.GetId())) { + // We could have optimistically pre-registered/registered the peer. In that case, + // we should forget about the reconciliation state here if this wasn't followed + // by WTXIDRELAY (since WTXIDRELAY can't be announced later). + m_txreconciliation->ForgetPeer(pfrom.GetId()); + } + } + + if (auto tx_relay = peer->GetTxRelay()) { + // `TxRelay::m_tx_inventory_to_send` must be empty before the + // version handshake is completed as + // `TxRelay::m_next_inv_send_time` is first initialised in + // `SendMessages` after the verack is received. Any transactions + // received during the version handshake would otherwise + // immediately be advertised without random delay, potentially + // leaking the time of arrival to a spy. + Assume(WITH_LOCK( + tx_relay->m_tx_inventory_mutex, + return tx_relay->m_tx_inventory_to_send.empty() && + tx_relay->m_next_inv_send_time == 0s)); + } + pfrom.fSuccessfullyConnected = true; return; } @@ -3451,6 +3490,61 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } + // Received from a peer demonstrating readiness to announce transactions via reconciliations. + // This feature negotiation must happen between VERSION and VERACK to avoid relay problems + // from switching announcement protocols after the connection is up. + if (msg_type == NetMsgType::SENDTXRCNCL) { + if (!m_txreconciliation) { + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "sendtxrcncl from peer=%d ignored, as our node does not have txreconciliation enabled\n", pfrom.GetId()); + return; + } + + if (pfrom.fSuccessfullyConnected) { + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "sendtxrcncl received after verack from peer=%d; disconnecting\n", pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + + // Peer must not offer us reconciliations if we specified no tx relay support in VERSION. + if (RejectIncomingTxs(pfrom)) { + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "sendtxrcncl received from peer=%d to which we indicated no tx relay; disconnecting\n", pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + + // Peer must not offer us reconciliations if they specified no tx relay support in VERSION. + // This flag might also be false in other cases, but the RejectIncomingTxs check above + // eliminates them, so that this flag fully represents what we are looking for. + if (!pfrom.m_relays_txs) { + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "sendtxrcncl received from peer=%d which indicated no tx relay to us; disconnecting\n", pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + + uint32_t peer_txreconcl_version; + uint64_t remote_salt; + vRecv >> peer_txreconcl_version >> remote_salt; + + const ReconciliationRegisterResult result = m_txreconciliation->RegisterPeer(pfrom.GetId(), pfrom.IsInboundConn(), + peer_txreconcl_version, remote_salt); + switch (result) { + case ReconciliationRegisterResult::NOT_FOUND: + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "Ignore unexpected txreconciliation signal from peer=%d\n", pfrom.GetId()); + break; + case ReconciliationRegisterResult::SUCCESS: + break; + case ReconciliationRegisterResult::ALREADY_REGISTERED: + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "txreconciliation protocol violation from peer=%d (sendtxrcncl received from already registered peer); disconnecting\n", pfrom.GetId()); + pfrom.fDisconnect = true; + return; + case ReconciliationRegisterResult::PROTOCOL_VIOLATION: + LogPrintLevel(BCLog::NET, BCLog::Level::Debug, "txreconciliation protocol violation from peer=%d; disconnecting\n", pfrom.GetId()); + pfrom.fDisconnect = true; + return; + } + return; + } + if (!pfrom.fSuccessfullyConnected) { LogPrint(BCLog::NET, "Unsupported message \"%s\" prior to verack from peer=%d\n", SanitizeString(msg_type), pfrom.GetId()); return; @@ -3805,12 +3899,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Note that if we were to be on a chain that forks from the checkpointed // chain, then serving those headers to a peer that has seen the // checkpointed chain would cause that peer to disconnect us. Requiring - // that our chainwork exceed nMinimumChainWork is a protection against + // that our chainwork exceed the minimum chain work is a protection against // being fed a bogus chain when we started up for the first time and // getting partitioned off the honest network for serving that chain to // others. if (m_chainman.ActiveTip() == nullptr || - (m_chainman.ActiveTip()->nChainWork < nMinimumChainWork && !pfrom.HasPermission(NetPermissionFlags::Download))) { + (m_chainman.ActiveTip()->nChainWork < m_chainman.MinimumChainWork() && !pfrom.HasPermission(NetPermissionFlags::Download))) { LogPrint(BCLog::NET, "Ignoring getheaders from peer=%d because active chain has too little work; sending empty response\n", pfrom.GetId()); // Just respond with an empty headers message, to tell the peer to // go away but not treat us as unresponsive. @@ -3898,7 +3992,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, AddKnownTx(*peer, txid); } - LOCK2(cs_main, g_cs_orphans); + LOCK(cs_main); m_txrequest.ReceivedResponse(pfrom.GetId(), txid); if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid); @@ -3939,7 +4033,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_txrequest.ForgetTxHash(tx.GetHash()); m_txrequest.ForgetTxHash(tx.GetWitnessHash()); RelayTransaction(tx.GetHash(), tx.GetWitnessHash()); - m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set); + m_orphanage.AddChildrenToWorkSet(tx, peer->m_id); pfrom.m_last_tx_time = GetTime<std::chrono::seconds>(); @@ -3953,7 +4047,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } // Recursively process any orphan transactions that depended on this one - ProcessOrphanTx(peer->m_orphan_work_set); + ProcessOrphanTx(*peer); } else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS) { @@ -4134,7 +4228,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, bool fBlockReconstructed = false; { - LOCK2(cs_main, g_cs_orphans); + LOCK(cs_main); // If AcceptBlockHeader returned true, it set pindex assert(pindex); UpdateBlockAvailability(pfrom.GetId(), pindex->GetBlockHash()); @@ -4274,7 +4368,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // (eg disk space). Because we only try to reconstruct blocks when // we're close to caught up (via the CanDirectFetch() requirement // above, combined with the behavior of not requesting blocks until - // we have a chain with at least nMinimumChainWork), and we ignore + // we have a chain with at least the minimum chain work), and we ignore // compact blocks with less work than our tip, it is safe to treat // reconstructed compact blocks as having been requested. ProcessBlock(pfrom, pblock, /*force_processing=*/true, /*min_pow_checked=*/true); @@ -4748,6 +4842,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + bool fMoreWork = false; PeerRef peer = GetPeerRef(pfrom->GetId()); @@ -4760,16 +4856,17 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt } } + bool has_more_orphans; { - LOCK2(cs_main, g_cs_orphans); - if (!peer->m_orphan_work_set.empty()) { - ProcessOrphanTx(peer->m_orphan_work_set); - } + LOCK(cs_main); + has_more_orphans = ProcessOrphanTx(*peer); } if (pfrom->fDisconnect) return false; + if (has_more_orphans) return true; + // this maintains the order of responses // and prevents m_getdata_requests to grow unbounded { @@ -4777,11 +4874,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt if (!peer->m_getdata_requests.empty()) return true; } - { - LOCK(g_cs_orphans); - if (!peer->m_orphan_work_set.empty()) return true; - } - // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) return false; @@ -5099,7 +5191,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros // Remove addr records that the peer already knows about, and add new // addrs to the m_addr_known filter on the same pass. - auto addr_already_known = [&peer](const CAddress& addr) { + auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) { bool ret = peer.m_addr_known->contains(addr.GetKey()); if (!ret) peer.m_addr_known->insert(addr.GetKey()); return ret; @@ -5138,7 +5230,7 @@ void PeerManagerImpl::MaybeSendSendHeaders(CNode& node, Peer& peer) LOCK(cs_main); CNodeState &state = *State(node.GetId()); if (state.pindexBestKnownBlock != nullptr && - state.pindexBestKnownBlock->nChainWork > nMinimumChainWork) { + state.pindexBestKnownBlock->nChainWork > m_chainman.MinimumChainWork()) { // Tell our peer we prefer to receive headers rather than inv's // We send this to non-NODE NETWORK peers as well, because even // non-NODE NETWORK peers can announce blocks (such as pruning @@ -5217,6 +5309,7 @@ bool PeerManagerImpl::RejectIncomingTxs(const CNode& peer) const { // block-relay-only peers may never send txs to us if (peer.IsBlockOnlyConn()) return true; + if (peer.IsFeelerConn()) return true; // In -blocksonly mode, peers need the 'relay' permission to send txs to us if (m_ignore_incoming_txs && !peer.HasPermission(NetPermissionFlags::Relay)) return true; return false; @@ -5230,8 +5323,9 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) if (node.IsBlockOnlyConn()) return false; if (!peer.m_addr_relay_enabled.exchange(true)) { - // First addr message we have received from the peer, initialize - // m_addr_known + // During version message processing (non-block-relay-only outbound peers) + // or on first addr-related message we have received (inbound peers), initialize + // m_addr_known. peer.m_addr_known = std::make_unique<CRollingBloomFilter>(5000, 0.001); } @@ -5240,6 +5334,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) bool PeerManagerImpl::SendMessages(CNode* pto) { + AssertLockHeld(g_msgproc_mutex); + PeerRef peer = GetPeerRef(pto->GetId()); if (!peer) return false; const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); |