aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPieter Wuille <pieter.wuille@gmail.com>2014-01-10 13:23:26 +0100
committerPieter Wuille <pieter.wuille@gmail.com>2014-02-08 16:52:19 +0100
commitf59d8f0b644d49324cabd19c58cf2262d49e1392 (patch)
treeb699fa0e1f6a0ca3fc09ef0ef50c4f30a6ad5b9c /src
parent95e66247ebaac88dadd081f850ebf86c71831e61 (diff)
Per-peer block download tracking and stalled download detection.
Keep track of which block is being requested (and to be requested) from each peer, and limit the number of blocks in-flight per peer. In addition, detect stalled downloads, and disconnect if they persist for too long. This means blocks are never requested twice, and should eliminate duplicate downloads during synchronization.
Diffstat (limited to 'src')
-rw-r--r--src/main.cpp149
-rw-r--r--src/main.h8
-rw-r--r--src/net.h2
-rw-r--r--src/test/DoS_tests.cpp1
4 files changed, 131 insertions, 29 deletions
diff --git a/src/main.cpp b/src/main.cpp
index d33ad2fa95..b2873f2a10 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1;
// Sources of received blocks, to be able to send them reject messages or ban
// them, if processing happens afterwards. Protected by cs_main.
map<uint256, NodeId> mapBlockSource;
+
+// Blocks that are in flight, and that are in the queue to be downloaded.
+// Protected by cs_main.
+struct QueuedBlock {
+ uint256 hash;
+ int64_t nTime; // Time of "getdata" request in microseconds.
+ int nQueuedBefore; // Number of blocks in flight at the time of request.
+};
+map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
+map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
}
//////////////////////////////////////////////////////////////////////////////
@@ -195,10 +205,20 @@ struct CNodeState {
std::string name;
// List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
+ list<QueuedBlock> vBlocksInFlight;
+ int nBlocksInFlight;
+ list<uint256> vBlocksToDownload;
+ int nBlocksToDownload;
+ int64_t nLastBlockReceive;
+ int64_t nLastBlockProcess;
CNodeState() {
nMisbehavior = 0;
fShouldBan = false;
+ nBlocksToDownload = 0;
+ nBlocksInFlight = 0;
+ nLastBlockReceive = 0;
+ nLastBlockProcess = 0;
}
};
@@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
void FinalizeNode(NodeId nodeid) {
LOCK(cs_main);
+ CNodeState *state = State(nodeid);
+
+ BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
+ mapBlocksInFlight.erase(entry.hash);
+ BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
+ mapBlocksToDownload.erase(hash);
+
mapNodeState.erase(nodeid);
}
+
+// Requires cs_main.
+void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
+ map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
+ if (itToDownload != mapBlocksToDownload.end()) {
+ CNodeState *state = State(itToDownload->second.first);
+ state->vBlocksToDownload.erase(itToDownload->second.second);
+ state->nBlocksToDownload--;
+ mapBlocksToDownload.erase(itToDownload);
+ }
+
+ map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
+ if (itInFlight != mapBlocksInFlight.end()) {
+ CNodeState *state = State(itInFlight->second.first);
+ state->vBlocksInFlight.erase(itInFlight->second.second);
+ state->nBlocksInFlight--;
+ if (itInFlight->second.first == nodeFrom)
+ state->nLastBlockReceive = GetTimeMicros();
+ mapBlocksInFlight.erase(itInFlight);
+ }
+
+}
+
+// Requires cs_main.
+bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
+ if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
+ return false;
+
+ CNodeState *state = State(nodeid);
+ if (state == NULL)
+ return false;
+
+ list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
+ state->nBlocksToDownload++;
+ if (state->nBlocksToDownload > 5000)
+ Misbehaving(nodeid, 10);
+ mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
+ return true;
+}
+
+// Requires cs_main.
+void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
+ CNodeState *state = State(nodeid);
+ assert(state != NULL);
+
+ // Make sure it's not listed somewhere already.
+ MarkBlockAsReceived(hash);
+
+ QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
+ if (state->nBlocksInFlight == 0)
+ state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
+ list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
+ state->nBlocksInFlight++;
+ mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
+}
+
}
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
@@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
CheckForkWarningConditions();
}
+// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
if (howmuch == 0)
@@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
pindexNew->nSequenceId = nBlockSequenceId++;
}
assert(pindexNew);
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
pindexNew->phashBlock = &((*mi).first);
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
@@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
// Preliminary checks
- if (!CheckBlock(*pblock, state)) {
- if (state.CorruptionPossible())
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
+ if (!CheckBlock(*pblock, state))
return error("ProcessBlock() : CheckBlock FAILED");
- }
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
@@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}
-
+ State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
@@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return error("message inv size() = %"PRIszu"", vInv.size());
}
- // find last block in inv vector
- unsigned int nLastBlock = (unsigned int)(-1);
- for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
- if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
- nLastBlock = vInv.size() - 1 - nInv;
- break;
- }
- }
-
LOCK(cs_main);
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
@@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
if (!fAlreadyHave) {
- if (!fImporting && !fReindex)
- pfrom->AskFor(inv);
+ if (!fImporting && !fReindex) {
+ if (inv.type == MSG_BLOCK)
+ AddBlockToQueue(pfrom->GetId(), inv.hash);
+ else
+ pfrom->AskFor(inv);
+ }
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
- } else if (nInv == nLastBlock) {
- // In case we are on a very long side-chain, it is possible that we already have
- // the last block in an inv bundle sent in response to getblocks. Try to detect
- // this situation and push another getblocks to continue.
- PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
- if (fDebug)
- LogPrintf("force request: %s\n", inv.ToString());
}
// Track requests for our stuff
@@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LOCK(cs_main);
// Remember who we got this block from.
mapBlockSource[inv.hash] = pfrom->GetId();
+ MarkBlockAsReceived(inv.hash, pfrom->GetId());
CValidationState state;
ProcessBlock(state, pfrom, &block);
@@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("inv", vInv);
+ // Detect stalled peers. Require that blocks are in flight, we haven't
+ // received a (requested) block in one minute, and that all blocks are
+ // in flight for over two minutes, since we first had a chance to
+ // process an incoming block.
+ int64_t nNow = GetTimeMicros();
+ if (!pto->fDisconnect && state.nBlocksInFlight &&
+ state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
+ state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
+ LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
+ pto->fDisconnect = true;
+ }
+
//
- // Message: getdata
+ // Message: getdata (blocks)
//
vector<CInv> vGetData;
- int64_t nNow = GetTime() * 1000000;
- while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
+ while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ uint256 hash = state.vBlocksToDownload.front();
+ vGetData.push_back(CInv(MSG_BLOCK, hash));
+ MarkBlockAsInFlight(pto->GetId(), hash);
+ LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
+ if (vGetData.size() >= 1000)
+ {
+ pto->PushMessage("getdata", vGetData);
+ vGetData.clear();
+ }
+ }
+
+ //
+ // Message: getdata (non-blocks)
+ //
+ while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(inv))
diff --git a/src/main.h b/src/main.h
index 05210e5164..159a665466 100644
--- a/src/main.h
+++ b/src/main.h
@@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100;
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
/** Maximum number of script-checking threads allowed */
static const int MAX_SCRIPTCHECK_THREADS = 16;
+/** Number of blocks that can be requested at any given time from a single peer. */
+static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128;
+/** Timeout in seconds before considering a block download peer unresponsive. */
+static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60;
+
#ifdef USE_UPNP
static const int fHaveUPnP = true;
#else
@@ -182,6 +187,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in
bool AbortNode(const std::string &msg);
/** Get statistics from node state */
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
+/** Increase a node's misbehavior score. */
+void Misbehaving(NodeId nodeid, int howmuch);
+
/** (try to) add transaction to memory pool **/
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,
diff --git a/src/net.h b/src/net.h
index a181e275ca..0307a0ccb2 100644
--- a/src/net.h
+++ b/src/net.h
@@ -427,7 +427,7 @@ public:
LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());
// Make sure not to reuse time indexes to keep things in the same order
- int64_t nNow = (GetTime() - 1) * 1000000;
+ int64_t nNow = GetTimeMicros() - 1000000;
static int64_t nLastTime;
++nLastTime;
nNow = std::max(nNow, nLastTime);
diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp
index fbca09b4dc..7bf7b19b54 100644
--- a/src/test/DoS_tests.cpp
+++ b/src/test/DoS_tests.cpp
@@ -21,7 +21,6 @@
// Tests this internal-to-main.cpp method:
extern bool AddOrphanTx(const CTransaction& tx);
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
-extern void Misbehaving(NodeId nodeid, int howmuch);
extern std::map<uint256, CTransaction> mapOrphanTransactions;
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;