diff options
Diffstat (limited to 'src/index/txindex.cpp')
-rw-r--r-- | src/index/txindex.cpp | 423 |
1 files changed, 200 insertions, 223 deletions
diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 0bb553ee6a..e106b9b420 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -2,258 +2,261 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include <chainparams.h> #include <index/txindex.h> #include <init.h> -#include <tinyformat.h> #include <ui_interface.h> #include <util.h> #include <validation.h> -#include <warnings.h> -constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds -constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds +#include <boost/thread.hpp> -std::unique_ptr<TxIndex> g_txindex; - -template<typename... Args> -static void FatalError(const char* fmt, const Args&... args) -{ - std::string strMessage = tfm::format(fmt, args...); - SetMiscWarning(strMessage); - LogPrintf("*** %s\n", strMessage); - uiInterface.ThreadSafeMessageBox( - "Error: A fatal internal error occurred, see debug.log for details", - "", CClientUIInterface::MSG_ERROR); - StartShutdown(); -} +constexpr char DB_BEST_BLOCK = 'B'; +constexpr char DB_TXINDEX = 't'; +constexpr char DB_TXINDEX_BLOCK = 'T'; -TxIndex::TxIndex(std::unique_ptr<TxIndexDB> db) : - m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr) -{} +std::unique_ptr<TxIndex> g_txindex; -TxIndex::~TxIndex() +struct CDiskTxPos : public CDiskBlockPos { - Interrupt(); - Stop(); -} + unsigned int nTxOffset; // after header -bool TxIndex::Init() -{ - LOCK(cs_main); + ADD_SERIALIZE_METHODS; - // Attempt to migrate txindex from the old database to the new one. Even if - // chain_tip is null, the node could be reindexing and we still want to - // delete txindex records in the old database. - if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) { - return false; + template <typename Stream, typename Operation> + inline void SerializationOp(Stream& s, Operation ser_action) { + READWRITEAS(CDiskBlockPos, *this); + READWRITE(VARINT(nTxOffset)); } - CBlockLocator locator; - if (!m_db->ReadBestBlock(locator)) { - locator.SetNull(); + CDiskTxPos(const CDiskBlockPos &blockIn, unsigned int nTxOffsetIn) : CDiskBlockPos(blockIn.nFile, blockIn.nPos), nTxOffset(nTxOffsetIn) { } - m_best_block_index = FindForkInGlobalIndex(chainActive, locator); - m_synced = m_best_block_index.load() == chainActive.Tip(); - return true; -} - -static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev) -{ - AssertLockHeld(cs_main); - - if (!pindex_prev) { - return chainActive.Genesis(); + CDiskTxPos() { + SetNull(); } - const CBlockIndex* pindex = chainActive.Next(pindex_prev); - if (pindex) { - return pindex; + void SetNull() { + CDiskBlockPos::SetNull(); + nTxOffset = 0; } - - return chainActive.Next(chainActive.FindFork(pindex_prev)); -} - -void TxIndex::ThreadSync() +}; + +/** + * Access to the txindex database (indexes/txindex/) + * + * The database stores a block locator of the chain the database is synced to + * so that the TxIndex can efficiently determine the point it last stopped at. + * A locator is used instead of a simple hash of the chain tip because blocks + * and block index entries may not be flushed to disk until after this database + * is updated. + */ +class TxIndex::DB : public BaseIndex::DB { - const CBlockIndex* pindex = m_best_block_index.load(); - if (!m_synced) { - auto& consensus_params = Params().GetConsensus(); - - int64_t last_log_time = 0; - int64_t last_locator_write_time = 0; - while (true) { - if (m_interrupt) { - WriteBestBlock(pindex); - return; - } +public: + explicit DB(size_t n_cache_size, bool f_memory = false, bool f_wipe = false); - { - LOCK(cs_main); - const CBlockIndex* pindex_next = NextSyncBlock(pindex); - if (!pindex_next) { - WriteBestBlock(pindex); - m_best_block_index = pindex; - m_synced = true; - break; - } - pindex = pindex_next; - } + /// Read the disk location of the transaction data with the given hash. Returns false if the + /// transaction hash is not indexed. + bool ReadTxPos(const uint256& txid, CDiskTxPos& pos) const; - int64_t current_time = GetTime(); - if (last_log_time + SYNC_LOG_INTERVAL < current_time) { - LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight); - last_log_time = current_time; - } + /// Write a batch of transaction positions to the DB. + bool WriteTxs(const std::vector<std::pair<uint256, CDiskTxPos>>& v_pos); - if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) { - WriteBestBlock(pindex); - last_locator_write_time = current_time; - } + /// Migrate txindex data from the block tree DB, where it may be for older nodes that have not + /// been upgraded yet to the new database. + bool MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& best_locator); +}; - CBlock block; - if (!ReadBlockFromDisk(block, pindex, consensus_params)) { - FatalError("%s: Failed to read block %s from disk", - __func__, pindex->GetBlockHash().ToString()); - return; - } - if (!WriteBlock(block, pindex)) { - FatalError("%s: Failed to write block %s to tx index database", - __func__, pindex->GetBlockHash().ToString()); - return; - } - } - } +TxIndex::DB::DB(size_t n_cache_size, bool f_memory, bool f_wipe) : + BaseIndex::DB(GetDataDir() / "indexes" / "txindex", n_cache_size, f_memory, f_wipe) +{} - if (pindex) { - LogPrintf("txindex is enabled at height %d\n", pindex->nHeight); - } else { - LogPrintf("txindex is enabled\n"); - } +bool TxIndex::DB::ReadTxPos(const uint256 &txid, CDiskTxPos& pos) const +{ + return Read(std::make_pair(DB_TXINDEX, txid), pos); } -bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) +bool TxIndex::DB::WriteTxs(const std::vector<std::pair<uint256, CDiskTxPos>>& v_pos) { - CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); - std::vector<std::pair<uint256, CDiskTxPos>> vPos; - vPos.reserve(block.vtx.size()); - for (const auto& tx : block.vtx) { - vPos.emplace_back(tx->GetHash(), pos); - pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION); + CDBBatch batch(*this); + for (const auto& tuple : v_pos) { + batch.Write(std::make_pair(DB_TXINDEX, tuple.first), tuple.second); } - return m_db->WriteTxs(vPos); + return WriteBatch(batch); } -bool TxIndex::WriteBestBlock(const CBlockIndex* block_index) +/* + * Safely persist a transfer of data from the old txindex database to the new one, and compact the + * range of keys updated. This is used internally by MigrateData. + */ +static void WriteTxIndexMigrationBatches(CDBWrapper& newdb, CDBWrapper& olddb, + CDBBatch& batch_newdb, CDBBatch& batch_olddb, + const std::pair<unsigned char, uint256>& begin_key, + const std::pair<unsigned char, uint256>& end_key) { - LOCK(cs_main); - if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) { - return error("%s: Failed to write locator to disk", __func__); - } - return true; + // Sync new DB changes to disk before deleting from old DB. + newdb.WriteBatch(batch_newdb, /*fSync=*/ true); + olddb.WriteBatch(batch_olddb); + olddb.CompactRange(begin_key, end_key); + + batch_newdb.Clear(); + batch_olddb.Clear(); } -void TxIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex, - const std::vector<CTransactionRef>& txn_conflicted) +bool TxIndex::DB::MigrateData(CBlockTreeDB& block_tree_db, const CBlockLocator& best_locator) { - if (!m_synced) { - return; - } - - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (!best_block_index) { - if (pindex->nHeight != 0) { - FatalError("%s: First block connected is not the genesis block (height=%d)", - __func__, pindex->nHeight); - return; + // The prior implementation of txindex was always in sync with block index + // and presence was indicated with a boolean DB flag. If the flag is set, + // this means the txindex from a previous version is valid and in sync with + // the chain tip. The first step of the migration is to unset the flag and + // write the chain hash to a separate key, DB_TXINDEX_BLOCK. After that, the + // index entries are copied over in batches to the new database. Finally, + // DB_TXINDEX_BLOCK is erased from the old database and the block hash is + // written to the new database. + // + // Unsetting the boolean flag ensures that if the node is downgraded to a + // previous version, it will not see a corrupted, partially migrated index + // -- it will see that the txindex is disabled. When the node is upgraded + // again, the migration will pick up where it left off and sync to the block + // with hash DB_TXINDEX_BLOCK. + bool f_legacy_flag = false; + block_tree_db.ReadFlag("txindex", f_legacy_flag); + if (f_legacy_flag) { + if (!block_tree_db.Write(DB_TXINDEX_BLOCK, best_locator)) { + return error("%s: cannot write block indicator", __func__); } - } else { - // Ensure block connects to an ancestor of the current best block. This should be the case - // most of the time, but may not be immediately after the the sync thread catches up and sets - // m_synced. Consider the case where there is a reorg and the blocks on the stale branch are - // in the ValidationInterface queue backlog even after the sync thread has caught up to the - // new chain tip. In this unlikely event, log a warning and let the queue clear. - if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) { - LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */ - "known best chain (tip=%s); not updating txindex\n", - __func__, pindex->GetBlockHash().ToString(), - best_block_index->GetBlockHash().ToString()); - return; + if (!block_tree_db.WriteFlag("txindex", false)) { + return error("%s: cannot write block index db flag", __func__); } } - if (WriteBlock(*block, pindex)) { - m_best_block_index = pindex; - } else { - FatalError("%s: Failed to write block %s to txindex", - __func__, pindex->GetBlockHash().ToString()); - return; + CBlockLocator locator; + if (!block_tree_db.Read(DB_TXINDEX_BLOCK, locator)) { + return true; } -} -void TxIndex::ChainStateFlushed(const CBlockLocator& locator) -{ - if (!m_synced) { - return; - } + int64_t count = 0; + LogPrintf("Upgrading txindex database... [0%%]\n"); + uiInterface.ShowProgress(_("Upgrading txindex database"), 0, true); + int report_done = 0; + const size_t batch_size = 1 << 24; // 16 MiB + + CDBBatch batch_newdb(*this); + CDBBatch batch_olddb(block_tree_db); + + std::pair<unsigned char, uint256> key; + std::pair<unsigned char, uint256> begin_key{DB_TXINDEX, uint256()}; + std::pair<unsigned char, uint256> prev_key = begin_key; + + bool interrupted = false; + std::unique_ptr<CDBIterator> cursor(block_tree_db.NewIterator()); + for (cursor->Seek(begin_key); cursor->Valid(); cursor->Next()) { + boost::this_thread::interruption_point(); + if (ShutdownRequested()) { + interrupted = true; + break; + } - const uint256& locator_tip_hash = locator.vHave.front(); - const CBlockIndex* locator_tip_index; - { - LOCK(cs_main); - locator_tip_index = LookupBlockIndex(locator_tip_hash); - } + if (!cursor->GetKey(key)) { + return error("%s: cannot get key from valid cursor", __func__); + } + if (key.first != DB_TXINDEX) { + break; + } + + // Log progress every 10%. + if (++count % 256 == 0) { + // Since txids are uniformly random and traversed in increasing order, the high 16 bits + // of the hash can be used to estimate the current progress. + const uint256& txid = key.second; + uint32_t high_nibble = + (static_cast<uint32_t>(*(txid.begin() + 0)) << 8) + + (static_cast<uint32_t>(*(txid.begin() + 1)) << 0); + int percentage_done = (int)(high_nibble * 100.0 / 65536.0 + 0.5); + + uiInterface.ShowProgress(_("Upgrading txindex database"), percentage_done, true); + if (report_done < percentage_done/10) { + LogPrintf("Upgrading txindex database... [%d%%]\n", percentage_done); + report_done = percentage_done/10; + } + } - if (!locator_tip_index) { - FatalError("%s: First block (hash=%s) in locator was not found", - __func__, locator_tip_hash.ToString()); - return; + CDiskTxPos value; + if (!cursor->GetValue(value)) { + return error("%s: cannot parse txindex record", __func__); + } + batch_newdb.Write(key, value); + batch_olddb.Erase(key); + + if (batch_newdb.SizeEstimate() > batch_size || batch_olddb.SizeEstimate() > batch_size) { + // NOTE: it's OK to delete the key pointed at by the current DB cursor while iterating + // because LevelDB iterators are guaranteed to provide a consistent view of the + // underlying data, like a lightweight snapshot. + WriteTxIndexMigrationBatches(*this, block_tree_db, + batch_newdb, batch_olddb, + prev_key, key); + prev_key = key; + } } - // This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail - // immediately after the the sync thread catches up and sets m_synced. Consider the case where - // there is a reorg and the blocks on the stale branch are in the ValidationInterface queue - // backlog even after the sync thread has caught up to the new chain tip. In this unlikely - // event, log a warning and let the queue clear. - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) { - LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */ - "chain (tip=%s); not writing txindex locator\n", - __func__, locator_tip_hash.ToString(), - best_block_index->GetBlockHash().ToString()); - return; + // If these final DB batches complete the migration, write the best block + // hash marker to the new database and delete from the old one. This signals + // that the former is fully caught up to that point in the blockchain and + // that all txindex entries have been removed from the latter. + if (!interrupted) { + batch_olddb.Erase(DB_TXINDEX_BLOCK); + batch_newdb.Write(DB_BEST_BLOCK, locator); } - if (!m_db->WriteBestBlock(locator)) { - error("%s: Failed to write locator to disk", __func__); + WriteTxIndexMigrationBatches(*this, block_tree_db, + batch_newdb, batch_olddb, + begin_key, key); + + if (interrupted) { + LogPrintf("[CANCELLED].\n"); + return false; } + + uiInterface.ShowProgress("", 100, false); + + LogPrintf("[DONE].\n"); + return true; } -bool TxIndex::BlockUntilSyncedToCurrentChain() +TxIndex::TxIndex(size_t n_cache_size, bool f_memory, bool f_wipe) + : m_db(MakeUnique<TxIndex::DB>(n_cache_size, f_memory, f_wipe)) +{} + +TxIndex::~TxIndex() {} + +bool TxIndex::Init() { - AssertLockNotHeld(cs_main); + LOCK(cs_main); - if (!m_synced) { + // Attempt to migrate txindex from the old database to the new one. Even if + // chain_tip is null, the node could be reindexing and we still want to + // delete txindex records in the old database. + if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) { return false; } - { - // Skip the queue-draining stuff if we know we're caught up with - // chainActive.Tip(). - LOCK(cs_main); - const CBlockIndex* chain_tip = chainActive.Tip(); - const CBlockIndex* best_block_index = m_best_block_index.load(); - if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) { - return true; - } - } + return BaseIndex::Init(); +} - LogPrintf("%s: txindex is catching up on block notifications\n", __func__); - SyncWithValidationInterfaceQueue(); - return true; +bool TxIndex::WriteBlock(const CBlock& block, const CBlockIndex* pindex) +{ + CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); + std::vector<std::pair<uint256, CDiskTxPos>> vPos; + vPos.reserve(block.vtx.size()); + for (const auto& tx : block.vtx) { + vPos.emplace_back(tx->GetHash(), pos); + pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION); + } + return m_db->WriteTxs(vPos); } +BaseIndex::DB& TxIndex::GetDB() const { return *m_db; } + bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRef& tx) const { CDiskTxPos postx; @@ -268,7 +271,9 @@ bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRe CBlockHeader header; try { file >> header; - fseek(file.Get(), postx.nTxOffset, SEEK_CUR); + if (fseek(file.Get(), postx.nTxOffset, SEEK_CUR)) { + return error("%s: fseek(...) failed", __func__); + } file >> tx; } catch (const std::exception& e) { return error("%s: Deserialize or I/O error - %s", __func__, e.what()); @@ -279,31 +284,3 @@ bool TxIndex::FindTx(const uint256& tx_hash, uint256& block_hash, CTransactionRe block_hash = header.GetHash(); return true; } - -void TxIndex::Interrupt() -{ - m_interrupt(); -} - -void TxIndex::Start() -{ - // Need to register this ValidationInterface before running Init(), so that - // callbacks are not missed if Init sets m_synced to true. - RegisterValidationInterface(this); - if (!Init()) { - FatalError("%s: txindex failed to initialize", __func__); - return; - } - - m_thread_sync = std::thread(&TraceThread<std::function<void()>>, "txindex", - std::bind(&TxIndex::ThreadSync, this)); -} - -void TxIndex::Stop() -{ - UnregisterValidationInterface(this); - - if (m_thread_sync.joinable()) { - m_thread_sync.join(); - } -} |