aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp343
1 files changed, 225 insertions, 118 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 3a956e89e7..03568283fb 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -101,7 +101,7 @@ namespace {
/** Blocks that are in flight, and that are in the queue to be downloaded. Protected by cs_main. */
struct QueuedBlock {
uint256 hash;
- CBlockIndex* pindex; //!< Optional.
+ const CBlockIndex* pindex; //!< Optional.
bool fValidatedHeaders; //!< Whether this block has validated headers at the time of request.
std::unique_ptr<PartiallyDownloadedBlock> partialBlock; //!< Optional, used for CMPCTBLOCK downloads
};
@@ -156,13 +156,13 @@ struct CNodeState {
//! List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
//! The best known block we know this peer has announced.
- CBlockIndex *pindexBestKnownBlock;
+ const CBlockIndex *pindexBestKnownBlock;
//! The hash of the last unknown block this peer has announced.
uint256 hashLastUnknownBlock;
//! The last full block we both have.
- CBlockIndex *pindexLastCommonBlock;
+ const CBlockIndex *pindexLastCommonBlock;
//! The best header we have sent our peer.
- CBlockIndex *pindexBestHeaderSent;
+ const CBlockIndex *pindexBestHeaderSent;
//! Length of current-streak of unconnecting headers announcements
int nUnconnectingHeaders;
//! Whether we've started headers synchronization with this peer.
@@ -331,7 +331,7 @@ bool MarkBlockAsReceived(const uint256& hash) {
// Requires cs_main.
// returns false, still setting pit, if the block was already in flight from the same peer
// pit will only be valid as long as the same cs_main lock is being held
-bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) {
+bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const Consensus::Params& consensusParams, const CBlockIndex *pindex = NULL, list<QueuedBlock>::iterator **pit = NULL) {
CNodeState *state = State(nodeid);
assert(state != NULL);
@@ -395,33 +395,38 @@ void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) {
}
}
-void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom, CConnman& connman) {
- if (!nodestate->fSupportsDesiredCmpctVersion) {
+void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connman) {
+ AssertLockHeld(cs_main);
+ CNodeState* nodestate = State(nodeid);
+ if (!nodestate || !nodestate->fSupportsDesiredCmpctVersion) {
// Never ask from peers who can't provide witnesses.
return;
}
if (nodestate->fProvidesHeaderAndIDs) {
for (std::list<NodeId>::iterator it = lNodesAnnouncingHeaderAndIDs.begin(); it != lNodesAnnouncingHeaderAndIDs.end(); it++) {
- if (*it == pfrom->GetId()) {
+ if (*it == nodeid) {
lNodesAnnouncingHeaderAndIDs.erase(it);
- lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
+ lNodesAnnouncingHeaderAndIDs.push_back(nodeid);
return;
}
}
- bool fAnnounceUsingCMPCTBLOCK = false;
- uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1;
- if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
- // As per BIP152, we only get 3 of our peers to announce
- // blocks using compact encodings.
- connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
- connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
- return true;
- });
- lNodesAnnouncingHeaderAndIDs.pop_front();
- }
- fAnnounceUsingCMPCTBLOCK = true;
- connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
- lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
+ connman.ForNode(nodeid, [&connman](CNode* pfrom){
+ bool fAnnounceUsingCMPCTBLOCK = false;
+ uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1;
+ if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
+ // As per BIP152, we only get 3 of our peers to announce
+ // blocks using compact encodings.
+ connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
+ connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
+ return true;
+ });
+ lNodesAnnouncingHeaderAndIDs.pop_front();
+ }
+ fAnnounceUsingCMPCTBLOCK = true;
+ connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
+ lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
+ return true;
+ });
}
}
@@ -432,7 +437,7 @@ bool CanDirectFetch(const Consensus::Params &consensusParams)
}
// Requires cs_main
-bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex)
+bool PeerHasHeader(CNodeState *state, const CBlockIndex *pindex)
{
if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight))
return true;
@@ -443,7 +448,7 @@ bool PeerHasHeader(CNodeState *state, CBlockIndex *pindex)
/** Find the last common ancestor two blocks have.
* Both pa and pb must be non-NULL. */
-CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) {
+const CBlockIndex* LastCommonAncestor(const CBlockIndex* pa, const CBlockIndex* pb) {
if (pa->nHeight > pb->nHeight) {
pa = pa->GetAncestor(pb->nHeight);
} else if (pb->nHeight > pa->nHeight) {
@@ -462,7 +467,7 @@ CBlockIndex* LastCommonAncestor(CBlockIndex* pa, CBlockIndex* pb) {
/** Update pindexLastCommonBlock and add not-in-flight missing successors to vBlocks, until it has
* at most count entries. */
-void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) {
+void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller, const Consensus::Params& consensusParams) {
if (count == 0)
return;
@@ -490,8 +495,8 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl
if (state->pindexLastCommonBlock == state->pindexBestKnownBlock)
return;
- std::vector<CBlockIndex*> vToFetch;
- CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
+ std::vector<const CBlockIndex*> vToFetch;
+ const CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
// Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last
// linked block we have in common with this peer. The +1 is so we can detect stalling, namely if we would be able to
// download that next block if the window were 1 larger.
@@ -514,7 +519,7 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<CBl
// are not yet downloaded and not in flight to vBlocks. In the mean time, update
// pindexLastCommonBlock as long as all ancestors are already downloaded, or if it's
// already part of our chain (and therefore don't need it even if pruned).
- BOOST_FOREACH(CBlockIndex* pindex, vToFetch) {
+ BOOST_FOREACH(const CBlockIndex* pindex, vToFetch) {
if (!pindex->IsValid(BLOCK_VALID_TREE)) {
// We consider the chain that this peer is on invalid.
return;
@@ -647,7 +652,7 @@ void EraseOrphansFor(NodeId peer)
nErased += EraseOrphanTx(maybeErase->second.tx->GetHash());
}
}
- if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer %d\n", nErased, peer);
+ if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer=%d\n", nErased, peer);
}
@@ -752,6 +757,51 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn
}
}
+static CCriticalSection cs_most_recent_block;
+static std::shared_ptr<const CBlock> most_recent_block;
+static std::shared_ptr<const CBlockHeaderAndShortTxIDs> most_recent_compact_block;
+static uint256 most_recent_block_hash;
+
+void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) {
+ std::shared_ptr<const CBlockHeaderAndShortTxIDs> pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs> (*pblock, true);
+ CNetMsgMaker msgMaker(PROTOCOL_VERSION);
+
+ LOCK(cs_main);
+
+ static int nHighestFastAnnounce = 0;
+ if (pindex->nHeight <= nHighestFastAnnounce)
+ return;
+ nHighestFastAnnounce = pindex->nHeight;
+
+ bool fWitnessEnabled = IsWitnessEnabled(pindex->pprev, Params().GetConsensus());
+ uint256 hashBlock(pblock->GetHash());
+
+ {
+ LOCK(cs_most_recent_block);
+ most_recent_block_hash = hashBlock;
+ most_recent_block = pblock;
+ most_recent_compact_block = pcmpctblock;
+ }
+
+ connman->ForEachNode([this, &pcmpctblock, pindex, &msgMaker, fWitnessEnabled, &hashBlock](CNode* pnode) {
+ // TODO: Avoid the repeated-serialization here
+ if (pnode->nVersion < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect)
+ return;
+ ProcessBlockAvailability(pnode->GetId());
+ CNodeState &state = *State(pnode->GetId());
+ // If the peer has, or we announced to them the previous block already,
+ // but we don't think they have this one, go ahead and announce it
+ if (state.fPreferHeaderAndIDs && (!fWitnessEnabled || state.fWantsCmpctWitness) &&
+ !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) {
+
+ LogPrint("net", "%s sending header-and-ids %s to peer=%d\n", "PeerLogicValidation::NewPoWValidBlock",
+ hashBlock.ToString(), pnode->id);
+ connman->PushMessage(pnode, msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock));
+ state.pindexBestHeaderSent = pindex;
+ }
+ });
+}
+
void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
const int nNewHeight = pindexNew->nHeight;
connman->SetBestHeight(nNewHeight);
@@ -777,6 +827,7 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB
}
}
});
+ connman->WakeMessageHandler();
}
nTimeBestReceived = GetTime();
@@ -798,6 +849,11 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationSta
Misbehaving(it->second.first, nDoS);
}
}
+ else if (state.IsValid() && !IsInitialBlockDownload() && mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) {
+ if (it != mapBlockSource.end()) {
+ MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, *connman);
+ }
+ }
if (it != mapBlockSource.end())
mapBlockSource.erase(it);
}
@@ -889,14 +945,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
vector<CInv> vNotFound;
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->nSendSize >= nMaxSendBufferSize)
+ if (pfrom->fPauseSend)
break;
const CInv &inv = *it;
@@ -912,6 +967,21 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
BlockMap::iterator mi = mapBlockIndex.find(inv.hash);
if (mi != mapBlockIndex.end())
{
+ if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) &&
+ mi->second->IsValid(BLOCK_VALID_TREE)) {
+ // If we have the block and all of its parents, but have not yet validated it,
+ // we might be in the middle of connecting it (ie in the unlock of cs_main
+ // before ActivateBestChain but after AcceptBlock).
+ // In this case, we need to run ActivateBestChain prior to checking the relay
+ // conditions below.
+ std::shared_ptr<const CBlock> a_recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ a_recent_block = most_recent_block;
+ }
+ CValidationState dummy;
+ ActivateBestChain(dummy, Params(), a_recent_block);
+ }
if (chainActive.Contains(mi->second)) {
send = true;
} else {
@@ -1049,7 +1119,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
-uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params& chainparams) {
+uint32_t GetFetchFlags(CNode* pfrom, const CBlockIndex* pprev, const Consensus::Params& chainparams) {
uint32_t nFetchFlags = 0;
if ((pfrom->GetLocalServices() & NODE_WITNESS) && State(pfrom->GetId())->fHaveWitness) {
nFetchFlags |= MSG_WITNESS_FLAG;
@@ -1057,10 +1127,25 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
return nFetchFlags;
}
+inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode* pfrom, CConnman& connman) {
+ BlockTransactions resp(req);
+ for (size_t i = 0; i < req.indexes.size(); i++) {
+ if (req.indexes[i] >= block.vtx.size()) {
+ LOCK(cs_main);
+ Misbehaving(pfrom->GetId(), 100);
+ LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id);
+ return;
+ }
+ resp.txn[i] = block.vtx[req.indexes[i]];
+ }
+ LOCK(cs_main);
+ CNetMsgMaker msgMaker(pfrom->GetSendVersion());
+ int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
+ connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
+}
+
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
-
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0)
{
@@ -1413,11 +1498,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Track requests for our stuff
GetMainSignals().Inventory(inv.hash);
-
- if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) {
- Misbehaving(pfrom->GetId(), 50);
- return error("send buffer size() = %u", pfrom->nSendSize);
- }
}
if (!vToFetch.empty())
@@ -1453,10 +1533,27 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
uint256 hashStop;
vRecv >> locator >> hashStop;
+ // We might have announced the currently-being-connected tip using a
+ // compact block, which resulted in the peer sending a getblocks
+ // request, which we would otherwise respond to without the new block.
+ // To avoid this situation we simply verify that we are on our best
+ // known chain now. This is super overkill, but we handle it better
+ // for getheaders requests, and there are no known nodes which support
+ // compact blocks but still use getblocks to request blocks.
+ {
+ std::shared_ptr<const CBlock> a_recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ a_recent_block = most_recent_block;
+ }
+ CValidationState dummy;
+ ActivateBestChain(dummy, Params(), a_recent_block);
+ }
+
LOCK(cs_main);
// Find the last block the caller has in the main chain
- CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
+ const CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
// Send the rest of the chain
if (pindex)
@@ -1496,6 +1593,18 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
BlockTransactionsRequest req;
vRecv >> req;
+ std::shared_ptr<const CBlock> recent_block;
+ {
+ LOCK(cs_most_recent_block);
+ if (most_recent_block_hash == req.blockhash)
+ recent_block = most_recent_block;
+ // Unlock cs_most_recent_block to avoid cs_main lock inversion
+ }
+ if (recent_block) {
+ SendBlockTransactions(*recent_block, req, pfrom, connman);
+ return true;
+ }
+
LOCK(cs_main);
BlockMap::iterator it = mapBlockIndex.find(req.blockhash);
@@ -1525,17 +1634,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
bool ret = ReadBlockFromDisk(block, it->second, chainparams.GetConsensus());
assert(ret);
- BlockTransactions resp(req);
- for (size_t i = 0; i < req.indexes.size(); i++) {
- if (req.indexes[i] >= block.vtx.size()) {
- Misbehaving(pfrom->GetId(), 100);
- LogPrintf("Peer %d sent us a getblocktxn with out-of-bounds tx indices", pfrom->id);
- return true;
- }
- resp.txn[i] = block.vtx[req.indexes[i]];
- }
- int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
- connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
+ SendBlockTransactions(block, req, pfrom, connman);
}
@@ -1552,7 +1651,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
CNodeState *nodestate = State(pfrom->GetId());
- CBlockIndex* pindex = NULL;
+ const CBlockIndex* pindex = NULL;
if (locator.IsNull())
{
// If locator is null, return the hashStop block
@@ -1583,6 +1682,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// if our peer has chainActive.Tip() (and thus we are sending an empty
// headers message). In both cases it's safe to update
// pindexBestHeaderSent to be our tip.
+ //
+ // It is important that we simply reset the BestHeaderSent value here,
+ // and not max(BestHeaderSent, newHeaderSent). We might have announced
+ // the currently-being-connected tip using a compact block, which
+ // resulted in the peer sending a headers request, which we respond to
+ // without the new block. By resetting the BestHeaderSent, we ensure we
+ // will re-announce the new block via headers (or compact blocks again)
+ // in the SendMessages logic.
nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip();
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
}
@@ -1715,6 +1822,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
LogPrint("mempool", "mapOrphan overflow, removed %u tx\n", nEvicted);
} else {
LogPrint("mempool", "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString());
+ // We will continue to reject this tx since it has rejected
+ // parents so avoid re-requesting it from other peers.
+ recentRejects->insert(tx.GetHash());
}
} else {
if (!tx.HasWitness() && !state.CorruptionPossible()) {
@@ -1775,7 +1885,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
}
- CBlockIndex *pindex = NULL;
+ const CBlockIndex *pindex = NULL;
CValidationState state;
if (!ProcessNewBlockHeaders({cmpctblock.header}, state, chainparams, &pindex)) {
int nDoS;
@@ -1873,12 +1983,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true;
}
- if (!fAlreadyInFlight && mapBlocksInFlight.size() == 1 && pindex->pprev->IsValid(BLOCK_VALID_CHAIN)) {
- // We seem to be rather well-synced, so it appears pfrom was the first to provide us
- // with this block! Let's get them to announce using compact blocks in the future.
- MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman);
- }
-
BlockTransactionsRequest req;
for (size_t i = 0; i < cmpctblock.BlockTxCount(); i++) {
if (!partialBlock.IsTxAvailable(i))
@@ -2051,7 +2155,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true;
}
- CBlockIndex *pindexLast = NULL;
+ const CBlockIndex *pindexLast = NULL;
{
LOCK(cs_main);
CNodeState *nodestate = State(pfrom->GetId());
@@ -2128,8 +2232,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// If this set of headers is valid and ends in a block with at least as
// much work as our tip, download as much as possible.
if (fCanDirectFetch && pindexLast->IsValid(BLOCK_VALID_TREE) && chainActive.Tip()->nChainWork <= pindexLast->nChainWork) {
- vector<CBlockIndex *> vToFetch;
- CBlockIndex *pindexWalk = pindexLast;
+ vector<const CBlockIndex *> vToFetch;
+ const CBlockIndex *pindexWalk = pindexLast;
// Calculate all the blocks we'd need to switch to pindexLast, up to a limit.
while (pindexWalk && !chainActive.Contains(pindexWalk) && vToFetch.size() <= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (!(pindexWalk->nStatus & BLOCK_HAVE_DATA) &&
@@ -2151,7 +2255,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} else {
vector<CInv> vGetData;
// Download as much as possible, from earliest to latest.
- BOOST_REVERSE_FOREACH(CBlockIndex *pindex, vToFetch) {
+ BOOST_REVERSE_FOREACH(const CBlockIndex *pindex, vToFetch) {
if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
// Can't download any more from this peer
break;
@@ -2168,9 +2272,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
if (vGetData.size() > 0) {
if (nodestate->fSupportsDesiredCmpctVersion && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN)) {
- // We seem to be rather well-synced, so it appears pfrom was the first to provide us
- // with this block! Let's get them to announce using compact blocks in the future.
- MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman);
// In any case, we want to download using a compact block, not a regular one
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
}
@@ -2447,14 +2548,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true;
}
-// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{
const CChainParams& chainparams = Params();
- unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
- //if (fDebug)
- // LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size());
-
//
// Message format
// (4) message start
@@ -2463,40 +2559,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
// (4) checksum
// (x) data
//
- bool fOk = true;
+ bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
+ if (pfrom->fDisconnect)
+ return false;
+
// this maintains the order of responses
- if (!pfrom->vRecvGetData.empty()) return fOk;
+ if (!pfrom->vRecvGetData.empty()) return true;
- std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
- while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
- if (pfrom->nSendSize >= nMaxSendBufferSize)
- break;
-
- // get next message
- CNetMessage& msg = *it;
-
- //if (fDebug)
- // LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__,
- // msg.hdr.nMessageSize, msg.vRecv.size(),
- // msg.complete() ? "Y" : "N");
-
- // end, if an incomplete message is found
- if (!msg.complete())
- break;
-
- // at this point, any failure means we can delete the current message
- it++;
+ if (pfrom->fPauseSend)
+ return false;
+ std::list<CNetMessage> msgs;
+ {
+ LOCK(pfrom->cs_vProcessMsg);
+ if (pfrom->vProcessMsg.empty())
+ return false;
+ // Just take one message
+ msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
+ pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
+ pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
+ fMoreWork = !pfrom->vProcessMsg.empty();
+ }
+ CNetMessage& msg(msgs.front());
+
+ msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
- fOk = false;
- break;
+ pfrom->fDisconnect = true;
+ return false;
}
// Read header
@@ -2504,7 +2600,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!hdr.IsValid(chainparams.MessageStart()))
{
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
- continue;
+ return fMoreWork;
}
string strCommand = hdr.GetCommand();
@@ -2520,7 +2616,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
- continue;
+ return fMoreWork;
}
// Process message
@@ -2529,7 +2625,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
{
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
if (interruptMsgProc)
- return true;
+ return false;
+ if (!pfrom->vRecvGetData.empty())
+ fMoreWork = true;
}
catch (const std::ios_base::failure& e)
{
@@ -2563,14 +2661,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!fRet)
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
- break;
- }
-
- // In case the connection got shut down, its receive buffer was wiped
- if (!pfrom->fDisconnect)
- pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
-
- return fOk;
+ return fMoreWork;
}
class CompareInvMempoolOrder
@@ -2742,7 +2833,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
bool fRevertToInv = ((!state.fPreferHeaders &&
(!state.fPreferHeaderAndIDs || pto->vBlockHashesToAnnounce.size() > 1)) ||
pto->vBlockHashesToAnnounce.size() > MAX_BLOCKS_TO_ANNOUNCE);
- CBlockIndex *pBestIndex = NULL; // last header queued for delivery
+ const CBlockIndex *pBestIndex = NULL; // last header queued for delivery
ProcessBlockAvailability(pto->id); // ensure pindexBestKnownBlock is up-to-date
if (!fRevertToInv) {
@@ -2753,7 +2844,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
BOOST_FOREACH(const uint256 &hash, pto->vBlockHashesToAnnounce) {
BlockMap::iterator mi = mapBlockIndex.find(hash);
assert(mi != mapBlockIndex.end());
- CBlockIndex *pindex = mi->second;
+ const CBlockIndex *pindex = mi->second;
if (chainActive[pindex->nHeight] != pindex) {
// Bail out if we reorged away from this block
fRevertToInv = true;
@@ -2797,15 +2888,31 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
if (vHeaders.size() == 1 && state.fPreferHeaderAndIDs) {
// We only send up to 1 block as header-and-ids, as otherwise
// probably means we're doing an initial-ish-sync or they're slow
- LogPrint("net", "%s sending header-and-ids %s to peer %d\n", __func__,
+ LogPrint("net", "%s sending header-and-ids %s to peer=%d\n", __func__,
vHeaders.front().GetHash().ToString(), pto->id);
- //TODO: Shouldn't need to reload block from disk, but requires refactor
- CBlock block;
- bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams);
- assert(ret);
- CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
+
int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
- connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+
+ bool fGotBlockFromCache = false;
+ {
+ LOCK(cs_most_recent_block);
+ if (most_recent_block_hash == pBestIndex->GetBlockHash()) {
+ if (state.fWantsCmpctWitness)
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block));
+ else {
+ CBlockHeaderAndShortTxIDs cmpctblock(*most_recent_block, state.fWantsCmpctWitness);
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+ }
+ fGotBlockFromCache = true;
+ }
+ }
+ if (!fGotBlockFromCache) {
+ CBlock block;
+ bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams);
+ assert(ret);
+ CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
+ connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
+ }
state.pindexBestHeaderSent = pBestIndex;
} else if (state.fPreferHeaders) {
if (vHeaders.size() > 1) {
@@ -2830,7 +2937,7 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
const uint256 &hashToAnnounce = pto->vBlockHashesToAnnounce.back();
BlockMap::iterator mi = mapBlockIndex.find(hashToAnnounce);
assert(mi != mapBlockIndex.end());
- CBlockIndex *pindex = mi->second;
+ const CBlockIndex *pindex = mi->second;
// Warn if we're announcing a block that is not on the main chain.
// This should be very rare and could be optimized out.
@@ -3015,10 +3122,10 @@ bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsg
//
vector<CInv> vGetData;
if (!pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
- vector<CBlockIndex*> vToDownload;
+ vector<const CBlockIndex*> vToDownload;
NodeId staller = -1;
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams);
- BOOST_FOREACH(CBlockIndex *pindex, vToDownload) {
+ BOOST_FOREACH(const CBlockIndex *pindex, vToDownload) {
uint32_t nFetchFlags = GetFetchFlags(pto, pindex->pprev, consensusParams);
vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()));
MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), consensusParams, pindex);