aboutsummaryrefslogtreecommitdiff
path: root/src/main.cpp
diff options
context:
space:
mode:
authorPieter Wuille <pieter.wuille@gmail.com>2014-01-10 13:23:26 +0100
committerPieter Wuille <pieter.wuille@gmail.com>2014-02-08 16:52:19 +0100
commitf59d8f0b644d49324cabd19c58cf2262d49e1392 (patch)
treeb699fa0e1f6a0ca3fc09ef0ef50c4f30a6ad5b9c /src/main.cpp
parent95e66247ebaac88dadd081f850ebf86c71831e61 (diff)
Per-peer block download tracking and stalled download detection.
Keep track of which block is being requested (and to be requested) from each peer, and limit the number of blocks in-flight per peer. In addition, detect stalled downloads, and disconnect if they persist for too long. This means blocks are never requested twice, and should eliminate duplicate downloads during synchronization.
Diffstat (limited to 'src/main.cpp')
-rw-r--r--src/main.cpp149
1 files changed, 122 insertions, 27 deletions
diff --git a/src/main.cpp b/src/main.cpp
index d33ad2fa95..b2873f2a10 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1;
// Sources of received blocks, to be able to send them reject messages or ban
// them, if processing happens afterwards. Protected by cs_main.
map<uint256, NodeId> mapBlockSource;
+
+// Blocks that are in flight, and that are in the queue to be downloaded.
+// Protected by cs_main.
+struct QueuedBlock {
+ uint256 hash;
+ int64_t nTime; // Time of "getdata" request in microseconds.
+ int nQueuedBefore; // Number of blocks in flight at the time of request.
+};
+map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
+map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
}
//////////////////////////////////////////////////////////////////////////////
@@ -195,10 +205,20 @@ struct CNodeState {
std::string name;
// List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
+ list<QueuedBlock> vBlocksInFlight;
+ int nBlocksInFlight;
+ list<uint256> vBlocksToDownload;
+ int nBlocksToDownload;
+ int64_t nLastBlockReceive;
+ int64_t nLastBlockProcess;
CNodeState() {
nMisbehavior = 0;
fShouldBan = false;
+ nBlocksToDownload = 0;
+ nBlocksInFlight = 0;
+ nLastBlockReceive = 0;
+ nLastBlockProcess = 0;
}
};
@@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
void FinalizeNode(NodeId nodeid) {
LOCK(cs_main);
+ CNodeState *state = State(nodeid);
+
+ BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
+ mapBlocksInFlight.erase(entry.hash);
+ BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
+ mapBlocksToDownload.erase(hash);
+
mapNodeState.erase(nodeid);
}
+
+// Requires cs_main.
+void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
+ map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
+ if (itToDownload != mapBlocksToDownload.end()) {
+ CNodeState *state = State(itToDownload->second.first);
+ state->vBlocksToDownload.erase(itToDownload->second.second);
+ state->nBlocksToDownload--;
+ mapBlocksToDownload.erase(itToDownload);
+ }
+
+ map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
+ if (itInFlight != mapBlocksInFlight.end()) {
+ CNodeState *state = State(itInFlight->second.first);
+ state->vBlocksInFlight.erase(itInFlight->second.second);
+ state->nBlocksInFlight--;
+ if (itInFlight->second.first == nodeFrom)
+ state->nLastBlockReceive = GetTimeMicros();
+ mapBlocksInFlight.erase(itInFlight);
+ }
+
+}
+
+// Requires cs_main.
+bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
+ if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
+ return false;
+
+ CNodeState *state = State(nodeid);
+ if (state == NULL)
+ return false;
+
+ list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
+ state->nBlocksToDownload++;
+ if (state->nBlocksToDownload > 5000)
+ Misbehaving(nodeid, 10);
+ mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
+ return true;
+}
+
+// Requires cs_main.
+void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
+ CNodeState *state = State(nodeid);
+ assert(state != NULL);
+
+ // Make sure it's not listed somewhere already.
+ MarkBlockAsReceived(hash);
+
+ QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
+ if (state->nBlocksInFlight == 0)
+ state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
+ list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
+ state->nBlocksInFlight++;
+ mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
+}
+
}
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
@@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
CheckForkWarningConditions();
}
+// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
if (howmuch == 0)
@@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
pindexNew->nSequenceId = nBlockSequenceId++;
}
assert(pindexNew);
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
pindexNew->phashBlock = &((*mi).first);
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
@@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
// Preliminary checks
- if (!CheckBlock(*pblock, state)) {
- if (state.CorruptionPossible())
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
+ if (!CheckBlock(*pblock, state))
return error("ProcessBlock() : CheckBlock FAILED");
- }
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
@@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}
-
+ State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
@@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return error("message inv size() = %"PRIszu"", vInv.size());
}
- // find last block in inv vector
- unsigned int nLastBlock = (unsigned int)(-1);
- for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
- if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
- nLastBlock = vInv.size() - 1 - nInv;
- break;
- }
- }
-
LOCK(cs_main);
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
@@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
if (!fAlreadyHave) {
- if (!fImporting && !fReindex)
- pfrom->AskFor(inv);
+ if (!fImporting && !fReindex) {
+ if (inv.type == MSG_BLOCK)
+ AddBlockToQueue(pfrom->GetId(), inv.hash);
+ else
+ pfrom->AskFor(inv);
+ }
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
- } else if (nInv == nLastBlock) {
- // In case we are on a very long side-chain, it is possible that we already have
- // the last block in an inv bundle sent in response to getblocks. Try to detect
- // this situation and push another getblocks to continue.
- PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
- if (fDebug)
- LogPrintf("force request: %s\n", inv.ToString());
}
// Track requests for our stuff
@@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LOCK(cs_main);
// Remember who we got this block from.
mapBlockSource[inv.hash] = pfrom->GetId();
+ MarkBlockAsReceived(inv.hash, pfrom->GetId());
CValidationState state;
ProcessBlock(state, pfrom, &block);
@@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("inv", vInv);
+ // Detect stalled peers. Require that blocks are in flight, we haven't
+ // received a (requested) block in one minute, and that all blocks are
+ // in flight for over two minutes, since we first had a chance to
+ // process an incoming block.
+ int64_t nNow = GetTimeMicros();
+ if (!pto->fDisconnect && state.nBlocksInFlight &&
+ state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
+ state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
+ LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
+ pto->fDisconnect = true;
+ }
+
//
- // Message: getdata
+ // Message: getdata (blocks)
//
vector<CInv> vGetData;
- int64_t nNow = GetTime() * 1000000;
- while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
+ while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ uint256 hash = state.vBlocksToDownload.front();
+ vGetData.push_back(CInv(MSG_BLOCK, hash));
+ MarkBlockAsInFlight(pto->GetId(), hash);
+ LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
+ if (vGetData.size() >= 1000)
+ {
+ pto->PushMessage("getdata", vGetData);
+ vGetData.clear();
+ }
+ }
+
+ //
+ // Message: getdata (non-blocks)
+ //
+ while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(inv))