From 818075adaca9104539f056cdb858e135a49de2ed Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 4 Dec 2017 17:10:45 -0500 Subject: Create new mutex for orphans, no cs_main in PLV::BlockConnected This should (marginally) speed up validationinterface queue draining by avoiding a cs_main lock in one client. --- src/net_processing.cpp | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 20426eaceb..d1a89ed91c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -51,12 +51,13 @@ struct COrphanTx { NodeId fromPeer; int64_t nTimeExpire; }; -std::map mapOrphanTransactions GUARDED_BY(cs_main); -std::map::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(cs_main); -void EraseOrphansFor(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main); +static CCriticalSection g_cs_orphans; +std::map mapOrphanTransactions GUARDED_BY(g_cs_orphans); +std::map::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans); +void EraseOrphansFor(NodeId peer); -static size_t vExtraTxnForCompactIt = 0; -static std::vector> vExtraTxnForCompact GUARDED_BY(cs_main); +static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; +static std::vector> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8] @@ -127,7 +128,7 @@ namespace { int g_outbound_peers_with_protect_from_disconnect = 0; /** When our tip was last updated. */ - int64_t g_last_tip_update = 0; + std::atomic g_last_tip_update(0); /** Relay map, protected by cs_main. */ typedef std::map MapRelay; @@ -631,7 +632,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { // mapOrphanTransactions // -void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { size_t max_extra_txn = gArgs.GetArg("-blockreconstructionextratxn", DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN); if (max_extra_txn <= 0) @@ -642,7 +643,7 @@ void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_RE vExtraTxnForCompactIt = (vExtraTxnForCompactIt + 1) % max_extra_txn; } -bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { const uint256& hash = tx->GetHash(); if (mapOrphanTransactions.count(hash)) @@ -675,7 +676,7 @@ bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRE return true; } -int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) { std::map::iterator it = mapOrphanTransactions.find(hash); if (it == mapOrphanTransactions.end()) @@ -695,6 +696,7 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) void EraseOrphansFor(NodeId peer) { + LOCK(g_cs_orphans); int nErased = 0; std::map::iterator iter = mapOrphanTransactions.begin(); while (iter != mapOrphanTransactions.end()) @@ -709,8 +711,10 @@ void EraseOrphansFor(NodeId peer) } -unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) { + LOCK(g_cs_orphans); + unsigned int nEvicted = 0; static int64_t nNextSweep; int64_t nNow = GetTime(); @@ -804,7 +808,7 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &schedu } void PeerLogicValidation::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex, const std::vector& vtxConflicted) { - LOCK(cs_main); + LOCK(g_cs_orphans); std::vector vOrphanErase; @@ -971,9 +975,13 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) recentRejects->reset(); } + { + LOCK(g_cs_orphans); + if (mapOrphanTransactions.count(inv.hash)) return true; + } + return recentRejects->contains(inv.hash) || mempool.exists(inv.hash) || - mapOrphanTransactions.count(inv.hash) || pcoinsTip->HaveCoinInCache(COutPoint(inv.hash, 0)) || // Best effort: only try output 0 and 1 pcoinsTip->HaveCoinInCache(COutPoint(inv.hash, 1)); } @@ -2101,7 +2109,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CInv inv(MSG_TX, tx.GetHash()); pfrom->AddInventoryKnown(inv); - LOCK(cs_main); + LOCK2(cs_main, g_cs_orphans); bool fMissingInputs = false; CValidationState state; @@ -2324,7 +2332,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fBlockReconstructed = false; { - LOCK(cs_main); + LOCK2(cs_main, g_cs_orphans); // If AcceptBlockHeader returned true, it set pindex assert(pindex); UpdateBlockAvailability(pfrom->GetId(), pindex->GetBlockHash()); -- cgit v1.2.3 From 66aa1d58a158991a8014a91335b5bc9c00062f56 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 24 Dec 2017 11:58:20 -0500 Subject: Refactor ProcessGetData in anticipation of avoiding cs_main for ABC --- src/net_processing.cpp | 321 +++++++++++++++++++++++++------------------------ 1 file changed, 166 insertions(+), 155 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d1a89ed91c..3225d429b9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1038,182 +1038,193 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connma connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); } -void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic& interruptMsgProc) +void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic& interruptMsgProc) { - std::deque::iterator it = pfrom->vRecvGetData.begin(); - std::vector vNotFound; - const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); LOCK(cs_main); - while (it != pfrom->vRecvGetData.end()) { - // Don't bother if send buffer is too full to respond anyway - if (pfrom->fPauseSend) - break; + bool send = false; + std::shared_ptr a_recent_block; + std::shared_ptr a_recent_compact_block; + bool fWitnessesPresentInARecentCompactBlock; + { + LOCK(cs_most_recent_block); + a_recent_block = most_recent_block; + a_recent_compact_block = most_recent_compact_block; + fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock; + } - const CInv &inv = *it; + { + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) { - if (interruptMsgProc) - return; + if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && + mi->second->IsValid(BLOCK_VALID_TREE)) { + // If we have the block and all of its parents, but have not yet validated it, + // we might be in the middle of connecting it (ie in the unlock of cs_main + // before ActivateBestChain but after AcceptBlock). + // In this case, we need to run ActivateBestChain prior to checking the relay + // conditions below. + CValidationState dummy; + ActivateBestChain(dummy, Params(), a_recent_block); + } + } + } + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi != mapBlockIndex.end()) { + send = BlockRequestAllowed(mi->second, consensusParams); + if (!send) { + LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId()); + } + } + const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + // disconnect node in case we have reached the outbound limit for serving historical blocks + // never disconnect whitelisted nodes + if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) + { + LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); - it++; + //disconnect node + pfrom->fDisconnect = true; + send = false; + } + // Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold + if (send && !pfrom->fWhitelisted && ( + (((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) ) + )) { + LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId()); - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) + //disconnect node and prevent it from stalling (would otherwise wait for the missing block) + pfrom->fDisconnect = true; + send = false; + } + // Pruned nodes may have deleted the block, so check whether + // it's available before trying to send. + if (send && (mi->second->nStatus & BLOCK_HAVE_DATA)) + { + std::shared_ptr pblock; + if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) { + pblock = a_recent_block; + } else { + // Send block from disk + std::shared_ptr pblockRead = std::make_shared(); + if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams)) + assert(!"cannot load block from disk"); + pblock = pblockRead; + } + if (inv.type == MSG_BLOCK) + connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); + else if (inv.type == MSG_WITNESS_BLOCK) + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); + else if (inv.type == MSG_FILTERED_BLOCK) + { + bool sendMerkleBlock = false; + CMerkleBlock merkleBlock; { - bool send = false; - BlockMap::iterator mi = mapBlockIndex.find(inv.hash); - std::shared_ptr a_recent_block; - std::shared_ptr a_recent_compact_block; - bool fWitnessesPresentInARecentCompactBlock; - { - LOCK(cs_most_recent_block); - a_recent_block = most_recent_block; - a_recent_compact_block = most_recent_compact_block; - fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock; + LOCK(pfrom->cs_filter); + if (pfrom->pfilter) { + sendMerkleBlock = true; + merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); } - if (mi != mapBlockIndex.end()) - { - if (mi->second->nChainTx && !mi->second->IsValid(BLOCK_VALID_SCRIPTS) && - mi->second->IsValid(BLOCK_VALID_TREE)) { - // If we have the block and all of its parents, but have not yet validated it, - // we might be in the middle of connecting it (ie in the unlock of cs_main - // before ActivateBestChain but after AcceptBlock). - // In this case, we need to run ActivateBestChain prior to checking the relay - // conditions below. - CValidationState dummy; - ActivateBestChain(dummy, Params(), a_recent_block); - } - send = BlockRequestAllowed(mi->second, consensusParams); - if (!send) { - LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom->GetId()); - } + } + if (sendMerkleBlock) { + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); + // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see + // This avoids hurting performance by pointlessly requiring a round-trip + // Note that there is currently no way for a node to request any single transactions we didn't send here - + // they must either disconnect and retry or request the full block. + // Thus, the protocol spec specified allows for us to provide duplicate txn here, + // however we MUST always provide at least what the remote peer needs + typedef std::pair PairType; + for (PairType& pair : merkleBlock.vMatchedTxn) + connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); + } + // else + // no response + } + else if (inv.type == MSG_CMPCT_BLOCK) + { + // If a peer is asking for old blocks, we're almost guaranteed + // they won't have a useful mempool to match against a compact block, + // and we don't feel like constructing the object for them, so + // instead we respond with the full, non-compact block. + bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness; + int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; + if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { + if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); + } else { + CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); } - // disconnect node in case we have reached the outbound limit for serving historical blocks - // never disconnect whitelisted nodes - if (send && connman->OutboundTargetReached(true) && ( ((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - mi->second->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && !pfrom->fWhitelisted) - { - LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom->GetId()); + } else { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock)); + } + } - //disconnect node - pfrom->fDisconnect = true; - send = false; - } - // Avoid leaking prune-height by never sending blocks below the NODE_NETWORK_LIMITED threshold - if (send && !pfrom->fWhitelisted && ( - (((pfrom->GetLocalServices() & NODE_NETWORK_LIMITED) == NODE_NETWORK_LIMITED) && ((pfrom->GetLocalServices() & NODE_NETWORK) != NODE_NETWORK) && (chainActive.Tip()->nHeight - mi->second->nHeight > (int)NODE_NETWORK_LIMITED_MIN_BLOCKS + 2 /* add two blocks buffer extension for possible races */) ) - )) { - LogPrint(BCLog::NET, "Ignore block request below NODE_NETWORK_LIMITED threshold from peer=%d\n", pfrom->GetId()); + // Trigger the peer node to send a getblocks request for the next batch of inventory + if (inv.hash == pfrom->hashContinue) + { + // Bypass PushInventory, this must send even if redundant, + // and we want it right after the last block so they don't + // wait for other stuff first. + std::vector vInv; + vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); + connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); + pfrom->hashContinue.SetNull(); + } + } +} - //disconnect node and prevent it from stalling (would otherwise wait for the missing block) - pfrom->fDisconnect = true; - send = false; - } - // Pruned nodes may have deleted the block, so check whether - // it's available before trying to send. - if (send && (mi->second->nStatus & BLOCK_HAVE_DATA)) - { - std::shared_ptr pblock; - if (a_recent_block && a_recent_block->GetHash() == (*mi).second->GetBlockHash()) { - pblock = a_recent_block; - } else { - // Send block from disk - std::shared_ptr pblockRead = std::make_shared(); - if (!ReadBlockFromDisk(*pblockRead, (*mi).second, consensusParams)) - assert(!"cannot load block from disk"); - pblock = pblockRead; - } - if (inv.type == MSG_BLOCK) - connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_WITNESS_BLOCK) - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_FILTERED_BLOCK) - { - bool sendMerkleBlock = false; - CMerkleBlock merkleBlock; - { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) { - sendMerkleBlock = true; - merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); - } - } - if (sendMerkleBlock) { - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); - // CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see - // This avoids hurting performance by pointlessly requiring a round-trip - // Note that there is currently no way for a node to request any single transactions we didn't send here - - // they must either disconnect and retry or request the full block. - // Thus, the protocol spec specified allows for us to provide duplicate txn here, - // however we MUST always provide at least what the remote peer needs - typedef std::pair PairType; - for (PairType& pair : merkleBlock.vMatchedTxn) - connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); - } - // else - // no response - } - else if (inv.type == MSG_CMPCT_BLOCK) - { - // If a peer is asking for old blocks, we're almost guaranteed - // they won't have a useful mempool to match against a compact block, - // and we don't feel like constructing the object for them, so - // instead we respond with the full, non-compact block. - bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness; - int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; - if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) { - if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == mi->second->GetBlockHash()) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block)); - } else { - CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); - } - } else { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock)); - } - } +void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic& interruptMsgProc) +{ + std::deque::iterator it = pfrom->vRecvGetData.begin(); + std::vector vNotFound; + const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); + { + LOCK(cs_main); - // Trigger the peer node to send a getblocks request for the next batch of inventory - if (inv.hash == pfrom->hashContinue) - { - // Bypass PushInventory, this must send even if redundant, - // and we want it right after the last block so they don't - // wait for other stuff first. - std::vector vInv; - vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash())); - connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv)); - pfrom->hashContinue.SetNull(); - } - } - } - else if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) - { - // Send stream from relay memory - bool push = false; - auto mi = mapRelay.find(inv.hash); - int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); - if (mi != mapRelay.end()) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); + while (it != pfrom->vRecvGetData.end() && (it->type == MSG_TX || it->type == MSG_WITNESS_TX)) { + if (interruptMsgProc) + return; + // Don't bother if send buffer is too full to respond anyway + if (pfrom->fPauseSend) + break; + + const CInv &inv = *it; + it++; + + // Send stream from relay memory + bool push = false; + auto mi = mapRelay.find(inv.hash); + int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); + if (mi != mapRelay.end()) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); + push = true; + } else if (pfrom->timeLastMempoolReq) { + auto txinfo = mempool.info(inv.hash); + // To protect privacy, do not answer getdata using the mempool when + // that TX couldn't have been INVed in reply to a MEMPOOL request. + if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { + connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); push = true; - } else if (pfrom->timeLastMempoolReq) { - auto txinfo = mempool.info(inv.hash); - // To protect privacy, do not answer getdata using the mempool when - // that TX couldn't have been INVed in reply to a MEMPOOL request. - if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { - connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); - push = true; - } - } - if (!push) { - vNotFound.push_back(inv); } } + if (!push) { + vNotFound.push_back(inv); + } // Track requests for our stuff. GetMainSignals().Inventory(inv.hash); + } - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) - break; + if (it != pfrom->vRecvGetData.end()) { + const CInv &inv = *it; + it++; + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { + ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc); + } } - } + } // release cs_main pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); -- cgit v1.2.3 From a7348960389af9d86983e767b4aea2c7778ab726 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 24 Dec 2017 11:59:02 -0500 Subject: Avoid cs_main in net_processing ActivateBestChain calls --- src/net_processing.cpp | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3225d429b9..76e82a5ef6 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1040,8 +1040,6 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connma void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic& interruptMsgProc) { - LOCK(cs_main); - bool send = false; std::shared_ptr a_recent_block; std::shared_ptr a_recent_compact_block; @@ -1053,7 +1051,9 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus fWitnessesPresentInARecentCompactBlock = fWitnessesPresentInMostRecentCompactBlock; } + bool need_activate_chain = false; { + LOCK(cs_main); BlockMap::iterator mi = mapBlockIndex.find(inv.hash); if (mi != mapBlockIndex.end()) { @@ -1064,11 +1064,16 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus // before ActivateBestChain but after AcceptBlock). // In this case, we need to run ActivateBestChain prior to checking the relay // conditions below. - CValidationState dummy; - ActivateBestChain(dummy, Params(), a_recent_block); + need_activate_chain = true; } } + } // release cs_main before calling ActivateBestChain + if (need_activate_chain) { + CValidationState dummy; + ActivateBestChain(dummy, Params(), a_recent_block); } + + LOCK(cs_main); BlockMap::iterator mi = mapBlockIndex.find(inv.hash); if (mi != mapBlockIndex.end()) { send = BlockRequestAllowed(mi->second, consensusParams); @@ -1177,6 +1182,8 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic& interruptMsgProc) { + AssertLockNotHeld(cs_main); + std::deque::iterator it = pfrom->vRecvGetData.begin(); std::vector vNotFound; const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); @@ -1216,15 +1223,15 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam // Track requests for our stuff. GetMainSignals().Inventory(inv.hash); } + } // release cs_main - if (it != pfrom->vRecvGetData.end()) { - const CInv &inv = *it; - it++; - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { - ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc); - } + if (it != pfrom->vRecvGetData.end()) { + const CInv &inv = *it; + it++; + if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { + ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc); } - } // release cs_main + } pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it); @@ -2027,7 +2034,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr inv.type = State(pfrom->GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; inv.hash = req.blockhash; pfrom->vRecvGetData.push_back(inv); - ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main) return true; } -- cgit v1.2.3 From a99b76f26958829c2b9ca4ffa5b1d81912b8acc7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 4 Dec 2017 18:25:57 -0500 Subject: Require no cs_main lock for ProcessNewBlock/ActivateBestChain This requires the removal of some very liberal (incorrect) cs_mains sprinkled in some tests. It adds some chainActive.Tip() races, but the tests are all single-threaded anyway. --- src/test/miner_tests.cpp | 38 ++++++++++++++++++++---------------- src/test/txvalidationcache_tests.cpp | 21 ++++++++++---------- src/validation.cpp | 3 +++ src/wallet/test/wallet_tests.cpp | 17 ++++++++++------ 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/src/test/miner_tests.cpp b/src/test/miner_tests.cpp index b1a2032ea8..f968869bf5 100644 --- a/src/test/miner_tests.cpp +++ b/src/test/miner_tests.cpp @@ -205,7 +205,6 @@ BOOST_AUTO_TEST_CASE(CreateNewBlock_validity) entry.nFee = 11; entry.nHeight = 11; - LOCK(cs_main); fCheckpointsEnabled = false; // Simple block creation, nothing special yet: @@ -218,27 +217,32 @@ BOOST_AUTO_TEST_CASE(CreateNewBlock_validity) for (unsigned int i = 0; i < sizeof(blockinfo)/sizeof(*blockinfo); ++i) { CBlock *pblock = &pblocktemplate->block; // pointer for convenience - pblock->nVersion = 1; - pblock->nTime = chainActive.Tip()->GetMedianTimePast()+1; - CMutableTransaction txCoinbase(*pblock->vtx[0]); - txCoinbase.nVersion = 1; - txCoinbase.vin[0].scriptSig = CScript(); - txCoinbase.vin[0].scriptSig.push_back(blockinfo[i].extranonce); - txCoinbase.vin[0].scriptSig.push_back(chainActive.Height()); - txCoinbase.vout.resize(1); // Ignore the (optional) segwit commitment added by CreateNewBlock (as the hardcoded nonces don't account for this) - txCoinbase.vout[0].scriptPubKey = CScript(); - pblock->vtx[0] = MakeTransactionRef(std::move(txCoinbase)); - if (txFirst.size() == 0) - baseheight = chainActive.Height(); - if (txFirst.size() < 4) - txFirst.push_back(pblock->vtx[0]); - pblock->hashMerkleRoot = BlockMerkleRoot(*pblock); - pblock->nNonce = blockinfo[i].nonce; + { + LOCK(cs_main); + pblock->nVersion = 1; + pblock->nTime = chainActive.Tip()->GetMedianTimePast()+1; + CMutableTransaction txCoinbase(*pblock->vtx[0]); + txCoinbase.nVersion = 1; + txCoinbase.vin[0].scriptSig = CScript(); + txCoinbase.vin[0].scriptSig.push_back(blockinfo[i].extranonce); + txCoinbase.vin[0].scriptSig.push_back(chainActive.Height()); + txCoinbase.vout.resize(1); // Ignore the (optional) segwit commitment added by CreateNewBlock (as the hardcoded nonces don't account for this) + txCoinbase.vout[0].scriptPubKey = CScript(); + pblock->vtx[0] = MakeTransactionRef(std::move(txCoinbase)); + if (txFirst.size() == 0) + baseheight = chainActive.Height(); + if (txFirst.size() < 4) + txFirst.push_back(pblock->vtx[0]); + pblock->hashMerkleRoot = BlockMerkleRoot(*pblock); + pblock->nNonce = blockinfo[i].nonce; + } std::shared_ptr shared_pblock = std::make_shared(*pblock); BOOST_CHECK(ProcessNewBlock(chainparams, shared_pblock, true, nullptr)); pblock->hashPrevBlock = pblock->GetHash(); } + LOCK(cs_main); + // Just to make sure we can still make simple blocks BOOST_CHECK(pblocktemplate = AssemblerForTest(chainparams).CreateNewBlock(scriptPubKey)); diff --git a/src/test/txvalidationcache_tests.cpp b/src/test/txvalidationcache_tests.cpp index fe8cb6126e..10acabe0be 100644 --- a/src/test/txvalidationcache_tests.cpp +++ b/src/test/txvalidationcache_tests.cpp @@ -66,7 +66,6 @@ BOOST_FIXTURE_TEST_CASE(tx_mempool_block_doublespend, TestChain100Setup) // Test 1: block with both of those transactions should be rejected. block = CreateAndProcessBlock(spends, scriptPubKey); - LOCK(cs_main); BOOST_CHECK(chainActive.Tip()->GetBlockHash() != block.GetHash()); // Test 2: ... and should be rejected if spend1 is in the memory pool @@ -190,12 +189,12 @@ BOOST_FIXTURE_TEST_CASE(checkinputs_test, TestChain100Setup) spend_tx.vin[0].scriptSig << vchSig; } - LOCK(cs_main); - // Test that invalidity under a set of flags doesn't preclude validity // under other (eg consensus) flags. // spend_tx is invalid according to DERSIG { + LOCK(cs_main); + CValidationState state; PrecomputedTransactionData ptd_spend_tx(spend_tx); @@ -213,15 +212,17 @@ BOOST_FIXTURE_TEST_CASE(checkinputs_test, TestChain100Setup) // test later that block validation works fine in the absence of cached // successes. ValidateCheckInputsForAllFlags(spend_tx, SCRIPT_VERIFY_DERSIG | SCRIPT_VERIFY_LOW_S | SCRIPT_VERIFY_STRICTENC, false); + } - // And if we produce a block with this tx, it should be valid (DERSIG not - // enabled yet), even though there's no cache entry. - CBlock block; + // And if we produce a block with this tx, it should be valid (DERSIG not + // enabled yet), even though there's no cache entry. + CBlock block; - block = CreateAndProcessBlock({spend_tx}, p2pk_scriptPubKey); - BOOST_CHECK(chainActive.Tip()->GetBlockHash() == block.GetHash()); - BOOST_CHECK(pcoinsTip->GetBestBlock() == block.GetHash()); - } + block = CreateAndProcessBlock({spend_tx}, p2pk_scriptPubKey); + BOOST_CHECK(chainActive.Tip()->GetBlockHash() == block.GetHash()); + BOOST_CHECK(pcoinsTip->GetBestBlock() == block.GetHash()); + + LOCK(cs_main); // Test P2SH: construct a transaction that is valid without P2SH, and // then test validity with P2SH. diff --git a/src/validation.cpp b/src/validation.cpp index 75c40b22fc..71a51c499c 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2559,6 +2559,7 @@ bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& // far from a guarantee. Things in the P2P/RPC will often end up calling // us in the middle of ProcessNewBlock - do not assume pblock is set // sanely for performance or correctness! + AssertLockNotHeld(cs_main); CBlockIndex *pindexMostWork = nullptr; CBlockIndex *pindexNewTip = nullptr; @@ -3383,6 +3384,8 @@ bool CChainState::AcceptBlock(const std::shared_ptr& pblock, CVali bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr pblock, bool fForceProcessing, bool *fNewBlock) { + AssertLockNotHeld(cs_main); + { CBlockIndex *pindex = nullptr; if (fNewBlock) *fNewBlock = false; diff --git a/src/wallet/test/wallet_tests.cpp b/src/wallet/test/wallet_tests.cpp index 80e31a1ce0..8e9362c649 100644 --- a/src/wallet/test/wallet_tests.cpp +++ b/src/wallet/test/wallet_tests.cpp @@ -370,8 +370,6 @@ static void AddKey(CWallet& wallet, const CKey& key) BOOST_FIXTURE_TEST_CASE(rescan, TestChain100Setup) { - LOCK(cs_main); - // Cap last block file size, and mine new block in a new block file. CBlockIndex* const nullBlock = nullptr; CBlockIndex* oldTip = chainActive.Tip(); @@ -379,6 +377,8 @@ BOOST_FIXTURE_TEST_CASE(rescan, TestChain100Setup) CreateAndProcessBlock({}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())); CBlockIndex* newTip = chainActive.Tip(); + LOCK(cs_main); + // Verify ScanForWalletTransactions picks up transactions in both the old // and new block files. { @@ -447,8 +447,6 @@ BOOST_FIXTURE_TEST_CASE(rescan, TestChain100Setup) // than or equal to key birthday. BOOST_FIXTURE_TEST_CASE(importwallet_rescan, TestChain100Setup) { - LOCK(cs_main); - // Create two blocks with same timestamp to verify that importwallet rescan // will pick up both blocks, not just the first. const int64_t BLOCK_TIME = chainActive.Tip()->GetBlockTimeMax() + 5; @@ -462,6 +460,8 @@ BOOST_FIXTURE_TEST_CASE(importwallet_rescan, TestChain100Setup) SetMockTime(KEY_TIME); coinbaseTxns.emplace_back(*CreateAndProcessBlock({}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]); + LOCK(cs_main); + // Import key into wallet and call dumpwallet to create backup file. { CWallet wallet; @@ -627,10 +627,15 @@ public: BOOST_CHECK(wallet->CreateTransaction({recipient}, wtx, reservekey, fee, changePos, error, dummy)); CValidationState state; BOOST_CHECK(wallet->CommitTransaction(wtx, reservekey, nullptr, state)); + CMutableTransaction blocktx; + { + LOCK(wallet->cs_wallet); + blocktx = CMutableTransaction(*wallet->mapWallet.at(wtx.GetHash()).tx); + } + CreateAndProcessBlock({CMutableTransaction(blocktx)}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())); LOCK(wallet->cs_wallet); auto it = wallet->mapWallet.find(wtx.GetHash()); BOOST_CHECK(it != wallet->mapWallet.end()); - CreateAndProcessBlock({CMutableTransaction(*it->second.tx)}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())); it->second.SetMerkleBranch(chainActive.Tip(), 1); return it->second; } @@ -641,7 +646,6 @@ public: BOOST_FIXTURE_TEST_CASE(ListCoins, ListCoinsTestingSetup) { std::string coinbaseAddress = coinbaseKey.GetPubKey().GetID().ToString(); - LOCK2(cs_main, wallet->cs_wallet); // Confirm ListCoins initially returns 1 coin grouped under coinbaseKey // address. @@ -669,6 +673,7 @@ BOOST_FIXTURE_TEST_CASE(ListCoins, ListCoinsTestingSetup) BOOST_CHECK_EQUAL(available.size(), 2); for (const auto& group : list) { for (const auto& coin : group.second) { + LOCK(wallet->cs_wallet); wallet->LockCoin(COutPoint(coin.tx->GetHash(), coin.i)); } } -- cgit v1.2.3 From 5a933cefcc5e0595a1ec46fc5ea287aa163ecd3f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 4 Dec 2017 18:31:36 -0500 Subject: Add an interface to get the queue depth out of CValidationInterface --- src/scheduler.cpp | 5 +++++ src/scheduler.h | 2 ++ src/validationinterface.cpp | 5 +++++ src/validationinterface.h | 2 ++ 4 files changed, 14 insertions(+) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 260f6fa60e..7d5ec14610 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -206,3 +206,8 @@ void SingleThreadedSchedulerClient::EmptyQueue() { should_continue = !m_callbacks_pending.empty(); } } + +size_t SingleThreadedSchedulerClient::CallbacksPending() { + LOCK(m_cs_callbacks_pending); + return m_callbacks_pending.size(); +} diff --git a/src/scheduler.h b/src/scheduler.h index b99f165343..39ecb849b9 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -108,6 +108,8 @@ public: // Processes all remaining queue members on the calling thread, blocking until queue is empty // Must be called after the CScheduler has no remaining processing threads! void EmptyQueue(); + + size_t CallbacksPending(); }; #endif diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index abbd8cc4d2..1ddff4b335 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -54,6 +54,11 @@ void CMainSignals::FlushBackgroundCallbacks() { } } +size_t CMainSignals::CallbacksPending() { + if (!m_internals) return 0; + return m_internals->m_schedulerClient.CallbacksPending(); +} + void CMainSignals::RegisterWithMempoolSignals(CTxMemPool& pool) { pool.NotifyEntryRemoved.connect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2)); } diff --git a/src/validationinterface.h b/src/validationinterface.h index 7b5d138414..164059e3b9 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -131,6 +131,8 @@ public: /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); + size_t CallbacksPending(); + /** Register with mempool to call TransactionRemovedFromMempool callbacks */ void RegisterWithMempoolSignals(CTxMemPool& pool); /** Unregister with mempool */ -- cgit v1.2.3 From 36137497f1e2b3324ca84550f4f295dcd605d1fa Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 4 Dec 2017 18:57:55 -0500 Subject: Block ActivateBestChain to empty validationinterface queue --- src/test/test_bitcoin.cpp | 6 +++--- src/validation.cpp | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/test/test_bitcoin.cpp b/src/test/test_bitcoin.cpp index f52c8ccc21..6bd228a63a 100644 --- a/src/test/test_bitcoin.cpp +++ b/src/test/test_bitcoin.cpp @@ -69,9 +69,9 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha fs::create_directories(pathTemp); gArgs.ForceSetArg("-datadir", pathTemp.string()); - // Note that because we don't bother running a scheduler thread here, - // callbacks via CValidationInterface are unreliable, but that's OK, - // our unit tests aren't testing multiple parts of the code at once. + // We have to run a scheduler thread to prevent ActivateBestChain + // from blocking due to queue overrun. + threadGroup.create_thread(boost::bind(&CScheduler::serviceQueue, &scheduler)); GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); mempool.setSanityCheck(1.0); diff --git a/src/validation.cpp b/src/validation.cpp index 71a51c499c..23ce6509d5 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -40,6 +40,7 @@ #include #include +#include #include #include @@ -2566,6 +2567,18 @@ bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& int nStopAtHeight = gArgs.GetArg("-stopatheight", DEFAULT_STOPATHEIGHT); do { boost::this_thread::interruption_point(); + + if (GetMainSignals().CallbacksPending() > 10) { + // Block until the validation queue drains. This should largely + // never happen in normal operation, however may happen during + // reindex, causing memory blowup if we run too far ahead. + std::promise promise; + CallFunctionInValidationInterfaceQueue([&promise] { + promise.set_value(); + }); + promise.get_future().wait(); + } + if (ShutdownRequested()) break; -- cgit v1.2.3 From 97d2b09c124e6e5803f7fd4503348d9710d1260f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 24 Dec 2017 12:13:13 -0500 Subject: Add helper to wait for validation interface queue to catch up --- src/validation.cpp | 6 +----- src/validationinterface.cpp | 12 ++++++++++++ src/validationinterface.h | 10 ++++++++++ src/wallet/wallet.cpp | 7 +------ 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/validation.cpp b/src/validation.cpp index 23ce6509d5..1c024ce2ae 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2572,11 +2572,7 @@ bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& // Block until the validation queue drains. This should largely // never happen in normal operation, however may happen during // reindex, causing memory blowup if we run too far ahead. - std::promise promise; - CallFunctionInValidationInterfaceQueue([&promise] { - promise.set_value(); - }); - promise.get_future().wait(); + SyncWithValidationInterfaceQueue(); } if (ShutdownRequested()) diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 1ddff4b335..1e13465bf7 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -11,9 +11,11 @@ #include #include #include +#include #include #include +#include #include @@ -118,6 +120,16 @@ void CallFunctionInValidationInterfaceQueue(std::function func) { g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); } +void SyncWithValidationInterfaceQueue() { + AssertLockNotHeld(cs_main); + // Block until the validation queue drains + std::promise promise; + CallFunctionInValidationInterfaceQueue([&promise] { + promise.set_value(); + }); + promise.get_future().wait(); +} + void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) { if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) { m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] { diff --git a/src/validationinterface.h b/src/validationinterface.h index 164059e3b9..f416f6678f 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -42,6 +42,16 @@ void UnregisterAllValidationInterfaces(); * will result in a deadlock (that DEBUG_LOCKORDER will miss). */ void CallFunctionInValidationInterfaceQueue(std::function func); +/** + * This is a synonym for the following, which asserts certain locks are not + * held: + * std::promise promise; + * CallFunctionInValidationInterfaceQueue([&promise] { + * promise.set_value(); + * }); + * promise.get_future().wait(); + */ +void SyncWithValidationInterfaceQueue(); class CValidationInterface { protected: diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index dafd708d09..cf9bb2909f 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1292,12 +1292,7 @@ void CWallet::BlockUntilSyncedToCurrentChain() { // ...otherwise put a callback in the validation interface queue and wait // for the queue to drain enough to execute it (indicating we are caught up // at least with the time we entered this function). - - std::promise promise; - CallFunctionInValidationInterfaceQueue([&promise] { - promise.set_value(); - }); - promise.get_future().wait(); + SyncWithValidationInterfaceQueue(); } -- cgit v1.2.3