diff options
Diffstat (limited to 'src/leveldb/db')
-rw-r--r-- | src/leveldb/db/corruption_test.cc | 26 | ||||
-rw-r--r-- | src/leveldb/db/db_bench.cc | 8 | ||||
-rw-r--r-- | src/leveldb/db/db_impl.cc | 99 | ||||
-rw-r--r-- | src/leveldb/db/db_impl.h | 9 | ||||
-rw-r--r-- | src/leveldb/db/db_iter.cc | 9 | ||||
-rw-r--r-- | src/leveldb/db/db_test.cc | 142 | ||||
-rw-r--r-- | src/leveldb/db/filename.cc | 9 | ||||
-rw-r--r-- | src/leveldb/db/filename.h | 5 | ||||
-rw-r--r-- | src/leveldb/db/filename_test.cc | 1 | ||||
-rw-r--r-- | src/leveldb/db/repair.cc | 159 | ||||
-rw-r--r-- | src/leveldb/db/table_cache.cc | 6 | ||||
-rw-r--r-- | src/leveldb/db/version_set.cc | 33 | ||||
-rw-r--r-- | src/leveldb/db/version_set.h | 2 |
13 files changed, 319 insertions, 189 deletions
diff --git a/src/leveldb/db/corruption_test.cc b/src/leveldb/db/corruption_test.cc index b37ffdfe64..96afc68913 100644 --- a/src/leveldb/db/corruption_test.cc +++ b/src/leveldb/db/corruption_test.cc @@ -75,7 +75,13 @@ class CorruptionTest { Slice key = Key(i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); - ASSERT_OK(db_->Write(WriteOptions(), &batch)); + WriteOptions options; + // Corrupt() doesn't work without this sync on windows; stat reports 0 for + // the file size. + if (i == n - 1) { + options.sync = true; + } + ASSERT_OK(db_->Write(options, &batch)); } } @@ -125,7 +131,7 @@ class CorruptionTest { FileType type; std::string fname; int picked_number = -1; - for (int i = 0; i < filenames.size(); i++) { + for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == filetype && int(number) > picked_number) { // Pick latest file @@ -238,6 +244,22 @@ TEST(CorruptionTest, TableFile) { Check(90, 99); } +TEST(CorruptionTest, TableFileRepair) { + options_.block_size = 2 * kValueSize; // Limit scope of corruption + options_.paranoid_checks = true; + Reopen(); + Build(100); + DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); + dbi->TEST_CompactMemTable(); + dbi->TEST_CompactRange(0, NULL, NULL); + dbi->TEST_CompactRange(1, NULL, NULL); + + Corrupt(kTableFile, 100, 1); + RepairDB(); + Reopen(); + Check(95, 99); +} + TEST(CorruptionTest, TableFileIndexData) { Build(10000); // Enough to build multiple Tables DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); diff --git a/src/leveldb/db/db_bench.cc b/src/leveldb/db/db_bench.cc index 7abdf87587..fc46d89693 100644 --- a/src/leveldb/db/db_bench.cc +++ b/src/leveldb/db/db_bench.cc @@ -128,7 +128,7 @@ class RandomGenerator { pos_ = 0; } - Slice Generate(int len) { + Slice Generate(size_t len) { if (pos_ + len > data_.size()) { pos_ = 0; assert(len < data_.size()); @@ -139,11 +139,11 @@ class RandomGenerator { }; static Slice TrimSpace(Slice s) { - int start = 0; + size_t start = 0; while (start < s.size() && isspace(s[start])) { start++; } - int limit = s.size(); + size_t limit = s.size(); while (limit > start && isspace(s[limit-1])) { limit--; } @@ -399,7 +399,7 @@ class Benchmark { heap_counter_(0) { std::vector<std::string> files; Env::Default()->GetChildren(FLAGS_db, &files); - for (int i = 0; i < files.size(); i++) { + for (size_t i = 0; i < files.size(); i++) { if (Slice(files[i]).starts_with("heap-")) { Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); } diff --git a/src/leveldb/db/db_impl.cc b/src/leveldb/db/db_impl.cc index fa1351038b..faf5e7d7ba 100644 --- a/src/leveldb/db/db_impl.cc +++ b/src/leveldb/db/db_impl.cc @@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), - manual_compaction_(NULL), - consecutive_compaction_errors_(0) { + manual_compaction_(NULL) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + if (!bg_error_.ok()) { + // After a background error, we don't know whether a new version may + // or may not have been committed, so we cannot safely garbage collect. + return; + } + // Make a set of all of the live files std::set<uint64_t> live = pending_outputs_; versions_->AddLiveFiles(&live); @@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, return s; } -Status DBImpl::CompactMemTable() { +void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); @@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() { imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); + } else { + RecordBackgroundError(s); } - - return s; } void DBImpl::CompactRange(const Slice* begin, const Slice* end) { @@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } MutexLock l(&mutex_); - while (!manual.done) { - while (manual_compaction_ != NULL) { - bg_cv_.Wait(); - } - manual_compaction_ = &manual; - MaybeScheduleCompaction(); - while (manual_compaction_ == &manual) { + while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { + if (manual_compaction_ == NULL) { // Idle + manual_compaction_ = &manual; + MaybeScheduleCompaction(); + } else { // Running either my compaction or another compaction. bg_cv_.Wait(); } } + if (manual_compaction_ == &manual) { + // Cancel my manual compaction since we aborted early for some reason. + manual_compaction_ = NULL; + } } Status DBImpl::TEST_CompactMemTable() { @@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::RecordBackgroundError(const Status& s) { + mutex_.AssertHeld(); + if (bg_error_.ok()) { + bg_error_ = s; + bg_cv_.SignalAll(); + } +} + void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions + } else if (!bg_error_.ok()) { + // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { @@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); - if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundCompaction(); - if (s.ok()) { - // Success - consecutive_compaction_errors_ = 0; - } else if (shutting_down_.Acquire_Load()) { - // Error most likely due to shutdown; do not wait - } else { - // Wait a little bit before retrying background compaction in - // case this is an environmental problem and we do not want to - // chew up resources for failed compactions for the duration of - // the problem. - bg_cv_.SignalAll(); // In case a waiter can proceed despite the error - Log(options_.info_log, "Waiting after background compaction error: %s", - s.ToString().c_str()); - mutex_.Unlock(); - ++consecutive_compaction_errors_; - int seconds_to_sleep = 1; - for (int i = 0; i < 3 && i < consecutive_compaction_errors_ - 1; ++i) { - seconds_to_sleep *= 2; - } - env_->SleepForMicroseconds(seconds_to_sleep * 1000000); - mutex_.Lock(); - } + if (shutting_down_.Acquire_Load()) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + BackgroundCompaction(); } bg_compaction_scheduled_ = false; @@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -Status DBImpl::BackgroundCompaction() { +void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { - return CompactMemTable(); + CompactMemTable(); + return; } Compaction* c; @@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() { c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), @@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() { } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); + if (!status.ok()) { + RecordBackgroundError(status); + } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); @@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() { } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (options_.paranoid_checks && bg_error_.ok()) { - bg_error_ = status; - } } if (is_manual) { @@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() { } manual_compaction_ = NULL; } - return status; } void DBImpl::CleanupCompaction(CompactionState* compact) { @@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = InstallCompactionResults(compact); } + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); @@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); + bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); + if (!status.ok()) { + sync_error = true; + } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); + if (sync_error) { + // The state of the log file is indeterminate: the log record we + // just added may or may not show up when the DB is re-opened. + // So we force the DB into a mode where all future writes fail. + RecordBackgroundError(status); + } } if (updates == tmp_batch_) tmp_batch_->Clear(); diff --git a/src/leveldb/db/db_impl.h b/src/leveldb/db/db_impl.h index 75fd30abe9..cfc998164a 100644 --- a/src/leveldb/db/db_impl.h +++ b/src/leveldb/db/db_impl.h @@ -87,8 +87,8 @@ class DBImpl : public DB { // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status CompactMemTable() - EXCLUSIVE_LOCKS_REQUIRED(mutex_); + // Errors are recorded in bg_error_. + void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status RecoverLogFile(uint64_t log_number, VersionEdit* edit, @@ -102,10 +102,12 @@ class DBImpl : public DB { EXCLUSIVE_LOCKS_REQUIRED(mutex_); WriteBatch* BuildBatchGroup(Writer** last_writer); + void RecordBackgroundError(const Status& s); + void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -170,7 +172,6 @@ class DBImpl : public DB { // Have we encountered a background error in paranoid mode? Status bg_error_; - int consecutive_compaction_errors_; // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". diff --git a/src/leveldb/db/db_iter.cc b/src/leveldb/db/db_iter.cc index 071a54e3f4..3b2035e9e3 100644 --- a/src/leveldb/db/db_iter.cc +++ b/src/leveldb/db/db_iter.cc @@ -161,12 +161,13 @@ void DBIter::Next() { saved_key_.clear(); return; } + // saved_key_ already contains the key to skip past. + } else { + // Store in saved_key_ the current key so we skip it below. + SaveKey(ExtractUserKey(iter_->key()), &saved_key_); } - // Temporarily use saved_key_ as storage for key to skip. - std::string* skip = &saved_key_; - SaveKey(ExtractUserKey(iter_->key()), skip); - FindNextUserEntry(true, skip); + FindNextUserEntry(true, &saved_key_); } void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { diff --git a/src/leveldb/db/db_test.cc b/src/leveldb/db/db_test.cc index 49aae04dbd..280b01c14b 100644 --- a/src/leveldb/db/db_test.cc +++ b/src/leveldb/db/db_test.cc @@ -57,8 +57,11 @@ void DelayMilliseconds(int millis) { // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: - // sstable Sync() calls are blocked while this pointer is non-NULL. - port::AtomicPointer delay_sstable_sync_; + // sstable/log Sync() calls are blocked while this pointer is non-NULL. + port::AtomicPointer delay_data_sync_; + + // sstable/log Sync() calls return an error. + port::AtomicPointer data_sync_error_; // Simulate no-space errors while this pointer is non-NULL. port::AtomicPointer no_space_; @@ -75,11 +78,9 @@ class SpecialEnv : public EnvWrapper { bool count_random_reads_; AtomicCounter random_read_counter_; - AtomicCounter sleep_counter_; - AtomicCounter sleep_time_counter_; - explicit SpecialEnv(Env* base) : EnvWrapper(base) { - delay_sstable_sync_.Release_Store(NULL); + delay_data_sync_.Release_Store(NULL); + data_sync_error_.Release_Store(NULL); no_space_.Release_Store(NULL); non_writable_.Release_Store(NULL); count_random_reads_ = false; @@ -88,17 +89,17 @@ class SpecialEnv : public EnvWrapper { } Status NewWritableFile(const std::string& f, WritableFile** r) { - class SSTableFile : public WritableFile { + class DataFile : public WritableFile { private: SpecialEnv* env_; WritableFile* base_; public: - SSTableFile(SpecialEnv* env, WritableFile* base) + DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) { } - ~SSTableFile() { delete base_; } + ~DataFile() { delete base_; } Status Append(const Slice& data) { if (env_->no_space_.Acquire_Load() != NULL) { // Drop writes on the floor @@ -110,7 +111,10 @@ class SpecialEnv : public EnvWrapper { Status Close() { return base_->Close(); } Status Flush() { return base_->Flush(); } Status Sync() { - while (env_->delay_sstable_sync_.Acquire_Load() != NULL) { + if (env_->data_sync_error_.Acquire_Load() != NULL) { + return Status::IOError("simulated data sync error"); + } + while (env_->delay_data_sync_.Acquire_Load() != NULL) { DelayMilliseconds(100); } return base_->Sync(); @@ -147,8 +151,9 @@ class SpecialEnv : public EnvWrapper { Status s = target()->NewWritableFile(f, r); if (s.ok()) { - if (strstr(f.c_str(), ".sst") != NULL) { - *r = new SSTableFile(this, *r); + if (strstr(f.c_str(), ".ldb") != NULL || + strstr(f.c_str(), ".log") != NULL) { + *r = new DataFile(this, *r); } else if (strstr(f.c_str(), "MANIFEST") != NULL) { *r = new ManifestFile(this, *r); } @@ -179,12 +184,6 @@ class SpecialEnv : public EnvWrapper { } return s; } - - virtual void SleepForMicroseconds(int micros) { - sleep_counter_.Increment(); - sleep_time_counter_.IncrementBy(micros); - } - }; class DBTest { @@ -322,7 +321,7 @@ class DBTest { } // Check reverse iteration results are the reverse of forward results - int matched = 0; + size_t matched = 0; for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { ASSERT_LT(matched, forward.size()); ASSERT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]); @@ -484,6 +483,24 @@ class DBTest { } return false; } + + // Returns number of files renamed. + int RenameLDBToSST() { + std::vector<std::string> filenames; + ASSERT_OK(env_->GetChildren(dbname_, &filenames)); + uint64_t number; + FileType type; + int files_renamed = 0; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { + const std::string from = TableFileName(dbname_, number); + const std::string to = SSTTableFileName(dbname_, number); + ASSERT_OK(env_->RenameFile(from, to)); + files_renamed++; + } + } + return files_renamed; + } }; TEST(DBTest, Empty) { @@ -525,11 +542,11 @@ TEST(DBTest, GetFromImmutableLayer) { ASSERT_OK(Put("foo", "v1")); ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + env_->delay_data_sync_.Release_Store(env_); // Block sync calls Put("k1", std::string(100000, 'x')); // Fill memtable Put("k2", std::string(100000, 'y')); // Trigger compaction ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + env_->delay_data_sync_.Release_Store(NULL); // Release sync calls } while (ChangeOptions()); } @@ -1516,41 +1533,13 @@ TEST(DBTest, NoSpace) { Compact("a", "z"); const int num_files = CountFiles(); env_->no_space_.Release_Store(env_); // Force out-of-space errors - env_->sleep_counter_.Reset(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { for (int level = 0; level < config::kNumLevels-1; level++) { dbfull()->TEST_CompactRange(level, NULL, NULL); } } env_->no_space_.Release_Store(NULL); ASSERT_LT(CountFiles(), num_files + 3); - - // Check that compaction attempts slept after errors - ASSERT_GE(env_->sleep_counter_.Read(), 5); -} - -TEST(DBTest, ExponentialBackoff) { - Options options = CurrentOptions(); - options.env = env_; - Reopen(&options); - - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - Compact("a", "z"); - env_->non_writable_.Release_Store(env_); // Force errors for new files - env_->sleep_counter_.Reset(); - env_->sleep_time_counter_.Reset(); - for (int i = 0; i < 5; i++) { - dbfull()->TEST_CompactRange(2, NULL, NULL); - } - env_->non_writable_.Release_Store(NULL); - - // Wait for compaction to finish - DelayMilliseconds(1000); - - ASSERT_GE(env_->sleep_counter_.Read(), 5); - ASSERT_LT(env_->sleep_counter_.Read(), 10); - ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6); } TEST(DBTest, NonWritableFileSystem) { @@ -1573,6 +1562,37 @@ TEST(DBTest, NonWritableFileSystem) { env_->non_writable_.Release_Store(NULL); } +TEST(DBTest, WriteSyncError) { + // Check that log sync errors cause the DB to disallow future writes. + + // (a) Cause log sync calls to fail + Options options = CurrentOptions(); + options.env = env_; + Reopen(&options); + env_->data_sync_error_.Release_Store(env_); + + // (b) Normal write should succeed + WriteOptions w; + ASSERT_OK(db_->Put(w, "k1", "v1")); + ASSERT_EQ("v1", Get("k1")); + + // (c) Do a sync write; should fail + w.sync = true; + ASSERT_TRUE(!db_->Put(w, "k2", "v2").ok()); + ASSERT_EQ("v1", Get("k1")); + ASSERT_EQ("NOT_FOUND", Get("k2")); + + // (d) make sync behave normally + env_->data_sync_error_.Release_Store(NULL); + + // (e) Do a non-sync write; should fail + w.sync = false; + ASSERT_TRUE(!db_->Put(w, "k3", "v3").ok()); + ASSERT_EQ("v1", Get("k1")); + ASSERT_EQ("NOT_FOUND", Get("k2")); + ASSERT_EQ("NOT_FOUND", Get("k3")); +} + TEST(DBTest, ManifestWriteError) { // Test for the following problem: // (a) Compaction produces file F @@ -1632,6 +1652,22 @@ TEST(DBTest, MissingSSTFile) { << s.ToString(); } +TEST(DBTest, StillReadSST) { + ASSERT_OK(Put("foo", "bar")); + ASSERT_EQ("bar", Get("foo")); + + // Dump the memtable to disk. + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("bar", Get("foo")); + Close(); + ASSERT_GT(RenameLDBToSST(), 0); + Options options = CurrentOptions(); + options.paranoid_checks = true; + Status s = TryReopen(&options); + ASSERT_TRUE(s.ok()); + ASSERT_EQ("bar", Get("foo")); +} + TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_OK(Put("foo", "v2")); Compact("a", "z"); @@ -1663,7 +1699,7 @@ TEST(DBTest, BloomFilter) { dbfull()->TEST_CompactMemTable(); // Prevent auto compactions triggered by seeks - env_->delay_sstable_sync_.Release_Store(env_); + env_->delay_data_sync_.Release_Store(env_); // Lookup present keys. Should rarely read from small sstable. env_->random_read_counter_.Reset(); @@ -1684,7 +1720,7 @@ TEST(DBTest, BloomFilter) { fprintf(stderr, "%d missing => %d reads\n", N, reads); ASSERT_LE(reads, 3*N/100); - env_->delay_sstable_sync_.Release_Store(NULL); + env_->delay_data_sync_.Release_Store(NULL); Close(); delete options.block_cache; delete options.filter_policy; @@ -1744,7 +1780,7 @@ static void MTThreadBody(void* arg) { ASSERT_EQ(k, key); ASSERT_GE(w, 0); ASSERT_LT(w, kNumThreads); - ASSERT_LE(c, reinterpret_cast<uintptr_t>( + ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>( t->state->counter[w].Acquire_Load())); } } diff --git a/src/leveldb/db/filename.cc b/src/leveldb/db/filename.cc index 3c4d49f64e..da32946d99 100644 --- a/src/leveldb/db/filename.cc +++ b/src/leveldb/db/filename.cc @@ -31,6 +31,11 @@ std::string LogFileName(const std::string& name, uint64_t number) { std::string TableFileName(const std::string& name, uint64_t number) { assert(number > 0); + return MakeFileName(name, number, "ldb"); +} + +std::string SSTTableFileName(const std::string& name, uint64_t number) { + assert(number > 0); return MakeFileName(name, number, "sst"); } @@ -71,7 +76,7 @@ std::string OldInfoLogFileName(const std::string& dbname) { // dbname/LOG // dbname/LOG.old // dbname/MANIFEST-[0-9]+ -// dbname/[0-9]+.(log|sst) +// dbname/[0-9]+.(log|sst|ldb) bool ParseFileName(const std::string& fname, uint64_t* number, FileType* type) { @@ -106,7 +111,7 @@ bool ParseFileName(const std::string& fname, Slice suffix = rest; if (suffix == Slice(".log")) { *type = kLogFile; - } else if (suffix == Slice(".sst")) { + } else if (suffix == Slice(".sst") || suffix == Slice(".ldb")) { *type = kTableFile; } else if (suffix == Slice(".dbtmp")) { *type = kTempFile; diff --git a/src/leveldb/db/filename.h b/src/leveldb/db/filename.h index d5d09b1146..87a752605d 100644 --- a/src/leveldb/db/filename.h +++ b/src/leveldb/db/filename.h @@ -37,6 +37,11 @@ extern std::string LogFileName(const std::string& dbname, uint64_t number); // "dbname". extern std::string TableFileName(const std::string& dbname, uint64_t number); +// Return the legacy file name for an sstable with the specified number +// in the db named by "dbname". The result will be prefixed with +// "dbname". +extern std::string SSTTableFileName(const std::string& dbname, uint64_t number); + // Return the name of the descriptor file for the db named by // "dbname" and the specified incarnation number. The result will be // prefixed with "dbname". diff --git a/src/leveldb/db/filename_test.cc b/src/leveldb/db/filename_test.cc index 5a26da4728..a32556deaf 100644 --- a/src/leveldb/db/filename_test.cc +++ b/src/leveldb/db/filename_test.cc @@ -27,6 +27,7 @@ TEST(FileNameTest, Parse) { { "100.log", 100, kLogFile }, { "0.log", 0, kLogFile }, { "0.sst", 0, kTableFile }, + { "0.ldb", 0, kTableFile }, { "CURRENT", 0, kCurrentFile }, { "LOCK", 0, kDBLockFile }, { "MANIFEST-2", 2, kDescriptorFile }, diff --git a/src/leveldb/db/repair.cc b/src/leveldb/db/repair.cc index 022d52f3de..96c9b37af1 100644 --- a/src/leveldb/db/repair.cc +++ b/src/leveldb/db/repair.cc @@ -244,60 +244,133 @@ class Repairer { void ExtractMetaData() { std::vector<TableInfo> kept; for (size_t i = 0; i < table_numbers_.size(); i++) { - TableInfo t; - t.meta.number = table_numbers_[i]; - Status status = ScanTable(&t); - if (!status.ok()) { - std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%llu: ignoring %s", - (unsigned long long) table_numbers_[i], - status.ToString().c_str()); - ArchiveFile(fname); - } else { - tables_.push_back(t); - } + ScanTable(table_numbers_[i]); } } - Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(dbname_, t->meta.number); + Iterator* NewTableIterator(const FileMetaData& meta) { + // Same as compaction iterators: if paranoid_checks are on, turn + // on checksum verification. + ReadOptions r; + r.verify_checksums = options_.paranoid_checks; + return table_cache_->NewIterator(r, meta.number, meta.file_size); + } + + void ScanTable(uint64_t number) { + TableInfo t; + t.meta.number = number; + std::string fname = TableFileName(dbname_, number); + Status status = env_->GetFileSize(fname, &t.meta.file_size); + if (!status.ok()) { + // Try alternate file name. + fname = SSTTableFileName(dbname_, number); + Status s2 = env_->GetFileSize(fname, &t.meta.file_size); + if (s2.ok()) { + status = Status::OK(); + } + } + if (!status.ok()) { + ArchiveFile(TableFileName(dbname_, number)); + ArchiveFile(SSTTableFileName(dbname_, number)); + Log(options_.info_log, "Table #%llu: dropped: %s", + (unsigned long long) t.meta.number, + status.ToString().c_str()); + return; + } + + // Extract metadata by scanning through table. int counter = 0; - Status status = env_->GetFileSize(fname, &t->meta.file_size); - if (status.ok()) { - Iterator* iter = table_cache_->NewIterator( - ReadOptions(), t->meta.number, t->meta.file_size); - bool empty = true; - ParsedInternalKey parsed; - t->max_sequence = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - Slice key = iter->key(); - if (!ParseInternalKey(key, &parsed)) { - Log(options_.info_log, "Table #%llu: unparsable key %s", - (unsigned long long) t->meta.number, - EscapeString(key).c_str()); - continue; - } + Iterator* iter = NewTableIterator(t.meta); + bool empty = true; + ParsedInternalKey parsed; + t.max_sequence = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + if (!ParseInternalKey(key, &parsed)) { + Log(options_.info_log, "Table #%llu: unparsable key %s", + (unsigned long long) t.meta.number, + EscapeString(key).c_str()); + continue; + } - counter++; - if (empty) { - empty = false; - t->meta.smallest.DecodeFrom(key); - } - t->meta.largest.DecodeFrom(key); - if (parsed.sequence > t->max_sequence) { - t->max_sequence = parsed.sequence; - } + counter++; + if (empty) { + empty = false; + t.meta.smallest.DecodeFrom(key); } - if (!iter->status().ok()) { - status = iter->status(); + t.meta.largest.DecodeFrom(key); + if (parsed.sequence > t.max_sequence) { + t.max_sequence = parsed.sequence; } - delete iter; } + if (!iter->status().ok()) { + status = iter->status(); + } + delete iter; Log(options_.info_log, "Table #%llu: %d entries %s", - (unsigned long long) t->meta.number, + (unsigned long long) t.meta.number, counter, status.ToString().c_str()); - return status; + + if (status.ok()) { + tables_.push_back(t); + } else { + RepairTable(fname, t); // RepairTable archives input file. + } + } + + void RepairTable(const std::string& src, TableInfo t) { + // We will copy src contents to a new table and then rename the + // new table over the source. + + // Create builder. + std::string copy = TableFileName(dbname_, next_file_number_++); + WritableFile* file; + Status s = env_->NewWritableFile(copy, &file); + if (!s.ok()) { + return; + } + TableBuilder* builder = new TableBuilder(options_, file); + + // Copy data. + Iterator* iter = NewTableIterator(t.meta); + int counter = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + builder->Add(iter->key(), iter->value()); + counter++; + } + delete iter; + + ArchiveFile(src); + if (counter == 0) { + builder->Abandon(); // Nothing to save + } else { + s = builder->Finish(); + if (s.ok()) { + t.meta.file_size = builder->FileSize(); + } + } + delete builder; + builder = NULL; + + if (s.ok()) { + s = file->Close(); + } + delete file; + file = NULL; + + if (counter > 0 && s.ok()) { + std::string orig = TableFileName(dbname_, t.meta.number); + s = env_->RenameFile(copy, orig); + if (s.ok()) { + Log(options_.info_log, "Table #%llu: %d entries repaired", + (unsigned long long) t.meta.number, counter); + tables_.push_back(t); + } + } + if (!s.ok()) { + env_->DeleteFile(copy); + } } Status WriteDescriptor() { diff --git a/src/leveldb/db/table_cache.cc b/src/leveldb/db/table_cache.cc index 497db27076..e3d82cd3ea 100644 --- a/src/leveldb/db/table_cache.cc +++ b/src/leveldb/db/table_cache.cc @@ -54,6 +54,12 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, RandomAccessFile* file = NULL; Table* table = NULL; s = env_->NewRandomAccessFile(fname, &file); + if (!s.ok()) { + std::string old_fname = SSTTableFileName(dbname_, file_number); + if (env_->NewRandomAccessFile(old_fname, &file).ok()) { + s = Status::OK(); + } + } if (s.ok()) { s = Table::Open(*options_, file, file_size, &table); } diff --git a/src/leveldb/db/version_set.cc b/src/leveldb/db/version_set.cc index 66d73be71f..517edd3b18 100644 --- a/src/leveldb/db/version_set.cc +++ b/src/leveldb/db/version_set.cc @@ -876,12 +876,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); - if (ManifestContains(record)) { - Log(options_->info_log, - "MANIFEST contains log record despite error; advancing to new " - "version to prevent mismatch between in-memory and logged state"); - s = Status::OK(); - } } } @@ -889,8 +883,6 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // new CURRENT file that points to it. if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); - // No need to double-check MANIFEST in case of error since it - // will be discarded below. } mu->Lock(); @@ -1124,31 +1116,6 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { return scratch->buffer; } -// Return true iff the manifest contains the specified record. -bool VersionSet::ManifestContains(const std::string& record) const { - std::string fname = DescriptorFileName(dbname_, manifest_file_number_); - Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); - SequentialFile* file = NULL; - Status s = env_->NewSequentialFile(fname, &file); - if (!s.ok()) { - Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str()); - return false; - } - log::Reader reader(file, NULL, true/*checksum*/, 0); - Slice r; - std::string scratch; - bool result = false; - while (reader.ReadRecord(&r, &scratch)) { - if (r == Slice(record)) { - result = true; - break; - } - } - delete file; - Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0); - return result; -} - uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; for (int level = 0; level < config::kNumLevels; level++) { diff --git a/src/leveldb/db/version_set.h b/src/leveldb/db/version_set.h index 20de0e2629..8dc14b8e01 100644 --- a/src/leveldb/db/version_set.h +++ b/src/leveldb/db/version_set.h @@ -292,8 +292,6 @@ class VersionSet { void AppendVersion(Version* v); - bool ManifestContains(const std::string& record) const; - Env* const env_; const std::string dbname_; const Options* const options_; |