aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/interfaces/chain.cpp8
-rw-r--r--src/interfaces/chain.h13
-rw-r--r--src/net.h2
-rw-r--r--src/net_processing.cpp140
-rw-r--r--src/wallet/rpcdump.cpp18
-rw-r--r--src/wallet/wallet.cpp16
-rw-r--r--src/wallet/wallet.h2
-rwxr-xr-xtest/functional/wallet_balance.py12
-rwxr-xr-xtest/functional/wallet_resendwallettransactions.py63
9 files changed, 190 insertions, 84 deletions
diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp
index da810bc5e6..d3d9e86e8d 100644
--- a/src/interfaces/chain.cpp
+++ b/src/interfaces/chain.cpp
@@ -8,6 +8,7 @@
#include <chainparams.h>
#include <primitives/block.h>
#include <sync.h>
+#include <txmempool.h>
#include <uint256.h>
#include <util/system.h>
#include <validation.h>
@@ -177,6 +178,13 @@ public:
LOCK(cs_main);
return GuessVerificationProgress(Params().TxData(), LookupBlockIndex(block_hash));
}
+ void requestMempoolTransactions(std::function<void(const CTransactionRef&)> fn) override
+ {
+ LOCK2(::cs_main, ::mempool.cs);
+ for (const CTxMemPoolEntry& entry : ::mempool.mapTx) {
+ fn(entry.GetSharedTx());
+ }
+ }
};
} // namespace
diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h
index 3a54b9164e..57e9703758 100644
--- a/src/interfaces/chain.h
+++ b/src/interfaces/chain.h
@@ -16,6 +16,9 @@ class CBlock;
class CScheduler;
class uint256;
struct CBlockLocator;
+class CTransaction;
+
+using CTransactionRef = std::shared_ptr<const CTransaction>;
namespace interfaces {
@@ -127,6 +130,16 @@ public:
//! Estimate fraction of total transactions verified if blocks up to
//! the specified block hash are verified.
virtual double guessVerificationProgress(const uint256& block_hash) = 0;
+
+ //! Synchronously send TransactionAddedToMempool notifications about all
+ //! current mempool transactions to the specified handler and return after
+ //! the last one is sent. These notifications aren't coordinated with async
+ //! notifications sent by handleNotifications, so out of date async
+ //! notifications from handleNotifications can arrive during and after
+ //! synchronous notifications from requestMempoolTransactions. Clients need
+ //! to be prepared to handle this by ignoring notifications about unknown
+ //! removed transactions and already added new transactions.
+ virtual void requestMempoolTransactions(std::function<void(const CTransactionRef&)> fn) = 0;
};
//! Interface to let node manage chain clients (wallets, or maybe tools for
diff --git a/src/net.h b/src/net.h
index f4a90e01f1..f1d09f5934 100644
--- a/src/net.h
+++ b/src/net.h
@@ -739,6 +739,8 @@ public:
CAmount lastSentFeeFilter{0};
int64_t nextSendTimeFeeFilter{0};
+ std::set<uint256> orphan_work_set;
+
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode();
CNode(const CNode&) = delete;
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index a37451e39a..53ff6a52ac 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -1713,6 +1713,67 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}
+void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
+{
+ AssertLockHeld(cs_main);
+ AssertLockHeld(g_cs_orphans);
+ std::set<NodeId> setMisbehaving;
+ bool done = false;
+ while (!done && !orphan_work_set.empty()) {
+ const uint256 orphanHash = *orphan_work_set.begin();
+ orphan_work_set.erase(orphan_work_set.begin());
+
+ auto orphan_it = mapOrphanTransactions.find(orphanHash);
+ if (orphan_it == mapOrphanTransactions.end()) continue;
+
+ const CTransactionRef porphanTx = orphan_it->second.tx;
+ const CTransaction& orphanTx = *porphanTx;
+ NodeId fromPeer = orphan_it->second.fromPeer;
+ bool fMissingInputs2 = false;
+ // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
+ // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
+ // anyone relaying LegitTxX banned)
+ CValidationState stateDummy;
+
+ if (setMisbehaving.count(fromPeer)) continue;
+ if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
+ LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
+ RelayTransaction(orphanTx, connman);
+ for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
+ auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
+ if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
+ for (const auto& elem : it_by_prev->second) {
+ orphan_work_set.insert(elem->first);
+ }
+ }
+ }
+ EraseOrphanTx(orphanHash);
+ done = true;
+ } else if (!fMissingInputs2) {
+ int nDos = 0;
+ if (stateDummy.IsInvalid(nDos) && nDos > 0) {
+ // Punish peer that gave us an invalid orphan tx
+ Misbehaving(fromPeer, nDos);
+ setMisbehaving.insert(fromPeer);
+ LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
+ }
+ // Has inputs but not accepted to mempool
+ // Probably non-standard or insufficient fee
+ LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
+ if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
+ // Do not use rejection cache for witness transactions or
+ // witness-stripped transactions, as they can have been malleated.
+ // See https://github.com/bitcoin/bitcoin/issues/8279 for details.
+ assert(recentRejects);
+ recentRejects->insert(orphanHash);
+ }
+ EraseOrphanTx(orphanHash);
+ done = true;
+ }
+ mempool.check(pcoinsTip.get());
+ }
+}
+
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
@@ -2342,8 +2403,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
- std::deque<COutPoint> vWorkQueue;
- std::vector<uint256> vEraseQueue;
CTransactionRef ptx;
vRecv >> ptx;
const CTransaction& tx = *ptx;
@@ -2368,7 +2427,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip.get());
RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
- vWorkQueue.emplace_back(inv.hash, i);
+ auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
+ if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
+ for (const auto& elem : it_by_prev->second) {
+ pfrom->orphan_work_set.insert(elem->first);
+ }
+ }
}
pfrom->nLastTXTime = GetTime();
@@ -2379,65 +2443,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);
// Recursively process any orphan transactions that depended on this one
- std::set<NodeId> setMisbehaving;
- while (!vWorkQueue.empty()) {
- auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
- vWorkQueue.pop_front();
- if (itByPrev == mapOrphanTransactionsByPrev.end())
- continue;
- for (auto mi = itByPrev->second.begin();
- mi != itByPrev->second.end();
- ++mi)
- {
- const CTransactionRef& porphanTx = (*mi)->second.tx;
- const CTransaction& orphanTx = *porphanTx;
- const uint256& orphanHash = orphanTx.GetHash();
- NodeId fromPeer = (*mi)->second.fromPeer;
- bool fMissingInputs2 = false;
- // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
- // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
- // anyone relaying LegitTxX banned)
- CValidationState stateDummy;
-
-
- if (setMisbehaving.count(fromPeer))
- continue;
- if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
- LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
- RelayTransaction(orphanTx, connman);
- for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
- vWorkQueue.emplace_back(orphanHash, i);
- }
- vEraseQueue.push_back(orphanHash);
- }
- else if (!fMissingInputs2)
- {
- int nDos = 0;
- if (stateDummy.IsInvalid(nDos) && nDos > 0)
- {
- // Punish peer that gave us an invalid orphan tx
- Misbehaving(fromPeer, nDos);
- setMisbehaving.insert(fromPeer);
- LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
- }
- // Has inputs but not accepted to mempool
- // Probably non-standard or insufficient fee
- LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
- vEraseQueue.push_back(orphanHash);
- if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
- // Do not use rejection cache for witness transactions or
- // witness-stripped transactions, as they can have been malleated.
- // See https://github.com/bitcoin/bitcoin/issues/8279 for details.
- assert(recentRejects);
- recentRejects->insert(orphanHash);
- }
- }
- mempool.check(pcoinsTip.get());
- }
- }
-
- for (const uint256& hash : vEraseQueue)
- EraseOrphanTx(hash);
+ ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn);
}
else if (fMissingInputs)
{
@@ -3145,11 +3151,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
+ if (!pfrom->orphan_work_set.empty()) {
+ std::list<CTransactionRef> removed_txn;
+ LOCK2(cs_main, g_cs_orphans);
+ ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn);
+ for (const CTransactionRef& removedTx : removed_txn) {
+ AddToCompactExtraTransactions(removedTx);
+ }
+ }
+
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
+ if (!pfrom->orphan_work_set.empty()) return true;
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
diff --git a/src/wallet/rpcdump.cpp b/src/wallet/rpcdump.cpp
index 5b0592e06d..bdf901dee4 100644
--- a/src/wallet/rpcdump.cpp
+++ b/src/wallet/rpcdump.cpp
@@ -346,7 +346,11 @@ UniValue importaddress(const JSONRPCRequest& request)
if (fRescan)
{
RescanWallet(*pwallet, reserver);
- pwallet->ReacceptWalletTransactions();
+ {
+ auto locked_chain = pwallet->chain().lock();
+ LOCK(pwallet->cs_wallet);
+ pwallet->ReacceptWalletTransactions(*locked_chain);
+ }
}
return NullUniValue;
@@ -529,7 +533,11 @@ UniValue importpubkey(const JSONRPCRequest& request)
if (fRescan)
{
RescanWallet(*pwallet, reserver);
- pwallet->ReacceptWalletTransactions();
+ {
+ auto locked_chain = pwallet->chain().lock();
+ LOCK(pwallet->cs_wallet);
+ pwallet->ReacceptWalletTransactions(*locked_chain);
+ }
}
return NullUniValue;
@@ -1464,7 +1472,11 @@ UniValue importmulti(const JSONRPCRequest& mainRequest)
}
if (fRescan && fRunScan && requests.size()) {
int64_t scannedTime = pwallet->RescanFromTime(nLowestTimestamp, reserver, true /* update */);
- pwallet->ReacceptWalletTransactions();
+ {
+ auto locked_chain = pwallet->chain().lock();
+ LOCK(pwallet->cs_wallet);
+ pwallet->ReacceptWalletTransactions(*locked_chain);
+ }
if (pwallet->IsAbortingRescan()) {
throw JSONRPCError(RPC_MISC_ERROR, "Rescan aborted by user.");
diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp
index 388422bec8..b1b2a63166 100644
--- a/src/wallet/wallet.cpp
+++ b/src/wallet/wallet.cpp
@@ -1859,13 +1859,11 @@ CWallet::ScanResult CWallet::ScanForWalletTransactions(const uint256& start_bloc
return result;
}
-void CWallet::ReacceptWalletTransactions()
+void CWallet::ReacceptWalletTransactions(interfaces::Chain::Lock& locked_chain)
{
// If transactions aren't being broadcasted, don't let them into local mempool either
if (!fBroadcastTransactions)
return;
- auto locked_chain = chain().lock();
- LOCK(cs_wallet);
std::map<int64_t, CWalletTx*> mapSorted;
// Sort pending wallet transactions based on their initial wallet insertion order
@@ -1875,7 +1873,7 @@ void CWallet::ReacceptWalletTransactions()
CWalletTx& wtx = item.second;
assert(wtx.GetHash() == wtxid);
- int nDepth = wtx.GetDepthInMainChain(*locked_chain);
+ int nDepth = wtx.GetDepthInMainChain(locked_chain);
if (!wtx.IsCoinBase() && (nDepth == 0 && !wtx.isAbandoned())) {
mapSorted.insert(std::make_pair(wtx.nOrderPos, &wtx));
@@ -1886,7 +1884,7 @@ void CWallet::ReacceptWalletTransactions()
for (const std::pair<const int64_t, CWalletTx*>& item : mapSorted) {
CWalletTx& wtx = *(item.second);
CValidationState state;
- wtx.AcceptToMemoryPool(*locked_chain, maxTxFee, state);
+ wtx.AcceptToMemoryPool(locked_chain, maxTxFee, state);
}
}
@@ -4425,9 +4423,15 @@ std::shared_ptr<CWallet> CWallet::CreateWalletFromFile(interfaces::Chain& chain,
void CWallet::postInitProcess()
{
+ auto locked_chain = chain().lock();
+ LOCK(cs_wallet);
+
// Add wallet transactions that aren't already in a block to mempool
// Do this here as mempool requires genesis block to be loaded
- ReacceptWalletTransactions();
+ ReacceptWalletTransactions(*locked_chain);
+
+ // Update wallet transactions with current mempool transactions.
+ chain().requestMempoolTransactions([this](const CTransactionRef& tx) { TransactionAddedToMempool(tx); });
}
bool CWallet::BackupWallet(const std::string& strDest)
diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h
index 2a5d6caaf8..9a4d252618 100644
--- a/src/wallet/wallet.h
+++ b/src/wallet/wallet.h
@@ -943,7 +943,7 @@ public:
};
ScanResult ScanForWalletTransactions(const uint256& first_block, const uint256& last_block, const WalletRescanReserver& reserver, bool fUpdate);
void TransactionRemovedFromMempool(const CTransactionRef &ptx) override;
- void ReacceptWalletTransactions();
+ void ReacceptWalletTransactions(interfaces::Chain::Lock& locked_chain) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override EXCLUSIVE_LOCKS_REQUIRED(cs_main);
// ResendWalletTransactionsBefore may only be called if fBroadcastTransactions!
std::vector<uint256> ResendWalletTransactionsBefore(interfaces::Chain::Lock& locked_chain, int64_t nTime, CConnman* connman);
diff --git a/test/functional/wallet_balance.py b/test/functional/wallet_balance.py
index 05c97e0340..3a65b684c8 100755
--- a/test/functional/wallet_balance.py
+++ b/test/functional/wallet_balance.py
@@ -129,5 +129,17 @@ class WalletTest(BitcoinTestFramework):
# getbalance with minconf=2 will show the new balance.
assert_equal(self.nodes[1].getbalance(minconf=2), Decimal('0'))
+ # check mempool transactions count for wallet unconfirmed balance after
+ # dynamically loading the wallet.
+ before = self.nodes[1].getunconfirmedbalance()
+ dst = self.nodes[1].getnewaddress()
+ self.nodes[1].unloadwallet('')
+ self.nodes[0].sendtoaddress(dst, 0.1)
+ self.sync_all()
+ self.nodes[1].loadwallet('')
+ after = self.nodes[1].getunconfirmedbalance()
+ assert_equal(before + Decimal('0.1'), after)
+
+
if __name__ == '__main__':
WalletTest().main()
diff --git a/test/functional/wallet_resendwallettransactions.py b/test/functional/wallet_resendwallettransactions.py
index 00bf58d709..8aafa94c2e 100755
--- a/test/functional/wallet_resendwallettransactions.py
+++ b/test/functional/wallet_resendwallettransactions.py
@@ -2,31 +2,70 @@
# Copyright (c) 2017-2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
-"""Test resendwallettransactions RPC."""
+"""Test that the wallet resends transactions periodically."""
+from collections import defaultdict
+import time
+from test_framework.blocktools import create_block, create_coinbase
+from test_framework.messages import ToHex
+from test_framework.mininode import P2PInterface, mininode_lock
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import assert_equal, assert_raises_rpc_error
+from test_framework.util import assert_equal, wait_until
+
+class P2PStoreTxInvs(P2PInterface):
+ def __init__(self):
+ super().__init__()
+ self.tx_invs_received = defaultdict(int)
+
+ def on_inv(self, message):
+ # Store how many times invs have been received for each tx.
+ for i in message.inv:
+ if i.type == 1:
+ # save txid
+ self.tx_invs_received[i.hash] += 1
class ResendWalletTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
- self.extra_args = [['--walletbroadcast=false']]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
- # Should raise RPC_WALLET_ERROR (-4) if walletbroadcast is disabled.
- assert_raises_rpc_error(-4, "Error: Wallet transaction broadcasting is disabled with -walletbroadcast", self.nodes[0].resendwallettransactions)
+ node = self.nodes[0] # alias
+
+ node.add_p2p_connection(P2PStoreTxInvs())
+
+ self.log.info("Create a new transaction and wait until it's broadcast")
+ txid = int(node.sendtoaddress(node.getnewaddress(), 1), 16)
+
+ # Can take a few seconds due to transaction trickling
+ wait_until(lambda: node.p2p.tx_invs_received[txid] >= 1, lock=mininode_lock)
+
+ # Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown)
+ node.add_p2p_connection(P2PStoreTxInvs())
+
+ self.log.info("Create a block")
+ # Create and submit a block without the transaction.
+ # Transactions are only rebroadcast if there has been a block at least five minutes
+ # after the last time we tried to broadcast. Use mocktime and give an extra minute to be sure.
+ block_time = int(time.time()) + 6 * 60
+ node.setmocktime(block_time)
+ block = create_block(int(node.getbestblockhash(), 16), create_coinbase(node.getblockchaininfo()['blocks']), block_time)
+ block.nVersion = 3
+ block.rehash()
+ block.solve()
+ node.submitblock(ToHex(block))
- # Should return an empty array if there aren't unconfirmed wallet transactions.
- self.stop_node(0)
- self.start_node(0, extra_args=[])
- assert_equal(self.nodes[0].resendwallettransactions(), [])
+ # Transaction should not be rebroadcast
+ node.p2ps[1].sync_with_ping()
+ assert_equal(node.p2ps[1].tx_invs_received[txid], 0)
- # Should return an array with the unconfirmed wallet transaction.
- txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1)
- assert_equal(self.nodes[0].resendwallettransactions(), [txid])
+ self.log.info("Transaction should be rebroadcast after 30 minutes")
+ # Use mocktime and give an extra 5 minutes to be sure.
+ rebroadcast_time = int(time.time()) + 41 * 60
+ node.setmocktime(rebroadcast_time)
+ wait_until(lambda: node.p2ps[1].tx_invs_received[txid] >= 1, lock=mininode_lock)
if __name__ == '__main__':
ResendWalletTransactionsTest().main()