diff options
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 172 |
1 files changed, 124 insertions, 48 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 4a5ae0e69c..72c403a57e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -50,7 +50,8 @@ struct IteratorComparator }; struct COrphanTx { - CTransaction tx; + // When modifying, adapt the copy of this definition in tests/DoS_tests. + CTransactionRef tx; NodeId fromPeer; int64_t nTimeExpire; }; @@ -585,9 +586,9 @@ void UnregisterNodeSignals(CNodeSignals& nodeSignals) // mapOrphanTransactions // -bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - uint256 hash = tx.GetHash(); + const uint256& hash = tx->GetHash(); if (mapOrphanTransactions.count(hash)) return false; @@ -598,7 +599,7 @@ bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(c // have been mined or received. // 100 orphans, each of which is at most 99,999 bytes big is // at most 10 megabytes of orphans and somewhat more byprev index (in the worst case): - unsigned int sz = GetTransactionWeight(tx); + unsigned int sz = GetTransactionWeight(*tx); if (sz >= MAX_STANDARD_TX_WEIGHT) { LogPrint("mempool", "ignoring large orphan tx (size: %u, hash: %s)\n", sz, hash.ToString()); @@ -607,7 +608,7 @@ bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(c auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME}); assert(ret.second); - BOOST_FOREACH(const CTxIn& txin, tx.vin) { + BOOST_FOREACH(const CTxIn& txin, tx->vin) { mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first); } @@ -621,7 +622,7 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash); if (it == mapOrphanTransactions.end()) return 0; - BOOST_FOREACH(const CTxIn& txin, it->second.tx.vin) + BOOST_FOREACH(const CTxIn& txin, it->second.tx->vin) { auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout); if (itPrev == mapOrphanTransactionsByPrev.end()) @@ -643,7 +644,7 @@ void EraseOrphansFor(NodeId peer) map<uint256, COrphanTx>::iterator maybeErase = iter++; // increment to avoid iterator becoming invalid if (maybeErase->second.fromPeer == peer) { - nErased += EraseOrphanTx(maybeErase->second.tx.GetHash()); + nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); } } if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer %d\n", nErased, peer); @@ -664,7 +665,7 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) EXCLUSIVE_LOCKS_REQUIRE { map<uint256, COrphanTx>::iterator maybeErase = iter++; if (maybeErase->second.nTimeExpire <= nNow) { - nErased += EraseOrphanTx(maybeErase->second.tx.GetHash()); + nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); } else { nMinExpTime = std::min(maybeErase->second.nTimeExpire, nMinExpTime); } @@ -735,7 +736,7 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn auto itByPrev = mapOrphanTransactionsByPrev.find(tx.vin[j].prevout); if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) { - const CTransaction& orphanTx = (*mi)->second.tx; + const CTransaction& orphanTx = *(*mi)->second.tx; const uint256& orphanHash = orphanTx.GetHash(); vOrphanErase.push_back(orphanHash); } @@ -885,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -900,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam const CInv &inv = *it; { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return; + it++; if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) @@ -1054,12 +1057,12 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } -bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) +bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); - if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0) + if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { LogPrintf("dropmessagestest DROPPING RECV MESSAGE\n"); return true; @@ -1294,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nSince = nNow - 10 * 60; BOOST_FOREACH(CAddress& addr, vAddr) { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1376,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { CInv &inv = vInv[nInv]; - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; bool fAlreadyHave = AlreadyHave(inv); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); @@ -1438,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); } @@ -1512,12 +1517,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); return true; } CBlock block; - assert(ReadBlockFromDisk(block, it->second, chainparams.GetConsensus())); + bool ret = ReadBlockFromDisk(block, it->second, chainparams.GetConsensus()); + assert(ret); BlockTransactions resp(req); for (size_t i = 0; i < req.indexes.size(); i++) { @@ -1594,8 +1600,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, deque<COutPoint> vWorkQueue; vector<uint256> vEraseQueue; - CTransaction tx; - vRecv >> tx; + CTransactionRef ptx; + vRecv >> ptx; + const CTransaction& tx = *ptx; CInv inv(MSG_TX, tx.GetHash()); pfrom->AddInventoryKnown(inv); @@ -1608,7 +1615,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pfrom->setAskFor.erase(inv.hash); mapAlreadyAskedFor.erase(inv.hash); - if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, tx, true, &fMissingInputs)) { + if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, true, &fMissingInputs)) { mempool.check(pcoinsTip); RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { @@ -1633,7 +1640,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, mi != itByPrev->second.end(); ++mi) { - const CTransaction& orphanTx = (*mi)->second.tx; + const CTransactionRef& porphanTx = (*mi)->second.tx; + const CTransaction& orphanTx = *porphanTx; const uint256& orphanHash = orphanTx.GetHash(); NodeId fromPeer = (*mi)->second.fromPeer; bool fMissingInputs2 = false; @@ -1645,7 +1653,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, if (setMisbehaving.count(fromPeer)) continue; - if (AcceptToMemoryPool(mempool, stateDummy, orphanTx, true, &fMissingInputs2)) { + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString()); RelayTransaction(orphanTx, connman); for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { @@ -1667,7 +1675,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Probably non-standard or insufficient fee/priority LogPrint("mempool", " removed orphan tx %s\n", orphanHash.ToString()); vEraseQueue.push_back(orphanHash); - if (orphanTx.wit.IsNull() && !stateDummy.CorruptionPossible()) { + if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { // Do not use rejection cache for witness transactions or // witness-stripped transactions, as they can have been malleated. // See https://github.com/bitcoin/bitcoin/issues/8279 for details. @@ -1698,7 +1706,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pfrom->AddInventoryKnown(_inv); if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); } - AddOrphanTx(tx, pfrom->GetId()); + AddOrphanTx(ptx, pfrom->GetId()); // DoS prevention: do not allow mapOrphanTransactions to grow unbounded unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); @@ -1712,7 +1720,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, recentRejects->insert(tx.GetHash()); } } else { - if (tx.wit.IsNull() && !state.CorruptionPossible()) { + if (!tx.HasWitness() && !state.CorruptionPossible()) { // Do not use rejection cache for witness transactions or // witness-stripped transactions, as they can have been malleated. // See https://github.com/bitcoin/bitcoin/issues/8279 for details. @@ -1784,6 +1792,24 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } } + // When we succeed in decoding a block's txids from a cmpctblock + // message we typically jump to the BLOCKTXN handling code, with a + // dummy (empty) BLOCKTXN message, to re-use the logic there in + // completing processing of the putative block (without cs_main). + bool fProcessBLOCKTXN = false; + CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION); + + // If we end up treating this as a plain headers message, call that as well + // without cs_main. + bool fRevertToHeaderProcessing = false; + CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION); + + // Keep a CBlock for "optimistic" compactblock reconstructions (see + // below) + std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(); + bool fBlockReconstructed = false; + + { LOCK(cs_main); // If AcceptBlockHeader returned true, it set pindex assert(pindex); @@ -1865,13 +1891,29 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Dirty hack to jump to BLOCKTXN code (TODO: move message handling into their own functions) BlockTransactions txn; txn.blockhash = cmpctblock.header.GetHash(); - CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION); blockTxnMsg << txn; - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); + fProcessBLOCKTXN = true; } else { req.blockhash = pindex->GetBlockHash(); connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); } + } else { + // This block is either already in flight from a different + // peer, or this peer has too many blocks outstanding to + // download from. + // Optimistically try to reconstruct anyway since we might be + // able to without any round trips. + PartiallyDownloadedBlock tempBlock(&mempool); + ReadStatus status = tempBlock.InitData(cmpctblock); + if (status != READ_STATUS_OK) { + // TODO: don't ignore failures + return true; + } + std::vector<CTransactionRef> dummy; + status = tempBlock.FillBlock(*pblock, dummy); + if (status == READ_STATUS_OK) { + fBlockReconstructed = true; + } } } else { if (fAlreadyInFlight) { @@ -1886,11 +1928,40 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Dirty hack to process as if it were just a headers message (TODO: move message handling into their own functions) std::vector<CBlock> headers; headers.push_back(cmpctblock.header); - CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION); vHeadersMsg << headers; - return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); + fRevertToHeaderProcessing = true; + } + } + } // cs_main + + if (fProcessBLOCKTXN) + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc); + + if (fRevertToHeaderProcessing) + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc); + + if (fBlockReconstructed) { + // If we got here, we were able to optimistically reconstruct a + // block that is in flight from some other peer. + { + LOCK(cs_main); + mapBlockSource.emplace(pblock->GetHash(), std::make_pair(pfrom->GetId(), false)); + } + bool fNewBlock = false; + ProcessNewBlock(chainparams, pblock, true, &fNewBlock); + if (fNewBlock) + pfrom->nLastBlockTime = GetTime(); + + LOCK(cs_main); // hold cs_main for CBlockIndex::IsValid() + if (pindex->IsValid(BLOCK_VALID_TRANSACTIONS)) { + // Clear download state for this block, which is in + // process from some other peer. We do this after calling + // ProcessNewBlock so that a malleated cmpctblock announcement + // can't be used to interfere with block relay. + MarkBlockAsReceived(pblock->GetHash()); } } + } else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing @@ -1898,7 +1969,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, BlockTransactions resp; vRecv >> resp; - CBlock block; + std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(); bool fBlockRead = false; { LOCK(cs_main); @@ -1911,7 +1982,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock; - ReadStatus status = partialBlock.FillBlock(block, resp.txn); + ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn); if (status == READ_STATUS_INVALID) { MarkBlockAsReceived(resp.blockhash); // Reset in-flight state in case of whitelist Misbehaving(pfrom->GetId(), 100); @@ -1954,7 +2025,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, bool fNewBlock = false; // Since we requested this block (it was in mapBlocksInFlight), force it to be processed, // even if it would not be a candidate for new tip (missing previous block, chain not long enough, etc) - ProcessNewBlock(chainparams, &block, true, NULL, &fNewBlock); + ProcessNewBlock(chainparams, pblock, true, &fNewBlock); if (fNewBlock) pfrom->nLastBlockTime = GetTime(); } @@ -2115,17 +2186,17 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, else if (strCommand == NetMsgType::BLOCK && !fImporting && !fReindex) // Ignore blocks received while importing { - CBlock block; - vRecv >> block; + std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(); + vRecv >> *pblock; - LogPrint("net", "received block %s peer=%d\n", block.GetHash().ToString(), pfrom->id); + LogPrint("net", "received block %s peer=%d\n", pblock->GetHash().ToString(), pfrom->id); // Process all blocks from whitelisted peers, even if not requested, // unless we're still syncing with the network. // Such an unrequested block may still be processed, subject to the // conditions in AcceptBlock(). bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload(); - const uint256 hash(block.GetHash()); + const uint256 hash(pblock->GetHash()); { LOCK(cs_main); // Also always process if we requested the block explicitly, as we may @@ -2136,7 +2207,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, mapBlockSource.emplace(hash, std::make_pair(pfrom->GetId(), true)); } bool fNewBlock = false; - ProcessNewBlock(chainparams, &block, forceProcessing, NULL, &fNewBlock); + ProcessNewBlock(chainparams, pblock, forceProcessing, &fNewBlock); if (fNewBlock) pfrom->nLastBlockTime = GetTime(); } @@ -2380,7 +2451,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // requires LOCK(cs_vRecvMsg) -bool ProcessMessages(CNode* pfrom, CConnman& connman) +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -2398,7 +2469,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fOk = true; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; @@ -2459,8 +2530,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); - boost::this_thread::interruption_point(); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return true; } catch (const std::ios_base::failure& e) { @@ -2485,9 +2557,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) PrintExceptionContinue(&e, "ProcessMessages()"); } } - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { PrintExceptionContinue(&e, "ProcessMessages()"); } catch (...) { @@ -2524,7 +2593,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman& connman) +bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { @@ -2578,6 +2647,8 @@ bool SendMessages(CNode* pto, CConnman& connman) state.fShouldBan = false; if (pto->fWhitelisted) LogPrintf("Warning: not punishing whitelisted peer %s!\n", pto->addr.ToString()); + else if (pto->fAddnode) + LogPrintf("Warning: not punishing addnoded peer %s!\n", pto->addr.ToString()); else { pto->fDisconnect = true; if (pto->addr.IsLocal()) @@ -2733,7 +2804,8 @@ bool SendMessages(CNode* pto, CConnman& connman) vHeaders.front().GetHash().ToString(), pto->id); //TODO: Shouldn't need to reload block from disk, but requires refactor CBlock block; - assert(ReadBlockFromDisk(block, pBestIndex, consensusParams)); + bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); + assert(ret); CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness); int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); @@ -2998,8 +3070,12 @@ bool SendMessages(CNode* pto, CConnman& connman) CAmount currentFilter = mempool.GetMinFee(GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); int64_t timeNow = GetTimeMicros(); if (timeNow > pto->nextSendTimeFeeFilter) { - static FeeFilterRounder filterRounder(::minRelayTxFee); + static CFeeRate default_feerate(DEFAULT_MIN_RELAY_TX_FEE); + static FeeFilterRounder filterRounder(default_feerate); CAmount filterToSend = filterRounder.round(currentFilter); + // If we don't allow free transactions, then we always have a fee filter of at least minRelayTxFee + if (GetArg("-limitfreerelay", DEFAULT_LIMITFREERELAY) <= 0) + filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK()); if (filterToSend != pto->lastSentFeeFilter) { connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend)); pto->lastSentFeeFilter = filterToSend; |