diff options
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r-- | src/net_processing.cpp | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index e10694ed18..ccfbb77fcd 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -51,7 +51,7 @@ struct IteratorComparator struct COrphanTx { // When modifying, adapt the copy of this definition in tests/DoS_tests. - CTransaction tx; + CTransactionRef tx; NodeId fromPeer; int64_t nTimeExpire; }; @@ -586,9 +586,9 @@ void UnregisterNodeSignals(CNodeSignals& nodeSignals) // mapOrphanTransactions // -bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - uint256 hash = tx.GetHash(); + const uint256& hash = tx->GetHash(); if (mapOrphanTransactions.count(hash)) return false; @@ -599,7 +599,7 @@ bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(c // have been mined or received. // 100 orphans, each of which is at most 99,999 bytes big is // at most 10 megabytes of orphans and somewhat more byprev index (in the worst case): - unsigned int sz = GetTransactionWeight(tx); + unsigned int sz = GetTransactionWeight(*tx); if (sz >= MAX_STANDARD_TX_WEIGHT) { LogPrint("mempool", "ignoring large orphan tx (size: %u, hash: %s)\n", sz, hash.ToString()); @@ -608,7 +608,7 @@ bool AddOrphanTx(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(c auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME}); assert(ret.second); - BOOST_FOREACH(const CTxIn& txin, tx.vin) { + BOOST_FOREACH(const CTxIn& txin, tx->vin) { mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first); } @@ -622,7 +622,7 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.find(hash); if (it == mapOrphanTransactions.end()) return 0; - BOOST_FOREACH(const CTxIn& txin, it->second.tx.vin) + BOOST_FOREACH(const CTxIn& txin, it->second.tx->vin) { auto itPrev = mapOrphanTransactionsByPrev.find(txin.prevout); if (itPrev == mapOrphanTransactionsByPrev.end()) @@ -644,7 +644,7 @@ void EraseOrphansFor(NodeId peer) map<uint256, COrphanTx>::iterator maybeErase = iter++; // increment to avoid iterator becoming invalid if (maybeErase->second.fromPeer == peer) { - nErased += EraseOrphanTx(maybeErase->second.tx.GetHash()); + nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); } } if (nErased > 0) LogPrint("mempool", "Erased %d orphan tx from peer %d\n", nErased, peer); @@ -665,7 +665,7 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) EXCLUSIVE_LOCKS_REQUIRE { map<uint256, COrphanTx>::iterator maybeErase = iter++; if (maybeErase->second.nTimeExpire <= nNow) { - nErased += EraseOrphanTx(maybeErase->second.tx.GetHash()); + nErased += EraseOrphanTx(maybeErase->second.tx->GetHash()); } else { nMinExpTime = std::min(maybeErase->second.nTimeExpire, nMinExpTime); } @@ -736,7 +736,7 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn auto itByPrev = mapOrphanTransactionsByPrev.find(tx.vin[j].prevout); if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; for (auto mi = itByPrev->second.begin(); mi != itByPrev->second.end(); ++mi) { - const CTransaction& orphanTx = (*mi)->second.tx; + const CTransaction& orphanTx = *(*mi)->second.tx; const uint256& orphanHash = orphanTx.GetHash(); vOrphanErase.push_back(orphanHash); } @@ -886,7 +886,7 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman) +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -901,7 +901,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam const CInv &inv = *it; { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return; + it++; if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) @@ -1055,12 +1057,12 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params return nFetchFlags; } -bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman) +bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) { unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); - if (mapArgs.count("-dropmessagestest") && GetRand(atoi(mapArgs["-dropmessagestest"])) == 0) + if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) { LogPrintf("dropmessagestest DROPPING RECV MESSAGE\n"); return true; @@ -1295,7 +1297,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nSince = nNow - 10 * 60; BOOST_FOREACH(CAddress& addr, vAddr) { - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) continue; @@ -1377,7 +1380,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, { CInv &inv = vInv[nInv]; - boost::this_thread::interruption_point(); + if (interruptMsgProc) + return true; bool fAlreadyHave = AlreadyHave(inv); LogPrint("net", "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->id); @@ -1439,7 +1443,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("net", "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom->id); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); } @@ -1513,7 +1517,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); return true; } @@ -1596,7 +1600,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, deque<COutPoint> vWorkQueue; vector<uint256> vEraseQueue; - CTransaction tx(deserialize, vRecv); + CTransactionRef ptx; + vRecv >> ptx; + const CTransaction& tx = *ptx; CInv inv(MSG_TX, tx.GetHash()); pfrom->AddInventoryKnown(inv); @@ -1609,7 +1615,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pfrom->setAskFor.erase(inv.hash); mapAlreadyAskedFor.erase(inv.hash); - if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, tx, true, &fMissingInputs)) { + if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, true, &fMissingInputs)) { mempool.check(pcoinsTip); RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { @@ -1634,7 +1640,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, mi != itByPrev->second.end(); ++mi) { - const CTransaction& orphanTx = (*mi)->second.tx; + const CTransactionRef& porphanTx = (*mi)->second.tx; + const CTransaction& orphanTx = *porphanTx; const uint256& orphanHash = orphanTx.GetHash(); NodeId fromPeer = (*mi)->second.fromPeer; bool fMissingInputs2 = false; @@ -1646,7 +1653,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, if (setMisbehaving.count(fromPeer)) continue; - if (AcceptToMemoryPool(mempool, stateDummy, orphanTx, true, &fMissingInputs2)) { + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString()); RelayTransaction(orphanTx, connman); for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { @@ -1668,7 +1675,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Probably non-standard or insufficient fee/priority LogPrint("mempool", " removed orphan tx %s\n", orphanHash.ToString()); vEraseQueue.push_back(orphanHash); - if (orphanTx.wit.IsNull() && !stateDummy.CorruptionPossible()) { + if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { // Do not use rejection cache for witness transactions or // witness-stripped transactions, as they can have been malleated. // See https://github.com/bitcoin/bitcoin/issues/8279 for details. @@ -1699,7 +1706,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, pfrom->AddInventoryKnown(_inv); if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); } - AddOrphanTx(tx, pfrom->GetId()); + AddOrphanTx(ptx, pfrom->GetId()); // DoS prevention: do not allow mapOrphanTransactions to grow unbounded unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); @@ -1710,7 +1717,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, LogPrint("mempool", "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString()); } } else { - if (tx.wit.IsNull() && !state.CorruptionPossible()) { + if (!tx.HasWitness() && !state.CorruptionPossible()) { // Do not use rejection cache for witness transactions or // witness-stripped transactions, as they can have been malleated. // See https://github.com/bitcoin/bitcoin/issues/8279 for details. @@ -1782,11 +1789,24 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } } + // When we succeed in decoding a block's txids from a cmpctblock + // message we typically jump to the BLOCKTXN handling code, with a + // dummy (empty) BLOCKTXN message, to re-use the logic there in + // completing processing of the putative block (without cs_main). + bool fProcessBLOCKTXN = false; + CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION); + + // If we end up treating this as a plain headers message, call that as well + // without cs_main. + bool fRevertToHeaderProcessing = false; + CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION); + // Keep a CBlock for "optimistic" compactblock reconstructions (see // below) std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(); bool fBlockReconstructed = false; + { LOCK(cs_main); // If AcceptBlockHeader returned true, it set pindex assert(pindex); @@ -1868,9 +1888,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Dirty hack to jump to BLOCKTXN code (TODO: move message handling into their own functions) BlockTransactions txn; txn.blockhash = cmpctblock.header.GetHash(); - CDataStream blockTxnMsg(SER_NETWORK, PROTOCOL_VERSION); blockTxnMsg << txn; - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman); + fProcessBLOCKTXN = true; } else { req.blockhash = pindex->GetBlockHash(); connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); @@ -1906,11 +1925,17 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // Dirty hack to process as if it were just a headers message (TODO: move message handling into their own functions) std::vector<CBlock> headers; headers.push_back(cmpctblock.header); - CDataStream vHeadersMsg(SER_NETWORK, PROTOCOL_VERSION); vHeadersMsg << headers; - return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); + fRevertToHeaderProcessing = true; } } + } // cs_main + + if (fProcessBLOCKTXN) + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, interruptMsgProc); + + if (fRevertToHeaderProcessing) + return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman, interruptMsgProc); if (fBlockReconstructed) { // If we got here, we were able to optimistically reconstruct a @@ -2423,7 +2448,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, } // requires LOCK(cs_vRecvMsg) -bool ProcessMessages(CNode* pfrom, CConnman& connman) +bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const CChainParams& chainparams = Params(); unsigned int nMaxSendBufferSize = connman.GetSendBufferSize(); @@ -2441,7 +2466,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fOk = true; if (!pfrom->vRecvGetData.empty()) - ProcessGetData(pfrom, chainparams.GetConsensus(), connman); + ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return fOk; @@ -2502,8 +2527,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) bool fRet = false; try { - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman); - boost::this_thread::interruption_point(); + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); + if (interruptMsgProc) + return true; } catch (const std::ios_base::failure& e) { @@ -2528,9 +2554,6 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman) PrintExceptionContinue(&e, "ProcessMessages()"); } } - catch (const boost::thread_interrupted&) { - throw; - } catch (const std::exception& e) { PrintExceptionContinue(&e, "ProcessMessages()"); } catch (...) { @@ -2567,7 +2590,7 @@ public: } }; -bool SendMessages(CNode* pto, CConnman& connman) +bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interruptMsgProc) { const Consensus::Params& consensusParams = Params().GetConsensus(); { |