From 89eeb4a3335f8e871cc3f5286af4546dff66172a Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Wed, 29 Jan 2020 08:12:59 -0800 Subject: [mempool] Track "unbroadcast" transactions - Mempool tracks locally submitted transactions (wallet or rpc) - Transactions are removed from set when the node receives a GETDATA request from a peer, or if the transaction is removed from the mempool. --- src/net_processing.cpp | 10 ++++++++-- src/node/transaction.cpp | 4 ++++ src/txmempool.cpp | 11 +++++++++++ src/txmempool.h | 18 ++++++++++++++++++ 4 files changed, 41 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ab430cbe19..4756d35fe5 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1556,7 +1556,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c } } -void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic& interruptMsgProc) LOCKS_EXCLUDED(cs_main) +void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, CTxMemPool& mempool, const std::atomic& interruptMsgProc) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); @@ -1605,7 +1605,13 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm push = true; } } - if (!push) { + + if (push) { + // We interpret fulfilling a GETDATA for a transaction as a + // successful initial broadcast and remove it from our + // unbroadcast set. + mempool.RemoveUnbroadcastTx(inv.hash); + } else { vNotFound.push_back(inv); } } diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 201406ce3b..3841d8687d 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -78,6 +78,10 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t } if (relay) { + // the mempool tracks locally submitted transactions to make a + // best-effort of initial broadcast + node.mempool->AddUnbroadcastTx(hashTx); + RelayTransaction(hashTx, *node.connman); } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 47b0d39ea4..a6701d0f8c 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -417,6 +417,8 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) for (const CTxIn& txin : it->GetTx().vin) mapNextTx.erase(txin.prevout); + RemoveUnbroadcastTx(hash, true /* add logging because unchecked */ ); + if (vTxHashes.size() > 1) { vTxHashes[it->vTxHashesIdx] = std::move(vTxHashes.back()); vTxHashes[it->vTxHashesIdx].second->vTxHashesIdx = it->vTxHashesIdx; @@ -919,6 +921,15 @@ size_t CTxMemPool::DynamicMemoryUsage() const { return memusage::MallocUsage(sizeof(CTxMemPoolEntry) + 12 * sizeof(void*)) * mapTx.size() + memusage::DynamicUsage(mapNextTx) + memusage::DynamicUsage(mapDeltas) + memusage::DynamicUsage(mapLinks) + memusage::DynamicUsage(vTxHashes) + cachedInnerUsage; } +void CTxMemPool::RemoveUnbroadcastTx(const uint256& txid, const bool unchecked) { + LOCK(cs); + + if (m_unbroadcast_txids.erase(txid)) + { + LogPrint(BCLog::MEMPOOL, "Removed %i from set of unbroadcast txns%s\n", txid.GetHex(), (unchecked ? " before confirmation that txn was sent out" : "")); + } +} + void CTxMemPool::RemoveStaged(setEntries &stage, bool updateDescendants, MemPoolRemovalReason reason) { AssertLockHeld(cs); UpdateForRemoveFromMempool(stage, updateDescendants); diff --git a/src/txmempool.h b/src/txmempool.h index 3dae0a04c7..4bee78b8d6 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -549,6 +549,9 @@ private: std::vector GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs); + /** track locally submitted transactions to periodically retry initial broadcast */ + std::set m_unbroadcast_txids GUARDED_BY(cs); + public: indirectmap mapNextTx GUARDED_BY(cs); std::map mapDeltas; @@ -698,6 +701,21 @@ public: size_t DynamicMemoryUsage() const; + /** Adds a transaction to the unbroadcast set */ + void AddUnbroadcastTx(const uint256& txid) { + LOCK(cs); + m_unbroadcast_txids.insert(txid); + } + + /** Removes a transaction from the unbroadcast set */ + void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false); + + /** Returns transactions in unbroadcast set */ + const std::set GetUnbroadcastTxs() const { + LOCK(cs); + return m_unbroadcast_txids; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the -- cgit v1.2.3 From 7e93eecce3bc5a1b7bb0284e06f9e2e69454f5ba Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Wed, 22 Apr 2020 18:35:07 -0700 Subject: [util] Add method that returns random time in milliseconds --- src/random.cpp | 5 +++++ src/random.h | 1 + 2 files changed, 6 insertions(+) (limited to 'src') diff --git a/src/random.cpp b/src/random.cpp index 2a27e6ba0d..f1032feaaf 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -592,6 +592,11 @@ std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) return std::chrono::microseconds{GetRand(duration_max.count())}; } +std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept +{ + return std::chrono::milliseconds{GetRand(duration_max.count())}; +} + int GetRandInt(int nMax) noexcept { return GetRand(nMax); diff --git a/src/random.h b/src/random.h index 518a5cd3e3..46e3ecc34c 100644 --- a/src/random.h +++ b/src/random.h @@ -69,6 +69,7 @@ void GetRandBytes(unsigned char* buf, int num) noexcept; uint64_t GetRand(uint64_t nMax) noexcept; std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; +std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept; int GetRandInt(int nMax) noexcept; uint256 GetRandHash() noexcept; -- cgit v1.2.3 From e25e42f20a3aa39651fbc1f9fa3df1a49f1f5868 Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Wed, 29 Jan 2020 08:19:27 -0800 Subject: [p2p] Reattempt initial send of unbroadcast transactions Every 10-15 minutes, the scheduler kicks off a job that queues unbroadcast transactions onto each node. --- src/net_processing.cpp | 17 +++++++++++++++++ src/net_processing.h | 2 ++ 2 files changed, 19 insertions(+) (limited to 'src') diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 4756d35fe5..d641c2b0cb 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -779,6 +779,19 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) { PushNodeVersion(pnode, connman, GetTime()); } +void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const +{ + std::set unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); + + for (const uint256& txid : unbroadcast_txids) { + RelayTransaction(txid, *connman); + } + + // schedule next run for 10-15 minutes in the future + const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); +} + void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { fUpdateConnectionTime = false; LOCK(cs_main); @@ -1128,6 +1141,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS // timer. static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery([this, consensusParams] { this->CheckForStaleTipAndEvictPeers(consensusParams); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); + + // schedule next run for 10-15 minutes in the future + const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); } /** diff --git a/src/net_processing.h b/src/net_processing.h index 65e3963c41..d02678c7c3 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -75,6 +75,8 @@ public: void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ + void ReattemptInitialBroadcast(CScheduler& scheduler) const; private: int64_t m_stale_tip_check_time; //!< Next time to check for stale tip -- cgit v1.2.3 From dc1da48dc5e5526215561311c184a8cbc345ecdc Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Wed, 29 Jan 2020 08:21:02 -0800 Subject: [wallet] Update the rebroadcast frequency to be ~1/day. Since the mempool unbroadcast mechanism handles the reattempts for initial broadcast, the wallet rebroadcast attempts can be much less frequent (previously ~1/30 min) --- src/wallet/wallet.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 9a972febab..6eee0ccfc6 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1978,7 +1978,8 @@ void CWallet::ResendWalletTransactions() // that these are our transactions. if (GetTime() < nNextResend || !fBroadcastTransactions) return; bool fFirst = (nNextResend == 0); - nNextResend = GetTime() + GetRand(30 * 60); + // resend 12-36 hours from now, ~1 day on average. + nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60); if (fFirst) return; // Only do it if there's been a new block since last time -- cgit v1.2.3 From 50fc4df6c4e8a84bdda13ade7bed7a2131796f00 Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Tue, 17 Mar 2020 10:39:25 -0700 Subject: [mempool] Persist unbroadcast set to mempool.dat Ensure that the unbroadcast set will still be meaningful if the node is restarted. --- src/validation.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/validation.cpp b/src/validation.cpp index 7ee94f8657..8282402d9a 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -4970,6 +4970,7 @@ bool LoadMempool(CTxMemPool& pool) int64_t expired = 0; int64_t failed = 0; int64_t already_there = 0; + int64_t unbroadcast = 0; int64_t nNow = GetTime(); try { @@ -5023,12 +5024,21 @@ bool LoadMempool(CTxMemPool& pool) for (const auto& i : mapDeltas) { pool.PrioritiseTransaction(i.first, i.second); } + + std::set unbroadcast_txids; + file >> unbroadcast_txids; + unbroadcast = unbroadcast_txids.size(); + + for (const auto& txid : unbroadcast_txids) { + pool.AddUnbroadcastTx(txid); + } + } catch (const std::exception& e) { LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what()); return false; } - LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there); + LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast); return true; } @@ -5038,6 +5048,7 @@ bool DumpMempool(const CTxMemPool& pool) std::map mapDeltas; std::vector vinfo; + std::set unbroadcast_txids; static Mutex dump_mutex; LOCK(dump_mutex); @@ -5048,6 +5059,7 @@ bool DumpMempool(const CTxMemPool& pool) mapDeltas[i.first] = i.second; } vinfo = pool.infoAll(); + unbroadcast_txids = pool.GetUnbroadcastTxs(); } int64_t mid = GetTimeMicros(); @@ -5072,6 +5084,10 @@ bool DumpMempool(const CTxMemPool& pool) } file << mapDeltas; + + LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size()); + file << unbroadcast_txids; + if (!FileCommit(file.Get())) throw std::runtime_error("FileCommit failed"); file.fclose(); -- cgit v1.2.3