diff options
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 393 |
1 files changed, 238 insertions, 155 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index a246c8fa1c..c97f7ced46 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -169,6 +169,14 @@ void EraseOrphansFor(NodeId peer); // Internal stuff namespace { +/** Blocks that are in flight, and that are in the queue to be downloaded. */ +struct QueuedBlock { + uint256 hash; + const CBlockIndex* pindex; //!< Optional. + bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request. + std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads +}; + /** * Data structure for an individual peer. This struct is not protected by * cs_main since it does not contain validation-critical data. @@ -211,6 +219,13 @@ struct Peer { /** This peer's reported block height when we connected */ std::atomic<int> m_starting_height{-1}; + /** The pong reply we're expecting, or 0 if no pong expected. */ + std::atomic<uint64_t> m_ping_nonce_sent{0}; + /** When the last ping was sent, or 0 if no ping was ever sent */ + std::atomic<std::chrono::microseconds> m_ping_start{0us}; + /** Whether a ping has been requested by the user */ + std::atomic<bool> m_ping_queued{false}; + /** Set of txids to reconsider once their parent transactions have been accepted **/ std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); @@ -248,6 +263,7 @@ public: void CheckForStaleTipAndEvictPeers() override; bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) override; bool IgnoresIncomingTxs() override { return m_ignore_incoming_txs; } + void SendPings() override; void SetBestHeight(int height) override { m_best_height = height; }; void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message) override; void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, @@ -294,9 +310,10 @@ private: /** Maybe disconnect a peer and discourage future connections from its address. * * @param[in] pnode The node to check. + * @param[in] peer The peer object to check. * @return True if the peer was marked for disconnection in this function */ - bool MaybeDiscourageAndDisconnect(CNode& pnode); + bool MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer); void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); /** Process a single headers message from a peer. */ @@ -315,6 +332,10 @@ private: /** Send a version message to a peer */ void PushNodeVersion(CNode& pnode, int64_t nTime); + /** Send a ping message every PING_INTERVAL or if requested via RPC. May + * mark the peer to be disconnected if a ping has timed out. */ + void MaybeSendPing(CNode& node_to, Peer& peer); + const CChainParams& m_chainparams; CConnman& m_connman; /** Pointer to this node's banman. May be nullptr - check existence before dereferencing. */ @@ -345,10 +366,7 @@ private: * their own locks. */ std::map<NodeId, PeerRef> m_peer_map GUARDED_BY(m_peer_mutex); -}; -} // namespace -namespace { /** Number of nodes with fSyncStarted. */ int nSyncStarted GUARDED_BY(cs_main) = 0; @@ -360,6 +378,14 @@ namespace { */ std::map<uint256, std::pair<NodeId, bool>> mapBlockSource GUARDED_BY(cs_main); + /** Number of peers with wtxid relay. */ + int m_wtxid_relay_peers GUARDED_BY(cs_main) = 0; + + /** Number of outbound peers with m_chain_sync.m_protect. */ + int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0; + + bool AlreadyHaveTx(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** * Filter for transactions that were recently rejected by * AcceptToMemoryPool. These are not rerequested until the chain tip @@ -402,35 +428,36 @@ namespace { * We use this to avoid requesting transactions that have already been * confirnmed. */ - Mutex g_cs_recent_confirmed_transactions; - std::unique_ptr<CRollingBloomFilter> g_recent_confirmed_transactions GUARDED_BY(g_cs_recent_confirmed_transactions); - - /** Blocks that are in flight, and that are in the queue to be downloaded. */ - struct QueuedBlock { - uint256 hash; - const CBlockIndex* pindex; //!< Optional. - bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request. - std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads - }; - std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> > mapBlocksInFlight GUARDED_BY(cs_main); + Mutex m_recent_confirmed_transactions_mutex; + std::unique_ptr<CRollingBloomFilter> m_recent_confirmed_transactions GUARDED_BY(m_recent_confirmed_transactions_mutex); - /** Stack of nodes which we have set to announce using compact blocks */ - std::list<NodeId> lNodesAnnouncingHeaderAndIDs GUARDED_BY(cs_main); + /* Returns a bool indicating whether we requested this block. + * Also used if a block was /not/ received and timed out or started with another peer + */ + bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Number of preferable block download peers. */ - int nPreferredDownload GUARDED_BY(cs_main) = 0; + /* Mark a block as in flight + * Returns false, still setting pit, if the block was already in flight from the same peer + * pit will only be valid as long as the same cs_main lock is being held + */ + bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const CBlockIndex* pindex = nullptr, std::list<QueuedBlock>::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Number of peers from which we're downloading blocks. */ - int nPeersWithValidatedDownloads GUARDED_BY(cs_main) = 0; + bool TipMayBeStale() EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Number of peers with wtxid relay. */ - int g_wtxid_relay_peers GUARDED_BY(cs_main) = 0; + /** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has + * at most count entries. + */ + void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - /** Number of outbound peers with m_chain_sync.m_protect. */ - int g_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0; + std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> > mapBlocksInFlight GUARDED_BY(cs_main); /** When our tip was last updated. */ - std::atomic<int64_t> g_last_tip_update(0); + std::atomic<int64_t> m_last_tip_update{0}; + + /** Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). */ + CTransactionRef FindTxForGetData(const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main); + + void ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex); /** Relay map (txid or wtxid -> CTransactionRef) */ typedef std::map<uint256, CTransactionRef> MapRelay; @@ -438,6 +465,28 @@ namespace { /** Expiration-time ordered list of (expire time, relay map entry) pairs. */ std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration GUARDED_BY(cs_main); + /** + * When a peer sends us a valid block, instruct it to announce blocks to us + * using CMPCTBLOCK if possible by adding its nodeid to the end of + * lNodesAnnouncingHeaderAndIDs, and keeping that list under a certain size by + * removing the first element if necessary. + */ + void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + + /** Stack of nodes which we have set to announce using compact blocks */ + std::list<NodeId> lNodesAnnouncingHeaderAndIDs GUARDED_BY(cs_main); + + /** Number of peers from which we're downloading blocks. */ + int nPeersWithValidatedDownloads GUARDED_BY(cs_main) = 0; + +}; +} // namespace + +namespace { + + /** Number of preferable block download peers. */ + int nPreferredDownload GUARDED_BY(cs_main) = 0; + struct IteratorComparator { template<typename I> @@ -610,9 +659,8 @@ static void UpdatePreferredDownload(const CNode& node, CNodeState* state) EXCLUS nPreferredDownload += state->fPreferredDownload; } -// Returns a bool indicating whether we requested this block. -// Also used if a block was /not/ received and timed out or started with another peer -static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { +bool PeerManagerImpl::MarkBlockAsReceived(const uint256& hash) +{ std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); if (itInFlight != mapBlocksInFlight.end()) { CNodeState *state = State(itInFlight->second.first); @@ -635,9 +683,8 @@ static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs return false; } -// returns false, still setting pit, if the block was already in flight from the same peer -// pit will only be valid as long as the same cs_main lock is being held -static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint256& hash, const CBlockIndex* pindex = nullptr, std::list<QueuedBlock>::iterator** pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { +bool PeerManagerImpl::MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const CBlockIndex* pindex, std::list<QueuedBlock>::iterator** pit) +{ CNodeState *state = State(nodeid); assert(state != nullptr); @@ -654,7 +701,7 @@ static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint25 MarkBlockAsReceived(hash); std::list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), - {hash, pindex, pindex != nullptr, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&mempool) : nullptr)}); + {hash, pindex, pindex != nullptr, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); state->nBlocksInFlight++; state->nBlocksInFlightValidHeaders += it->fValidatedHeaders; if (state->nBlocksInFlight == 1) { @@ -705,13 +752,7 @@ static void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) EXCLUSIV } } -/** - * When a peer sends us a valid block, instruct it to announce blocks to us - * using CMPCTBLOCK if possible by adding its nodeid to the end of - * lNodesAnnouncingHeaderAndIDs, and keeping that list under a certain size by - * removing the first element if necessary. - */ -static void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connman) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid) { AssertLockHeld(cs_main); CNodeState* nodestate = State(nodeid); @@ -727,21 +768,21 @@ static void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connma return; } } - connman.ForNode(nodeid, [&connman](CNode* pfrom) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + m_connman.ForNode(nodeid, [this](CNode* pfrom) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { AssertLockHeld(::cs_main); uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1; if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { // As per BIP152, we only get 3 of our peers to announce // blocks using compact encodings. - connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, nCMPCTBLOCKVersion](CNode* pnodeStop){ - connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); + m_connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [this, nCMPCTBLOCKVersion](CNode* pnodeStop){ + m_connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); // save BIP152 bandwidth state: we select peer to be low-bandwidth pnodeStop->m_bip152_highbandwidth_to = false; return true; }); lNodesAnnouncingHeaderAndIDs.pop_front(); } - connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); + m_connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); // save BIP152 bandwidth state: we select peer to be high-bandwidth pfrom->m_bip152_highbandwidth_to = true; lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); @@ -750,13 +791,14 @@ static void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connma } } -static bool TipMayBeStale(const Consensus::Params &consensusParams) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool PeerManagerImpl::TipMayBeStale() { AssertLockHeld(cs_main); - if (g_last_tip_update == 0) { - g_last_tip_update = GetTime(); + const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); + if (m_last_tip_update == 0) { + m_last_tip_update = GetTime(); } - return g_last_tip_update < GetTime() - consensusParams.nPowTargetSpacing * 3 && mapBlocksInFlight.empty(); + return m_last_tip_update < GetTime() - consensusParams.nPowTargetSpacing * 3 && mapBlocksInFlight.empty(); } static bool CanDirectFetch(const Consensus::Params &consensusParams) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -773,9 +815,7 @@ static bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex) EXCLUSIV return false; } -/** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has - * at most count entries. */ -static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManagerImpl::FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller) { if (count == 0) return; @@ -804,6 +844,7 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec if (state->pindexLastCommonBlock == state->pindexBestKnownBlock) return; + const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); std::vector<const CBlockIndex*> vToFetch; const CBlockIndex *pindexWalk = state->pindexLastCommonBlock; // Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last @@ -911,7 +952,7 @@ void PeerManagerImpl::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, auto delay = std::chrono::microseconds{0}; const bool preferred = state->fPreferredDownload; if (!preferred) delay += NONPREF_PEER_TX_DELAY; - if (!gtxid.IsWtxid() && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY; + if (!gtxid.IsWtxid() && m_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY; const bool overloaded = !node.HasPermission(PF_RELAY) && m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; @@ -1004,10 +1045,10 @@ void PeerManagerImpl::FinalizeNode(const CNode& node, bool& fUpdateConnectionTim nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); - g_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; - assert(g_outbound_peers_with_protect_from_disconnect >= 0); - g_wtxid_relay_peers -= state->m_wtxid_relay; - assert(g_wtxid_relay_peers >= 0); + m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; + assert(m_outbound_peers_with_protect_from_disconnect >= 0); + m_wtxid_relay_peers -= state->m_wtxid_relay; + assert(m_wtxid_relay_peers >= 0); mapNodeState.erase(nodeid); @@ -1016,8 +1057,8 @@ void PeerManagerImpl::FinalizeNode(const CNode& node, bool& fUpdateConnectionTim assert(mapBlocksInFlight.empty()); assert(nPreferredDownload == 0); assert(nPeersWithValidatedDownloads == 0); - assert(g_outbound_peers_with_protect_from_disconnect == 0); - assert(g_wtxid_relay_peers == 0); + assert(m_outbound_peers_with_protect_from_disconnect == 0); + assert(m_wtxid_relay_peers == 0); assert(m_txrequest.Size() == 0); } LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); @@ -1060,6 +1101,18 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) PeerRef peer = GetPeerRef(nodeid); if (peer == nullptr) return false; stats.m_starting_height = peer->m_starting_height; + // It is common for nodes with good ping times to suddenly become lagged, + // due to a new block arriving or other large transfer. + // Merely reporting pingtime might fool the caller into thinking the node was still responsive, + // since pingtime does not update until the ping is complete, which might take a while. + // So, if a ping is taking an unusually long time in flight, + // the caller can immediately detect that this is happening. + std::chrono::microseconds ping_wait{0}; + if ((0 != peer->m_ping_nonce_sent) && (0 != peer->m_ping_start.load().count())) { + ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load(); + } + + stats.m_ping_wait_usec = count_microseconds(ping_wait); return true; } @@ -1344,7 +1397,7 @@ PeerManagerImpl::PeerManagerImpl(const CChainParams& chainparams, CConnman& conn // The false positive rate of 1/1M should come out to less than 1 // transaction per day that would be inadvertently ignored (which is the // same probability that we have in the reject filter). - g_recent_confirmed_transactions.reset(new CRollingBloomFilter(48000, 0.000001)); + m_recent_confirmed_transactions.reset(new CRollingBloomFilter(48000, 0.000001)); // Stale tip checking and peer eviction are on two different timers, but we // don't want them to get out of sync due to drift in the scheduler, so we @@ -1394,14 +1447,14 @@ void PeerManagerImpl::BlockConnected(const std::shared_ptr<const CBlock>& pblock LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased); } - g_last_tip_update = GetTime(); + m_last_tip_update = GetTime(); } { - LOCK(g_cs_recent_confirmed_transactions); + LOCK(m_recent_confirmed_transactions_mutex); for (const auto& ptx : pblock->vtx) { - g_recent_confirmed_transactions->insert(ptx->GetHash()); + m_recent_confirmed_transactions->insert(ptx->GetHash()); if (ptx->GetHash() != ptx->GetWitnessHash()) { - g_recent_confirmed_transactions->insert(ptx->GetWitnessHash()); + m_recent_confirmed_transactions->insert(ptx->GetWitnessHash()); } } } @@ -1424,8 +1477,8 @@ void PeerManagerImpl::BlockDisconnected(const std::shared_ptr<const CBlock> &blo // block's worth of transactions in it, but that should be fine, since // presumably the most common case of relaying a confirmed transaction // should be just after a new block containing it is found. - LOCK(g_cs_recent_confirmed_transactions); - g_recent_confirmed_transactions->reset(); + LOCK(m_recent_confirmed_transactions_mutex); + m_recent_confirmed_transactions->reset(); } // All of the following cache a recent block, and are protected by cs_most_recent_block @@ -1550,7 +1603,7 @@ void PeerManagerImpl::BlockChecked(const CBlock& block, const BlockValidationSta !::ChainstateActive().IsInitialBlockDownload() && mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) { if (it != mapBlockSource.end()) { - MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, m_connman); + MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first); } } if (it != mapBlockSource.end()) @@ -1563,7 +1616,7 @@ void PeerManagerImpl::BlockChecked(const CBlock& block, const BlockValidationSta // -bool static AlreadyHaveTx(const GenTxid& gtxid, const CTxMemPool& mempool) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool PeerManagerImpl::AlreadyHaveTx(const GenTxid& gtxid) { assert(recentRejects); if (::ChainActive().Tip()->GetBlockHash() != hashRecentRejectsChainTip) { @@ -1587,11 +1640,11 @@ bool static AlreadyHaveTx(const GenTxid& gtxid, const CTxMemPool& mempool) EXCLU } { - LOCK(g_cs_recent_confirmed_transactions); - if (g_recent_confirmed_transactions->contains(hash)) return true; + LOCK(m_recent_confirmed_transactions_mutex); + if (m_recent_confirmed_transactions->contains(hash)) return true; } - return recentRejects->contains(hash) || mempool.exists(gtxid); + return recentRejects->contains(hash) || m_mempool.exists(gtxid); } bool static AlreadyHaveBlock(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -1599,6 +1652,12 @@ bool static AlreadyHaveBlock(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED return g_chainman.m_blockman.LookupBlockIndex(block_hash) != nullptr; } +void PeerManagerImpl::SendPings() +{ + LOCK(m_peer_mutex); + for(auto& it : m_peer_map) it.second->m_ping_queued = true; +} + void RelayTransaction(const uint256& txid, const uint256& wtxid, const CConnman& connman) { connman.ForEachNode([&txid, &wtxid](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { @@ -1825,10 +1884,9 @@ void static ProcessGetBlockData(CNode& pfrom, Peer& peer, const CChainParams& ch } } -//! Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). -static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) +CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) { - auto txinfo = mempool.info(gtxid); + auto txinfo = m_mempool.info(gtxid); if (txinfo.tx) { // If a TX could have been INVed in reply to a MEMPOOL request, // or is older than UNCONDITIONAL_RELAY_DELAY, permit the request @@ -1853,7 +1911,7 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode& return {}; } -void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex) +void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic<bool>& interruptMsgProc) { AssertLockNotHeld(cs_main); @@ -1882,17 +1940,17 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa continue; } - CTransactionRef tx = FindTxForGetData(mempool, pfrom, ToGenTxid(inv), mempool_req, now); + CTransactionRef tx = FindTxForGetData(pfrom, ToGenTxid(inv), mempool_req, now); if (tx) { // WTX and WITNESS_TX imply we serialize with witness int nSendFlags = (inv.IsMsgTx() ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); - connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *tx)); - mempool.RemoveUnbroadcastTx(tx->GetHash()); + m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *tx)); + m_mempool.RemoveUnbroadcastTx(tx->GetHash()); // As we're going to send tx, make sure its unconfirmed parents are made requestable. std::vector<uint256> parent_ids_to_add; { - LOCK(mempool.cs); - auto txiter = mempool.GetIter(tx->GetHash()); + LOCK(m_mempool.cs); + auto txiter = m_mempool.GetIter(tx->GetHash()); if (txiter) { const CTxMemPoolEntry::Parents& parents = (*txiter)->GetMemPoolParentsConst(); parent_ids_to_add.reserve(parents.size()); @@ -1920,7 +1978,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) { const CInv &inv = *it++; if (inv.IsGenBlkMsg()) { - ProcessGetBlockData(pfrom, peer, chainparams, inv, connman); + ProcessGetBlockData(pfrom, peer, m_chainparams, inv, m_connman); } // else: If the first item on the queue is an unknown type, we erase it // and continue processing the queue on the next call. @@ -1943,7 +2001,7 @@ void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainpa // In normal operation, we often send NOTFOUND messages for parents of // transactions that we relay; if a peer is missing a parent, they may // assume we have them and request the parents from us. - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); } } @@ -2102,7 +2160,7 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, const Peer& peer, } uint32_t nFetchFlags = GetFetchFlags(pfrom); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); - MarkBlockAsInFlight(m_mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex); + MarkBlockAsInFlight(pfrom.GetId(), pindex->GetBlockHash(), pindex); LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n", pindex->GetBlockHash().ToString(), pfrom.GetId()); } @@ -2146,10 +2204,10 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, const Peer& peer, // thus always subject to eviction under the bad/lagging chain logic. // See ChainSyncTimeoutState. if (!pfrom.fDisconnect && pfrom.IsFullOutboundConn() && nodestate->pindexBestKnownBlock != nullptr) { - if (g_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= ::ChainActive().Tip()->nChainWork && !nodestate->m_chain_sync.m_protect) { + if (m_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= ::ChainActive().Tip()->nChainWork && !nodestate->m_chain_sync.m_protect) { LogPrint(BCLog::NET, "Protecting outbound peer=%d from eviction\n", pfrom.GetId()); nodestate->m_chain_sync.m_protect = true; - ++g_outbound_peers_with_protect_from_disconnect; + ++m_outbound_peers_with_protect_from_disconnect; } } } @@ -2178,10 +2236,10 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set) if (orphan_it == mapOrphanTransactions.end()) continue; const CTransactionRef porphanTx = orphan_it->second.tx; - TxValidationState state; - std::list<CTransactionRef> removed_txn; + const MempoolAcceptResult result = AcceptToMemoryPool(::ChainstateActive(), m_mempool, porphanTx, false /* bypass_limits */); + const TxValidationState& state = result.m_state; - if (AcceptToMemoryPool(m_mempool, state, porphanTx, &removed_txn, false /* bypass_limits */)) { + if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); RelayTransaction(orphanHash, porphanTx->GetWitnessHash(), m_connman); for (unsigned int i = 0; i < porphanTx->vout.size(); i++) { @@ -2193,7 +2251,7 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set) } } EraseOrphanTx(orphanHash); - for (const CTransactionRef& removedTx : removed_txn) { + for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { AddToCompactExtraTransactions(removedTx); } break; @@ -2489,6 +2547,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, bool fRelay = true; vRecv >> nVersion >> nServiceInt >> nTime >> addrMe; + if (nTime < 0) { + nTime = 0; + } nServices = ServiceFlags(nServiceInt); if (!pfrom.IsInboundConn()) { @@ -2758,7 +2819,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LOCK(cs_main); if (!State(pfrom.GetId())->m_wtxid_relay) { State(pfrom.GetId())->m_wtxid_relay = true; - g_wtxid_relay_peers++; + m_wtxid_relay_peers++; } else { LogPrint(BCLog::NET, "ignoring duplicate wtxidrelay from peer=%d\n", pfrom.GetId()); } @@ -2901,7 +2962,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } else if (inv.IsGenTxMsg()) { const GenTxid gtxid = ToGenTxid(inv); - const bool fAlreadyHave = AlreadyHaveTx(gtxid, m_mempool); + const bool fAlreadyHave = AlreadyHaveTx(gtxid); LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); pfrom.AddKnownTx(inv.hash); @@ -2943,7 +3004,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, { LOCK(peer->m_getdata_requests_mutex); peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + ProcessGetData(pfrom, *peer, interruptMsgProc); } return; @@ -3182,7 +3243,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // already; and an adversary can already relay us old transactions // (older than our recency filter) if trying to DoS us, without any need // for witness malleation. - if (AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid), m_mempool)) { + if (AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid))) { if (pfrom.HasPermission(PF_FORCERELAY)) { // Always relay transactions received from peers with forcerelay // permission, even if they were already in the mempool, allowing @@ -3197,10 +3258,10 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } - TxValidationState state; - std::list<CTransactionRef> lRemovedTxn; + const MempoolAcceptResult result = AcceptToMemoryPool(::ChainstateActive(), m_mempool, ptx, false /* bypass_limits */); + const TxValidationState& state = result.m_state; - if (AcceptToMemoryPool(m_mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */)) { + if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { m_mempool.check(&::ChainstateActive().CoinsTip()); // As this version of the transaction was acceptable, we can forget about any // requests for it. @@ -3223,7 +3284,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, tx.GetHash().ToString(), m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); - for (const CTransactionRef& removedTx : lRemovedTxn) { + for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { AddToCompactExtraTransactions(removedTx); } @@ -3261,7 +3322,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // protocol for getting all unconfirmed parents. const GenTxid gtxid{/* is_wtxid=*/false, parent_txid}; pfrom.AddKnownTx(parent_txid); - if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time); + if (!AlreadyHaveTx(gtxid)) AddTxAnnouncement(pfrom, gtxid, current_time); } AddOrphanTx(ptx, pfrom.GetId()); @@ -3451,7 +3512,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { std::list<QueuedBlock>::iterator* queuedBlockIt = nullptr; - if (!MarkBlockAsInFlight(m_mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { + if (!MarkBlockAsInFlight(pfrom.GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&m_mempool)); else { @@ -3807,15 +3868,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vRecv >> nonce; // Only process pong message if there is an outstanding ping (old ping without nonce should never pong) - if (pfrom.nPingNonceSent != 0) { - if (nonce == pfrom.nPingNonceSent) { + if (peer->m_ping_nonce_sent != 0) { + if (nonce == peer->m_ping_nonce_sent) { // Matching pong received, this ping is no longer outstanding bPingFinished = true; - const auto ping_time = ping_end - pfrom.m_ping_start.load(); + const auto ping_time = ping_end - peer->m_ping_start.load(); if (ping_time.count() >= 0) { - // Successful ping time measurement, replace previous - pfrom.nPingUsecTime = count_microseconds(ping_time); - pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), count_microseconds(ping_time)); + // Let connman know about this successful ping-pong + pfrom.PongReceived(ping_time); } else { // This should never happen sProblem = "Timing mishap"; @@ -3842,12 +3902,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogPrint(BCLog::NET, "pong peer=%d: %s, %x expected, %x received, %u bytes\n", pfrom.GetId(), sProblem, - pfrom.nPingNonceSent, + peer->m_ping_nonce_sent, nonce, nAvail); } if (bPingFinished) { - pfrom.nPingNonceSent = 0; + peer->m_ping_nonce_sent = 0; } return; } @@ -3966,43 +4026,40 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } -bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode) +bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) { - const NodeId peer_id{pnode.GetId()}; - PeerRef peer = GetPeerRef(peer_id); - if (peer == nullptr) return false; - { - LOCK(peer->m_misbehavior_mutex); + LOCK(peer.m_misbehavior_mutex); // There's nothing to do if the m_should_discourage flag isn't set - if (!peer->m_should_discourage) return false; + if (!peer.m_should_discourage) return false; - peer->m_should_discourage = false; + peer.m_should_discourage = false; } // peer.m_misbehavior_mutex if (pnode.HasPermission(PF_NOBAN)) { // We never disconnect or discourage peers for bad behavior if they have the NOBAN permission flag - LogPrintf("Warning: not punishing noban peer %d!\n", peer_id); + LogPrintf("Warning: not punishing noban peer %d!\n", peer.m_id); return false; } if (pnode.IsManualConn()) { // We never disconnect or discourage manual peers for bad behavior - LogPrintf("Warning: not punishing manually connected peer %d!\n", peer_id); + LogPrintf("Warning: not punishing manually connected peer %d!\n", peer.m_id); return false; } if (pnode.addr.IsLocal()) { // We disconnect local peers for bad behavior but don't discourage (since that would discourage // all peers on the same local address) - LogPrintf("Warning: disconnecting but not discouraging local peer %d!\n", peer_id); + LogPrint(BCLog::NET, "Warning: disconnecting but not discouraging %s peer %d!\n", + pnode.m_inbound_onion ? "inbound onion" : "local", peer.m_id); pnode.fDisconnect = true; return true; } // Normal case: Disconnect the peer and discourage all nodes sharing the address - LogPrint(BCLog::NET, "Disconnecting and discouraging peer %d!\n", peer_id); + LogPrint(BCLog::NET, "Disconnecting and discouraging peer %d!\n", peer.m_id); if (m_banman) m_banman->Discourage(pnode.addr); m_connman.DisconnectNode(pnode.addr); return true; @@ -4018,7 +4075,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt { LOCK(peer->m_getdata_requests_mutex); if (!peer->m_getdata_requests.empty()) { - ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + ProcessGetData(*pfrom, *peer, interruptMsgProc); } } @@ -4250,8 +4307,8 @@ void PeerManagerImpl::CheckForStaleTipAndEvictPeers() if (time_in_seconds > m_stale_tip_check_time) { // Check whether our tip is stale, and if so, allow using an extra // outbound peer - if (!fImporting && !fReindex && m_connman.GetNetworkActive() && m_connman.GetUseAddrmanOutgoing() && TipMayBeStale(m_chainparams.GetConsensus())) { - LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update); + if (!fImporting && !fReindex && m_connman.GetNetworkActive() && m_connman.GetUseAddrmanOutgoing() && TipMayBeStale()) { + LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - m_last_tip_update); m_connman.SetTryNewOutboundPeer(true); } else if (m_connman.GetTryNewOutboundPeer()) { m_connman.SetTryNewOutboundPeer(false); @@ -4265,6 +4322,50 @@ void PeerManagerImpl::CheckForStaleTipAndEvictPeers() } } +void PeerManagerImpl::MaybeSendPing(CNode& node_to, Peer& peer) +{ + // Use mockable time for ping timeouts. + // This means that setmocktime may cause pings to time out. + auto now = GetTime<std::chrono::microseconds>(); + + if (m_connman.RunInactivityChecks(node_to) && peer.m_ping_nonce_sent && + now > peer.m_ping_start.load() + std::chrono::seconds{TIMEOUT_INTERVAL}) { + LogPrint(BCLog::NET, "ping timeout: %fs peer=%d\n", 0.000001 * count_microseconds(now - peer.m_ping_start.load()), peer.m_id); + node_to.fDisconnect = true; + return; + } + + const CNetMsgMaker msgMaker(node_to.GetCommonVersion()); + bool pingSend = false; + + if (peer.m_ping_queued) { + // RPC ping request by user + pingSend = true; + } + + if (peer.m_ping_nonce_sent == 0 && now > peer.m_ping_start.load() + PING_INTERVAL) { + // Ping automatically sent as a latency probe & keepalive. + pingSend = true; + } + + if (pingSend) { + uint64_t nonce = 0; + while (nonce == 0) { + GetRandBytes((unsigned char*)&nonce, sizeof(nonce)); + } + peer.m_ping_queued = false; + peer.m_ping_start = now; + if (node_to.GetCommonVersion() > BIP0031_VERSION) { + peer.m_ping_nonce_sent = nonce; + m_connman.PushMessage(&node_to, msgMaker.Make(NetMsgType::PING, nonce)); + } else { + // Peer is too old to support ping command with nonce, pong will never arrive. + peer.m_ping_nonce_sent = 0; + m_connman.PushMessage(&node_to, msgMaker.Make(NetMsgType::PING)); + } + } +} + namespace { class CompareInvMempoolOrder { @@ -4289,11 +4390,12 @@ public: bool PeerManagerImpl::SendMessages(CNode* pto) { PeerRef peer = GetPeerRef(pto->GetId()); + if (!peer) return false; const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); // We must call MaybeDiscourageAndDisconnect first, to ensure that we'll // disconnect misbehaving peers even before the version handshake is complete. - if (MaybeDiscourageAndDisconnect(*pto)) return true; + if (MaybeDiscourageAndDisconnect(*pto, *peer)) return true; // Don't send anything until the version handshake is complete if (!pto->fSuccessfullyConnected || pto->fDisconnect) @@ -4302,34 +4404,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // If we get here, the outgoing message serialization version is set and can't change. const CNetMsgMaker msgMaker(pto->GetCommonVersion()); - // - // Message: ping - // - bool pingSend = false; - if (pto->fPingQueued) { - // RPC ping request by user - pingSend = true; - } - if (pto->nPingNonceSent == 0 && pto->m_ping_start.load() + PING_INTERVAL < GetTime<std::chrono::microseconds>()) { - // Ping automatically sent as a latency probe & keepalive. - pingSend = true; - } - if (pingSend) { - uint64_t nonce = 0; - while (nonce == 0) { - GetRandBytes((unsigned char*)&nonce, sizeof(nonce)); - } - pto->fPingQueued = false; - pto->m_ping_start = GetTime<std::chrono::microseconds>(); - if (pto->GetCommonVersion() > BIP0031_VERSION) { - pto->nPingNonceSent = nonce; - m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); - } else { - // Peer is too old to support ping command with nonce, pong will never arrive. - pto->nPingNonceSent = 0; - m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING)); - } - } + MaybeSendPing(*pto, *peer); + + // MaybeSendPing may have marked peer for disconnection + if (pto->fDisconnect) return true; { LOCK(cs_main); @@ -4339,7 +4417,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Address refresh broadcast auto current_time = GetTime<std::chrono::microseconds>(); - if (pto->RelayAddrsWithConn() && !::ChainstateActive().IsInitialBlockDownload() && pto->m_next_local_addr_send < current_time) { + if (fListen && pto->RelayAddrsWithConn() && + !::ChainstateActive().IsInitialBlockDownload() && + pto->m_next_local_addr_send < current_time) { // If we've sent before, clear the bloom filter for the peer, so that our // self-announcement will actually go out. // This might be unnecessary if the bloom filter has already rolled @@ -4349,7 +4429,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (pto->m_next_local_addr_send != 0us) { pto->m_addr_known->reset(); } - AdvertiseLocal(pto); + if (Optional<CAddress> local_addr = GetLocalAddrForPeer(pto)) { + FastRandomContext insecure_rand; + pto->PushAddress(*local_addr, insecure_rand); + } pto->m_next_local_addr_send = PoissonNextSend(current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); } @@ -4778,11 +4861,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (!pto->fClient && ((fFetch && !pto->m_limited_node) || !::ChainstateActive().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector<const CBlockIndex*> vToDownload; NodeId staller = -1; - FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams); + FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller); for (const CBlockIndex *pindex : vToDownload) { uint32_t nFetchFlags = GetFetchFlags(*pto); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); - MarkBlockAsInFlight(m_mempool, pto->GetId(), pindex->GetBlockHash(), pindex); + MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), pindex); LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, pto->GetId()); } @@ -4804,7 +4887,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) entry.second.GetHash().ToString(), entry.first); } for (const GenTxid& gtxid : requestable) { - if (!AlreadyHaveTx(gtxid, m_mempool)) { + if (!AlreadyHaveTx(gtxid)) { LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx", gtxid.GetHash().ToString(), pto->GetId()); vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash()); |