diff options
author | fanquake <fanquake@gmail.com> | 2020-04-29 16:12:10 +0800 |
---|---|---|
committer | fanquake <fanquake@gmail.com> | 2020-04-29 16:32:37 +0800 |
commit | 0ef0d33f7562c3b7f9c021549e70b3b4dbcc504c (patch) | |
tree | 670e6e6fbda19683160f8c90cc7e23d42025b0a1 /src | |
parent | ba348dbc518b8e082a5dc3a225432fdacf859a13 (diff) | |
parent | 50fc4df6c4e8a84bdda13ade7bed7a2131796f00 (diff) |
Merge #18038: P2P: Mempool tracks locally submitted transactions to improve wallet privacy
50fc4df6c4e8a84bdda13ade7bed7a2131796f00 [mempool] Persist unbroadcast set to mempool.dat (Amiti Uttarwar)
297a1785360c4db662a7f3d3ade7b6b503258d39 [test] Integration tests for unbroadcast functionality (Amiti Uttarwar)
6851502472d3625416f0e7796e9f2a0379d14d49 [refactor/test] Extract P2PTxInvStore into test framework (Amiti Uttarwar)
dc1da48dc5e5526215561311c184a8cbc345ecdc [wallet] Update the rebroadcast frequency to be ~1/day. (Amiti Uttarwar)
e25e42f20a3aa39651fbc1f9fa3df1a49f1f5868 [p2p] Reattempt initial send of unbroadcast transactions (Amiti Uttarwar)
7e93eecce3bc5a1b7bb0284e06f9e2e69454f5ba [util] Add method that returns random time in milliseconds (Amiti Uttarwar)
89eeb4a3335f8e871cc3f5286af4546dff66172a [mempool] Track "unbroadcast" transactions (Amiti Uttarwar)
Pull request description:
This PR introduces mempool tracking of unbroadcast transactions and periodic reattempts at initial broadcast. This is a part of the rebroadcast project, and a standalone privacy win.
The current rebroadcast logic is terrible for privacy because 1. only the source wallet rebroadcasts transactions and 2. it does so quite frequently. In the current system, if a user submits a transaction that does not immediately get broadcast to the network (eg. they are offline), this "rebroadcast" behavior is the safety net that can actually serve as the initial broadcast. So, keeping the attempts frequent is important for initial delivery within a reasonable timespan.
This PR aims to improve # 2 by reducing the wallet rebroadcast frequency to ~1/day from ~1/15 min. It achieves this by separating the notion of initial broadcast from rebroadcasts. With these changes, the mempool tracks locally submitted transactions & periodically reattempts initial broadcast. Transactions submitted via the wallet or RPC are added to an "unbroadcast" set & are removed when a peer sends a `getdata` request, or the transaction is removed from the mempool. Every 10-15 minutes, the node reattempts an initial broadcast. This enables reducing the wallet rebroadcast frequency while ensuring the transactions will be propagated to the network.
For privacy improvements around # 1, please see #16698.
Thank you to gmaxwell for the idea of how to break out this subset of functionality (https://github.com/bitcoin/bitcoin/pull/16698#issuecomment-571399346)
ACKs for top commit:
fjahr:
Code review ACK 50fc4df6c4e8a84bdda13ade7bed7a2131796f00
MarcoFalke:
ACK 50fc4df6c4e8a84bdda13ade7bed7a2131796f00, I think this is ready for merge now 👻
amitiuttarwar:
The current tip `50fc4df` currently has 6 ACKs on it, so I've opened #18807 to address the last bits.
jnewbery:
utACK 50fc4df6c4e8a84bdda13ade7bed7a2131796f00.
ariard:
Code Review ACK 50fc4df (minor points no need to invalid other ACKs)
robot-visions:
ACK 50fc4df6c4e8a84bdda13ade7bed7a2131796f00
sipa:
utACK 50fc4df6c4e8a84bdda13ade7bed7a2131796f00
naumenkogs:
utACK 50fc4df
Tree-SHA512: 2dd935d645d5e209f8abf87bfaa3ef0e4492705ce7e89ea64279cb27ffd37f4727fa94ad62d41be331177332f8edbebf3c7f4972f8cda10dd951b80a28ab3c0f
Diffstat (limited to 'src')
-rw-r--r-- | src/net_processing.cpp | 27 | ||||
-rw-r--r-- | src/net_processing.h | 2 | ||||
-rw-r--r-- | src/node/transaction.cpp | 4 | ||||
-rw-r--r-- | src/random.cpp | 5 | ||||
-rw-r--r-- | src/random.h | 1 | ||||
-rw-r--r-- | src/txmempool.cpp | 11 | ||||
-rw-r--r-- | src/txmempool.h | 18 | ||||
-rw-r--r-- | src/validation.cpp | 18 | ||||
-rw-r--r-- | src/wallet/wallet.cpp | 3 |
9 files changed, 85 insertions, 4 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 26327ac6eb..50a8a8a882 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -810,6 +810,19 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) { PushNodeVersion(pnode, connman, GetTime()); } +void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const +{ + std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); + + for (const uint256& txid : unbroadcast_txids) { + RelayTransaction(txid, *connman); + } + + // schedule next run for 10-15 minutes in the future + const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); +} + void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { fUpdateConnectionTime = false; LOCK(cs_main); @@ -1159,6 +1172,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS // timer. static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery([this, consensusParams] { this->CheckForStaleTipAndEvictPeers(consensusParams); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); + + // schedule next run for 10-15 minutes in the future + const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); } /** @@ -1587,7 +1604,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c } } -void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) +void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); @@ -1636,7 +1653,13 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm push = true; } } - if (!push) { + + if (push) { + // We interpret fulfilling a GETDATA for a transaction as a + // successful initial broadcast and remove it from our + // unbroadcast set. + mempool.RemoveUnbroadcastTx(inv.hash); + } else { vNotFound.push_back(inv); } } diff --git a/src/net_processing.h b/src/net_processing.h index 3d9bc65a3a..a85d5e7c70 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -76,6 +76,8 @@ public: void CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ + void ReattemptInitialBroadcast(CScheduler& scheduler) const; private: int64_t m_stale_tip_check_time; //!< Next time to check for stale tip diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 201406ce3b..3841d8687d 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -78,6 +78,10 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t } if (relay) { + // the mempool tracks locally submitted transactions to make a + // best-effort of initial broadcast + node.mempool->AddUnbroadcastTx(hashTx); + RelayTransaction(hashTx, *node.connman); } diff --git a/src/random.cpp b/src/random.cpp index 765239e1f5..b408b1e13e 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -592,6 +592,11 @@ std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) return std::chrono::microseconds{GetRand(duration_max.count())}; } +std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept +{ + return std::chrono::milliseconds{GetRand(duration_max.count())}; +} + int GetRandInt(int nMax) noexcept { return GetRand(nMax); diff --git a/src/random.h b/src/random.h index 0c5008685b..690125079b 100644 --- a/src/random.h +++ b/src/random.h @@ -69,6 +69,7 @@ void GetRandBytes(unsigned char* buf, int num) noexcept; uint64_t GetRand(uint64_t nMax) noexcept; std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; +std::chrono::milliseconds GetRandMillis(std::chrono::milliseconds duration_max) noexcept; int GetRandInt(int nMax) noexcept; uint256 GetRandHash() noexcept; diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 102e50dc5c..c5c0208d8f 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -417,6 +417,8 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) for (const CTxIn& txin : it->GetTx().vin) mapNextTx.erase(txin.prevout); + RemoveUnbroadcastTx(hash, true /* add logging because unchecked */ ); + if (vTxHashes.size() > 1) { vTxHashes[it->vTxHashesIdx] = std::move(vTxHashes.back()); vTxHashes[it->vTxHashesIdx].second->vTxHashesIdx = it->vTxHashesIdx; @@ -919,6 +921,15 @@ size_t CTxMemPool::DynamicMemoryUsage() const { return memusage::MallocUsage(sizeof(CTxMemPoolEntry) + 12 * sizeof(void*)) * mapTx.size() + memusage::DynamicUsage(mapNextTx) + memusage::DynamicUsage(mapDeltas) + memusage::DynamicUsage(mapLinks) + memusage::DynamicUsage(vTxHashes) + cachedInnerUsage; } +void CTxMemPool::RemoveUnbroadcastTx(const uint256& txid, const bool unchecked) { + LOCK(cs); + + if (m_unbroadcast_txids.erase(txid)) + { + LogPrint(BCLog::MEMPOOL, "Removed %i from set of unbroadcast txns%s\n", txid.GetHex(), (unchecked ? " before confirmation that txn was sent out" : "")); + } +} + void CTxMemPool::RemoveStaged(setEntries &stage, bool updateDescendants, MemPoolRemovalReason reason) { AssertLockHeld(cs); UpdateForRemoveFromMempool(stage, updateDescendants); diff --git a/src/txmempool.h b/src/txmempool.h index 3dae0a04c7..4bee78b8d6 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -549,6 +549,9 @@ private: std::vector<indexed_transaction_set::const_iterator> GetSortedDepthAndScore() const EXCLUSIVE_LOCKS_REQUIRED(cs); + /** track locally submitted transactions to periodically retry initial broadcast */ + std::set<uint256> m_unbroadcast_txids GUARDED_BY(cs); + public: indirectmap<COutPoint, const CTransaction*> mapNextTx GUARDED_BY(cs); std::map<uint256, CAmount> mapDeltas; @@ -698,6 +701,21 @@ public: size_t DynamicMemoryUsage() const; + /** Adds a transaction to the unbroadcast set */ + void AddUnbroadcastTx(const uint256& txid) { + LOCK(cs); + m_unbroadcast_txids.insert(txid); + } + + /** Removes a transaction from the unbroadcast set */ + void RemoveUnbroadcastTx(const uint256& txid, const bool unchecked = false); + + /** Returns transactions in unbroadcast set */ + const std::set<uint256> GetUnbroadcastTxs() const { + LOCK(cs); + return m_unbroadcast_txids; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/validation.cpp b/src/validation.cpp index 25975e3e31..d18803b342 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -4998,6 +4998,7 @@ bool LoadMempool(CTxMemPool& pool) int64_t expired = 0; int64_t failed = 0; int64_t already_there = 0; + int64_t unbroadcast = 0; int64_t nNow = GetTime(); try { @@ -5051,12 +5052,21 @@ bool LoadMempool(CTxMemPool& pool) for (const auto& i : mapDeltas) { pool.PrioritiseTransaction(i.first, i.second); } + + std::set<uint256> unbroadcast_txids; + file >> unbroadcast_txids; + unbroadcast = unbroadcast_txids.size(); + + for (const auto& txid : unbroadcast_txids) { + pool.AddUnbroadcastTx(txid); + } + } catch (const std::exception& e) { LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what()); return false; } - LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there); + LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast); return true; } @@ -5066,6 +5076,7 @@ bool DumpMempool(const CTxMemPool& pool) std::map<uint256, CAmount> mapDeltas; std::vector<TxMempoolInfo> vinfo; + std::set<uint256> unbroadcast_txids; static Mutex dump_mutex; LOCK(dump_mutex); @@ -5076,6 +5087,7 @@ bool DumpMempool(const CTxMemPool& pool) mapDeltas[i.first] = i.second; } vinfo = pool.infoAll(); + unbroadcast_txids = pool.GetUnbroadcastTxs(); } int64_t mid = GetTimeMicros(); @@ -5100,6 +5112,10 @@ bool DumpMempool(const CTxMemPool& pool) } file << mapDeltas; + + LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size()); + file << unbroadcast_txids; + if (!FileCommit(file.Get())) throw std::runtime_error("FileCommit failed"); file.fclose(); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index b6f25de64e..6d46841cee 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1993,7 +1993,8 @@ void CWallet::ResendWalletTransactions() // that these are our transactions. if (GetTime() < nNextResend || !fBroadcastTransactions) return; bool fFirst = (nNextResend == 0); - nNextResend = GetTime() + GetRand(30 * 60); + // resend 12-36 hours from now, ~1 day on average. + nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60); if (fFirst) return; // Only do it if there's been a new block since last time |