diff options
author | Matt Corallo <git@bluematt.me> | 2017-12-24 11:58:20 -0500 |
---|---|---|
committer | Matt Corallo <git@bluematt.me> | 2017-12-24 13:08:38 -0500 |
commit | 66aa1d58a158991a8014a91335b5bc9c00062f56 (patch) | |
tree | 422b97c1a541a694d980be42b31dd5d6197121ef | |
parent | 818075adaca9104539f056cdb858e135a49de2ed (diff) |
Refactor ProcessGetData in anticipation of avoiding cs_main for ABC
-rw-r--r-- | src/net_processing.cpp | 321 |
1 files changed, 166 insertions, 155 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d1a89ed91c..3225d429b9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1038,182 +1038,193 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connma connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) +void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic<bool>& interruptMsgProc) { - std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); - std::vector<CInv> vNotFound; - const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); - while (it != pfrom->vRecvGetData.end()) { - // Don't bother if send buffer is too full to respond anyway - if (pfrom->fPauseSend) - break; + bool send = false; + std::shared_ptr<const CBlock> a_recent_block; + std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block; + bool fWitnessesPresentInARecentCompactBlock; + { + LOCK(cs_most_recent_block); + a_recent_block = most_recent_block; + a_recent_compact_block = most_recent_compact_block; + fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock; + } - const CInv &inv = *it; + { + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) { - if (interruptMsgProc) - return; + if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && + mi->second->IsValid(BLOCK_VALID_TREE)) { + // If we have the block and all of its parents, but have not yet validated it, + // we might be in the middle of connecting it (ie in the unlock of cs_main + // before ActivateBestChain but after AcceptBlock). + // In this case, we need to run ActivateBestChain prior to checking the relay + // conditions below. + CValidationState dummy; + ActivateBestChain(dummy, Params(), a_recent_block); + } + } + } + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) { + send = BlockRequestAllowed(mi->second, consensusParams); + if (!send) { + LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId()); + } + } + const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + // disconnect node in case we have reached the outbound limit for serving historical blocks + // never disconnect whitelisted nodes + if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) + { + LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); - it++; + //disconnect node + pfrom->fDisconnect = true; + send = false; + } + // Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold + if (send && !pfrom->fWhitelisted && ( + (((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) ) + )) { + LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId()); - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) + //disconnect node and prevent it from stalling (would otherwise wait for the missing block) + pfrom->fDisconnect = true; + send = false; + } + // Pruned nodes may have deleted the block, so check whether + // it's available before trying to send. + if (send && (mi->second->nStatus & BLOCK_HAVE_DATA)) + { + std::shared_ptr<const CBlock> pblock; + if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) { + pblock = a_recent_block; + } else { + // Send block from disk + std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>(); + if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams)) + assert(!"cannot load block from disk"); + pblock = pblockRead; + } + if (inv.type == MSG_BLOCK) + connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); + else if (inv.type == MSG_WITNESS_BLOCK) + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); + else if (inv.type == MSG_FILTERED_BLOCK) + { + bool sendMerkleBlock = false; + CMerkleBlock merkleBlock; { - bool send = false; - BlockMap::iterator mi = mapBlockIndex.find(inv.hash); - std::shared_ptr<const CBlock> a_recent_block; - std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block; - bool fWitnessesPresentInARecentCompactBlock; - { - LOCK(cs_most_recent_block); - a_recent_block = most_recent_block; - a_recent_compact_block = most_recent_compact_block; - fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock; + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) { + sendMerkleBlock = true; + merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); } - if (mi != mapBlockIndex.end()) - { - if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && - mi->second->IsValid(BLOCK_VALID_TREE)) { - // If we have the block and all of its parents, but have not yet validated it, - // we might be in the middle of connecting it (ie in the unlock of cs_main - // before ActivateBestChain but after AcceptBlock). - // In this case, we need to run ActivateBestChain prior to checking the relay - // conditions below. - CValidationState dummy; - ActivateBestChain(dummy, Params(), a_recent_block); - } - send = BlockRequestAllowed(mi->second, consensusParams); - if (!send) { - LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId()); - } + } + if (sendMerkleBlock) { + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); + // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see + // This avoids hurting performance by pointlessly requiring a round-trip + // Note that there is currently no way for a node to request any single transactions we didn't send here - + // they must either disconnect and retry or request the full block. + // Thus, the protocol spec specified allows for us to provide duplicate txn here, + // however we MUST always provide at least what the remote peer needs + typedef std::pair<unsigned int, uint256> PairType; + for (PairType& pair : merkleBlock.vMatchedTxn) + connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); + } + // else + // no response + } + else if (inv.type == MSG_CMPCT_BLOCK) + { + // If a peer is asking for old blocks, we're almost guaranteed + // they won't have a useful mempool to match against a compact block, + // and we don't feel like constructing the object for them, so + // instead we respond with the full, non-compact block. + bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness; + int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; + if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { + if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); + } else { + CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); } - // disconnect node in case we have reached the outbound limit for serving historical blocks - // never disconnect whitelisted nodes - if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) - { - LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); + } else { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock)); + } + } - //disconnect node - pfrom->fDisconnect = true; - send = false; - } - // Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold - if (send && !pfrom->fWhitelisted && ( - (((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) ) - )) { - LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId()); + // Trigger the peer node to send a getblocks request for the next batch of inventory + if (inv.hash == pfrom->hashContinue) + { + // Bypass PushInventory, this must send even if redundant, + // and we want it right after the last block so they don't + // wait for other stuff first. + std::vector<CInv> vInv; + vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); + pfrom->hashContinue.SetNull(); + } + } +} - //disconnect node and prevent it from stalling (would otherwise wait for the missing block) - pfrom->fDisconnect = true; - send = false; - } - // Pruned nodes may have deleted the block, so check whether - // it's available before trying to send. - if (send && (mi->second->nStatus & BLOCK_HAVE_DATA)) - { - std::shared_ptr<const CBlock> pblock; - if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) { - pblock = a_recent_block; - } else { - // Send block from disk - std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>(); - if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams)) - assert(!"cannot load block from disk"); - pblock = pblockRead; - } - if (inv.type == MSG_BLOCK) - connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_WITNESS_BLOCK) - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_FILTERED_BLOCK) - { - bool sendMerkleBlock = false; - CMerkleBlock merkleBlock; - { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) { - sendMerkleBlock = true; - merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); - } - } - if (sendMerkleBlock) { - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); - // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see - // This avoids hurting performance by pointlessly requiring a round-trip - // Note that there is currently no way for a node to request any single transactions we didn't send here - - // they must either disconnect and retry or request the full block. - // Thus, the protocol spec specified allows for us to provide duplicate txn here, - // however we MUST always provide at least what the remote peer needs - typedef std::pair<unsigned int, uint256> PairType; - for (PairType& pair : merkleBlock.vMatchedTxn) - connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); - } - // else - // no response - } - else if (inv.type == MSG_CMPCT_BLOCK) - { - // If a peer is asking for old blocks, we're almost guaranteed - // they won't have a useful mempool to match against a compact block, - // and we don't feel like constructing the object for them, so - // instead we respond with the full, non-compact block. - bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness; - int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; - if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { - if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); - } else { - CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); - } - } else { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock)); - } - } +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) +{ + std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); + std::vector<CInv> vNotFound; + const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + { + LOCK(cs_main); - // Trigger the peer node to send a getblocks request for the next batch of inventory - if (inv.hash == pfrom->hashContinue) - { - // Bypass PushInventory, this must send even if redundant, - // and we want it right after the last block so they don't - // wait for other stuff first. - std::vector<CInv> vInv; - vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); - pfrom->hashContinue.SetNull(); - } - } - } - else if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) - { - // Send stream from relay memory - bool push = false; - auto mi = mapRelay.find(inv.hash); - int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); - if (mi != mapRelay.end()) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); + while (it != pfrom->vRecvGetData.end() && (it->type == MSG_TX || it->type == MSG_WITNESS_TX)) { + if (interruptMsgProc) + return; + // Don't bother if send buffer is too full to respond anyway + if (pfrom->fPauseSend) + break; + + const CInv &inv = *it; + it++; + + // Send stream from relay memory + bool push = false; + auto mi = mapRelay.find(inv.hash); + int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); + if (mi != mapRelay.end()) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); + push = true; + } else if (pfrom->timeLastMempoolReq) { + auto txinfo = mempool.info(inv.hash); + // To protect privacy, do not answer getdata using the mempool when + // that TX couldn't have been INVed in reply to a MEMPOOL request. + if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); push = true; - } else if (pfrom->timeLastMempoolReq) { - auto txinfo = mempool.info(inv.hash); - // To protect privacy, do not answer getdata using the mempool when - // that TX couldn't have been INVed in reply to a MEMPOOL request. - if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); - push = true; - } - } - if (!push) { - vNotFound.push_back(inv); } } + if (!push) { + vNotFound.push_back(inv); + } // Track requests for our stuff. GetMainSignals().Inventory(inv.hash); + } - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) - break; + if (it != pfrom->vRecvGetData.end()) { + const CInv &inv = *it; + it++; + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { + ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc); + } } - } + } // release cs_main pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); |