diff options
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 343 |
1 files changed, 225 insertions, 118 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index b1e606abae..36a5257635 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -104,7 +104,7 @@ namespace { /** Blocks that are in flight, and that are in the queue to be downloaded. Protected by cs_main. */ struct QueuedBlock { uint256 hash; - CBlockIndex* pindex; //!< Optional. + const CBlockIndex* pindex; //!< Optional. bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request. std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads }; @@ -159,13 +159,13 @@ struct CNodeState { //! List of asynchronously-determined block rejections to notify this peer about. std::vector<CBlockReject> rejects; //! The best known block we know this peer has announced. - CBlockIndex *pindexBestKnownBlock; + const CBlockIndex *pindexBestKnownBlock; //! The hash of the last unknown block this peer has announced. uint256 hashLastUnknownBlock; //! The last full block we both have. - CBlockIndex *pindexLastCommonBlock; + const CBlockIndex *pindexLastCommonBlock; //! The best header we have sent our peer. - CBlockIndex *pindexBestHeaderSent; + const CBlockIndex *pindexBestHeaderSent; //! Length of current-streak of unconnecting headers announcements int nUnconnectingHeaders; //! Whether we've started headers synchronization with this peer. @@ -334,7 +334,7 @@ bool MarkBlockAsReceived(const uint256& hash) { // Requires cs_main. // returns false, still setting pit, if the block was already in flight from the same peer // pit will only be valid as long as the same cs_main lock is being held -bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) { +bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, const CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) { CNodeState *state = State(nodeid); assert(state != NULL); @@ -398,33 +398,38 @@ void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) { } } -void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom, CConnman& connman) { - if (!nodestate->fSupportsDesiredCmpctVersion) { +void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connman) { + AssertLockHeld(cs_main); + CNodeState* nodestate = State(nodeid); + if (!nodestate || !nodestate->fSupportsDesiredCmpctVersion) { // Never ask from peers who can't provide witnesses. return; } if (nodestate->fProvidesHeaderAndIDs) { for (std::list<NodeId>::iterator it = lNodesAnnouncingHeaderAndIDs.begin(); it != lNodesAnnouncingHeaderAndIDs.end(); it++) { - if (*it == pfrom->GetId()) { + if (*it == nodeid) { lNodesAnnouncingHeaderAndIDs.erase(it); - lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); + lNodesAnnouncingHeaderAndIDs.push_back(nodeid); return; } } - bool fAnnounceUsingCMPCTBLOCK = false; - uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1; - if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { - // As per BIP152, we only get 3 of our peers to announce - // blocks using compact encodings. - connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){ - connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); - return true; - }); - lNodesAnnouncingHeaderAndIDs.pop_front(); - } - fAnnounceUsingCMPCTBLOCK = true; - connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); - lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); + connman.ForNode(nodeid, [&connman](CNode* pfrom){ + bool fAnnounceUsingCMPCTBLOCK = false; + uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1; + if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { + // As per BIP152, we only get 3 of our peers to announce + // blocks using compact encodings. + connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){ + connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + return true; + }); + lNodesAnnouncingHeaderAndIDs.pop_front(); + } + fAnnounceUsingCMPCTBLOCK = true; + connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); + return true; + }); } } @@ -435,7 +440,7 @@ bool CanDirectFetch(const Consensus::Params &consensusParams) } // Requires cs_main -bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex) +bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex) { if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight)) return true; @@ -446,7 +451,7 @@ bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex) /** Find the last common ancestor two blocks have. * Both pa and pb must be non-NULL. */ -CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) { +const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb) { if (pa->nHeight > pb->nHeight) { pa = pa->GetAncestor(pb->nHeight); } else if (pb->nHeight > pa->nHeight) { @@ -465,7 +470,7 @@ CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) { /** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has * at most count entries. */ -void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) { +void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) { if (count == 0) return; @@ -493,8 +498,8 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl if (state->pindexLastCommonBlock == state->pindexBestKnownBlock) return; - std::vector<CBlockIndex*> vToFetch; - CBlockIndex *pindexWalk = state->pindexLastCommonBlock; + std::vector<const CBlockIndex*> vToFetch; + const CBlockIndex *pindexWalk = state->pindexLastCommonBlock; // Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last // linked block we have in common with this peer. The +1 is so we can detect stalling, namely if we would be able to // download that next block if the window were 1 larger. @@ -517,7 +522,7 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl // are not yet downloaded and not in flight to vBlocks. In the mean time, update // pindexLastCommonBlock as long as all ancestors are already downloaded, or if it's // already part of our chain (and therefore don't need it even if pruned). - BOOST_FOREACH(CBlockIndex* pindex, vToFetch) { + BOOST_FOREACH(const CBlockIndex* pindex, vToFetch) { if (!pindex->IsValid(BLOCK_VALID_TREE)) { // We consider the chain that this peer is on invalid. return; @@ -663,7 +668,7 @@ void EraseOrphansFor(NodeId peer) nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); } } - if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer %d\n", nErased, peer); + if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer=%d\n", nErased, peer); } @@ -768,6 +773,51 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn } } +static CCriticalSection cs_most_recent_block; +static std::shared_ptr<const CBlock> most_recent_block; +static std::shared_ptr<const CBlockHeaderAndShortTxIDs> most_recent_compact_block; +static uint256 most_recent_block_hash; + +void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) { + std::shared_ptr<const CBlockHeaderAndShortTxIDs> pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs> (*pblock, true); + CNetMsgMaker msgMaker(PROTOCOL_VERSION); + + LOCK(cs_main); + + static int nHighestFastAnnounce = 0; + if (pindex->nHeight <= nHighestFastAnnounce) + return; + nHighestFastAnnounce = pindex->nHeight; + + bool fWitnessEnabled = IsWitnessEnabled(pindex->pprev, Params().GetConsensus()); + uint256 hashBlock(pblock->GetHash()); + + { + LOCK(cs_most_recent_block); + most_recent_block_hash = hashBlock; + most_recent_block = pblock; + most_recent_compact_block = pcmpctblock; + } + + connman->ForEachNode([this, &pcmpctblock, pindex, &msgMaker, fWitnessEnabled, &hashBlock](CNode* pnode) { + // TODO: Avoid the repeated-serialization here + if (pnode->nVersion < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect) + return; + ProcessBlockAvailability(pnode->GetId()); + CNodeState &state = *State(pnode->GetId()); + // If the peer has, or we announced to them the previous block already, + // but we don't think they have this one, go ahead and announce it + if (state.fPreferHeaderAndIDs && (!fWitnessEnabled || state.fWantsCmpctWitness) && + !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) { + + LogPrint("net", "%s sending header-and-ids %s to peer=%d\n", "PeerLogicValidation::NewPoWValidBlock", + hashBlock.ToString(), pnode->id); + connman->PushMessage(pnode, msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock)); + state.pindexBestHeaderSent = pindex; + } + }); +} + void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { const int nNewHeight = pindexNew->nHeight; connman->SetBestHeight(nNewHeight); @@ -793,6 +843,7 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB } } }); + connman->WakeMessageHandler(); } nTimeBestReceived = GetTime(); @@ -814,6 +865,11 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationSta Misbehaving(it->second.first, nDoS); } } + else if (state.IsValid() && !IsInitialBlockDownload() && mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) { + if (it != mapBlockSource.end()) { + MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, *connman); + } + } if (it != mapBlockSource.end()) mapBlockSource.erase(it); } @@ -905,14 +961,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); vector<CInv> vNotFound; CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); while (it != pfrom->vRecvGetData.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) + if (pfrom->fPauseSend) break; const CInv &inv = *it; @@ -928,6 +983,21 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam BlockMap::iterator mi = mapBlockIndex.find(inv.hash); if (mi != mapBlockIndex.end()) { + if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && + mi->second->IsValid(BLOCK_VALID_TREE)) { + // If we have the block and all of its parents, but have not yet validated it, + // we might be in the middle of connecting it (ie in the unlock of cs_main + // before ActivateBestChain but after AcceptBlock). + // In this case, we need to run ActivateBestChain prior to checking the relay + // conditions below. + std::shared_ptr<const CBlock> a_recent_block; + { + LOCK(cs_most_recent_block); + a_recent_block = most_recent_block; + } + CValidationState dummy; + ActivateBestChain(dummy, Params(), a_recent_block); + } if (chainActive.Contains(mi->second)) { send = true; } else { @@ -1065,7 +1135,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam } } -uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params& chainparams) { +uint32_t GetFetchFlags(CNode* pfrom, const CBlockIndex* pprev, const Consensus::Params& chainparams) { uint32_t nFetchFlags = 0; if ((pfrom->GetLocalServices() & NODE_WITNESS) && State(pfrom->GetId())->fHaveWitness) { nFetchFlags |= MSG_WITNESS_FLAG; @@ -1073,10 +1143,25 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } +inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode* pfrom, CConnman& connman) { + BlockTransactions resp(req); + for (size_t i = 0; i < req.indexes.size(); i++) { + if (req.indexes[i] >= block.vtx.size()) { + LOCK(cs_main); + Misbehaving(pfrom->GetId(), 100); + LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id); + return; + } + resp.txn[i] = block.vtx[req.indexes[i]]; + } + LOCK(cs_main); + CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; + connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); +} + bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { @@ -1429,11 +1514,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Track requests for our stuff GetMainSignals().Inventory(inv.hash); - - if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) { - Misbehaving(pfrom->GetId(), 50); - return error("send buffer size() = %u", pfrom->nSendSize); - } } if (!vToFetch.empty()) @@ -1469,10 +1549,27 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, uint256 hashStop; vRecv >> locator >> hashStop; + // We might have announced the currently-being-connected tip using a + // compact block, which resulted in the peer sending a getblocks + // request, which we would otherwise respond to without the new block. + // To avoid this situation we simply verify that we are on our best + // known chain now. This is super overkill, but we handle it better + // for getheaders requests, and there are no known nodes which support + // compact blocks but still use getblocks to request blocks. + { + std::shared_ptr<const CBlock> a_recent_block; + { + LOCK(cs_most_recent_block); + a_recent_block = most_recent_block; + } + CValidationState dummy; + ActivateBestChain(dummy, Params(), a_recent_block); + } + LOCK(cs_main); // Find the last block the caller has in the main chain - CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator); + const CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator); // Send the rest of the chain if (pindex) @@ -1512,6 +1609,18 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, BlockTransactionsRequest req; vRecv >> req; + std::shared_ptr<const CBlock> recent_block; + { + LOCK(cs_most_recent_block); + if (most_recent_block_hash == req.blockhash) + recent_block = most_recent_block; + // Unlock cs_most_recent_block to avoid cs_main lock inversion + } + if (recent_block) { + SendBlockTransactions(*recent_block, req, pfrom, connman); + return true; + } + LOCK(cs_main); BlockMap::iterator it = mapBlockIndex.find(req.blockhash); @@ -1541,17 +1650,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool ret = ReadBlockFromDisk(block, it->second, chainparams.GetConsensus()); assert(ret); - BlockTransactions resp(req); - for (size_t i = 0; i < req.indexes.size(); i++) { - if (req.indexes[i] >= block.vtx.size()) { - Misbehaving(pfrom->GetId(), 100); - LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id); - return true; - } - resp.txn[i] = block.vtx[req.indexes[i]]; - } - int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; - connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); + SendBlockTransactions(block, req, pfrom, connman); } @@ -1568,7 +1667,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } CNodeState *nodestate = State(pfrom->GetId()); - CBlockIndex* pindex = NULL; + const CBlockIndex* pindex = NULL; if (locator.IsNull()) { // If locator is null, return the hashStop block @@ -1599,6 +1698,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // if our peer has chainActive.Tip() (and thus we are sending an empty // headers message). In both cases it's safe to update // pindexBestHeaderSent to be our tip. + // + // It is important that we simply reset the BestHeaderSent value here, + // and not max(BestHeaderSent, newHeaderSent). We might have announced + // the currently-being-connected tip using a compact block, which + // resulted in the peer sending a headers request, which we respond to + // without the new block. By resetting the BestHeaderSent, we ensure we + // will re-announce the new block via headers (or compact blocks again) + // in the SendMessages logic. nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip(); connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); } @@ -1733,6 +1840,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("mempool", "mapOrphan overflow, removed %u tx\n", nEvicted); } else { LogPrint("mempool", "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString()); + // We will continue to reject this tx since it has rejected + // parents so avoid re-requesting it from other peers. + recentRejects->insert(tx.GetHash()); } } else { if (!tx.HasWitness() && !state.CorruptionPossible()) { @@ -1802,7 +1912,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } } - CBlockIndex *pindex = NULL; + const CBlockIndex *pindex = NULL; CValidationState state; if (!ProcessNewBlockHeaders({cmpctblock.header}, state, chainparams, &pindex)) { int nDoS; @@ -1900,12 +2010,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } - if (!fAlreadyInFlight && mapBlocksInFlight.size() == 1 && pindex->pprev->IsValid(BLOCK_VALID_CHAIN)) { - // We seem to be rather well-synced, so it appears pfrom was the first to provide us - // with this block! Let's get them to announce using compact blocks in the future. - MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman); - } - BlockTransactionsRequest req; for (size_t i = 0; i < cmpctblock.BlockTxCount(); i++) { if (!partialBlock.IsTxAvailable(i)) @@ -2078,7 +2182,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } - CBlockIndex *pindexLast = NULL; + const CBlockIndex *pindexLast = NULL; { LOCK(cs_main); CNodeState *nodestate = State(pfrom->GetId()); @@ -2155,8 +2259,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // If this set of headers is valid and ends in a block with at least as // much work as our tip, download as much as possible. if (fCanDirectFetch && pindexLast->IsValid(BLOCK_VALID_TREE) && chainActive.Tip()->nChainWork <= pindexLast->nChainWork) { - vector<CBlockIndex *> vToFetch; - CBlockIndex *pindexWalk = pindexLast; + vector<const CBlockIndex *> vToFetch; + const CBlockIndex *pindexWalk = pindexLast; // Calculate all the blocks we'd need to switch to pindexLast, up to a limit. while (pindexWalk && !chainActive.Contains(pindexWalk) && vToFetch.size() <= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { if (!(pindexWalk->nStatus & BLOCK_HAVE_DATA) && @@ -2178,7 +2282,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } else { vector<CInv> vGetData; // Download as much as possible, from earliest to latest. - BOOST_REVERSE_FOREACH(CBlockIndex *pindex, vToFetch) { + BOOST_REVERSE_FOREACH(const CBlockIndex *pindex, vToFetch) { if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { // Can't download any more from this peer break; @@ -2195,9 +2299,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } if (vGetData.size() > 0) { if (nodestate->fSupportsDesiredCmpctVersion && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) { - // We seem to be rather well-synced, so it appears pfrom was the first to provide us - // with this block! Let's get them to announce using compact blocks in the future. - MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman); // In any case, we want to download using a compact block, not a regular one vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); } @@ -2474,14 +2575,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, return true; } -// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); - unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); - //if (fDebug) - // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size()); - // // Message format // (4) message start @@ -2490,40 +2586,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru // (4) checksum // (x) data // - bool fOk = true; + bool fMoreWork = false; if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (pfrom->fDisconnect) + return false; + // this maintains the order of responses - if (!pfrom->vRecvGetData.empty()) return fOk; + if (!pfrom->vRecvGetData.empty()) return true; - std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin(); - while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->nSendSize >= nMaxSendBufferSize) - break; - - // get next message - CNetMessage& msg = *it; - - //if (fDebug) - // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, - // msg.hdr.nMessageSize, msg.vRecv.size(), - // msg.complete() ? "Y" : "N"); - - // end, if an incomplete message is found - if (!msg.complete()) - break; - - // at this point, any failure means we can delete the current message - it++; + if (pfrom->fPauseSend) + return false; + std::list<CNetMessage> msgs; + { + LOCK(pfrom->cs_vProcessMsg); + if (pfrom->vProcessMsg.empty()) + return false; + // Just take one message + msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); + pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE; + pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize(); + fMoreWork = !pfrom->vProcessMsg.empty(); + } + CNetMessage& msg(msgs.front()); + + msg.SetVersion(pfrom->GetRecvVersion()); // Scan for message start if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); - fOk = false; - break; + pfrom->fDisconnect = true; + return false; } // Read header @@ -2531,7 +2627,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (!hdr.IsValid(chainparams.MessageStart())) { LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); - continue; + return fMoreWork; } string strCommand = hdr.GetCommand(); @@ -2547,7 +2643,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru SanitizeString(strCommand), nMessageSize, HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - continue; + return fMoreWork; } // Process message @@ -2556,7 +2652,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru { fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); if (interruptMsgProc) - return true; + return false; + if (!pfrom->vRecvGetData.empty()) + fMoreWork = true; } catch (const std::ios_base::failure& e) { @@ -2590,14 +2688,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru if (!fRet) LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); - break; - } - - // In case the connection got shut down, its receive buffer was wiped - if (!pfrom->fDisconnect) - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); - - return fOk; + return fMoreWork; } class CompareInvMempoolOrder @@ -2769,7 +2860,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg bool fRevertToInv = ((!state.fPreferHeaders && (!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) || pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE); - CBlockIndex *pBestIndex = NULL; // last header queued for delivery + const CBlockIndex *pBestIndex = NULL; // last header queued for delivery ProcessBlockAvailability(pto->id); // ensure pindexBestKnownBlock is up-to-date if (!fRevertToInv) { @@ -2780,7 +2871,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg BOOST_FOREACH(const uint256 &hash, pto->vBlockHashesToAnnounce) { BlockMap::iterator mi = mapBlockIndex.find(hash); assert(mi != mapBlockIndex.end()); - CBlockIndex *pindex = mi->second; + const CBlockIndex *pindex = mi->second; if (chainActive[pindex->nHeight] != pindex) { // Bail out if we reorged away from this block fRevertToInv = true; @@ -2824,15 +2915,31 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg if (vHeaders.size() == 1 && state.fPreferHeaderAndIDs) { // We only send up to 1 block as header-and-ids, as otherwise // probably means we're doing an initial-ish-sync or they're slow - LogPrint("net", "%s sending header-and-ids %s to peer %d\n", __func__, + LogPrint("net", "%s sending header-and-ids %s to peer=%d\n", __func__, vHeaders.front().GetHash().ToString(), pto->id); - //TODO: Shouldn't need to reload block from disk, but requires refactor - CBlock block; - bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); - assert(ret); - CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness); + int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; - connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); + + bool fGotBlockFromCache = false; + { + LOCK(cs_most_recent_block); + if (most_recent_block_hash == pBestIndex->GetBlockHash()) { + if (state.fWantsCmpctWitness) + connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); + else { + CBlockHeaderAndShortTxIDs cmpctblock(*most_recent_block, state.fWantsCmpctWitness); + connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); + } + fGotBlockFromCache = true; + } + } + if (!fGotBlockFromCache) { + CBlock block; + bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); + assert(ret); + CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness); + connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); + } state.pindexBestHeaderSent = pBestIndex; } else if (state.fPreferHeaders) { if (vHeaders.size() > 1) { @@ -2857,7 +2964,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back(); BlockMap::iterator mi = mapBlockIndex.find(hashToAnnounce); assert(mi != mapBlockIndex.end()); - CBlockIndex *pindex = mi->second; + const CBlockIndex *pindex = mi->second; // Warn if we're announcing a block that is not on the main chain. // This should be very rare and could be optimized out. @@ -3042,10 +3149,10 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg // vector<CInv> vGetData; if (!pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { - vector<CBlockIndex*> vToDownload; + vector<const CBlockIndex*> vToDownload; NodeId staller = -1; FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams); - BOOST_FOREACH(CBlockIndex *pindex, vToDownload) { + BOOST_FOREACH(const CBlockIndex *pindex, vToDownload) { uint32_t nFetchFlags = GetFetchFlags(pto, pindex->pprev, consensusParams); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), consensusParams, pindex); |