aboutsummaryrefslogtreecommitdiff
path: root/src/main.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.cpp')
-rw-r--r--src/main.cpp174
1 files changed, 135 insertions, 39 deletions
diff --git a/src/main.cpp b/src/main.cpp
index 7afaa9e7e2..836c86483a 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -51,7 +51,7 @@ unsigned int nCoinCacheSize = 5000;
/** Fees smaller than this (in satoshi) are considered zero fee (for transaction creation) */
int64_t CTransaction::nMinTxFee = 10000; // Override with -mintxfee
-/** Fees smaller than this (in satoshi) are considered zero fee (for relaying) */
+/** Fees smaller than this (in satoshi) are considered zero fee (for relaying and mining) */
int64_t CTransaction::nMinRelayTxFee = 1000;
static CMedianFilter<int> cPeerBlockCounts(8, 0); // Amount of blocks that other nodes claim to have
@@ -111,6 +111,16 @@ uint32_t nBlockSequenceId = 1;
// Sources of received blocks, to be able to send them reject messages or ban
// them, if processing happens afterwards. Protected by cs_main.
map<uint256, NodeId> mapBlockSource;
+
+// Blocks that are in flight, and that are in the queue to be downloaded.
+// Protected by cs_main.
+struct QueuedBlock {
+ uint256 hash;
+ int64_t nTime; // Time of "getdata" request in microseconds.
+ int nQueuedBefore; // Number of blocks in flight at the time of request.
+};
+map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
+map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
}
//////////////////////////////////////////////////////////////////////////////
@@ -194,10 +204,20 @@ struct CNodeState {
std::string name;
// List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
+ list<QueuedBlock> vBlocksInFlight;
+ int nBlocksInFlight;
+ list<uint256> vBlocksToDownload;
+ int nBlocksToDownload;
+ int64_t nLastBlockReceive;
+ int64_t nLastBlockProcess;
CNodeState() {
nMisbehavior = 0;
fShouldBan = false;
+ nBlocksToDownload = 0;
+ nBlocksInFlight = 0;
+ nLastBlockReceive = 0;
+ nLastBlockProcess = 0;
}
};
@@ -226,8 +246,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
void FinalizeNode(NodeId nodeid) {
LOCK(cs_main);
+ CNodeState *state = State(nodeid);
+
+ BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
+ mapBlocksInFlight.erase(entry.hash);
+ BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
+ mapBlocksToDownload.erase(hash);
+
mapNodeState.erase(nodeid);
}
+
+// Requires cs_main.
+void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
+ map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
+ if (itToDownload != mapBlocksToDownload.end()) {
+ CNodeState *state = State(itToDownload->second.first);
+ state->vBlocksToDownload.erase(itToDownload->second.second);
+ state->nBlocksToDownload--;
+ mapBlocksToDownload.erase(itToDownload);
+ }
+
+ map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
+ if (itInFlight != mapBlocksInFlight.end()) {
+ CNodeState *state = State(itInFlight->second.first);
+ state->vBlocksInFlight.erase(itInFlight->second.second);
+ state->nBlocksInFlight--;
+ if (itInFlight->second.first == nodeFrom)
+ state->nLastBlockReceive = GetTimeMicros();
+ mapBlocksInFlight.erase(itInFlight);
+ }
+
+}
+
+// Requires cs_main.
+bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
+ if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
+ return false;
+
+ CNodeState *state = State(nodeid);
+ if (state == NULL)
+ return false;
+
+ list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
+ state->nBlocksToDownload++;
+ if (state->nBlocksToDownload > 5000)
+ Misbehaving(nodeid, 10);
+ mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
+ return true;
+}
+
+// Requires cs_main.
+void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
+ CNodeState *state = State(nodeid);
+ assert(state != NULL);
+
+ // Make sure it's not listed somewhere already.
+ MarkBlockAsReceived(hash);
+
+ QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
+ if (state->nBlocksInFlight == 0)
+ state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
+ list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
+ state->nBlocksInFlight++;
+ mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
+}
+
}
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
@@ -1310,6 +1393,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
CheckForkWarningConditions();
}
+// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
if (howmuch == 0)
@@ -1739,7 +1823,7 @@ bool ConnectBlock(CBlock& block, CValidationState& state, CBlockIndex* pindex, C
return state.DoS(100,
error("ConnectBlock() : coinbase pays too much (actual=%d vs limit=%d)",
block.vtx[0].GetValueOut(), GetBlockValue(pindex->nHeight, nFees)),
- REJECT_INVALID, "bad-cb-amount");
+ REJECT_INVALID, "bad-cb-amount");
if (!control.Wait())
return state.DoS(100, false);
@@ -2049,7 +2133,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
pindexNew->nSequenceId = nBlockSequenceId++;
}
assert(pindexNew);
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
pindexNew->phashBlock = &((*mi).first);
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
@@ -2400,11 +2483,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
// Preliminary checks
- if (!CheckBlock(*pblock, state)) {
- if (state.CorruptionPossible())
- mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
+ if (!CheckBlock(*pblock, state))
return error("ProcessBlock() : CheckBlock FAILED");
- }
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
@@ -2764,9 +2844,10 @@ bool static LoadBlockIndexDB()
if (it == mapBlockIndex.end())
return true;
chainActive.SetTip(it->second);
- LogPrintf("LoadBlockIndexDB(): hashBestChain=%s height=%d date=%s\n",
+ LogPrintf("LoadBlockIndexDB(): hashBestChain=%s height=%d date=%s progress=%f\n",
chainActive.Tip()->GetBlockHash().ToString(), chainActive.Height(),
- DateTimeStrFormat("%Y-%m-%d %H:%M:%S", chainActive.Tip()->GetBlockTime()));
+ DateTimeStrFormat("%Y-%m-%d %H:%M:%S", chainActive.Tip()->GetBlockTime()),
+ Checkpoints::GuessVerificationProgress(chainActive.Tip()));
return true;
}
@@ -3163,14 +3244,14 @@ void static ProcessGetData(CNode* pfrom)
int nHeight = mi->second->nHeight;
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
if (pcheckpoint && nHeight < pcheckpoint->nHeight) {
- if (!chainActive.Contains(mi->second))
- {
- LogPrintf("ProcessGetData(): ignoring request for old block that isn't in the main chain\n");
- } else {
- send = true;
- }
+ if (!chainActive.Contains(mi->second))
+ {
+ LogPrintf("ProcessGetData(): ignoring request for old block that isn't in the main chain\n");
+ } else {
+ send = true;
+ }
} else {
- send = true;
+ send = true;
}
}
if (send)
@@ -3274,7 +3355,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}
-
+ State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
@@ -3477,15 +3558,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return error("message inv size() = %"PRIszu"", vInv.size());
}
- // find last block in inv vector
- unsigned int nLastBlock = (unsigned int)(-1);
- for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
- if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
- nLastBlock = vInv.size() - 1 - nInv;
- break;
- }
- }
-
LOCK(cs_main);
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
@@ -3499,17 +3571,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
if (!fAlreadyHave) {
- if (!fImporting && !fReindex)
- pfrom->AskFor(inv);
+ if (!fImporting && !fReindex) {
+ if (inv.type == MSG_BLOCK)
+ AddBlockToQueue(pfrom->GetId(), inv.hash);
+ else
+ pfrom->AskFor(inv);
+ }
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
- } else if (nInv == nLastBlock) {
- // In case we are on a very long side-chain, it is possible that we already have
- // the last block in an inv bundle sent in response to getblocks. Try to detect
- // this situation and push another getblocks to continue.
- PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
- if (fDebug)
- LogPrintf("force request: %s\n", inv.ToString());
}
// Track requests for our stuff
@@ -3690,7 +3759,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
}
int nDoS = 0;
if (state.IsInvalid(nDoS))
- {
+ {
LogPrint("mempool", "%s from %s %s was not accepted into the memory pool: %s\n", tx.GetHash().ToString(),
pfrom->addr.ToString(), pfrom->cleanSubVer,
state.GetRejectReason());
@@ -3716,6 +3785,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LOCK(cs_main);
// Remember who we got this block from.
mapBlockSource[inv.hash] = pfrom->GetId();
+ MarkBlockAsReceived(inv.hash, pfrom->GetId());
CValidationState state;
ProcessBlock(state, pfrom, &block);
@@ -4243,12 +4313,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("inv", vInv);
+ // Detect stalled peers. Require that blocks are in flight, we haven't
+ // received a (requested) block in one minute, and that all blocks are
+ // in flight for over two minutes, since we first had a chance to
+ // process an incoming block.
+ int64_t nNow = GetTimeMicros();
+ if (!pto->fDisconnect && state.nBlocksInFlight &&
+ state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
+ state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
+ LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
+ pto->fDisconnect = true;
+ }
+
//
- // Message: getdata
+ // Message: getdata (blocks)
//
vector<CInv> vGetData;
- int64_t nNow = GetTime() * 1000000;
- while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
+ while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
+ uint256 hash = state.vBlocksToDownload.front();
+ vGetData.push_back(CInv(MSG_BLOCK, hash));
+ MarkBlockAsInFlight(pto->GetId(), hash);
+ LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
+ if (vGetData.size() >= 1000)
+ {
+ pto->PushMessage("getdata", vGetData);
+ vGetData.clear();
+ }
+ }
+
+ //
+ // Message: getdata (non-blocks)
+ //
+ while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(inv))