diff options
Diffstat (limited to 'src/main.cpp')
-rw-r--r-- | src/main.cpp | 174 |
1 files changed, 135 insertions, 39 deletions
diff --git a/src/main.cpp b/src/main.cpp index 7afaa9e7e2..836c86483a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -51,7 +51,7 @@ unsigned int nCoinCacheSize = 5000; /** Fees smaller than this (in satoshi) are considered zero fee (for transaction creation) */ int64_t CTransaction::nMinTxFee = 10000; // Override with -mintxfee -/** Fees smaller than this (in satoshi) are considered zero fee (for relaying) */ +/** Fees smaller than this (in satoshi) are considered zero fee (for relaying and mining) */ int64_t CTransaction::nMinRelayTxFee = 1000; static CMedianFilter<int> cPeerBlockCounts(8, 0); // Amount of blocks that other nodes claim to have @@ -111,6 +111,16 @@ uint32_t nBlockSequenceId = 1; // Sources of received blocks, to be able to send them reject messages or ban // them, if processing happens afterwards. Protected by cs_main. map<uint256, NodeId> mapBlockSource; + +// Blocks that are in flight, and that are in the queue to be downloaded. +// Protected by cs_main. +struct QueuedBlock { + uint256 hash; + int64_t nTime; // Time of "getdata" request in microseconds. + int nQueuedBefore; // Number of blocks in flight at the time of request. +}; +map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight; +map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload; } ////////////////////////////////////////////////////////////////////////////// @@ -194,10 +204,20 @@ struct CNodeState { std::string name; // List of asynchronously-determined block rejections to notify this peer about. std::vector<CBlockReject> rejects; + list<QueuedBlock> vBlocksInFlight; + int nBlocksInFlight; + list<uint256> vBlocksToDownload; + int nBlocksToDownload; + int64_t nLastBlockReceive; + int64_t nLastBlockProcess; CNodeState() { nMisbehavior = 0; fShouldBan = false; + nBlocksToDownload = 0; + nBlocksInFlight = 0; + nLastBlockReceive = 0; + nLastBlockProcess = 0; } }; @@ -226,8 +246,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) { void FinalizeNode(NodeId nodeid) { LOCK(cs_main); + CNodeState *state = State(nodeid); + + BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight) + mapBlocksInFlight.erase(entry.hash); + BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload) + mapBlocksToDownload.erase(hash); + mapNodeState.erase(nodeid); } + +// Requires cs_main. +void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) { + map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash); + if (itToDownload != mapBlocksToDownload.end()) { + CNodeState *state = State(itToDownload->second.first); + state->vBlocksToDownload.erase(itToDownload->second.second); + state->nBlocksToDownload--; + mapBlocksToDownload.erase(itToDownload); + } + + map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); + if (itInFlight != mapBlocksInFlight.end()) { + CNodeState *state = State(itInFlight->second.first); + state->vBlocksInFlight.erase(itInFlight->second.second); + state->nBlocksInFlight--; + if (itInFlight->second.first == nodeFrom) + state->nLastBlockReceive = GetTimeMicros(); + mapBlocksInFlight.erase(itInFlight); + } + +} + +// Requires cs_main. +bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) { + if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash)) + return false; + + CNodeState *state = State(nodeid); + if (state == NULL) + return false; + + list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash); + state->nBlocksToDownload++; + if (state->nBlocksToDownload > 5000) + Misbehaving(nodeid, 10); + mapBlocksToDownload[hash] = std::make_pair(nodeid, it); + return true; +} + +// Requires cs_main. +void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) { + CNodeState *state = State(nodeid); + assert(state != NULL); + + // Make sure it's not listed somewhere already. + MarkBlockAsReceived(hash); + + QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight}; + if (state->nBlocksInFlight == 0) + state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent. + list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry); + state->nBlocksInFlight++; + mapBlocksInFlight[hash] = std::make_pair(nodeid, it); +} + } bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { @@ -1310,6 +1393,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip) CheckForkWarningConditions(); } +// Requires cs_main. void Misbehaving(NodeId pnode, int howmuch) { if (howmuch == 0) @@ -1739,7 +1823,7 @@ bool ConnectBlock(CBlock& block, CValidationState& state, CBlockIndex* pindex, C return state.DoS(100, error("ConnectBlock() : coinbase pays too much (actual=%d vs limit=%d)", block.vtx[0].GetValueOut(), GetBlockValue(pindex->nHeight, nFees)), - REJECT_INVALID, "bad-cb-amount"); + REJECT_INVALID, "bad-cb-amount"); if (!control.Wait()) return state.DoS(100, false); @@ -2049,7 +2133,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos pindexNew->nSequenceId = nBlockSequenceId++; } assert(pindexNew); - mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash)); map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first; pindexNew->phashBlock = &((*mi).first); map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock); @@ -2400,11 +2483,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate"); // Preliminary checks - if (!CheckBlock(*pblock, state)) { - if (state.CorruptionPossible()) - mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash)); + if (!CheckBlock(*pblock, state)) return error("ProcessBlock() : CheckBlock FAILED"); - } CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex); if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0))) @@ -2764,9 +2844,10 @@ bool static LoadBlockIndexDB() if (it == mapBlockIndex.end()) return true; chainActive.SetTip(it->second); - LogPrintf("LoadBlockIndexDB(): hashBestChain=%s height=%d date=%s\n", + LogPrintf("LoadBlockIndexDB(): hashBestChain=%s height=%d date=%s progress=%f\n", chainActive.Tip()->GetBlockHash().ToString(), chainActive.Height(), - DateTimeStrFormat("%Y-%m-%d %H:%M:%S", chainActive.Tip()->GetBlockTime())); + DateTimeStrFormat("%Y-%m-%d %H:%M:%S", chainActive.Tip()->GetBlockTime()), + Checkpoints::GuessVerificationProgress(chainActive.Tip())); return true; } @@ -3163,14 +3244,14 @@ void static ProcessGetData(CNode* pfrom) int nHeight = mi->second->nHeight; CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex); if (pcheckpoint && nHeight < pcheckpoint->nHeight) { - if (!chainActive.Contains(mi->second)) - { - LogPrintf("ProcessGetData(): ignoring request for old block that isn't in the main chain\n"); - } else { - send = true; - } + if (!chainActive.Contains(mi->second)) + { + LogPrintf("ProcessGetData(): ignoring request for old block that isn't in the main chain\n"); + } else { + send = true; + } } else { - send = true; + send = true; } } if (send) @@ -3274,7 +3355,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return true; } - + State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros(); @@ -3477,15 +3558,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return error("message inv size() = %"PRIszu"", vInv.size()); } - // find last block in inv vector - unsigned int nLastBlock = (unsigned int)(-1); - for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) { - if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) { - nLastBlock = vInv.size() - 1 - nInv; - break; - } - } - LOCK(cs_main); for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) @@ -3499,17 +3571,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new"); if (!fAlreadyHave) { - if (!fImporting && !fReindex) - pfrom->AskFor(inv); + if (!fImporting && !fReindex) { + if (inv.type == MSG_BLOCK) + AddBlockToQueue(pfrom->GetId(), inv.hash); + else + pfrom->AskFor(inv); + } } else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) { PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash)); - } else if (nInv == nLastBlock) { - // In case we are on a very long side-chain, it is possible that we already have - // the last block in an inv bundle sent in response to getblocks. Try to detect - // this situation and push another getblocks to continue. - PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0)); - if (fDebug) - LogPrintf("force request: %s\n", inv.ToString()); } // Track requests for our stuff @@ -3690,7 +3759,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) } int nDoS = 0; if (state.IsInvalid(nDoS)) - { + { LogPrint("mempool", "%s from %s %s was not accepted into the memory pool: %s\n", tx.GetHash().ToString(), pfrom->addr.ToString(), pfrom->cleanSubVer, state.GetRejectReason()); @@ -3716,6 +3785,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) LOCK(cs_main); // Remember who we got this block from. mapBlockSource[inv.hash] = pfrom->GetId(); + MarkBlockAsReceived(inv.hash, pfrom->GetId()); CValidationState state; ProcessBlock(state, pfrom, &block); @@ -4243,12 +4313,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle) pto->PushMessage("inv", vInv); + // Detect stalled peers. Require that blocks are in flight, we haven't + // received a (requested) block in one minute, and that all blocks are + // in flight for over two minutes, since we first had a chance to + // process an incoming block. + int64_t nNow = GetTimeMicros(); + if (!pto->fDisconnect && state.nBlocksInFlight && + state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 && + state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) { + LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str()); + pto->fDisconnect = true; + } + // - // Message: getdata + // Message: getdata (blocks) // vector<CInv> vGetData; - int64_t nNow = GetTime() * 1000000; - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) + while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + uint256 hash = state.vBlocksToDownload.front(); + vGetData.push_back(CInv(MSG_BLOCK, hash)); + MarkBlockAsInFlight(pto->GetId(), hash); + LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str()); + if (vGetData.size() >= 1000) + { + pto->PushMessage("getdata", vGetData); + vGetData.clear(); + } + } + + // + // Message: getdata (non-blocks) + // + while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { const CInv& inv = (*pto->mapAskFor.begin()).second; if (!AlreadyHave(inv)) |