diff options
author | Pieter Wuille <pieter.wuille@gmail.com> | 2014-01-10 13:23:26 +0100 |
---|---|---|
committer | Pieter Wuille <pieter.wuille@gmail.com> | 2014-02-08 16:52:19 +0100 |
commit | f59d8f0b644d49324cabd19c58cf2262d49e1392 (patch) | |
tree | b699fa0e1f6a0ca3fc09ef0ef50c4f30a6ad5b9c /src | |
parent | 95e66247ebaac88dadd081f850ebf86c71831e61 (diff) |
Per-peer block download tracking and stalled download detection.
Keep track of which block is being requested (and to be requested) from
each peer, and limit the number of blocks in-flight per peer. In addition,
detect stalled downloads, and disconnect if they persist for too long.
This means blocks are never requested twice, and should eliminate duplicate
downloads during synchronization.
Diffstat (limited to 'src')
-rw-r--r-- | src/main.cpp | 149 | ||||
-rw-r--r-- | src/main.h | 8 | ||||
-rw-r--r-- | src/net.h | 2 | ||||
-rw-r--r-- | src/test/DoS_tests.cpp | 1 |
4 files changed, 131 insertions, 29 deletions
diff --git a/src/main.cpp b/src/main.cpp index d33ad2fa95..b2873f2a10 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1; // Sources of received blocks, to be able to send them reject messages or ban // them, if processing happens afterwards. Protected by cs_main. map<uint256, NodeId> mapBlockSource; + +// Blocks that are in flight, and that are in the queue to be downloaded. +// Protected by cs_main. +struct QueuedBlock { + uint256 hash; + int64_t nTime; // Time of "getdata" request in microseconds. + int nQueuedBefore; // Number of blocks in flight at the time of request. +}; +map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight; +map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload; } ////////////////////////////////////////////////////////////////////////////// @@ -195,10 +205,20 @@ struct CNodeState { std::string name; // List of asynchronously-determined block rejections to notify this peer about. std::vector<CBlockReject> rejects; + list<QueuedBlock> vBlocksInFlight; + int nBlocksInFlight; + list<uint256> vBlocksToDownload; + int nBlocksToDownload; + int64_t nLastBlockReceive; + int64_t nLastBlockProcess; CNodeState() { nMisbehavior = 0; fShouldBan = false; + nBlocksToDownload = 0; + nBlocksInFlight = 0; + nLastBlockReceive = 0; + nLastBlockProcess = 0; } }; @@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) { void FinalizeNode(NodeId nodeid) { LOCK(cs_main); + CNodeState *state = State(nodeid); + + BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight) + mapBlocksInFlight.erase(entry.hash); + BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload) + mapBlocksToDownload.erase(hash); + mapNodeState.erase(nodeid); } + +// Requires cs_main. +void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) { + map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash); + if (itToDownload != mapBlocksToDownload.end()) { + CNodeState *state = State(itToDownload->second.first); + state->vBlocksToDownload.erase(itToDownload->second.second); + state->nBlocksToDownload--; + mapBlocksToDownload.erase(itToDownload); + } + + map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); + if (itInFlight != mapBlocksInFlight.end()) { + CNodeState *state = State(itInFlight->second.first); + state->vBlocksInFlight.erase(itInFlight->second.second); + state->nBlocksInFlight--; + if (itInFlight->second.first == nodeFrom) + state->nLastBlockReceive = GetTimeMicros(); + mapBlocksInFlight.erase(itInFlight); + } + +} + +// Requires cs_main. +bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) { + if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash)) + return false; + + CNodeState *state = State(nodeid); + if (state == NULL) + return false; + + list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash); + state->nBlocksToDownload++; + if (state->nBlocksToDownload > 5000) + Misbehaving(nodeid, 10); + mapBlocksToDownload[hash] = std::make_pair(nodeid, it); + return true; +} + +// Requires cs_main. +void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) { + CNodeState *state = State(nodeid); + assert(state != NULL); + + // Make sure it's not listed somewhere already. + MarkBlockAsReceived(hash); + + QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight}; + if (state->nBlocksInFlight == 0) + state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent. + list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry); + state->nBlocksInFlight++; + mapBlocksInFlight[hash] = std::make_pair(nodeid, it); +} + } bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { @@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip) CheckForkWarningConditions(); } +// Requires cs_main. void Misbehaving(NodeId pnode, int howmuch) { if (howmuch == 0) @@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos pindexNew->nSequenceId = nBlockSequenceId++; } assert(pindexNew); - mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash)); map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first; pindexNew->phashBlock = &((*mi).first); map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock); @@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate"); // Preliminary checks - if (!CheckBlock(*pblock, state)) { - if (state.CorruptionPossible()) - mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash)); + if (!CheckBlock(*pblock, state)) return error("ProcessBlock() : CheckBlock FAILED"); - } CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex); if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0))) @@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return true; } - + State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros(); @@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return error("message inv size() = %"PRIszu"", vInv.size()); } - // find last block in inv vector - unsigned int nLastBlock = (unsigned int)(-1); - for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) { - if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) { - nLastBlock = vInv.size() - 1 - nInv; - break; - } - } - LOCK(cs_main); for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) @@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new"); if (!fAlreadyHave) { - if (!fImporting && !fReindex) - pfrom->AskFor(inv); + if (!fImporting && !fReindex) { + if (inv.type == MSG_BLOCK) + AddBlockToQueue(pfrom->GetId(), inv.hash); + else + pfrom->AskFor(inv); + } } else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) { PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash)); - } else if (nInv == nLastBlock) { - // In case we are on a very long side-chain, it is possible that we already have - // the last block in an inv bundle sent in response to getblocks. Try to detect - // this situation and push another getblocks to continue. - PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0)); - if (fDebug) - LogPrintf("force request: %s\n", inv.ToString()); } // Track requests for our stuff @@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) LOCK(cs_main); // Remember who we got this block from. mapBlockSource[inv.hash] = pfrom->GetId(); + MarkBlockAsReceived(inv.hash, pfrom->GetId()); CValidationState state; ProcessBlock(state, pfrom, &block); @@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle) pto->PushMessage("inv", vInv); + // Detect stalled peers. Require that blocks are in flight, we haven't + // received a (requested) block in one minute, and that all blocks are + // in flight for over two minutes, since we first had a chance to + // process an incoming block. + int64_t nNow = GetTimeMicros(); + if (!pto->fDisconnect && state.nBlocksInFlight && + state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 && + state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) { + LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str()); + pto->fDisconnect = true; + } + // - // Message: getdata + // Message: getdata (blocks) // vector<CInv> vGetData; - int64_t nNow = GetTime() * 1000000; - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) + while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + uint256 hash = state.vBlocksToDownload.front(); + vGetData.push_back(CInv(MSG_BLOCK, hash)); + MarkBlockAsInFlight(pto->GetId(), hash); + LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str()); + if (vGetData.size() >= 1000) + { + pto->PushMessage("getdata", vGetData); + vGetData.clear(); + } + } + + // + // Message: getdata (non-blocks) + // + while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { const CInv& inv = (*pto->mapAskFor.begin()).second; if (!AlreadyHave(inv)) diff --git a/src/main.h b/src/main.h index 05210e5164..159a665466 100644 --- a/src/main.h +++ b/src/main.h @@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100; static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC /** Maximum number of script-checking threads allowed */ static const int MAX_SCRIPTCHECK_THREADS = 16; +/** Number of blocks that can be requested at any given time from a single peer. */ +static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128; +/** Timeout in seconds before considering a block download peer unresponsive. */ +static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60; + #ifdef USE_UPNP static const int fHaveUPnP = true; #else @@ -182,6 +187,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in bool AbortNode(const std::string &msg); /** Get statistics from node state */ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); +/** Increase a node's misbehavior score. */ +void Misbehaving(NodeId nodeid, int howmuch); + /** (try to) add transaction to memory pool **/ bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree, @@ -427,7 +427,7 @@ public: LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str()); // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = (GetTime() - 1) * 1000000; + int64_t nNow = GetTimeMicros() - 1000000; static int64_t nLastTime; ++nLastTime; nNow = std::max(nNow, nLastTime); diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index fbca09b4dc..7bf7b19b54 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -21,7 +21,6 @@ // Tests this internal-to-main.cpp method: extern bool AddOrphanTx(const CTransaction& tx); extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans); -extern void Misbehaving(NodeId nodeid, int howmuch); extern std::map<uint256, CTransaction> mapOrphanTransactions; extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev; |