aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWladimir J. van der Laan <laanwj@protonmail.com>2020-09-23 13:39:16 +0200
committerWladimir J. van der Laan <laanwj@protonmail.com>2020-09-23 13:55:24 +0200
commit9e217f5a6f08c50aff9975aa4c079e2aab2fe371 (patch)
treec5e80b46876a8963fced24f20b33313642a83f8f
parent82198938255783d2593f42a652efc9921550d86d (diff)
parent759d94e70f6844443106404882c7b105f3a4dba7 (diff)
Merge #19572: ZMQ: Create "sequence" notifier, enabling client-side mempool tracking
759d94e70f6844443106404882c7b105f3a4dba7 Update zmq notification documentation and sample consumer (Gregory Sanders) 68c3c7e1bdd00bbe7d70592a8eb39520fa3f87f1 Add functional tests for zmq sequence topic and mempool sequence logic (Gregory Sanders) e76fc2b84d065c9d06010d0a10b316f1f9d36fb9 Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas (Gregory Sanders) 1b615e61bfc464f215a1b48e6e27d1e8fc16b2d1 zmq test: Actually make reorg occur (Gregory Sanders) Pull request description: This PR creates a new ZMQ notifier that gives a "total hash history" of block (dis)connection, mempool addition/substraction, all in one pipeline. It also exposes a "mempool sequence number" to both this notifier and `getrawmempool` results, which allows the consumer to use the results together without confusion about ordering of results and without excessive `getrawmempool` polling. See the functional test `interfaces_zmq.py::test_mempool_sync` which shows the proposed user flow for the client-side tracking of mempool contents and confirmations. Inspired by https://github.com/bitcoin/bitcoin/pull/19462#issuecomment-656140421 Alternative to https://github.com/bitcoin/bitcoin/pull/19462 due to noted deficiencies in current zmq notification streams. Also fixes a legacy zmq test that didn't actually trigger a reorg because of identical blocks being generated on each side of the split(oops) ACKs for top commit: laanwj: Code review ACK 759d94e70f6844443106404882c7b105f3a4dba7 Tree-SHA512: 9daf0d7d996190f3a68ff40340a687519323d7a6c51dcb26be457fbc013217ea7b62fbd0700b74b654433d2e370704feb61e5584399290692464fcfcb72ce3b7
-rwxr-xr-xcontrib/zmq/zmq_sub.py19
-rw-r--r--doc/zmq.md29
-rw-r--r--src/init.cpp4
-rw-r--r--src/interfaces/chain.cpp10
-rw-r--r--src/interfaces/chain.h4
-rw-r--r--src/rpc/blockchain.cpp41
-rw-r--r--src/rpc/blockchain.h2
-rw-r--r--src/rpc/client.cpp1
-rw-r--r--src/txmempool.cpp6
-rw-r--r--src/txmempool.h14
-rw-r--r--src/validation.cpp2
-rw-r--r--src/validationinterface.cpp12
-rw-r--r--src/validationinterface.h9
-rw-r--r--src/wallet/wallet.cpp6
-rw-r--r--src/wallet/wallet.h4
-rw-r--r--src/zmq/zmqabstractnotifier.cpp20
-rw-r--r--src/zmq/zmqabstractnotifier.h10
-rw-r--r--src/zmq/zmqnotificationinterface.cpp41
-rw-r--r--src/zmq/zmqnotificationinterface.h3
-rw-r--r--src/zmq/zmqpublishnotifier.cpp49
-rw-r--r--src/zmq/zmqpublishnotifier.h9
-rwxr-xr-xtest/functional/interface_zmq.py320
22 files changed, 552 insertions, 63 deletions
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
index 06893407f5..8b8503331d 100755
--- a/contrib/zmq/zmq_sub.py
+++ b/contrib/zmq/zmq_sub.py
@@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
- -zmqpubhashblock=tcp://127.0.0.1:28332
+ -zmqpubhashblock=tcp://127.0.0.1:28332 \
+ -zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
@@ -47,16 +48,14 @@ class ZMQHandler():
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
+ self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
async def handle(self) :
- msg = await self.zmqSubSocket.recv_multipart()
- topic = msg[0]
- body = msg[1]
+ topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"
- if len(msg[-1]) == 4:
- msgSequence = struct.unpack('<I', msg[-1])[-1]
- sequence = str(msgSequence)
+ if len(seq) == 4:
+ sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body))
@@ -69,6 +68,12 @@ class ZMQHandler():
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body))
+ elif topic == b"sequence":
+ hash = binascii.hexlify(body[:32])
+ label = chr(body[32])
+ mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
+ print('- SEQUENCE ('+sequence+') -')
+ print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
diff --git a/doc/zmq.md b/doc/zmq.md
index 835dea23f2..f003c90d3a 100644
--- a/doc/zmq.md
+++ b/doc/zmq.md
@@ -63,6 +63,7 @@ Currently, the following notifications are supported:
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
+ -zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
@@ -74,6 +75,7 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubhashblockhwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n
+ -zmqpubsequencehwm=address
The high water mark value must be an integer greater than or equal to 0.
@@ -87,7 +89,15 @@ Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
-bytes).
+bytes) for all but `sequence` topic. For `sequence`, the body
+is structured as the following based on the type of message:
+
+ <32-byte hash>C : Blockhash connected
+ <32-byte hash>D : Blockhash disconnected
+ <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
+ <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
+
+Where the 8-byte uints correspond to the mempool sequence number.
These options can also be provided in bitcoin.conf.
@@ -124,13 +134,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.
-Note that when the block chain tip changes, a reorganisation may occur
-and just the tip will be notified. It is up to the subscriber to
-retrieve the chain from the last known block to the new tip. Also note
-that no notification occurs if the tip was in the active chain - this
-is the case after calling invalidateblock RPC.
+Note that for `*block` topics, when the block chain tip changes,
+a reorganisation may occur and just the tip will be notified.
+It is up to the subscriber to retrieve the chain from the last known
+block to the new tip. Also note that no notification will occur if the tip
+was in the active chain--as would be the case after calling invalidateblock RPC.
+In contrast, the `sequence` topic publishes all block connections and
+disconnections.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Bitcoind appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.
+
+The `sequence` topic refers specifically to the mempool sequence
+number, which is also published along with all mempool events. This
+is a different sequence value than in ZMQ itself in order to allow a total
+ordering of mempool events to be constructed.
diff --git a/src/init.cpp b/src/init.cpp
index 023aa9aba5..132acbfc81 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -488,19 +488,23 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>");
+ hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
+ hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif
argsman.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
diff --git a/src/interfaces/chain.cpp b/src/interfaces/chain.cpp
index 13d885a20d..4c5ebe66fc 100644
--- a/src/interfaces/chain.cpp
+++ b/src/interfaces/chain.cpp
@@ -59,13 +59,13 @@ public:
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
- void TransactionAddedToMempool(const CTransactionRef& tx) override
+ void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override
{
- m_notifications->transactionAddedToMempool(tx);
+ m_notifications->transactionAddedToMempool(tx, mempool_sequence);
}
- void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
+ void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
- m_notifications->transactionRemovedFromMempool(tx, reason);
+ m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
@@ -405,7 +405,7 @@ public:
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
- notifications.transactionAddedToMempool(entry.GetSharedTx());
+ notifications.transactionAddedToMempool(entry.GetSharedTx(), 0 /* mempool_sequence */);
}
}
NodeContext& m_node;
diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h
index 6e50ccb27a..85d09be0f3 100644
--- a/src/interfaces/chain.h
+++ b/src/interfaces/chain.h
@@ -242,8 +242,8 @@ public:
{
public:
virtual ~Notifications() {}
- virtual void transactionAddedToMempool(const CTransactionRef& tx) {}
- virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
+ virtual void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
+ virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}
diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp
index 9b464bee72..0bb7342db0 100644
--- a/src/rpc/blockchain.cpp
+++ b/src/rpc/blockchain.cpp
@@ -500,9 +500,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool
info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash()));
}
-UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
+UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose, bool include_mempool_sequence)
{
if (verbose) {
+ if (include_mempool_sequence) {
+ throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values.");
+ }
LOCK(pool.cs);
UniValue o(UniValue::VOBJ);
for (const CTxMemPoolEntry& e : pool.mapTx) {
@@ -516,14 +519,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose)
}
return o;
} else {
+ uint64_t mempool_sequence;
std::vector<uint256> vtxid;
- pool.queryHashes(vtxid);
-
+ {
+ LOCK(pool.cs);
+ pool.queryHashes(vtxid);
+ mempool_sequence = pool.GetSequence();
+ }
UniValue a(UniValue::VARR);
for (const uint256& hash : vtxid)
a.push_back(hash.ToString());
- return a;
+ if (!include_mempool_sequence) {
+ return a;
+ } else {
+ UniValue o(UniValue::VOBJ);
+ o.pushKV("txids", a);
+ o.pushKV("mempool_sequence", mempool_sequence);
+ return o;
+ }
}
}
@@ -534,6 +548,7 @@ static RPCHelpMan getrawmempool()
"\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n",
{
{"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"},
+ {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."},
},
{
RPCResult{"for verbose = false",
@@ -546,6 +561,15 @@ static RPCHelpMan getrawmempool()
{
{RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()},
}},
+ RPCResult{"for verbose = false and mempool_sequence = true",
+ RPCResult::Type::OBJ, "", "",
+ {
+ {RPCResult::Type::ARR, "txids", "",
+ {
+ {RPCResult::Type::STR_HEX, "", "The transaction id"},
+ }},
+ {RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."},
+ }},
},
RPCExamples{
HelpExampleCli("getrawmempool", "true")
@@ -557,7 +581,12 @@ static RPCHelpMan getrawmempool()
if (!request.params[0].isNull())
fVerbose = request.params[0].get_bool();
- return MempoolToJSON(EnsureMemPool(request.context), fVerbose);
+ bool include_mempool_sequence = false;
+ if (!request.params[1].isNull()) {
+ include_mempool_sequence = request.params[1].get_bool();
+ }
+
+ return MempoolToJSON(EnsureMemPool(request.context), fVerbose, include_mempool_sequence);
},
};
}
@@ -2451,7 +2480,7 @@ static const CRPCCommand commands[] =
{ "blockchain", "getmempooldescendants", &getmempooldescendants, {"txid","verbose"} },
{ "blockchain", "getmempoolentry", &getmempoolentry, {"txid"} },
{ "blockchain", "getmempoolinfo", &getmempoolinfo, {} },
- { "blockchain", "getrawmempool", &getrawmempool, {"verbose"} },
+ { "blockchain", "getrawmempool", &getrawmempool, {"verbose", "mempool_sequence"} },
{ "blockchain", "gettxout", &gettxout, {"txid","n","include_mempool"} },
{ "blockchain", "gettxoutsetinfo", &gettxoutsetinfo, {"hash_type"} },
{ "blockchain", "pruneblockchain", &pruneblockchain, {"height"} },
diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h
index 5c9a43b13e..5b362bf211 100644
--- a/src/rpc/blockchain.h
+++ b/src/rpc/blockchain.h
@@ -43,7 +43,7 @@ UniValue blockToJSON(const CBlock& block, const CBlockIndex* tip, const CBlockIn
UniValue MempoolInfoToJSON(const CTxMemPool& pool);
/** Mempool to JSON */
-UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false);
+UniValue MempoolToJSON(const CTxMemPool& pool, bool verbose = false, bool include_mempool_sequence = false);
/** Block header to JSON */
UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex) LOCKS_EXCLUDED(cs_main);
diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp
index 6ef3294132..3c432464f2 100644
--- a/src/rpc/client.cpp
+++ b/src/rpc/client.cpp
@@ -142,6 +142,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "pruneblockchain", 0, "height" },
{ "keypoolrefill", 0, "newsize" },
{ "getrawmempool", 0, "verbose" },
+ { "getrawmempool", 1, "mempool_sequence" },
{ "estimatesmartfee", 0, "conf_target" },
{ "estimaterawfee", 0, "conf_target" },
{ "estimaterawfee", 1, "threshold" },
diff --git a/src/txmempool.cpp b/src/txmempool.cpp
index 6a525c97db..0c2b731967 100644
--- a/src/txmempool.cpp
+++ b/src/txmempool.cpp
@@ -409,12 +409,16 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces
void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason)
{
+ // We increment mempool sequence value no matter removal reason
+ // even if not directly reported below.
+ uint64_t mempool_sequence = GetAndIncrementSequence();
+
if (reason != MemPoolRemovalReason::BLOCK) {
// Notify clients that a transaction has been removed from the mempool
// for any reason except being included in a block. Clients interested
// in transactions included in blocks can subscribe to the BlockConnected
// notification.
- GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason);
+ GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence);
}
const uint256 hash = it->GetTx().GetHash();
diff --git a/src/txmempool.h b/src/txmempool.h
index 664fb5986a..f513f14af6 100644
--- a/src/txmempool.h
+++ b/src/txmempool.h
@@ -501,6 +501,11 @@ private:
mutable uint64_t m_epoch;
mutable bool m_has_epoch_guard;
+ // In-memory counter for external mempool tracking purposes.
+ // This number is incremented once every time a transaction
+ // is added or removed from the mempool for any reason.
+ mutable uint64_t m_sequence_number{1};
+
void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs);
bool m_is_loaded GUARDED_BY(cs){false};
@@ -776,6 +781,15 @@ public:
return m_unbroadcast_txids.count(txid) != 0;
}
+ /** Guards this internal counter for external reporting */
+ uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
+ return m_sequence_number++;
+ }
+
+ uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) {
+ return m_sequence_number;
+ }
+
private:
/** UpdateForDescendants is used by UpdateTransactionsFromBlock to update
* the descendants for a single transaction that has been added to the
diff --git a/src/validation.cpp b/src/validation.cpp
index a96913e3a0..7020b59cb8 100644
--- a/src/validation.cpp
+++ b/src/validation.cpp
@@ -1058,7 +1058,7 @@ bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs
if (!Finalize(args, workspace)) return false;
- GetMainSignals().TransactionAddedToMempool(ptx);
+ GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());
return true;
}
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index 3dfbcc581c..1e07ff23ae 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -199,18 +199,18 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd
fInitialDownload);
}
-void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx) {
- auto event = [tx, this] {
- m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx); });
+void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
+ auto event = [tx, mempool_sequence, this] {
+ m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
tx->GetWitnessHash().ToString());
}
-void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
- auto event = [tx, reason, this] {
- m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); });
+void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
+ auto event = [tx, reason, mempool_sequence, this] {
+ m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
tx->GetHash().ToString(),
diff --git a/src/validationinterface.h b/src/validationinterface.h
index e96f2883fc..7c3ce00fbc 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -97,7 +97,8 @@ protected:
*
* Called on a background thread.
*/
- virtual void TransactionAddedToMempool(const CTransactionRef& tx) {}
+ virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}
+
/**
* Notifies listeners of a transaction leaving mempool.
*
@@ -130,7 +131,7 @@ protected:
*
* Called on a background thread.
*/
- virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
+ virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
/**
* Notifies listeners of a block being connected.
* Provides a vector of transactions evicted from the mempool as a result.
@@ -197,8 +198,8 @@ public:
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
- void TransactionAddedToMempool(const CTransactionRef&);
- void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason);
+ void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
+ void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
void ChainStateFlushed(const CBlockLocator &);
diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp
index 73e11a5b52..66857dbb39 100644
--- a/src/wallet/wallet.cpp
+++ b/src/wallet/wallet.cpp
@@ -1177,7 +1177,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio
MarkInputsDirty(ptx);
}
-void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
+void CWallet::transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
LOCK(cs_wallet);
SyncTransaction(tx, {CWalletTx::Status::UNCONFIRMED, /* block height */ 0, /* block hash */ {}, /* index */ 0});
@@ -1187,7 +1187,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx) {
}
}
-void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {
+void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {
LOCK(cs_wallet);
auto it = mapWallet.find(tx->GetHash());
if (it != mapWallet.end()) {
@@ -1234,7 +1234,7 @@ void CWallet::blockConnected(const CBlock& block, int height)
m_last_block_processed = block_hash;
for (size_t index = 0; index < block.vtx.size(); index++) {
SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index});
- transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK);
+ transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */);
}
}
diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h
index c54480612a..f15712dd0e 100644
--- a/src/wallet/wallet.h
+++ b/src/wallet/wallet.h
@@ -900,7 +900,7 @@ public:
CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true);
bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
- void transactionAddedToMempool(const CTransactionRef& tx) override;
+ void transactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
void blockConnected(const CBlock& block, int height) override;
void blockDisconnected(const CBlock& block, int height) override;
void updatedBlockTip() override;
@@ -922,7 +922,7 @@ public:
uint256 last_failed_block;
};
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, Optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate);
- void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override;
+ void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
struct Balance {
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
index 0d0428f3c0..3938f6fd2c 100644
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -22,3 +22,23 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/
{
return true;
}
+
+bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/)
+{
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/)
+{
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
+{
+ return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence)
+{
+ return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index 34d7e5ef03..dddba8d6b6 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -44,7 +44,17 @@ public:
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
+ // Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
+ // Notifies of every block connection
+ virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
+ // Notifies of every block disconnection
+ virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex);
+ // Notifies of every mempool acceptance
+ virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence);
+ // Notifies of every mempool removal, except inclusion in blocks
+ virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
+ // Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index a22772baed..a7e9a34269 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -36,6 +36,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+ factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
for (const auto& entry : factories)
@@ -140,31 +141,53 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
});
}
-void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
+void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, uint64_t mempool_sequence)
{
- // Used by BlockConnected and BlockDisconnected as well, because they're
- // all the same external callback.
const CTransaction& tx = *ptx;
- TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
- return notifier->NotifyTransaction(tx);
+ TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
+ });
+}
+
+void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
+{
+ // Called for all non-block inclusion reasons
+ const CTransaction& tx = *ptx;
+
+ TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
});
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
- // Do a normal notify for each transaction added in the block
- TransactionAddedToMempool(ptx);
+ const CTransaction& tx = *ptx;
+ TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyTransaction(tx);
+ });
}
+
+ // Next we notify BlockConnect listeners for *all* blocks
+ TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyBlockConnect(pindexConnected);
+ });
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
{
for (const CTransactionRef& ptx : pblock->vtx) {
- // Do a normal notify for each transaction removed in block disconnection
- TransactionAddedToMempool(ptx);
+ const CTransaction& tx = *ptx;
+ TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyTransaction(tx);
+ });
}
+
+ // Next we notify BlockDisconnect listeners for *all* blocks
+ TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
+ return notifier->NotifyBlockDisconnect(pindexDisconnected);
+ });
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
index 0686960ed4..788a383517 100644
--- a/src/zmq/zmqnotificationinterface.h
+++ b/src/zmq/zmqnotificationinterface.h
@@ -26,7 +26,8 @@ protected:
void Shutdown();
// CValidationInterface
- void TransactionAddedToMempool(const CTransactionRef& tx) override;
+ void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override;
+ void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index d4d21b05ba..a0e7a0a600 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -26,6 +26,7 @@ static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
+static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@@ -225,3 +226,51 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}
+
+
+// TODO: Dedup this code to take label char, log string
+bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
+{
+ uint256 hash = pindex->GetBlockHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex());
+ char data[sizeof(uint256)+1];
+ for (unsigned int i = 0; i < sizeof(uint256); i++)
+ data[sizeof(uint256) - 1 - i] = hash.begin()[i];
+ data[sizeof(data) - 1] = 'C'; // Block (C)onnect
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
+{
+ uint256 hash = pindex->GetBlockHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex());
+ char data[sizeof(uint256)+1];
+ for (unsigned int i = 0; i < sizeof(uint256); i++)
+ data[sizeof(uint256) - 1 - i] = hash.begin()[i];
+ data[sizeof(data) - 1] = 'D'; // Block (D)isconnect
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex());
+ unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
+ for (unsigned int i = 0; i < sizeof(uint256); i++)
+ data[sizeof(uint256) - 1 - i] = hash.begin()[i];
+ data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance
+ WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
+
+bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
+{
+ uint256 hash = transaction.GetHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex());
+ unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
+ for (unsigned int i = 0; i < sizeof(uint256); i++)
+ data[sizeof(uint256) - 1 - i] = hash.begin()[i];
+ data[sizeof(uint256)] = 'R'; // Mempool (R)emoval
+ WriteLE64(data+sizeof(uint256)+1, mempool_sequence);
+ return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index eb9ae881be..f13ed6f537 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -52,4 +52,13 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
+class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyBlockConnect(const CBlockIndex *pindex) override;
+ bool NotifyBlockDisconnect(const CBlockIndex *pindex) override;
+ bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override;
+ bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override;
+};
+
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index ef4780cacb..17032a3b83 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -5,13 +5,24 @@
"""Test the ZMQ notification interface."""
import struct
-from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
+from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2WSH_OP_TRUE
+from test_framework.blocktools import create_block, create_coinbase, add_witness_commitment
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.messages import CTransaction, hash256
-from test_framework.util import assert_equal, connect_nodes
+from test_framework.messages import CTransaction, hash256, FromHex
+from test_framework.util import (
+ assert_equal,
+ connect_nodes,
+ assert_raises_rpc_error,
+)
from io import BytesIO
from time import sleep
+# Test may be skipped and not have zmq installed
+try:
+ import zmq
+except ImportError:
+ pass
+
def hash256_reversed(byte_str):
return hash256(byte_str)[::-1]
@@ -21,7 +32,6 @@ class ZMQSubscriber:
self.socket = socket
self.topic = topic
- import zmq
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
@@ -33,6 +43,22 @@ class ZMQSubscriber:
self.sequence += 1
return body
+ def receive_sequence(self):
+ topic, body, seq = self.socket.recv_multipart()
+ # Topic should match the subscriber topic.
+ assert_equal(topic, self.topic)
+ # Sequence should be incremental.
+ assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
+ self.sequence += 1
+ hash = body[:32].hex()
+ label = chr(body[32])
+ mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
+ if mempool_sequence is not None:
+ assert label == "A" or label == "R"
+ else:
+ assert label == "D" or label == "C"
+ return (hash, label, mempool_sequence)
+
class ZMQTest (BitcoinTestFramework):
def set_test_params(self):
@@ -43,10 +69,11 @@ class ZMQTest (BitcoinTestFramework):
self.skip_if_no_bitcoind_zmq()
def run_test(self):
- import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
+ self.test_sequence()
+ self.test_mempool_sync()
self.test_reorg()
finally:
# Destroy the ZMQ context.
@@ -54,7 +81,6 @@ class ZMQTest (BitcoinTestFramework):
self.ctx.destroy(linger=None)
def test_basic(self):
- import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
@@ -146,7 +172,6 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Skipping reorg test because wallet is disabled")
return
- import zmq
address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
@@ -177,8 +202,8 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), payment_txid)
assert_equal(hashtx.receive().hex(), disconnect_cb)
- # Generate 2 blocks in nodes[1]
- connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)
+ # Generate 2 blocks in nodes[1] to a different address to ensure split
+ connect_blocks = self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)
@@ -204,5 +229,282 @@ class ZMQTest (BitcoinTestFramework):
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
+ def test_sequence(self):
+ """
+ Sequence zmq notifications give every blockhash and txhash in order
+ of processing, regardless of IBD, re-orgs, etc.
+ Format of messages:
+ <32-byte hash>C : Blockhash connected
+ <32-byte hash>D : Blockhash disconnected
+ <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
+ <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
+ """
+ self.log.info("Testing 'sequence' publisher")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # Mempool sequence number starts at 1
+ seq_num = 1
+
+ # Generate 1 block in nodes[0] and receive all notifications
+ dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
+
+ # Note: We are not notified of any block transactions, coinbase or mined
+ assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
+
+ # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
+ self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2WSH_OP_TRUE)
+
+ # nodes[0] will reorg chain after connecting back nodes[1]
+ connect_nodes(self.nodes[0], 1)
+
+ # Then we receive all block (dis)connect notifications for the 2 block reorg
+ assert_equal((dc_block, "D", None), seq.receive_sequence())
+ block_count = self.nodes[1].getblockcount()
+ assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
+ assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
+
+ # Rest of test requires wallet functionality
+ if self.is_wallet_compiled():
+ self.log.info("Wait for tx from second node")
+ payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0, replaceable=True)
+ self.sync_all()
+ self.log.info("Testing sequence notifications with mempool sequence values")
+
+ # Should receive the broadcasted txid.
+ assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ self.log.info("Testing RBF notification")
+ # Replace it to test eviction/addition notification
+ rbf_info = self.nodes[1].bumpfee(payment_txid)
+ self.sync_all()
+ assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
+ seq_num += 1
+ assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Doesn't get published when mined, make a block and tx to "flush" the possibility
+ # though the mempool sequence number does go up by the number of transactions
+ # removed from the mempool by the block mining it.
+ mempool_size = len(self.nodes[0].getrawmempool())
+ c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0]
+ self.sync_all()
+ # Make sure the number of mined transactions matches the number of txs out of mempool
+ mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
+ assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
+ seq_num += mempool_size_delta
+ payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
+ self.sync_all()
+ assert_equal((c_block, "C", None), seq.receive_sequence())
+ assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Spot check getrawmempool results that they only show up when asked for
+ assert type(self.nodes[0].getrawmempool()) is list
+ assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
+ assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
+ assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
+ assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
+
+ self.log.info("Testing reorg notifications")
+ # Manually invalidate the last block to test mempool re-entry
+ # N.B. This part could be made more lenient in exact ordering
+ # since it greatly depends on inner-workings of blocks/mempool
+ # during "deep" re-orgs. Probably should "re-construct"
+ # blockchain/mempool state from notifications instead.
+ block_count = self.nodes[0].getblockcount()
+ best_hash = self.nodes[0].getbestblockhash()
+ self.nodes[0].invalidateblock(best_hash)
+ sleep(2) # Bit of room to make sure transaction things happened
+
+ # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
+ # of the time they were gathered.
+ assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
+
+ assert_equal((best_hash, "D", None), seq.receive_sequence())
+ assert_equal((rbf_info["txid"], "A", seq_num), seq.receive_sequence())
+ seq_num += 1
+
+ # Other things may happen but aren't wallet-deterministic so we don't test for them currently
+ self.nodes[0].reconsiderblock(best_hash)
+ self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ self.sync_all()
+
+ self.log.info("Evict mempool transaction by block conflict")
+ orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
+
+ # More to be simply mined
+ more_tx = []
+ for _ in range(5):
+ more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1))
+
+ raw_tx = self.nodes[0].getrawtransaction(orig_txid)
+ bump_info = self.nodes[0].bumpfee(orig_txid)
+ # Mine the pre-bump tx
+ block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1))
+ tx = FromHex(CTransaction(), raw_tx)
+ block.vtx.append(tx)
+ for txid in more_tx:
+ tx = FromHex(CTransaction(), self.nodes[0].getrawtransaction(txid))
+ block.vtx.append(tx)
+ add_witness_commitment(block)
+ block.solve()
+ assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
+ tip = self.nodes[0].getbestblockhash()
+ assert_equal(int(tip, 16), block.sha256)
+ orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True)
+
+ # Flush old notifications until evicted tx original entry
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ while hash_str != orig_txid:
+ (hash_str, label, mempool_seq) = seq.receive_sequence()
+ mempool_seq += 1
+
+ # Added original tx
+ assert_equal(label, "A")
+ # More transactions to be simply mined
+ for i in range(len(more_tx)):
+ assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ # Bumped by rbf
+ assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ assert_equal((bump_info["txid"], "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ # Conflict announced first, then block
+ assert_equal((bump_info["txid"], "R", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ assert_equal((tip, "C", None), seq.receive_sequence())
+ mempool_seq += len(more_tx)
+ # Last tx
+ assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
+ mempool_seq += 1
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ self.sync_all() # want to make sure we didn't break "consensus" for other tests
+
+ def test_mempool_sync(self):
+ """
+ Use sequence notification plus getrawmempool sequence results to "sync mempool"
+ """
+ if not self.is_wallet_compiled():
+ self.log.info("Skipping mempool sync test")
+ return
+
+ self.log.info("Testing 'mempool sync' usage of sequence notifier")
+ address = 'tcp://127.0.0.1:28333'
+ socket = self.ctx.socket(zmq.SUB)
+ socket.set(zmq.RCVTIMEO, 60000)
+ seq = ZMQSubscriber(socket, b'sequence')
+
+ self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
+ connect_nodes(self.nodes[0], 1)
+ socket.connect(address)
+ # Relax so that the subscriber is ready before publishing zmq messages
+ sleep(0.2)
+
+ # In-memory counter, should always start at 1
+ next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
+ assert_equal(next_mempool_seq, 1)
+
+ # Some transactions have been happening but we aren't consuming zmq notifications yet
+ # or we lost a ZMQ message somehow and want to start over
+ txids = []
+ num_txs = 5
+ for _ in range(num_txs):
+ txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0, replaceable=True))
+ self.sync_all()
+
+ # 1) Consume backlog until we get a mempool sequence number
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+ while zmq_mem_seq is None:
+ (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
+
+ assert label == "A" or label == "R"
+ assert hash_str is not None
+
+ # 2) We need to "seed" our view of the mempool
+ mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+ assert_equal(get_raw_seq, 6)
+ # Snapshot may be too old compared to zmq message we read off latest
+ while zmq_mem_seq >= get_raw_seq:
+ sleep(2)
+ mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
+ mempool_view = set(mempool_snapshot["txids"])
+ get_raw_seq = mempool_snapshot["mempool_sequence"]
+
+ # Things continue to happen in the "interim" while waiting for snapshot results
+ # We have node 0 do all these to avoid p2p races with RBF announcements
+ for _ in range(num_txs):
+ txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True))
+ self.nodes[0].bumpfee(txids[-1])
+ self.sync_all()
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+ final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1, replaceable=True)
+
+ # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
+ while True:
+ if zmq_mem_seq == get_raw_seq - 1:
+ break
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if mempool_sequence is not None:
+ zmq_mem_seq = mempool_sequence
+ if zmq_mem_seq > get_raw_seq:
+ raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq))
+
+ # 4) Moving forward, we apply the delta to our local view
+ # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
+ expected_sequence = get_raw_seq
+ r_gap = 0
+ for _ in range(num_txs + 2 + 1 + 1):
+ (hash_str, label, mempool_sequence) = seq.receive_sequence()
+ if mempool_sequence is not None:
+ if mempool_sequence != expected_sequence:
+ # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
+ # position in the incoming block message "C"
+ if label == "R":
+ assert mempool_sequence > expected_sequence
+ r_gap += mempool_sequence - expected_sequence
+ else:
+ raise Exception("WARNING: txhash has unexpected mempool sequence value: {} vs expected {}".format(mempool_sequence, expected_sequence))
+ if label == "A":
+ assert hash_str not in mempool_view
+ mempool_view.add(hash_str)
+ expected_sequence = mempool_sequence + 1
+ elif label == "R":
+ assert hash_str in mempool_view
+ mempool_view.remove(hash_str)
+ expected_sequence = mempool_sequence + 1
+ elif label == "C":
+ # (Attempt to) remove all txids from known block connects
+ block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
+ for txid in block_txids:
+ if txid in mempool_view:
+ expected_sequence += 1
+ mempool_view.remove(txid)
+ expected_sequence -= r_gap
+ r_gap = 0
+ elif label == "D":
+ # Not useful for mempool tracking per se
+ continue
+ else:
+ raise Exception("Unexpected ZMQ sequence label!")
+
+ assert_equal(self.nodes[0].getrawmempool(), [final_txid])
+ assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
+
+ # 5) If you miss a zmq/mempool sequence number, go back to step (2)
+
+ self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
+
if __name__ == '__main__':
ZMQTest().main()