aboutsummaryrefslogtreecommitdiff
path: root/src/index/txindex.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/index/txindex.cpp')
-rw-r--r--src/index/txindex.cpp423
1 files changed, 200 insertions, 223 deletions
diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp
index 0bb553ee6a..e106b9b420 100644
--- a/src/index/txindex.cpp
+++ b/src/index/txindex.cpp
@@ -2,258 +2,261 @@
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
-#include <chainparams.h>
#include <index/txindex.h>
#include <init.h>
-#include <tinyformat.h>
#include <ui_interface.h>
#include <util.h>
#include <validation.h>
-#include <warnings.h>
-constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds
-constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds
+#include <boost/thread.hpp>
-std::unique_ptr<TxIndex> g_txindex;
-
-template<typename... Args>
-static void FatalError(const char* fmt, const Args&... args)
-{
- std::string strMessage = tfm::format(fmt, args...);
- SetMiscWarning(strMessage);
- LogPrintf("*** %s\n", strMessage);
- uiInterface.ThreadSafeMessageBox(
- "Error: A fatal internal error occurred, see debug.log for details",
- "", CClientUIInterface::MSG_ERROR);
- StartShutdown();
-}
+constexpr char DB_BEST_BLOCK = 'B';
+constexpr char DB_TXINDEX = 't';
+constexpr char DB_TXINDEX_BLOCK = 'T';
-TxIndex::TxIndex(std::unique_ptr<TxIndexDB> db) :
- m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr)
-{}
+std::unique_ptr<TxIndex> g_txindex;
-TxIndex::~TxIndex()
+struct CDiskTxPos : public CDiskBlockPos
{
- Interrupt();
- Stop();
-}
+ unsigned int nTxOffset; // after header
-bool TxIndex::Init()
-{
- LOCK(cs_main);
+ ADD_SERIALIZE_METHODS;
- // Attempt to migrate txindex from the old database to the new one. Even if
- // chain_tip is null, the node could be reindexing and we still want to
- // delete txindex records in the old database.
- if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) {
- return false;
+ template <typename Stream, typename Operation>
+ inline void SerializationOp(Stream& s, Operation ser_action) {
+ READWRITEAS(CDiskBlockPos, *this);
+ READWRITE(VARINT(nTxOffset));
}
- CBlockLocator locator;
- if (!m_db->ReadBestBlock(locator)) {
- locator.SetNull();
+ CDiskTxPos(const CDiskBlockPos &blockIn, unsigned int nTxOffsetIn) : CDiskBlockPos(blockIn.nFile, blockIn.nPos), nTxOffset(nTxOffsetIn) {
}
- m_best_block_index = FindForkInGlobalIndex(chainActive, locator);
- m_synced = m_best_block_index.load() == chainActive.Tip();
- return true;
-}
-
-static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev)
-{
- AssertLockHeld(cs_main);
-
- if (!pindex_prev) {
- return chainActive.Genesis();
+ CDiskTxPos() {
+ SetNull();
}
- const CBlockIndex* pindex = chainActive.Next(pindex_prev);
- if (pindex) {
- return pindex;
+ void SetNull() {
+ CDiskBlockPos::SetNull();
+ nTxOffset = 0;
}
-
- return chainActive.Next(chainActive.FindFork(pindex_prev));
-}
-
-void TxIndex::ThreadSync()
+};
+
+/**
+ * Access to the txindex database (indexes/txindex/)
+ *
+ * The database stores a block locator of the chain the database is synced to
+ * so that the TxIndex can efficiently determine the point it last stopped at.
+ * A locator is used instead of a simple hash of the chain tip because blocks
+ * and block index entries may not be flushed to disk until after this database
+ * is updated.
+ */
+class TxIndex::DB : public BaseIndex::DB
{
- const CBlockIndex* pindex = m_best_block_index.load();
- if (!m_synced) {
- auto& consensus_params = Params().GetConsensus();
-
- int64_t last_log_time = 0;
- int64_t last_locator_write_time = 0;
- while (true) {
- if (m_interrupt) {
- WriteBestBlock(pindex);
- return;
- }
+public:
+ explicit DB(size_t n_cache_size, bool f_memory = false, bool f_wipe = false);
- {
- LOCK(cs_main);
- const CBlockIndex* pindex_next = NextSyncBlock(pindex);
- if (!pindex_next) {
- WriteBestBlock(pindex);
- m_best_block_index = pindex;
- m_synced = true;
- break;
- }
- pindex = pindex_next;
- }
+ /// Read the disk location of the transaction data with the given hash. Returns false if the
+ /// transaction hash is not indexed.
+ bool ReadTxPos(const uint256& txid, CDiskTxPos& pos) const;
- int64_t current_time = GetTime();
- if (last_log_time + SYNC_LOG_INTERVAL < current_time) {
- LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight);
- last_log_time = current_time;
- }
+ /// Write a batch of transaction positions to the DB.
+ bool WriteTxs(const std::vector<std::pair<uint256, CDiskTxPos>>& v_pos);
- if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
- WriteBestBlock(pindex);
- last_locator_write_time = current_time;
- }
+ /// Migrate txindex data from the block tree DB, where it may be for older nodes that have not
+ /// been upgraded yet to the new database.
+ bool MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& best_locator);
+};
- CBlock block;
- if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
- FatalError("%s: Failed to read block %s from disk",
- __func__, pindex->GetBlockHash().ToString());
- return;
- }
- if (!WriteBlock(block, pindex)) {
- FatalError("%s: Failed to write block %s to tx index database",
- __func__, pindex->GetBlockHash().ToString());
- return;
- }
- }
- }
+TxIndex::DB::DB(size_t n_cache_size, bool f_memory, bool f_wipe) :
+ BaseIndex::DB(GetDataDir() / "indexes" / "txindex", n_cache_size, f_memory, f_wipe)
+{}
- if (pindex) {
- LogPrintf("txindex is enabled at height %d\n", pindex->nHeight);
- } else {
- LogPrintf("txindex is enabled\n");
- }
+bool TxIndex::DB::ReadTxPos(const uint256 &txid, CDiskTxPos& pos) const
+{
+ return Read(std::make_pair(DB_TXINDEX, txid), pos);
}
-bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex)
+bool TxIndex::DB::WriteTxs(const std::vector<std::pair<uint256, CDiskTxPos>>& v_pos)
{
- CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size()));
- std::vector<std::pair<uint256, CDiskTxPos>> vPos;
- vPos.reserve(block.vtx.size());
- for (const auto& tx : block.vtx) {
- vPos.emplace_back(tx->GetHash(), pos);
- pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION);
+ CDBBatch batch(*this);
+ for (const auto& tuple : v_pos) {
+ batch.Write(std::make_pair(DB_TXINDEX, tuple.first), tuple.second);
}
- return m_db->WriteTxs(vPos);
+ return WriteBatch(batch);
}
-bool TxIndex::WriteBestBlock(const CBlockIndex* block_index)
+/*
+ * Safely persist a transfer of data from the old txindex database to the new one, and compact the
+ * range of keys updated. This is used internally by MigrateData.
+ */
+static void WriteTxIndexMigrationBatches(CDBWrapper& newdb, CDBWrapper& olddb,
+ CDBBatch& batch_newdb, CDBBatch& batch_olddb,
+ const std::pair<unsigned char, uint256>& begin_key,
+ const std::pair<unsigned char, uint256>& end_key)
{
- LOCK(cs_main);
- if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) {
- return error("%s: Failed to write locator to disk", __func__);
- }
- return true;
+ // Sync new DB changes to disk before deleting from old DB.
+ newdb.WriteBatch(batch_newdb, /*fSync=*/ true);
+ olddb.WriteBatch(batch_olddb);
+ olddb.CompactRange(begin_key, end_key);
+
+ batch_newdb.Clear();
+ batch_olddb.Clear();
}
-void TxIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
- const std::vector<CTransactionRef>& txn_conflicted)
+bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& best_locator)
{
- if (!m_synced) {
- return;
- }
-
- const CBlockIndex* best_block_index = m_best_block_index.load();
- if (!best_block_index) {
- if (pindex->nHeight != 0) {
- FatalError("%s: First block connected is not the genesis block (height=%d)",
- __func__, pindex->nHeight);
- return;
+ // The prior implementation of txindex was always in sync with block index
+ // and presence was indicated with a boolean DB flag. If the flag is set,
+ // this means the txindex from a previous version is valid and in sync with
+ // the chain tip. The first step of the migration is to unset the flag and
+ // write the chain hash to a separate key, DB_TXINDEX_BLOCK. After that, the
+ // index entries are copied over in batches to the new database. Finally,
+ // DB_TXINDEX_BLOCK is erased from the old database and the block hash is
+ // written to the new database.
+ //
+ // Unsetting the boolean flag ensures that if the node is downgraded to a
+ // previous version, it will not see a corrupted, partially migrated index
+ // -- it will see that the txindex is disabled. When the node is upgraded
+ // again, the migration will pick up where it left off and sync to the block
+ // with hash DB_TXINDEX_BLOCK.
+ bool f_legacy_flag = false;
+ block_tree_db.ReadFlag("txindex", f_legacy_flag);
+ if (f_legacy_flag) {
+ if (!block_tree_db.Write(DB_TXINDEX_BLOCK, best_locator)) {
+ return error("%s: cannot write block indicator", __func__);
}
- } else {
- // Ensure block connects to an ancestor of the current best block. This should be the case
- // most of the time, but may not be immediately after the the sync thread catches up and sets
- // m_synced. Consider the case where there is a reorg and the blocks on the stale branch are
- // in the ValidationInterface queue backlog even after the sync thread has caught up to the
- // new chain tip. In this unlikely event, log a warning and let the queue clear.
- if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) {
- LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */
- "known best chain (tip=%s); not updating txindex\n",
- __func__, pindex->GetBlockHash().ToString(),
- best_block_index->GetBlockHash().ToString());
- return;
+ if (!block_tree_db.WriteFlag("txindex", false)) {
+ return error("%s: cannot write block index db flag", __func__);
}
}
- if (WriteBlock(*block, pindex)) {
- m_best_block_index = pindex;
- } else {
- FatalError("%s: Failed to write block %s to txindex",
- __func__, pindex->GetBlockHash().ToString());
- return;
+ CBlockLocator locator;
+ if (!block_tree_db.Read(DB_TXINDEX_BLOCK, locator)) {
+ return true;
}
-}
-void TxIndex::ChainStateFlushed(const CBlockLocator& locator)
-{
- if (!m_synced) {
- return;
- }
+ int64_t count = 0;
+ LogPrintf("Upgrading txindex database... [0%%]\n");
+ uiInterface.ShowProgress(_("Upgrading txindex database"), 0, true);
+ int report_done = 0;
+ const size_t batch_size = 1 << 24; // 16 MiB
+
+ CDBBatch batch_newdb(*this);
+ CDBBatch batch_olddb(block_tree_db);
+
+ std::pair<unsigned char, uint256> key;
+ std::pair<unsigned char, uint256> begin_key{DB_TXINDEX, uint256()};
+ std::pair<unsigned char, uint256> prev_key = begin_key;
+
+ bool interrupted = false;
+ std::unique_ptr<CDBIterator> cursor(block_tree_db.NewIterator());
+ for (cursor->Seek(begin_key); cursor->Valid(); cursor->Next()) {
+ boost::this_thread::interruption_point();
+ if (ShutdownRequested()) {
+ interrupted = true;
+ break;
+ }
- const uint256& locator_tip_hash = locator.vHave.front();
- const CBlockIndex* locator_tip_index;
- {
- LOCK(cs_main);
- locator_tip_index = LookupBlockIndex(locator_tip_hash);
- }
+ if (!cursor->GetKey(key)) {
+ return error("%s: cannot get key from valid cursor", __func__);
+ }
+ if (key.first != DB_TXINDEX) {
+ break;
+ }
+
+ // Log progress every 10%.
+ if (++count % 256 == 0) {
+ // Since txids are uniformly random and traversed in increasing order, the high 16 bits
+ // of the hash can be used to estimate the current progress.
+ const uint256& txid = key.second;
+ uint32_t high_nibble =
+ (static_cast<uint32_t>(*(txid.begin() + 0)) << 8) +
+ (static_cast<uint32_t>(*(txid.begin() + 1)) << 0);
+ int percentage_done = (int)(high_nibble * 100.0 / 65536.0 + 0.5);
+
+ uiInterface.ShowProgress(_("Upgrading txindex database"), percentage_done, true);
+ if (report_done < percentage_done/10) {
+ LogPrintf("Upgrading txindex database... [%d%%]\n", percentage_done);
+ report_done = percentage_done/10;
+ }
+ }
- if (!locator_tip_index) {
- FatalError("%s: First block (hash=%s) in locator was not found",
- __func__, locator_tip_hash.ToString());
- return;
+ CDiskTxPos value;
+ if (!cursor->GetValue(value)) {
+ return error("%s: cannot parse txindex record", __func__);
+ }
+ batch_newdb.Write(key, value);
+ batch_olddb.Erase(key);
+
+ if (batch_newdb.SizeEstimate() > batch_size || batch_olddb.SizeEstimate() > batch_size) {
+ // NOTE: it's OK to delete the key pointed at by the current DB cursor while iterating
+ // because LevelDB iterators are guaranteed to provide a consistent view of the
+ // underlying data, like a lightweight snapshot.
+ WriteTxIndexMigrationBatches(*this, block_tree_db,
+ batch_newdb, batch_olddb,
+ prev_key, key);
+ prev_key = key;
+ }
}
- // This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail
- // immediately after the the sync thread catches up and sets m_synced. Consider the case where
- // there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
- // backlog even after the sync thread has caught up to the new chain tip. In this unlikely
- // event, log a warning and let the queue clear.
- const CBlockIndex* best_block_index = m_best_block_index.load();
- if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
- LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */
- "chain (tip=%s); not writing txindex locator\n",
- __func__, locator_tip_hash.ToString(),
- best_block_index->GetBlockHash().ToString());
- return;
+ // If these final DB batches complete the migration, write the best block
+ // hash marker to the new database and delete from the old one. This signals
+ // that the former is fully caught up to that point in the blockchain and
+ // that all txindex entries have been removed from the latter.
+ if (!interrupted) {
+ batch_olddb.Erase(DB_TXINDEX_BLOCK);
+ batch_newdb.Write(DB_BEST_BLOCK, locator);
}
- if (!m_db->WriteBestBlock(locator)) {
- error("%s: Failed to write locator to disk", __func__);
+ WriteTxIndexMigrationBatches(*this, block_tree_db,
+ batch_newdb, batch_olddb,
+ begin_key, key);
+
+ if (interrupted) {
+ LogPrintf("[CANCELLED].\n");
+ return false;
}
+
+ uiInterface.ShowProgress("", 100, false);
+
+ LogPrintf("[DONE].\n");
+ return true;
}
-bool TxIndex::BlockUntilSyncedToCurrentChain()
+TxIndex::TxIndex(size_t n_cache_size, bool f_memory, bool f_wipe)
+ : m_db(MakeUnique<TxIndex::DB>(n_cache_size, f_memory, f_wipe))
+{}
+
+TxIndex::~TxIndex() {}
+
+bool TxIndex::Init()
{
- AssertLockNotHeld(cs_main);
+ LOCK(cs_main);
- if (!m_synced) {
+ // Attempt to migrate txindex from the old database to the new one. Even if
+ // chain_tip is null, the node could be reindexing and we still want to
+ // delete txindex records in the old database.
+ if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) {
return false;
}
- {
- // Skip the queue-draining stuff if we know we're caught up with
- // chainActive.Tip().
- LOCK(cs_main);
- const CBlockIndex* chain_tip = chainActive.Tip();
- const CBlockIndex* best_block_index = m_best_block_index.load();
- if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) {
- return true;
- }
- }
+ return BaseIndex::Init();
+}
- LogPrintf("%s: txindex is catching up on block notifications\n", __func__);
- SyncWithValidationInterfaceQueue();
- return true;
+bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex)
+{
+ CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size()));
+ std::vector<std::pair<uint256, CDiskTxPos>> vPos;
+ vPos.reserve(block.vtx.size());
+ for (const auto& tx : block.vtx) {
+ vPos.emplace_back(tx->GetHash(), pos);
+ pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION);
+ }
+ return m_db->WriteTxs(vPos);
}
+BaseIndex::DB& TxIndex::GetDB() const { return *m_db; }
+
bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const
{
CDiskTxPos postx;
@@ -268,7 +271,9 @@ bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRe
CBlockHeader header;
try {
file >> header;
- fseek(file.Get(), postx.nTxOffset, SEEK_CUR);
+ if (fseek(file.Get(), postx.nTxOffset, SEEK_CUR)) {
+ return error("%s: fseek(...) failed", __func__);
+ }
file >> tx;
} catch (const std::exception& e) {
return error("%s: Deserialize or I/O error - %s", __func__, e.what());
@@ -279,31 +284,3 @@ bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRe
block_hash = header.GetHash();
return true;
}
-
-void TxIndex::Interrupt()
-{
- m_interrupt();
-}
-
-void TxIndex::Start()
-{
- // Need to register this ValidationInterface before running Init(), so that
- // callbacks are not missed if Init sets m_synced to true.
- RegisterValidationInterface(this);
- if (!Init()) {
- FatalError("%s: txindex failed to initialize", __func__);
- return;
- }
-
- m_thread_sync = std::thread(&TraceThread<std::function<void()>>, "txindex",
- std::bind(&TxIndex::ThreadSync, this));
-}
-
-void TxIndex::Stop()
-{
- UnregisterValidationInterface(this);
-
- if (m_thread_sync.joinable()) {
- m_thread_sync.join();
- }
-}